How to register the integration flows in runtime?

442 Views Asked by At

I'm building a micro service for multiple properties. So, each property has different configuration. To do that, I've implemented something like this;

    @Autowired
    IntegrationFlowContext flowContext;

    @Bean
    public void setFlowContext() {
        List<Login> loginList = DAO.getLoginList(); // a web service
        loginList.forEach(e -> {
            IntegrationFlow flow = IntegrationFlows.from(() -> e, c -> c.poller(Pollers.fixedRate(e.getPeriod(), TimeUnit.SECONDS, 5)))
                    .channel("X_CHANNEL")
                    .get();
            flowContext.registration(flow).register();
        });
    }

By this implementation, I'm getting the loginList before application started. So, after application is started, I'm not able to get loginList from web service since there is no poller config. The problem is loginList could change; new logins credentials could be added or deleted. Therefore, I want to implement something will work X time period to get loginList from web service, then, by loginList I need to register the flows that are created for each loginList. To achieve, I've implemented something like this;

    @Bean
    public IntegrationFlow setFlowContext() {
        return IntegrationFlows
                .from(this::getSpecification, p -> p.poller(Pollers.fixedRate(X))) // the specification is constant.
                .transform(payload -> DAO.getLoginList(payload))
                .split()
                .<Login>handle((payload, header) -> {
                    IntegrationFlow flow = IntegrationFlows.from(() -> payload, c -> c.poller(Pollers.fixedRate(payload.getPeriod(), TimeUnit.SECONDS, 5)))
                    .channel("X_CHANNEL")
                    .get();
                    flowContext.registration(flow).register().start();
                    return null;
                })
                .get();
    }

Basically, I've used start() method, but this is not working as aspected. See this;

flowContext.registration(flow).register().start();

Lastly, I've read the Dynamic and Runtime Integration Flows, but still couldn't implement this feature.

1

There are 1 best solutions below

1
On

Dynamic flow registration cannot be used within a @Bean definition.

It is designed to be used at runtime AFTER the application context is fully initialized.