I am facing an issue with the Subscription being lost after the Master node gets down and another slave node becomes a master that time it not getting Re-subscribed again.
I have a cluster with 3 Redis replicas and 3 Sentinel replicas
Getting the following error and after that in onMessage() no message gets received.
2023-11-03 04:31:48.784 INFO 1 --- [ter.local:5000]] r.c.j.p.SentineledConnectionProvider : Created connection pool to master at imm-db-2.redis.imm-ns.svc.cluster.local:6379.
2023-11-03 04:31:58.855 INFO 1 --- [ scheduling-1] o.s.d.r.c.l.LettuceConnectionFactory : Validation of shared connection failed. Creating a new connection.
2023-11-03 04:32:02.746 WARN 1 --- [ioEventLoop-4-1] i.l.c.m.MasterSlaveTopologyRefresh : Unable to connect to imm-db-1.redis.imm-ns.svc.cluster.local:6379
I also used handleSubscriptionException() to Re-subscribe it but no call received in handleSubscriptionException()
Following is my code.
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().build();
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(60)).enableAllAdaptiveRefreshTriggers()
.enablePeriodicRefresh().dynamicRefreshSources(true).build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.topologyRefreshOptions(topologyRefreshOptions).build();
RedisSentinelConfiguration sentinelConfig = new
RedisSentinelConfiguration().master("mymaster")
.sentinel("imm-db-0.redis.imm-ns.svc.cluster.local", 5000)
.sentinel("imm-db-1.redis.imm-ns.svc.cluster.local", 5000)
.sentinel("imm-db-2.redis.imm-ns.svc.cluster.local", 5000);
return new LettuceConnectionFactory(sentinelConfig, LettuceClientConfiguration
.builder().readFrom(ReadFrom.MASTER).clientOptions(clusterClientOptions).build());
}
@Bean
public MessageListener messageListener() {
return new Http2IFR();
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
MessageListener messageListener) {
log.info("********************RedisMessageListenerContainer**************************init done");
RedisMessageListenerContainer container = new RedisMessageListenerContainer() {
@Override
protected void handleSubscriptionException(CompletableFuture<Void> future,
BackOffExecution backOffExecution, Throwable ex) {
super.handleSubscriptionException(future, backOffExecution, ex);
if (ex instanceof RedisConnectionFailureException) {
log.error("***************RedisConnectionFailureException :{}", ex.getMessage());
} else if (ex instanceof InterruptedException) {
// can ignore those I guess
log.error("***************InterruptedException :{}", ex.getMessage());
} else if (ex instanceof TransientDataAccessException || ex instanceof RecoverableDataAccessException) {
// try to restart in those cases?
log.error("***************RecoverableDataAccessException :{} isRunning:{}", ex.getMessage(),
isRunning());
log.error(
"***************Connection failure occurred. Restarting subscription task manually due to "
+ ex,
ex);
if (isRunning()) {
log.error(
"***************INSIDE Connection failure occurred. Restarting subscription task manually due to "
+ ex,
ex);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("thred inturrepted....:{}", e.getMessage());
}
start(); // best we can do
}
} else {
// otherwise shutdown and hope for the best next time
log.error("*****************Shutting down application due to unknown exception is running:{}", ex);
if (isRunning()) {
log.warn("*****************Shutting down application due to unknown exception " + ex, ex);
// context.close();
}
}
}
};
container.setConnectionFactory(connectionFactory);
container.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
log.info("********Inside exception handler:{}", t.getMessage());
}
});
container.addMessageListener(messageListener, new ChannelTopic("TOPIC_HTTP_TRIGGER"));
return container;
}
Is there any configuration setting that I am missing here?
How to re-subscribe it?