fix(tenant): route runtime traffic to tenant databases
Wire Doctrine's default connection to the tenant database resolved from the subdomain for HTTP requests and tenant-scoped Messenger messages while keeping master-only services on the master connection. This removes the production inconsistency where demo data, migrations and tenant commands used the tenant database but the web runtime still read from master.
This commit is contained in:
@@ -0,0 +1,44 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Shared\Infrastructure\Messenger;
|
||||
|
||||
use App\Shared\Infrastructure\Tenant\TenantDatabaseSwitcher;
|
||||
use App\Shared\Infrastructure\Tenant\TenantId;
|
||||
use App\Shared\Infrastructure\Tenant\TenantRegistry;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||
use Symfony\Component\Messenger\Middleware\StackInterface;
|
||||
|
||||
final readonly class TenantDatabaseMiddleware implements MiddlewareInterface
|
||||
{
|
||||
public function __construct(
|
||||
private TenantRegistry $tenantRegistry,
|
||||
private TenantDatabaseSwitcher $databaseSwitcher,
|
||||
private TenantMessageTenantIdResolver $tenantIdResolver,
|
||||
) {
|
||||
}
|
||||
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
$tenantId = $this->tenantIdResolver->resolve($envelope->getMessage());
|
||||
if ($tenantId === null) {
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
|
||||
$previousDatabaseUrl = $this->databaseSwitcher->currentDatabaseUrl();
|
||||
$tenantConfig = $this->tenantRegistry->getConfig(TenantId::fromString($tenantId));
|
||||
$this->databaseSwitcher->useTenantDatabase($tenantConfig->databaseUrl);
|
||||
|
||||
try {
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
} finally {
|
||||
if ($previousDatabaseUrl !== null) {
|
||||
$this->databaseSwitcher->useTenantDatabase($previousDatabaseUrl);
|
||||
} else {
|
||||
$this->databaseSwitcher->useDefaultDatabase();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Shared\Infrastructure\Messenger;
|
||||
|
||||
use App\Shared\Domain\Tenant\TenantId as DomainTenantId;
|
||||
|
||||
use function get_object_vars;
|
||||
use function is_string;
|
||||
use function trim;
|
||||
|
||||
final readonly class TenantMessageTenantIdResolver
|
||||
{
|
||||
public function resolve(object $message): ?string
|
||||
{
|
||||
$vars = get_object_vars($message);
|
||||
if (!isset($vars['tenantId'])) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$tenantId = $vars['tenantId'];
|
||||
|
||||
if ($tenantId instanceof DomainTenantId) {
|
||||
return (string) $tenantId;
|
||||
}
|
||||
|
||||
if (!is_string($tenantId)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$tenantId = trim($tenantId);
|
||||
|
||||
return $tenantId !== '' ? $tenantId : null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,167 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Shared\Infrastructure\Persistence\Doctrine;
|
||||
|
||||
use App\Shared\Infrastructure\Tenant\TenantDatabaseSwitcher;
|
||||
|
||||
use function array_merge;
|
||||
|
||||
use Doctrine\DBAL\Configuration;
|
||||
use Doctrine\DBAL\Connection;
|
||||
use Doctrine\DBAL\Connection\StaticServerVersionProvider;
|
||||
use Doctrine\DBAL\Driver;
|
||||
use Doctrine\DBAL\Driver\Connection as DriverConnection;
|
||||
use Doctrine\DBAL\DriverManager;
|
||||
use Doctrine\DBAL\Platforms\AbstractPlatform;
|
||||
use Doctrine\DBAL\Tools\DsnParser;
|
||||
|
||||
use function is_array;
|
||||
use function is_string;
|
||||
|
||||
use RuntimeException;
|
||||
use SensitiveParameter;
|
||||
|
||||
/**
|
||||
* @phpstan-import-type Params from DriverManager
|
||||
*/
|
||||
final class TenantAwareConnection extends Connection implements TenantDatabaseSwitcher
|
||||
{
|
||||
private const array URL_SCHEME_MAP = [
|
||||
'db2' => 'ibm_db2',
|
||||
'mssql' => 'pdo_sqlsrv',
|
||||
'mysql' => 'pdo_mysql',
|
||||
'mysql2' => 'pdo_mysql',
|
||||
'postgres' => 'pdo_pgsql',
|
||||
'postgresql' => 'pdo_pgsql',
|
||||
'pgsql' => 'pdo_pgsql',
|
||||
'sqlite' => 'pdo_sqlite',
|
||||
'sqlite3' => 'pdo_sqlite',
|
||||
];
|
||||
|
||||
/** @phpstan-var Params */
|
||||
private array $defaultConnectionParams;
|
||||
|
||||
/** @phpstan-var Params */
|
||||
private array $currentConnectionParams;
|
||||
|
||||
private ?string $currentDatabaseUrl = null;
|
||||
|
||||
private ?AbstractPlatform $currentPlatform = null;
|
||||
|
||||
private readonly DsnParser $dsnParser;
|
||||
|
||||
/**
|
||||
* @phpstan-param Params $params
|
||||
*/
|
||||
public function __construct(
|
||||
#[SensitiveParameter]
|
||||
array $params,
|
||||
Driver $driver,
|
||||
?Configuration $config = null,
|
||||
) {
|
||||
parent::__construct($params, $driver, $config);
|
||||
|
||||
$this->defaultConnectionParams = $params;
|
||||
$this->currentConnectionParams = $params;
|
||||
$this->dsnParser = new DsnParser(self::URL_SCHEME_MAP);
|
||||
}
|
||||
|
||||
public function useTenantDatabase(string $databaseUrl): void
|
||||
{
|
||||
/** @phpstan-var Params $connectionParams */
|
||||
$connectionParams = array_merge($this->defaultConnectionParams, $this->dsnParser->parse($databaseUrl));
|
||||
|
||||
$this->applyConnectionParams($connectionParams, $databaseUrl);
|
||||
}
|
||||
|
||||
public function useDefaultDatabase(): void
|
||||
{
|
||||
$this->applyConnectionParams($this->defaultConnectionParams, null);
|
||||
}
|
||||
|
||||
public function currentDatabaseUrl(): ?string
|
||||
{
|
||||
return $this->currentDatabaseUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-return Params
|
||||
*/
|
||||
public function getParams(): array
|
||||
{
|
||||
return $this->currentConnectionParams;
|
||||
}
|
||||
|
||||
public function getDatabasePlatform(): AbstractPlatform
|
||||
{
|
||||
if ($this->currentPlatform === null) {
|
||||
$versionProvider = $this;
|
||||
|
||||
$serverVersion = $this->currentConnectionParams['serverVersion'] ?? null;
|
||||
if (is_string($serverVersion)) {
|
||||
$versionProvider = new StaticServerVersionProvider($serverVersion);
|
||||
} else {
|
||||
$primaryConnection = $this->currentConnectionParams['primary'] ?? null;
|
||||
if (is_array($primaryConnection)) {
|
||||
$primaryServerVersion = $primaryConnection['serverVersion'] ?? null;
|
||||
if (is_string($primaryServerVersion)) {
|
||||
$versionProvider = new StaticServerVersionProvider($primaryServerVersion);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->currentPlatform = $this->getDriver()->getDatabasePlatform($versionProvider);
|
||||
}
|
||||
|
||||
return $this->currentPlatform;
|
||||
}
|
||||
|
||||
public function close(): void
|
||||
{
|
||||
parent::close();
|
||||
$this->currentPlatform = null;
|
||||
}
|
||||
|
||||
protected function connect(): DriverConnection
|
||||
{
|
||||
if ($this->_conn !== null) {
|
||||
return $this->_conn;
|
||||
}
|
||||
|
||||
try {
|
||||
$connection = $this->_conn = $this->getDriver()->connect($this->currentConnectionParams);
|
||||
} catch (Driver\Exception $e) {
|
||||
throw $this->convertException($e);
|
||||
}
|
||||
|
||||
if (!$this->isAutoCommit()) {
|
||||
$this->beginTransaction();
|
||||
}
|
||||
|
||||
return $connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-param Params $params
|
||||
*/
|
||||
private function applyConnectionParams(array $params, ?string $databaseUrl): void
|
||||
{
|
||||
if ($this->currentConnectionParams === $params && $this->currentDatabaseUrl === $databaseUrl) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($this->isConnected()) {
|
||||
if ($this->isTransactionActive()) {
|
||||
throw new RuntimeException('Cannot switch database while a transaction is active.');
|
||||
}
|
||||
|
||||
$this->close();
|
||||
}
|
||||
|
||||
$this->currentConnectionParams = $params;
|
||||
$this->currentDatabaseUrl = $databaseUrl;
|
||||
$this->currentPlatform = null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Shared\Infrastructure\Tenant;
|
||||
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\HttpKernel\Event\RequestEvent;
|
||||
use Symfony\Component\HttpKernel\KernelEvents;
|
||||
|
||||
final readonly class TenantDatabaseRequestSubscriber implements EventSubscriberInterface
|
||||
{
|
||||
public function __construct(
|
||||
private TenantDatabaseSwitcher $databaseSwitcher,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents(): array
|
||||
{
|
||||
return [
|
||||
KernelEvents::REQUEST => ['onKernelRequest', 255],
|
||||
KernelEvents::TERMINATE => 'onKernelTerminate',
|
||||
];
|
||||
}
|
||||
|
||||
public function onKernelRequest(RequestEvent $event): void
|
||||
{
|
||||
if (!$event->isMainRequest()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$tenant = $event->getRequest()->attributes->get('_tenant');
|
||||
if (!$tenant instanceof TenantConfig) {
|
||||
$this->databaseSwitcher->useDefaultDatabase();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$this->databaseSwitcher->useTenantDatabase($tenant->databaseUrl);
|
||||
}
|
||||
|
||||
public function onKernelTerminate(): void
|
||||
{
|
||||
$this->databaseSwitcher->useDefaultDatabase();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Shared\Infrastructure\Tenant;
|
||||
|
||||
interface TenantDatabaseSwitcher
|
||||
{
|
||||
public function useTenantDatabase(string $databaseUrl): void;
|
||||
|
||||
public function useDefaultDatabase(): void;
|
||||
|
||||
public function currentDatabaseUrl(): ?string;
|
||||
}
|
||||
@@ -49,6 +49,8 @@ final readonly class TenantMiddleware implements EventSubscriberInterface
|
||||
return;
|
||||
}
|
||||
|
||||
$this->context->clear();
|
||||
|
||||
$request = $event->getRequest();
|
||||
$path = $request->getPathInfo();
|
||||
$host = $request->getHost();
|
||||
|
||||
Reference in New Issue
Block a user