Rabbit Mq Listner does not work after spring boot 3 upgrade

325 Views Asked by At

I am upgrading a component from spring boot 2.6.6 to 3.0.6 ,After upgradation my rabbit listener is not working properly as consumer. When i use it as consumer with spring boot 2.6.6 then it works fine where as with spring boot 3.0.6 it does not work properly.PLease check below code snippet for more details

Code snippet : RabbitMq Config

         @Configuration
          public class RabbitConfig implements RabbitListenerConfigurer {
    
    private static final Logger LOGGER = LogManager.getLogger();
   
       @Value("${spring.rabbitmq.host:localhost}")
       private String host;
       
    @Value("${spring.rabbitmq.virtual-host:#{null}}")
       private String virtualHost;
    
       @Value("${spring.rabbitmq.port:5672}")
       private int port;
       
       @Value("${csn.traceability.component_name}")
       private String connectionName;
   
       @Value("${spring.rabbitmq.username}")
    private String username;
        
    @Value("${spring.rabbitmq.password:#{null}}")
    private String password;
        
    @Value("${spring.rabbitmq.passwordFile:#{null}}")
    private String passwordFile;
        
    @Value("${spring.rabbitmq.useSSL:false}")
    private boolean useSSL;
        
    @Value("${spring.rabbitmq.sslAlgorithm:TLSv1.1}")
    private String sslAlgorithm;
       
       
       @Bean
       MessageConverter messageConverter(ObjectMapper objectMapper) {
           return new Jackson2JsonMessageConverter(objectMapper, "*");
       }
       
       @Bean
    public CachingConnectionFactory connectionFactory() {
   
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPort(port);
        connectionFactory.setConnectionNameStrategy(cf -> connectionName);
   
        String amqpProtocol = useSSL ? "amqps" : "amqp";
   
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("RabbitConfig values [{}] [{}] [{}] [{}] [{}]", amqpProtocol, username, host, port, virtualHost);
        }
                
        String connectionString = String.format("%s://%s:%s@%s:%s", amqpProtocol, username, password, host, port);
        if (StringUtils.isNotBlank(virtualHost)) {
            connectionString = connectionString + String.format("/%s", virtualHost);
            connectionFactory.setVirtualHost(virtualHost);
        }   
   
        connectionFactory.setUri(connectionString);
        return connectionFactory;
    }
    
    @Bean(name = "rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory listenerFactory(MessageConverter messageConverter,
            CachingConnectionFactory connectionFactory) {
   
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory() {
            @Override
            protected void initializeContainer(SimpleMessageListenerContainer instance,
                    RabbitListenerEndpoint endpoint) {
                super.initializeContainer(instance, endpoint);
                // set value appropriately for what the consumer does and how long they typically take to process (depends on prefetch count also)
                instance.setShutdownTimeout(30000); 
            };
        };
        factory.setConcurrentConsumers(1);
        factory.setPrefetchCount(10);
           factory.setBatchSize(1);
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter);
        return factory;
    }
   
       
       
       /**
        * Method to configure rabbit listener
        */
       @Override
       public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
           registrar.setMessageHandlerMethodFactory(validatingHandlerMethodFactory());
       }
   
       @Bean
       DefaultMessageHandlerMethodFactory validatingHandlerMethodFactory() {
           DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
           factory.setValidator(amqpValidator());
           return factory;
       }
   
       @Bean
       Validator amqpValidator() {
           return new OptionalValidatorFactoryBean();
       }
   
   
    @Bean
    public RabbitAdmin rabbitAdmin(CachingConnectionFactory cachingConnectionFactory) {
        return new RabbitAdmin(cachingConnectionFactory);
   
    }
       
       /**
        * Static Bean for Property Configuration Place Holder
        * @return Static Property Source Place Holder
        */
       @Bean
       public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
           return new PropertySourcesPlaceholderConfigurer();
       }
   
       @PostConstruct
       private void readPropertiesFromFiles() throws IOException {          
            
            if (password == null && passwordFile != null) {
                byte[] encoded = Files.readAllBytes(Paths.get(passwordFile));
                password = new String(encoded, StandardCharsets.UTF_8);
            }
        }
   }

Code snippet of Listner :

        @Component
  @EnableMongoRepositories("com.csn.filing.ors.update.repository")
  public class OrsUpdateListener {
    private static final Logger logger = 
    LoggerFactory.getLogger(OrsUpdateListener.class);
    private static final String LISTENER_ID = "print-ors-xfr-ack";
    private static final String PAYX_PREFIX = "x-payx-";
  
    private final UpdateStatusTracker updateStatusTracker;
    private OrsPDFAvailabilityTrackerRepository orsPDFTrackerRepository;
  
    @Value("${csn.traceability.component_name}")
    private String componentName;
  
    /**
     * update listener constructor
     * @param orsPDFTrackerRepository MongoDB Repository Interface
     */
    @Autowired
    public OrsUpdateListener(OrsPDFAvailabilityTrackerRepository orsPDFTrackerRepository, UpdateStatusTracker updateStatusTracker) {
        this.orsPDFTrackerRepository = orsPDFTrackerRepository;
        this.updateStatusTracker = updateStatusTracker;
    }
  
    /**
     * Incoming ors update messages
     * @param updateMessage Rabbit ORS Update Message
     */
    @RabbitListener(id = LISTENER_ID, bindings = @QueueBinding(
            exchange = @Exchange(value = "${listener.exchange}", type = "topic"), 
            value = @Queue(value = "#{'${spring.rabbitmq.virtual-host:}' == 'filing' ? '${listener.queue}-cd' : '${listener.queue}'}", durable = "true"), 
            key = "${listener.routingKey}"),
            containerFactory= "rabbitListenerContainerFactory")
    public void receiveMessage(@Valid @Payload OrsUpdateMessage updateMessage, @Headers Map<String, Object> requestHeaders) {
        boolean success = false;
        long startTime = 0;
  
        // Extract Traceability Information from Rabbit Message Headers
        setThreadTraceability(requestHeaders);
  
        try {
            // Capture start time and inject into ThreadContext
            startTime = System.currentTimeMillis();
            ThreadContext.put(MarkAttributeEnum.start_time.toString(), String.valueOf(startTime));
  
            // Ensure Service Name is Populated
            if (ThreadContext.get(MarkAttributeEnum.service_name.toString()) == null) {
                ThreadContext.put(MarkAttributeEnum.service_name.toString(), "OrsUpdateListener");
            }
  
            // Mark Request Accepted
            logger.info(String.format("%s=%s", MarkAttributeEnum.mark, MarkEnum.request_accepted));
  
            // Mark Transaction Start
            logger.info(String.format("%s=%s", MarkAttributeEnum.mark, MarkEnum.transaction_start));
  
            final OrsPDFAvailabilityTracker orsPDFTracker = MongoTraceability.capture(() -> orsPDFTrackerRepository.findById(updateMessage.getRequestId()).orElse(null));
  
            if (orsPDFTracker != null ) {
                
                String orsErrorCode = "";
                StatusType orsStatus = StatusType.COMPLETED;
  
                if (updateMessage.getErrorCode() == OrsErrorCode.SUCCESS) {
                    /*
                     * If a message is successfully sent, we want to update its status and remove it from the pdfTracker repository.
                     */
                    updateStatus(orsPDFTracker, orsStatus, orsErrorCode);
                    MongoTraceability.captureNoReturn(() -> orsPDFTrackerRepository.delete(orsPDFTracker));
                    success = true;
  
                } else {
                    logger.error("Received ORS response with error code: {}", updateMessage.getErrorCode());
  
                    /*
                     * If a message contains an error code we want to update its status (as failed) and save it back into the database
                     */
                    orsErrorCode = updateMessage.getErrorCode().toString();
                    orsStatus = StatusType.FAILED;
  
                    updateStatus(orsPDFTracker, orsStatus, orsErrorCode);
                    orsPDFTracker.setStatus(OrsStatus.NOT_SENT);
                    MongoTraceability.captureNoReturn(() -> orsPDFTrackerRepository.save(orsPDFTracker));
  
                }
            } else {
                logger.error("ORS response invalid requestId: {}", updateMessage.toString());
            }
  
        } catch (Exception e) {
            success = false;
            logger.error("Error Processing ORS Update", e);
            throw new AmqpRejectAndDontRequeueException(e.getMessage());
  
        } finally {
            // Retrieve start time and calculate duration
            long endTime = System.currentTimeMillis();
            long duration = endTime - startTime;
  
            // Mark and Measure Transaction End
            logger.info(String.format("%s=%s,%s=%s,%s=%d",
                        MarkAttributeEnum.status.toString(), success ? "PASS": "FAIL",
                        MarkAttributeEnum.mark.toString(), MarkEnum.transaction_end,
                        MarkAttributeEnum.duration.toString(), duration));
  
            // Clear ThreadContext
            ThreadContext.clearAll();
        }
    }
    
    /**
     * Method responsible in updating ORS status on the taxfilingstatus records
     * @param orsPDFTracker
     * @param orsStatus status that need to updated on trackers
     * @param orsErrorCode error code for ors processing
     */
    private void updateStatus(OrsPDFAvailabilityTracker orsPDFTracker, StatusType orsStatus, String orsErrorCode){
        
        ReportType reportType = getReportType(orsPDFTracker.getState(), orsPDFTracker.getReportType());
        StateType miscState = null;
        // If report doesn't exits for the state and returntype, then it could be a misc report by ReportType                   
        if(reportType == null){
            reportType = getReportType(null, orsPDFTracker.getReportType());
            miscState = orsPDFTracker.getState();
        }
            
        if(StringUtils.isBlank(orsPDFTracker.getInternalEmployeeNumber())){
  
            updateStatusTracker.updateOrsInfo(orsPDFTracker.getClientId(),
                orsPDFTracker.getBranchNumber(),
                reportType,
                orsPDFTracker.getExtractId(),
                orsPDFTracker.getYear(),
                orsPDFTracker.getQuarter(),
                orsPDFTracker.getLocalCode(),
                orsStatus, orsErrorCode, miscState);
        }else {
                updateStatusTracker.updateEmployeeOrsInfo(orsPDFTracker.getClientId(),
                        orsPDFTracker.getBranchNumber(),
                        orsPDFTracker.getInternalEmployeeNumber(),
                        reportType,
                        orsPDFTracker.getExtractId(),
                        orsPDFTracker.getYear(),
                        orsPDFTracker.getQuarter(),
                        orsStatus, orsErrorCode);
        }
    }
  
    /**
     * Initiate Thread Context with Message Traceability Information
     * 
     * @param rabbitHeaders
     *            Rabbit Message Headers
     */
    private void setThreadTraceability(Map<String, Object> rabbitHeaders) {
        // Clear Any Possible Traceability Remnants
        ThreadContext.clearAll();
  
        if (rabbitHeaders != null) {
            // Store payx headers in ThreadContext
            rabbitHeaders.keySet().stream().filter(h -> h.toLowerCase().startsWith(PAYX_PREFIX))
                    .forEach(h -> ThreadContext.put(h.toLowerCase(), rabbitHeaders.get(h).toString()));
  
            // If transaction id was not received, create one along with a
            // transaction-start mark
            if (!ThreadContext.containsKey(MarkAttributeEnum.transaction_id.toString())) {
                ThreadContext.put(MarkAttributeEnum.transaction_id.toString(), UUID.randomUUID().toString());
                ThreadContext.put(MarkAttributeEnum.transaction_unknown.toString(), "true");
                ThreadContext.put(MarkAttributeEnum.business_process_name.toString(), "tax_print");
                ThreadContext.put(MarkAttributeEnum.session_id.toString(), "0");
                if (!ThreadContext.containsKey(MarkAttributeEnum.user.toString())) {
                    ThreadContext.put(MarkAttributeEnum.user.toString(), "unk");
                }
            }
  
            // Add Component Name
            ThreadContext.put(MarkAttributeEnum.component_name.toString(), componentName);
          }
      }
  }

There is not application log , But i can see error in rabbit mq log.

RabbitMq Error Log:

2023-08-21 18:09:19.488000+05:30 [error] <0.1437.0> Channel error on connection <0.926.0> (127.0.0.1:60999 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 1: 2023-08-21 18:09:19.488000+05:30 [error] <0.1437.0> operation basic.publish caused a channel exception not_found: no exchange 'tng.ors.service-fed' in vhost '/' 2023-08-21 18:09:21.316000+05:30 [error] <0.1444.0> Channel error on connection <0.926.0> (127.0.0.1:60999 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 1: 2023-08-21 18:09:21.316000+05:30 [error] <0.1444.0> operation basic.publish caused a channel exception not_found: no exchange 'tng.ors.service-fed' in vhost '/' 2023-08-21 18:09:23.272000+05:30 [error] <0.1460.0> Channel error on connection <0.926.0> (127.0.0.1:60999 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 1: 2023-08-21 18:09:23.272000+05:30 [error] <0.1460.0> operation basic.publish caused a channel exception not_found: no exchange 'tng.ors.service-fed' in vhost '/' 2023-08-22 08:52:04.500000+05:30 [warning] <0.718.0> Consumer 51 on channel 1 has timed out waiting for delivery acknowledgement. Timeout used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more 2023-08-22 08:52:09.420000+05:30 [error] <0.718.0> Channel error on connection <0.709.0> (127.0.0.1:60864 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 1: 2023-08-22 08:52:09.420000+05:30 [error] <0.718.0> operation none caused a channel exception precondition_failed: delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more 2023-08-22 08:53:07.551000+05:30 [error] <0.1475.0> Channel error on connection <0.926.0> (127.0.0.1:60999 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 1: 2023-08-22 08:53:07.551000+05:30 [error] <0.1475.0> operation basic.publish caused a channel exception not_found: no exchange 'tng.ors.service-fed' in vhost '/' 2023-08-22 11:20:08.987000+05:30 [error] <0.1753.0> Channel error on connection <0.926.0> (127.0.0.1:60999 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 1: 2023-08-22 11:20:08.987000+05:30 [error] <0.1753.0> operation basic.publish caused a channel exception not_found: no exchange 'tng.ors.service-fed' in vhost '/' 2023-08-22 11:20:11.562000+05:30 [error] <0.1771.0> Channel error on connection <0.926.0> (127.0.0.1:60999 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 1: 2023-08-22 11

0

There are 0 best solutions below