AWS SQS Listerner Unable to deserialize the message

3.4k Views Asked by At

Jackson is unable to deserialize it. please find the stack trace and the SQS configuration and the listener

org.springframework.cloud.aws.messaging.listener.QueueMessageHandler.processHandlerMethodException(QueueMessageHandler.java:248)
      t stack_trace   JsonParseException: Unrecognized token 's3': was expecting (JSON String, Number, Array, Object or token 'null', 'true'

or 'false') at [Source: (String)"s3://abc-invoices45-invoices/www-abc-at/2020/09/07/504000101-3436547667.pdf"; line: 1, column: 3] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2867) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1913) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:772) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4340) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4189) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3205) at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:230) ... MessageConversionException: Could not read JSON: Unrecognized token 's3': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') at [Source: (String)"s3://abc-invoices/www-abc-at/2020/09/07/504000101-3436547667.pdf"; line: 1, column: 3]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 's3': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') at [Source: (String)"s3://abc-invoices/www-k24-at/2020/09/07/504000101-3436547667.pdf"; line: 1, column: 3]

This is my SQS configuration



import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.abc.properties.LocalPropertyService;
import java.util.Collections;
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

@Configuration
@Import(SqsConfiguration.class)
public class InvoiceSqsConfig {

  @Bean(name = "amazonSQSAsyncClient")
  public AmazonSQSAsync getAmazonSQSAsyncClient(LocalPropertyService propertyService) {
    System.setProperty("invoice.queue.name",
                       propertyService.getString("sqs.invoice.queueName"));
    return AmazonSQSAsyncClientBuilder.standard()
                                      .withRegion(
                                          propertyService.getString("sqs.aws.region"))
                                      .withCredentials(
                                          new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                                              propertyService.getString("sqs.invoice.accessKey"),
                                              propertyService.getString("sqs.invoice.secretKey"))))
                                      .build();
  }

  @Bean
  public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setStrictContentTypeMatch(false);
    messageConverter.getObjectMapper().registerModule(new JavaTimeModule());
    factory.setMessageConverters(Collections.singletonList(messageConverter));
    return factory;
  }

  @Bean
  public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSqs);
    factory.setMaxNumberOfMessages(10);
    factory.setWaitTimeOut(20);
    return factory;
  }
}

And this is the Message Listener

import static org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy.ON_SUCCESS;

import com.abc.order.invoice.exception.InvalidInvoiceMessageException;
import com.abc.order.invoice.exception.OrderNotFoundException;
import com.abc.order.order.OrderService;
import com.abc.order.order.model.Order;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RequiredArgsConstructor
public class InvoiceUpdateListener {

  @NonNull
  private final OrderService orderService;

  @SqsListener(value = "${invoice.queue.name}", deletionPolicy = ON_SUCCESS)
  @SneakyThrows
  public void receiveInvoice(@NotificationMessage EnvelopedMessage envelopedMessage) {
    log.debug("Received message from the invoice queue : {} ", envelopedMessage.getMessage());
    String message = envelopedMessage.getMessage();
    if (StringUtils.isBlank(message)) {
      throw new InvalidInvoiceMessageException("Received invalid message from the invoice queue");
    }
    String orderNumber = extractOrderNumber(message);
    Order order = orderService.getOrderByOrderNumber(orderNumber);
    if (order == null) {
      throw new OrderNotFoundException(
          "Could not find the order with order number : " + orderNumber);
    }
    order.setInvoiceUrl(message);
    log.debug("Saving invoice url for the orderNumber : {} ", orderNumber);
    orderService.save(order);
  }
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@JsonInclude(Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class EnvelopedMessage {

  @JsonProperty("Type")
  private String type;
  @JsonProperty("MessageId")
  private String messageId;
  @JsonProperty("TopicArn")
  private String topicArn;
  @JsonProperty("Subject")
  private String subject;
  @JsonProperty("Message")
  private String message;
  @JsonProperty("Timestamp")
  private ZonedDateTime createdAt;
  @JsonProperty("SignatureVersion")
  private String signatureVersion;
  @JsonProperty("Signature")
  private String signature;
  @JsonProperty("SigningCertURL")
  private String certUrl;
  @JsonProperty("UnsubscribeURL")
  private String unsubscribeUrl;
}

and this is the message I receive from the producer

{
  "Type" : "Notification",
  "MessageId" : "d77fa67e-6aa6-5b87-a707-f1ae5ba5922f",
  "TopicArn" : "arn:aws:sns:eu-central-1:726569450381:invoices-from-core",
  "Subject" : "File uploaded: invoices/www-abc-at/2020/09/07/504000101-3436547667.pdf",
  "Message" : "s3://k24-invoices/www-abc-at/2020/09/07/504000101-3436547667.pdf",
  "Timestamp" : "2020-09-07T12:59:47.192Z",
  "SignatureVersion" : "1",
  "Signature" : "dummysignature",
  "SigningCertURL" : "dummy url",
  "UnsubscribeURL" : "dummy url"
} 
2

There are 2 best solutions below

0
On BEST ANSWER

Your error suggests that the issue is around the Message key of your message.

The message you shared looks like it actually comes from SNS, this is a common pattern in which a producer posts a message into an SNS Topic, the SNS Topic publishes its messages in a SQS Queue and a consumer on the other side is triggered/polls the queue for messages.

In your case, what is happening is that your consumer expects the Message to be a JSON string like "{\"some_key\": \"some_value\"}" so after reading the value of the Message key in the enveloped message it tries to parse it to an actual dictionary/object.

You should either instruct your code to treat this value as an actual string and avoid conversion or enclose your message in a JSON format.

0
On

got this working by making the following changes to the message listener

  public void receiveInvoice(@NotificationMessage String message) {
//
}

and removed this line messageConverter.setSerializedPayloadClass(String.class); from the Bean from InvoiceSqsConfig.java

@Bean
  public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setStrictContentTypeMatch(false);
    messageConverter.getObjectMapper().registerModule(new JavaTimeModule());
    factory.setMessageConverters(Collections.singletonList(messageConverter));
    return factory;
  }