How to check if I'm already in a message handler when dispatching a new message in Symfony Messenger?

352 Views Asked by At

I'm trying to log every message that goes through Symfony Messenger, along with every information I can gather:

  • message dispatched at <timestamp>
  • message handled / failed / retried at <timestamp>

I created a subscriber for all Messenger events I'm interested in:

final readonly class MessengerAuditSubscriber implements EventSubscriberInterface
{
    public static function getSubscribedEvents(): array
    {
        return [
            SendMessageToTransportsEvent::class => 'onSendMessageToTransportsEvent',
            WorkerMessageFailedEvent::class => 'onWorkerMessageFailedEvent',
            WorkerMessageHandledEvent::class => 'onWorkerMessageHandledEvent',
            WorkerMessageReceivedEvent::class => 'onWorkerMessageReceivedEvent',
            WorkerMessageRetriedEvent::class => 'onWorkerMessageRetriedEvent',
        ];
    }

    // ...
}

I attach a custom stamp to the Envelope when the message is dispatched:

    public function onSendMessageToTransportsEvent(SendMessageToTransportsEvent $event): void
    {
        $envelope = $event->getEnvelope();
        $envelope = $envelope->with(new MessageIdStamp(uniqid()));
        $event->setEnvelope($envelope);

        // ...log the event
    }

And retrieve it in subsequent events:

    public function onWorkerMessageHandledEvent(WorkerMessageHandledEvent $event): void
    {
        $envelope = $event->getEnvelope();
        $messageIdStamp = $envelope->last(MessageIdStamp::class);

        // ...log the event
    }

So far, so good.

Now, when a message handler itself dispatches a new message, I'd like to log the parent message ID as well, so that I can later retrieve the whole hierarchy of messages that occurred starting from a root message ID:

  • ✉️ Message 1 (parent null)
    • Handler
      • ✉️ Message 2 (parent 1)
        • Handler
      • ✉️ Message 3 (parent 1)
        • Handler
    • Handler
      • ✉️ Message 4 (parent 1)
        • Handler
          • ✉️ Message 5 (parent 4)
            • Handler

How can I tell, from within my listener's onSendMessageToTransportsEvent() method, whether I'm already called from within another message handler?

Is there some mechanism in Messenger like the RequestStack, but for messages?

1

There are 1 best solutions below

0
BenMorel On BEST ANSWER

The solution involves keeping a stack of message ids in the subscriber.

The stack is:

  • pushed with the message id when a message is received
  • popped when the message is handled / failed
  • read when a new message is dispatched, the top of the stack being the parent id
final readonly class MessageIdStamp implements StampInterface
{
    public function __construct(
        public string $messageId,
        public ?string $parentMessageId,
    ) {
    }
}

final class MessengerAuditSubscriber implements EventSubscriberInterface
{
    /**
     * @var list<string>
     */
    public array $messageIdStack = [];

    public static function getSubscribedEvents(): array
    {
        return [
            SendMessageToTransportsEvent::class => 'onSendMessageToTransportsEvent',
            WorkerMessageReceivedEvent::class => 'onWorkerMessageReceivedEvent',
            WorkerMessageHandledEvent::class => 'onWorkerMessageHandledOrFailedEvent',
            WorkerMessageFailedEvent::class => 'onWorkerMessageHandledOrFailedEvent',
        ];
    }

    public function onSendMessageToTransportsEvent(SendMessageToTransportsEvent $event): void
    {
        $stackSize = count($this->messageIdStack);
        $parentMessageId = ($stackSize !== 0)
            ? $this->messageIdStack[$stackSize - 1]
            : null;

        $messageId = $this->generateMessageId();
        $messageIdStamp = new MessageIdStamp($messageId, $parentMessageId);

        $envelope = $event->getEnvelope()->with($messageIdStamp);
        $event->setEnvelope($envelope);
    }

    public function onWorkerMessageReceivedEvent(WorkerMessageReceivedEvent $event): void
    {
        $messageIdStamp = $event->getEnvelope()->last(MessageIdStamp::class);

        if ($messageIdStamp === null) {
            // redelivery of a legacy message that was not stamped, ignore
            return;
        }

        $this->messageIdStack[] = $messageIdStamp->messageId;
    }

    public function onWorkerMessageHandledOrFailedEvent(WorkerMessageHandledEvent|WorkerMessageFailedEvent $event): void
    {
        $messageIdStamp = $event->getEnvelope()->last(MessageIdStamp::class);

        if ($messageIdStamp === null) {
            // redelivery of a legacy message that was not stamped, ignore
            return;
        }

        array_pop($this->messageIdStack);
    }

    private function generateMessageId(): string
    {
        // ...
    }
}