feat: Messaging asynchrone fiable avec retry, dead-letter et métriques

Les événements métier (emails d'invitation, reset password, activation)
bloquaient la réponse API en étant traités de manière synchrone. Ce commit
route ces événements vers un transport AMQP asynchrone avec un worker
dédié, garantissant des réponses API rapides et une gestion robuste des
échecs.

Le retry utilise une stratégie Fibonacci (1s, 1s, 2s, 3s, 5s, 8s, 13s)
qui offre un bon compromis entre réactivité et protection des services
externes. Les messages qui épuisent leurs tentatives arrivent dans une
dead-letter queue Doctrine avec alerte email à l'admin.

La commande console CreateTestActivationTokenCommand détecte désormais
les comptes déjà actifs et génère un token de réinitialisation de mot
de passe au lieu d'un token d'activation, évitant une erreur bloquante
lors de la ré-invitation par un admin.
This commit is contained in:
2026-02-08 21:38:20 +01:00
parent 4005c70082
commit 9ccad77bf0
29 changed files with 1706 additions and 33 deletions

View File

@@ -32,7 +32,7 @@ final readonly class AuditAuthenticationHandler
/**
* T4.1: Successful login.
*/
#[AsMessageHandler]
#[AsMessageHandler(bus: 'event.bus')]
public function handleConnexionReussie(ConnexionReussie $event): void
{
$this->auditLogger->logAuthentication(
@@ -50,7 +50,7 @@ final readonly class AuditAuthenticationHandler
/**
* T4.2: Failed login.
*/
#[AsMessageHandler]
#[AsMessageHandler(bus: 'event.bus')]
public function handleConnexionEchouee(ConnexionEchouee $event): void
{
$this->auditLogger->logAuthentication(
@@ -68,7 +68,7 @@ final readonly class AuditAuthenticationHandler
/**
* T4.3: Account temporarily locked.
*/
#[AsMessageHandler]
#[AsMessageHandler(bus: 'event.bus')]
public function handleCompteBloqueTemporairement(CompteBloqueTemporairement $event): void
{
$this->auditLogger->logAuthentication(
@@ -86,7 +86,7 @@ final readonly class AuditAuthenticationHandler
/**
* T4.4: Password changed (via reset or update).
*/
#[AsMessageHandler]
#[AsMessageHandler(bus: 'event.bus')]
public function handleMotDePasseChange(MotDePasseChange $event): void
{
$this->auditLogger->logAuthentication(

View File

@@ -0,0 +1,222 @@
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Console;
use App\Shared\Infrastructure\Messenger\ClassNameHelper;
use function count;
use Doctrine\DBAL\Connection;
use function is_array;
use function is_numeric;
use function is_string;
use const JSON_PRETTY_PRINT;
use const JSON_UNESCAPED_UNICODE;
use Override;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
/**
* Reviews and inspects failed messages in the dead-letter queue.
*
* Provides list and show subcommands for failed messages
* stored in the messenger_messages table (queue_name = 'failed').
*
* For retry/delete operations, use the built-in Symfony commands:
* - messenger:failed:retry
* - messenger:failed:remove
*/
#[AsCommand(
name: 'app:messenger:review-failed',
description: 'List or inspect failed messages in the dead-letter queue',
)]
final class ReviewFailedMessagesCommand extends Command
{
public function __construct(
private readonly Connection $connection,
) {
parent::__construct();
}
#[Override]
protected function configure(): void
{
$this
->addArgument(
'action',
InputArgument::OPTIONAL,
'Action: list (default), show',
'list',
)
->addOption(
'id',
null,
InputOption::VALUE_REQUIRED,
'Message ID (required for show)',
)
->addOption(
'limit',
'l',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to display',
'20',
)
->setHelp(<<<'HELP'
The <info>%command.name%</info> command lists and inspects failed messages.
To retry or delete a failed message, use the built-in Symfony Messenger commands:
<info>php bin/console messenger:failed:retry {id}</info>
<info>php bin/console messenger:failed:remove {id}</info>
HELP);
}
#[Override]
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
/** @var string $action */
$action = $input->getArgument('action');
return match ($action) {
'list' => $this->listMessages($input, $io),
'show' => $this->showMessage($input, $io),
default => $this->invalidAction($action, $io),
};
}
private function listMessages(InputInterface $input, SymfonyStyle $io): int
{
/** @var string $limitOption */
$limitOption = $input->getOption('limit');
if (!is_numeric($limitOption) || (int) $limitOption < 1) {
$io->error('L\'option --limit doit être un entier positif.');
return Command::FAILURE;
}
$limit = (int) $limitOption;
$rows = $this->connection->fetchAllAssociative(
'SELECT id, headers, created_at FROM messenger_messages WHERE queue_name = :queue ORDER BY created_at DESC LIMIT :limit',
['queue' => 'failed', 'limit' => $limit],
);
if ($rows === []) {
$io->success('Aucun message en echec.');
return Command::SUCCESS;
}
$tableRows = [];
foreach ($rows as $row) {
$headersRaw = is_string($row['headers']) ? $row['headers'] : '';
$headers = $this->decodeHeaders($headersRaw);
$type = $headers['type'] ?? 'inconnu';
$tableRows[] = [
$row['id'],
ClassNameHelper::short($type),
$row['created_at'],
];
}
$io->table(['ID', 'Type', 'Date'], $tableRows);
$io->info(count($rows) . ' message(s) en echec.');
$io->note('Pour rejouer ou supprimer : messenger:failed:retry {id} / messenger:failed:remove {id}');
return Command::SUCCESS;
}
private function showMessage(InputInterface $input, SymfonyStyle $io): int
{
$id = $this->requireId($input, $io);
if ($id === null) {
return Command::FAILURE;
}
$row = $this->connection->fetchAssociative(
'SELECT * FROM messenger_messages WHERE id = :id AND queue_name = :queue',
['id' => $id, 'queue' => 'failed'],
);
if ($row === false) {
$io->error("Message #{$id} introuvable dans la dead-letter queue.");
return Command::FAILURE;
}
$headersRaw = is_string($row['headers']) ? $row['headers'] : '';
$headers = $this->decodeHeaders($headersRaw);
$type = $headers['type'] ?? 'inconnu';
$io->title("Message #{$id}");
$io->definitionList(
['Type' => ClassNameHelper::short($type)],
['Date de creation' => $row['created_at']],
['Date de livraison' => $row['delivered_at'] ?? 'N/A'],
);
$io->section('Headers');
$io->text(json_encode($headers, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) ?: '{}');
$io->note('Pour rejouer : messenger:failed:retry ' . $id);
$io->note('Pour supprimer : messenger:failed:remove ' . $id);
return Command::SUCCESS;
}
private function invalidAction(string $action, SymfonyStyle $io): int
{
$io->error("Action inconnue : '{$action}'. Actions valides : list, show");
return Command::FAILURE;
}
private function requireId(InputInterface $input, SymfonyStyle $io): ?int
{
/** @var string|null $idOption */
$idOption = $input->getOption('id');
if ($idOption === null) {
$io->error("L'option --id est requise pour cette action.");
return null;
}
if (!is_numeric($idOption) || (int) $idOption < 1) {
$io->error("L'option --id doit être un entier positif.");
return null;
}
return (int) $idOption;
}
/**
* @return array<string, string>
*/
private function decodeHeaders(string $json): array
{
$decoded = json_decode($json, true);
if (!is_array($decoded)) {
return [];
}
$result = [];
foreach ($decoded as $key => $value) {
$result[(string) $key] = is_string($value) ? $value : '';
}
return $result;
}
}

View File

@@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Messenger;
/**
* Extracts the short (unqualified) class name from a FQCN.
*/
final class ClassNameHelper
{
public static function short(string $fqcn): string
{
$parts = explode('\\', $fqcn);
return end($parts);
}
}

View File

@@ -0,0 +1,71 @@
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Messenger;
use Psr\Log\LoggerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Mime\Email;
use Throwable;
use Twig\Environment;
/**
* Sends an admin alert email when a message exhausts all retries
* and lands in the dead-letter queue.
*
* The alert contains event type, attempt count, timestamps, and last error
* but never exposes PII (no user emails, no tokens).
*/
final readonly class DeadLetterAlertHandler
{
private const int MAX_ERROR_LENGTH = 500;
public function __construct(
private MailerInterface $mailer,
private Environment $twig,
private LoggerInterface $logger,
private string $adminEmail,
private string $fromEmail = 'noreply@classeo.fr',
) {
}
public function __invoke(WorkerMessageFailedEvent $event): void
{
if ($event->willRetry()) {
return;
}
$envelope = $event->getEnvelope();
$message = $envelope->getMessage();
$throwable = $event->getThrowable();
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
$errorMessage = mb_substr($throwable->getMessage(), 0, self::MAX_ERROR_LENGTH);
$html = $this->twig->render('emails/dead_letter_alert.html.twig', [
'eventType' => $message::class,
'retryCount' => $retryCount,
'lastError' => $errorMessage,
'transportName' => $event->getReceiverName(),
]);
$email = (new Email())
->from($this->fromEmail)
->to($this->adminEmail)
->subject('[Classeo] Message en dead-letter : ' . ClassNameHelper::short($message::class))
->html($html)
->priority(Email::PRIORITY_HIGH);
try {
$this->mailer->send($email);
} catch (Throwable $e) {
$this->logger->error('Failed to send dead-letter alert email.', [
'error' => $e->getMessage(),
'messageType' => $message::class,
]);
}
}
}

View File

@@ -0,0 +1,76 @@
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Messenger;
use InvalidArgumentException;
use LogicException;
use Override;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Throwable;
use Twig\Error\LoaderError;
/**
* Fibonacci-based retry strategy for Symfony Messenger.
*
* Delays: 1s, 1s, 2s, 3s, 5s, 8s, 13s (~33s total).
* Transient errors (SMTP timeout, network) trigger retries.
* Permanent errors (invalid template, bad argument) reject immediately.
*/
final readonly class FibonacciRetryStrategy implements RetryStrategyInterface
{
private const int MAX_RETRIES = 7;
/** @var list<int> Fibonacci delays in milliseconds */
private const array FIBONACCI_DELAYS = [1000, 1000, 2000, 3000, 5000, 8000, 13000];
/** @var list<class-string<Throwable>> */
private const array PERMANENT_EXCEPTIONS = [
InvalidArgumentException::class,
LogicException::class,
LoaderError::class,
];
#[Override]
public function isRetryable(Envelope $message, ?Throwable $throwable = null): bool
{
$retries = RedeliveryStamp::getRetryCountFromEnvelope($message);
if ($retries >= self::MAX_RETRIES) {
return false;
}
if ($throwable === null) {
return true;
}
return !$this->isPermanentError($throwable);
}
#[Override]
public function getWaitingTime(Envelope $message, ?Throwable $throwable = null): int
{
$retries = RedeliveryStamp::getRetryCountFromEnvelope($message);
return self::FIBONACCI_DELAYS[$retries] ?? self::FIBONACCI_DELAYS[self::MAX_RETRIES - 1];
}
private function isPermanentError(Throwable $throwable): bool
{
foreach (self::PERMANENT_EXCEPTIONS as $permanentClass) {
if ($throwable instanceof $permanentClass) {
return true;
}
}
$previous = $throwable->getPrevious();
if ($previous !== null) {
return $this->isPermanentError($previous);
}
return false;
}
}

View File

@@ -0,0 +1,74 @@
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Messenger;
use Prometheus\CollectorRegistry;
use Prometheus\Counter;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Throwable;
/**
* Middleware that counts handled and failed messages for Prometheus.
*
* Increments messenger_messages_handled_total on success and
* messenger_messages_failed_total on failure, labeled by message type.
*/
final class MessengerMetricsMiddleware implements MiddlewareInterface
{
private const string NAMESPACE = 'classeo';
private Counter $handledTotal;
private Counter $failedTotal;
public function __construct(CollectorRegistry $registry)
{
$this->handledTotal = $registry->getOrRegisterCounter(
self::NAMESPACE,
'messenger_messages_handled_total',
'Total messages handled successfully',
['message'],
);
$this->failedTotal = $registry->getOrRegisterCounter(
self::NAMESPACE,
'messenger_messages_failed_total',
'Total messages that failed',
['message'],
);
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$messageLabel = ClassNameHelper::short($envelope->getMessage()::class);
try {
$envelope = $stack->next()->handle($envelope, $stack);
} catch (Throwable $e) {
$this->safeIncrement($this->failedTotal, $messageLabel);
throw $e;
}
/** @var HandledStamp[] $handledStamps */
$handledStamps = $envelope->all(HandledStamp::class);
if ($handledStamps !== []) {
$this->safeIncrement($this->handledTotal, $messageLabel);
}
return $envelope;
}
private function safeIncrement(Counter $counter, string $label): void
{
try {
$counter->inc([$label]);
} catch (Throwable) {
// Metrics storage failure must not break message handling
}
}
}

View File

@@ -33,12 +33,9 @@ final readonly class HealthCheckController
{
$checks = $this->healthChecker->checkAll();
$allHealthy = !in_array(false, $checks, true);
$status = $allHealthy ? 'healthy' : 'unhealthy';
// Return 200 for healthy (instance is operational)
// Return 503 when unhealthy (core dependencies are down)
$httpStatus = $status === 'unhealthy' ? Response::HTTP_SERVICE_UNAVAILABLE : Response::HTTP_OK;
$healthy = !in_array(false, $checks, true);
$status = $healthy ? 'healthy' : 'unhealthy';
$httpStatus = $healthy ? Response::HTTP_OK : Response::HTTP_SERVICE_UNAVAILABLE;
return new JsonResponse([
'status' => $status,

View File

@@ -0,0 +1,60 @@
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Monitoring;
use Prometheus\CollectorRegistry;
use Prometheus\Gauge;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Throwable;
/**
* Collects Messenger queue metrics for Prometheus.
*
* Exposes a gauge for the number of messages waiting in the async transport.
* Uses the RabbitMQ management API to query queue depth.
*/
final class MessengerMetricsCollector implements MessengerMetricsCollectorInterface
{
private const string NAMESPACE = 'classeo';
private Gauge $messagesWaiting;
public function __construct(
private readonly CollectorRegistry $registry,
private readonly HttpClientInterface $httpClient,
private readonly string $rabbitmqManagementUrl = 'http://rabbitmq:15672',
private readonly string $rabbitmqUser = 'guest',
private readonly string $rabbitmqPassword = 'guest',
) {
$this->messagesWaiting = $this->registry->getOrRegisterGauge(
self::NAMESPACE,
'messenger_messages_waiting',
'Number of messages waiting in transport queue',
['transport'],
);
}
public function collect(): void
{
try {
$response = $this->httpClient->request(
'GET',
$this->rabbitmqManagementUrl . '/api/queues/%2f/messages',
[
'auth_basic' => [$this->rabbitmqUser, $this->rabbitmqPassword],
'timeout' => 2,
],
);
if ($response->getStatusCode() === 200) {
$data = $response->toArray();
$messageCount = $data['messages'] ?? 0;
$this->messagesWaiting->set((float) $messageCount, ['async']);
}
} catch (Throwable) {
// If RabbitMQ management API is unavailable, skip metric update
}
}
}

View File

@@ -0,0 +1,10 @@
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Monitoring;
interface MessengerMetricsCollectorInterface
{
public function collect(): void;
}

View File

@@ -39,6 +39,7 @@ final readonly class MetricsController
public function __construct(
private CollectorRegistry $registry,
private HealthMetricsCollectorInterface $healthMetrics,
private MessengerMetricsCollectorInterface $messengerMetrics,
private string $appEnv = 'dev',
) {
}
@@ -50,8 +51,9 @@ final readonly class MetricsController
throw new AccessDeniedHttpException('Metrics endpoint is restricted to internal networks.');
}
// Collect fresh health metrics before rendering
// Collect fresh metrics before rendering
$this->healthMetrics->collect();
$this->messengerMetrics->collect();
$renderer = new RenderTextFormat();
$metrics = $renderer->render($this->registry->getMetricFamilySamples());