Kafka Streams cast to string issues with KTable when grouping and aggregating

2.2k Views Asked by At

I have a Kafka stream with incoming messages that looks like sensor_code: x, time: 1526978768, address: Y I want to create a KTable that stores each unique address at each sensor code.

KTable

KTable<String, Long> numCount = streams
            .map(kvm1)
            .groupByKey(Serialized.with(stringSerde, stringSerde))
            .count()
            .groupBy(kvm2, Serialized.with(stringSerde, longSerde))
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("StateStore"));

Where kvm1 and kvm2 are my own KeyValueMappers. My idea was to replace the existing key with sensor_code=x, address=y, perform a groupByKey() and count(). Then another groupBy(kvm2, Serialized.with(stringSerde, longSerde)) where kvm2 modifies the existing key to contain the sensor_code and then the value would be its count. But since it is not working, maybe I am doing it wrong... It tries to cast it as a Long and throws an exception, because it is looking for a String. I want the count as Long, right?

Here is the first KeyValueMapper I use with its respective help function:

    private static String getKeySensorIdAddress(String o) {
    String x = "sensor_id=\"x\", address=\"y\""; 
    try {
        WifiStringEvent event = mapper.readValue(o, WifiStringEvent.class);
        x = x.replace("x", event.getSensor_code());
        x = x.replace("y", event.getAddress());
        return x;
    } catch(Exception ex) {
        System.out.println("Error... " + ex);
        return "Error";
    }
}
        //KeyValueMapper1
KeyValueMapper<String, String, KeyValue<String, String>> kvm1 = 
    new KeyValueMapper<String, String, KeyValue<String, String>>() {
         public KeyValue<String, String> apply(String key, String value) {
             return new KeyValue<>(getKeySensorIdAddress(value), value);
         }
    };

Here is the second KeyValueMapper and its help function.

    private static String getKeySensorId(String o) {
    int a = o.indexOf(",");
    return o.substring(0,a);
}

        //KeyValueMapper2 
    KeyValueMapper<String, Long, KeyValue<String, Long>> kvm2 = 
    new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
         public KeyValue<String, Long> apply(String key, Long value) {
             return new KeyValue<>(getKeySensorId(key), value);
         }
    };

Here is the exception and error that are returned when I try to run the code.

[2018-05-29 15:28:40,119] ERROR stream-thread [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1] Failed to process stream task 2_0 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105) java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95) at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)

Note the java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String error.

Any ideas why I get this error and how I can fix it or advice how I can edit the code to reach my desired output as I have mentioned?

Many thanks in advance!

EDIT: Made major overhaul of my question since I have abandoned one of the approaches.

1

There are 1 best solutions below

4
On

In the first case, if you want to use a HashMap as the value type, you need to define a custom serde for it and pass it using Materialized.withValueSerde.

In the second case I can't say without seeing the return type from your KeyValueMappers and the exact error message: is it trying to cast String to a Long or vice-versa?

EDIT: Thanks for sharing extra info.

I think what you need in the second case is to also specify the value serde in the second count operation. There seems to have been an inconsistency between count() on a KGroupedStream and a KGroupedTable in that the former automatically sets the value serde to LongSerde:

https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L281-L283

but the KGroupedTable doesn't:

https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java#L253

It seems to have been fixed on trunk already but not released yet:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java#L158-L160