flink with rocksdb failed when doing aggregation

56 Views Asked by At

i hava a job that does aggregation based on a model, for example, it contains a Map<MetricDim, Long> field. but after the aggregation's starting, the job changed from 'RUNING' to 'FAILING'


${time} INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job $JobName ($jobId) switched from state RUNNING to FAILING.
java.lang.ClassCastException: com.aggregation.MetricDim cannot be cast to java.lang.String
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)
        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:84)
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:274)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:357)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.getValueBytes(AbstractRocksDBState.java:184)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:79)
        at org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:103)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:750)

same code can be ran in non rocksdb statebackend, such as FsStateBackend. but when in rocksdb, it always fails. my flink version is flink v1.7.1.

the job was not ran with savepoint.

i don't know how to resolve this.

could anyone help?

2

There are 2 best solutions below

1
fightchwang On BEST ANSWER

defining a new custom MapSerializer and configuring it to job solved it

0
kkrugler On

Without seeing your code this is more of a qualified guess, but I think it's likely you've given Flink some kind of type hint that says your Map uses String as the key, versus MetricDim. The reason it works with the in-memory state backend is that entries aren't serialized in that case (so no serializer error), but they are if you're using RocksDB.