I need to intercept every incoming message from RabbitMQ, extract some specific headers from it and put it into thread context. I see different solutions on the internet but they are used with code configuration while i have xml configuration and can't change it. This is my configuration:
<bean id="eventPublisherService"
class="...">
...
</bean>
<rabbit:connection-factory id="connectionFactory"
host="${queuing.connection.host}" virtual-host="${queuing.connection.virtualHost}"
username="${queuing.connection.username}" password="${queuing.connection.password}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="${some.queue}"/>
... more queues
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory" channel-transacted="true"/>
<bean id="mailingConsumer" class="..."/>
... more beans
<rabbit:listener-container connection-factory="connectionFactory" advice-chain="retryAdvice">
<rabbit:listener ref="mailingConsumer" queue-names="${...}" id="mailingConsumerId"/>
... more listeners
</rabbit:listener-container>
I tried to use MessagePostProcessor but I'm not sure how to integrate it into this existing setup.
That's correct. The
<rabbit:listener-container>does not expose a property for thesetAfterReceivePostProcessors().You can achieve that, thought, via
advice-chain. I see you already set aretryAdvice. So, implement aMethodInterceptorand set a desired thread context before callinginvocation.proceed().You may consider to use
ListenerContainerFactoryBeanas a<bean>definition instead of that<rabbit:listener-container>.