Queries on poller in spring Integration

117 Views Asked by At

I am new to spring integration and pretty much all the things stated below. I have a inboundChannelAdapter to pull messages from a subscription in pubsub. As it is, it's pulling messages from the pubsub as soon it's received in the subscriber(i.e.) upon receipt, and hand it off to a @serviceactivator method. I want to wrap it in a cron so that this subscription of messages happens only in that interval(the expectation was that it would behave as a general task cron job using @Scheduled). Naturally, I stumbled into poller. I'm facing some issues and out of my depth here:

  1. the polling frequency is not working as whatever specified, be it fixed rate or cron trigger [I tried 60000 with fixedRate for 1 minutes and trigger="cronTrigger" with a bean returning cron expression(* * * * * *)]

  2. And it starts pulling messages as soon as the application is started rather than waiting for the scheduled/fixed time.

3.I've also got @Scheduled cron jobs unrelated to this that will be running. So I don't want run into any thread blocking issues as well.

  1. I've also noticed that after 1 message is pulled and handed off to serviceactivator method, it goes through the entire motion and completes that process and then the next message is received, which does not seem ideal for my usecase.

  2. Is it a good practice to use poller for the use case I've prescribed?

  3. Pl suggest what should be done to achieved the behavior of pulling messages from pubsub with channel adapters on a scheduled frequency using poller.

I am not using IntegrationFlow and I've tried something as follows annotated on the channel adapter bean;

@InboundChannelAdapter(channel="inboundchannel", poller = @Poller(fixedRate="60000", maxMessagesPerPoll="10")

UPDATE:

I'm still facing issue with the execution time/maxMessagesPerPoll, not sure which. I had 20+ messages in my subscription. As per the configuration below, I'd assume, it should pull 10 messages per MINUTE. but the execution happened as follows & each time it pulled & seemed to have processed what's defined in @serviceactivator method:

{"time":"2023-10-14 18:39:29,618": Message Received"}
{"time":"2023-10-14 18:39:31,314": Message Received"}
{"time":"2023-10-14 18:39:31,690": Message Received"}
{"time":"2023-10-14 18:39:32,494": Message Received"}
{"time":"2023-10-14 18:39:34,546": Message Received"}
{"time":"2023-10-14 18:39:35,665": Message Received"}
{"time":"2023-10-14 18:39:35,982": Message Received"}
{"time":"2023-10-14 18:39:36,175": Message Received"}
{"time":"2023-10-14 18:39:37,306": Message Received"}
{"time":"2023-10-14 18:39:38,423": Message Received"}
{"time":"2023-10-14 18:44:49,737": Message Received"}
{"time":"2023-10-14 18:44:51,059": Message Received"}
{"time":"2023-10-14 18:46:59,977": Message Received"}
{"time":"2023-10-14 18:49:08,561": Message Received"}

Trigger config:

@Bean
  public PollerMetadata periodicTrigger() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    PeriodicTrigger periodicTrigger = new PeriodicTrigger(1L, TimeUnit.MINUTES);
    periodicTrigger.setInitialDelay(1L);
//    periodicTrigger.setFixedRate(true);
    pollerMetadata.setMaxMessagesPerPoll(10L);
    pollerMetadata.setTrigger(periodicTrigger);
    return pollerMetadata;
  }
1

There are 1 best solutions below

2
On

It indeed starts the first polling cycle just after application startup. There was just no any previous task to calculate a delay for the current. For the purpose the PeriodicTrigger has an initialDelay property. We don't expose it in the @Poller annotation since it is a trigger-specific and does not apply for other triggers like CronTrigger. you might consider to look into a fixedDelay instead of fixedRate. See PeriodicTrigger Javadocs for more information about those properties. In two words: the fixedRate starts a new task after starting the previous, the fixedDelay only when the previous has been finished.

You have that maxMessagesPerPoll="10" which means loop in a single polling cycle for 10 messages max. And only after that it goes to the next scheduled task.

If you would like to process those polled messages in parallel, consider to set a taskExecutor option of that @Poller annotation.

See more info in docs:

https://docs.spring.io/spring-integration/reference/polling-consumer.html

https://docs.spring.io/spring-integration/reference/channel-adapter.html#channel-adapter-namespace-inbound

https://docs.spring.io/spring-integration/reference/configuration/annotations.html#configuration-using-poller-annotation