I got this error when i try to consume records from kinesis: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap') so it looks like the payload is coming in another type or so.
Note: if i replaced batch by record and replaced signature of consumer to receive one message only Consumer<byte[]> fizzBuzzConsumer() it works fine and i can convert the byte array to string with no issues, so why it's not working with batch mode?
spring:
profiles:
active: local
application:
name: my-consumer
cloud:
function:
definition: fizzBuzzConsumer
stream:
function:
bindings:
fizzBuzzConsumer-in-0: input
bindings:
input:
consumer:
batch-mode: true
use-native-decoding: true
destination: kinesis-writer-stream
content-type: text/plain
group: kinesis-reader-app-group
kinesis:
bindings:
kinesis-writer-stream:
consumer:
listener-mode: batch
checkpoint-mode: periodic
checkpoint-interval: 3000
idle-between-polls: ${KINESIS_CONSUMER_IDLE_BETWEEN_POLLS:1000}
consumer-backoff: ${KINESIS_CONSUMER_BACKOFF:1000}
records-limit: ${KINESIS_CONSUMER_RECORDS_LIMIT:2000}
shard-iterator-type: TRIM_HORIZON
worker-id: kinesis-reader-worker-id
binder:
checkpoint:
table: kinesis-reader-stream-metadata
locks:
table: kinesis-reader-lock-registry
lease-duration: 30
refresh-period: 3000
read-capacity: 10
kpl-kcl-enabled: true
auto-create-stream: true
auto-add-shards: true
min-shard-count: 1
And here is my consumer:
@Bean
public Consumer<Message<List<byte[]>>> fizzBuzzConsumer() {
return message -> {
for (byte[] record: message.getPayload()) {
String json = new String(Objects.requireNonNull(record), StandardCharsets.UTF_8);
log.info("New Record comes.... {}", json);
}
};
}
Error:
2022-11-03 22:22:13.496 ERROR 549022 --- [cTaskExecutor-4] s.i.a.i.k.KclMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[1165], headers={aws_shard=shardId-000000000000, id=d1433639-dcf9-53eb-9980-cc893406c3e8, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945], aws_receivedPartitionKey=1082619945, aws_receivedStream=kinesis-writer-stream, aws_receivedSequenceNumber=49634843198456367976521810433671650607349108380513861634, timestamp=1667510514911}]'
for the 'UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945]'.
Consider to use 'errorChannel' flow for the compensation logic.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@4f4bbdbb]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter.access$1600(KclMessageDrivenChannelAdapter.java:84) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.performSend(KclMessageDrivenChannelAdapter.java:520) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processSingleRecord(KclMessageDrivenChannelAdapter.java:435) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processRecords(KclMessageDrivenChannelAdapter.java:418) ~[spring-integration-aws-2.5.1.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) ~[amazon-kinesis-client-1.14.8.jar:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
For me (using Spring boot 3.0.2) the error was, ironically, caused by how the message was sent to the RMQ. On the producer side, I was sending the messages through standard
RabbitTemplateand not Spring'sStreamBridge.While I am not entirely familiar with Spring Cloud Steam's message decoding feature,
RabbitTemplatein standard configuration is sending a base64 encoded Java object including itspackage name.This also persists if
RabbitTemplateis set to useJSON, i.e.Sending the message from the abovementioned code adds additional header to the rabbit message:
If the Spring Cloud Stream receiver receives the message with
__TypeId__header set it will then fail to cast the message into the correct object, seemingly even if the header is set to the exact type that it is supposed to deserialize into. At the same time, if the same payload is (manually) sent without this header, all other parameters unchanged, the message will be consumed successfully.Thus, my fix was to send the message through the Spring Cloud Stream instead: