I'm trying to run an Apache Beam pipeline on Flink on our test cluster. It has been failing with an EOFException
at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView:79
during the encoding of an object through serialisation. I haven't been able to reproduce the error locally, yet. You can find the entire job log here. Some values have been replaced with fake data.
The command used to run the pipeline:
bin/flink run \
-m yarn-cluster \
--yarncontainer 1 \
--yarnslots 4 \
--yarnjobManagerMemory 2000 \
--yarntaskManagerMemory 2000 \
--yarnname "EBI" \
pipeline.jar \
--runner=FlinkRunner \
--zookeeperQuorum=hdp-master-001.fake.org:2181
While I think it's not related, the object-to-be-serialised is serialisable and has had both an implicit and an explicit coder, but this doesn't affect the situation.
What might be causing this situation and what can I do to address it?
For now, increasing the heap memory of the managers to somewhere between 4 and 8GiB seems to prevent the exception. Still unsure if this is supposed to be normal Flink behaviour (shouldn't it spill to disk?). Doesn't seem like a solution that would scale.
The
EOFException
is thrown because Flink ran out of memory buffers. Flink expects anEOFException
as a notification to start to write data to disk.This problem is caused by Beam's
SerializableCoder
wraps theEOFException
in aCoderException
. Hence, Flink does not catch the expectedEOFException
and fails.The problem can be solved by using a custom coder that does not wrap the
EOFException
but forwards it.