I am facing an issue with the Subscription being lost after the Master node gets down and another slave node becomes a master that time it not getting Re-subscribed again.

I have a cluster with 3 Redis replicas and 3 Sentinel replicas

Getting the following error and after that in onMessage() no message gets received.

2023-11-03 04:31:48.784  INFO 1 --- [ter.local:5000]] r.c.j.p.SentineledConnectionProvider     : Created connection pool to master at imm-db-2.redis.imm-ns.svc.cluster.local:6379.
2023-11-03 04:31:58.855  INFO 1 --- [   scheduling-1] o.s.d.r.c.l.LettuceConnectionFactory     : Validation of shared connection failed. Creating a new connection.
2023-11-03 04:32:02.746  WARN 1 --- [ioEventLoop-4-1] i.l.c.m.MasterSlaveTopologyRefresh       : Unable to connect to imm-db-1.redis.imm-ns.svc.cluster.local:6379

I also used handleSubscriptionException() to Re-subscribe it but no call received in handleSubscriptionException()

Following is my code.

@Bean
    public LettuceConnectionFactory lettuceConnectionFactory() {
                       LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().build();

                ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                    .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(60)).enableAllAdaptiveRefreshTriggers()
                    .enablePeriodicRefresh().dynamicRefreshSources(true).build();

                 ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
                    .topologyRefreshOptions(topologyRefreshOptions).build();
                    
                                RedisSentinelConfiguration sentinelConfig = new 
                                 RedisSentinelConfiguration().master("mymaster")
                    .sentinel("imm-db-0.redis.imm-ns.svc.cluster.local", 5000)
                    .sentinel("imm-db-1.redis.imm-ns.svc.cluster.local", 5000)
                    .sentinel("imm-db-2.redis.imm-ns.svc.cluster.local", 5000);
                    
                    return new LettuceConnectionFactory(sentinelConfig, LettuceClientConfiguration
                    .builder().readFrom(ReadFrom.MASTER).clientOptions(clusterClientOptions).build());

}
              @Bean
              public MessageListener messageListener() {
                  return new Http2IFR();
              }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
            MessageListener messageListener) {

        log.info("********************RedisMessageListenerContainer**************************init done");
        RedisMessageListenerContainer container = new RedisMessageListenerContainer() {
            @Override
            protected void handleSubscriptionException(CompletableFuture<Void> future,
                    BackOffExecution backOffExecution, Throwable ex) {
                super.handleSubscriptionException(future, backOffExecution, ex);

                if (ex instanceof RedisConnectionFailureException) {
                    log.error("***************RedisConnectionFailureException :{}", ex.getMessage());

                } else if (ex instanceof InterruptedException) {
                    // can ignore those I guess
                    log.error("***************InterruptedException :{}", ex.getMessage());
                } else if (ex instanceof TransientDataAccessException || ex instanceof RecoverableDataAccessException) {
                    // try to restart in those cases?
                    log.error("***************RecoverableDataAccessException :{} isRunning:{}", ex.getMessage(),
                            isRunning());
                    log.error(
                            "***************Connection failure occurred. Restarting subscription task manually due to "
                                    + ex,
                            ex);
                    if (isRunning()) {
                        log.error(
                                "***************INSIDE Connection failure occurred. Restarting subscription task manually due to "
                                        + ex,
                                ex);
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            log.error("thred inturrepted....:{}", e.getMessage());
                        }
                        start(); // best we can do
                    }
                } else {
                    // otherwise shutdown and hope for the best next time
                    log.error("*****************Shutting down application due to unknown exception is running:{}", ex);
                    if (isRunning()) {
                        log.warn("*****************Shutting down application due to unknown exception " + ex, ex);
//                          context.close();
                    }
                }
            }
        };

        container.setConnectionFactory(connectionFactory);
        container.setErrorHandler(new ErrorHandler() {
            @Override
            public void handleError(Throwable t) {
                log.info("********Inside exception handler:{}", t.getMessage());
            }
        });
        container.addMessageListener(messageListener, new ChannelTopic("TOPIC_HTTP_TRIGGER"));
        return container;
    }

Is there any configuration setting that I am missing here?

How to re-subscribe it?

0

There are 0 best solutions below