Kstream-Ktable join with CloudEvent value not working

54 Views Asked by At

I want to take join between a kafka stream and a ktable. The poc works fine with stream data. However, when I use CloudEvent, I keep running into some or other issue related to serialization.

Here is my code sample -

Map<String, Object> ceSerializerConfigs = new HashMap<>();
ceSerializerConfigs.put(ENCODING_CONFIG, Encoding.STRUCTURED);
ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);

CloudEventSerializer serializer = new CloudEventSerializer();
serializer.configure(ceSerializerConfigs, false);

CloudEventDeserializer deserializer = new CloudEventDeserializer();
deserializer.configure(ceSerializerConfigs, false);
Serde<CloudEvent> cloudEventSerde = Serdes.serdeFrom(serializer, deserializer);
KStream<String, CloudEvent> kStream = builder.stream("stream-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KTable<String, CloudEvent> kTable = builder.table("ktable-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KStream<String, CloudEvent> joined = kStream
    .join(kTable, (left, right) -> CloudEventBuilder.v1().withId(left.getId().concat(right.getId())).build());
joined.to(output, Produced.with(Serdes.String(), eventsSerde));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamProps);
kafkaStreams.start();

I also tried using WrapperSerde - Issue with configuring Serdes for Kafka Streams

However I keep running into exception -

18:12:08.691 [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000002, topic=cloudevent-ktable, partition=0, offset=80, stacktrace=java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)

Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:122) ~[kafka-streams-2.8.0.jar:?]

Has anyone used CloudEvent successfully with ktable?

1

There are 1 best solutions below

0
Artem Bilan On

Your stack trace says this:

Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at 
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at 
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at 

Which is essentially this code:

    final byte[] rawValue = valueSerializer.serialize(topic, data);

So, the CloudEventSerializer you use is not adapted for Kafka Streams:

public byte[] serialize(String topic, CloudEvent data) {
    throw new UnsupportedOperationException("CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)");
}

This is because Kafka Streams does not support headers on (de)serialization.

I suggest you to extend that CloudEventSerializer and override its serialize(String topic, CloudEvent data) to delegate to the serialize(String topic, Headers headers, CloudEvent data) with empty new RecordHeaders().