I am new to spring integration and pretty much all the things stated below. I have a inboundChannelAdapter to pull messages from a subscription in pubsub. As it is, it's pulling messages from the pubsub as soon it's received in the subscriber(i.e.) upon receipt, and hand it off to a @serviceactivator method. I want to wrap it in a cron so that this subscription of messages happens only in that interval(the expectation was that it would behave as a general task cron job using @Scheduled). Naturally, I stumbled into poller. I'm facing some issues and out of my depth here:
the polling frequency is not working as whatever specified, be it fixed rate or cron trigger [I tried 60000 with fixedRate for 1 minutes and trigger="cronTrigger" with a bean returning cron expression(* * * * * *)]
And it starts pulling messages as soon as the application is started rather than waiting for the scheduled/fixed time.
3.I've also got @Scheduled cron jobs unrelated to this that will be running. So I don't want run into any thread blocking issues as well.
I've also noticed that after 1 message is pulled and handed off to serviceactivator method, it goes through the entire motion and completes that process and then the next message is received, which does not seem ideal for my usecase.
Is it a good practice to use poller for the use case I've prescribed?
Pl suggest what should be done to achieved the behavior of pulling messages from pubsub with channel adapters on a scheduled frequency using poller.
I am not using IntegrationFlow and I've tried something as follows annotated on the channel adapter bean;
@InboundChannelAdapter(channel="inboundchannel", poller = @Poller(fixedRate="60000", maxMessagesPerPoll="10")
UPDATE:
I'm still facing issue with the execution time/maxMessagesPerPoll, not sure which. I had 20+ messages in my subscription. As per the configuration below, I'd assume, it should pull 10 messages per MINUTE. but the execution happened as follows & each time it pulled & seemed to have processed what's defined in @serviceactivator method:
{"time":"2023-10-14 18:39:29,618": Message Received"}
{"time":"2023-10-14 18:39:31,314": Message Received"}
{"time":"2023-10-14 18:39:31,690": Message Received"}
{"time":"2023-10-14 18:39:32,494": Message Received"}
{"time":"2023-10-14 18:39:34,546": Message Received"}
{"time":"2023-10-14 18:39:35,665": Message Received"}
{"time":"2023-10-14 18:39:35,982": Message Received"}
{"time":"2023-10-14 18:39:36,175": Message Received"}
{"time":"2023-10-14 18:39:37,306": Message Received"}
{"time":"2023-10-14 18:39:38,423": Message Received"}
{"time":"2023-10-14 18:44:49,737": Message Received"}
{"time":"2023-10-14 18:44:51,059": Message Received"}
{"time":"2023-10-14 18:46:59,977": Message Received"}
{"time":"2023-10-14 18:49:08,561": Message Received"}
Trigger config:
@Bean
public PollerMetadata periodicTrigger() {
PollerMetadata pollerMetadata = new PollerMetadata();
PeriodicTrigger periodicTrigger = new PeriodicTrigger(1L, TimeUnit.MINUTES);
periodicTrigger.setInitialDelay(1L);
// periodicTrigger.setFixedRate(true);
pollerMetadata.setMaxMessagesPerPoll(10L);
pollerMetadata.setTrigger(periodicTrigger);
return pollerMetadata;
}
It indeed starts the first polling cycle just after application startup. There was just no any previous task to calculate a delay for the current. For the purpose the
PeriodicTrigger
has aninitialDelay
property. We don't expose it in the@Poller
annotation since it is a trigger-specific and does not apply for other triggers likeCronTrigger
. you might consider to look into afixedDelay
instead offixedRate
. SeePeriodicTrigger
Javadocs for more information about those properties. In two words: thefixedRate
starts a new task after starting the previous, thefixedDelay
only when the previous has been finished.You have that
maxMessagesPerPoll="10"
which means loop in a single polling cycle for 10 messages max. And only after that it goes to the next scheduled task.If you would like to process those polled messages in parallel, consider to set a
taskExecutor
option of that@Poller
annotation.See more info in docs:
https://docs.spring.io/spring-integration/reference/polling-consumer.html
https://docs.spring.io/spring-integration/reference/channel-adapter.html#channel-adapter-namespace-inbound
https://docs.spring.io/spring-integration/reference/configuration/annotations.html#configuration-using-poller-annotation