Aggregate Java objects in a list with Kafka Streams DSL windows

5.3k Views Asked by At

I have the most straightforward of use cases for Kafka Streams DSL: read in CSV sensordata, group by timestamp and output. Following code does not compile:

public static void main(String[] args) {

    StreamsConfig streamingConfig = new StreamsConfig(getProperties());

    Serde<String> stringSerde = Serdes.String();

    CSVDeserializer<SensorData> sensorDataDeserializer = new CSVDeserializer<>(SensorData.class);
    JsonSerializer<SensorData> sensorDataSerializer = new JsonSerializer<>();
    Serde sensorDataSerde = Serdes.serdeFrom(sensorDataSerializer, sensorDataDeserializer);
    JsonDeserializer<SensorData> sensorDataJsonDeserializer = new JsonDeserializer<>(SensorData.class);
    Serde sensorDataJSONSerde = Serdes.serdeFrom(sensorDataSerializer, sensorDataJsonDeserializer);

    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();
    WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
    WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
    Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

    JsonSerializer<SensorDataAccumulator> accSerializer = new JsonSerializer<>();
    JsonDeserializer accDeserializer = new JsonDeserializer<>(SensorDataAccumulator.class);
    Serde<SensorDataAccumulator> accSerde = Serdes.serdeFrom(accSerializer, accDeserializer);


    KStreamBuilder kStreamBuilder = new KStreamBuilder();
    KStream<String,SensorData> initialStream =  kStreamBuilder.stream(stringSerde,sensorDataSerde,"e40_orig");

    final KStream<String, SensorData> sensorDataKStream = initialStream
            .filter((k, v) -> (v != null))
            .map((k, v) -> new KeyValue<>(v.getMeasurementDateTime().toString(), v));

    sensorDataKStream
            .filter((k, v) -> (v != null))
            .groupBy((k,v) -> k, stringSerde, sensorDataJSONSerde)
            .aggregate(SensorDataAccumulator::new,
 ==> error          (k, v, list) -> list.add(v), //CHANGED THIS -->((SensorDataAccumulator)list).add((SensorData)v),
                    TimeWindows.of(10000),
                    accSerde, "acc")
            .to(windowedSerde, accSerde, "out");

    KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamingConfig);
    kafkaStreams.start();
}

due to

Error:(90, 45) java: cannot find symbol symbol: method add(java.lang.Object) location: variable list of type java.lang.Object

Weird.

public class SensorDataAccumulator {

    ArrayList list = new ArrayList<SensorData>();

    public SensorDataAccumulator add(SensorData s) {
        list.add(s);
        return this;
    } 

Casting as commented leads to following runtime exception (right before outputting the windowed accumulation).

[2017-01-02 13:00:45,614] INFO task [1_0] Initializing processor nodes of the topology (org.apache.kafka.streams.processor.internals.StreamTask:123)
[2017-01-02 13:01:04,173] WARN Error while fetching metadata with correlation id 779 : {out=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:600)
[2017-01-02 13:01:04,662] INFO stream-thread [StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:268)
[2017-01-02 13:01:04,663] INFO stream-thread [StreamThread-1] Committing consumer offsets of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:358)
[2017-01-02 13:01:04,666] INFO stream-thread [StreamThread-1] Committing consumer offsets of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:358)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Closing a task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:751)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Closing a task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:751)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Flushing state stores of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:368)
[2017-01-02 13:01:04,669] INFO stream-thread [StreamThread-1] Flushing state stores of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:368)
Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: org.rocksdb.RocksIterator.close()V
    at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.close(RocksDBStore.java:468)
    at org.apache.kafka.streams.state.internals.RocksDBStore.closeOpenIterators(RocksDBStore.java:411)
    at org.apache.kafka.streams.state.internals.RocksDBStore.close(RocksDBStore.java:397)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.close(RocksDBWindowStore.java:276)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.close(MeteredWindowStore.java:109)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:125)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:349)
    at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:120)
    at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:348)
    at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
    at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:344)
    at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:305)
    at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252)
[2017-01-02 13:01:05,316] INFO stream-thread [StreamThread-1] Closing the state manager of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:347)
[2017-01-02 13:01:05,316] INFO stream-thread [StreamThread-1] Closing the state manager of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:347) 

Debugging the add method of SensorDataAccumulator should give a clue:

enter image description here

So, if I understand correctly, I'm keeping a ArrayList list = new ArrayList<SensorData>(); but actually, somewhere in the process its members are changed to LinkedTreeMap. The typechecker lost me here...

Ok, the LinkedTreeMap is the underlying datastructure GSON uses for my JsonDeserializer and JsonSerializer classes. So I'll add these for completeness below.

Currently I'm not sure what I am doing wrong and where to fix it. Should I use different serializers, different data structures? Different language ;) ?

Any input is welcome.

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, T t) {
        return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
} 

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    public JsonDeserializer() {
    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
         if(bytes == null){
             return null;
         }

        return gson.fromJson(new String(bytes),deserializedClass);

    }

    @Override
    public void close() {

    }
}
0

There are 0 best solutions below