AWS Glue scala spark job failing - org.apache.spark.util.collection.CompactBuffer[] not registered in Kryo

70 Views Asked by At

The below code segment is failing as per the SparkUI history server.

        segmentIdToTripIdsRDD.join(segmentIdToRSMSegmentRDD)
                .map(tuple => {
                    val tripIds: Iterable[String] = tuple._2._1._1
                    val segment: Segment = tuple._2._1._2
                    val rsmSegment: RSMJSONSegment = tuple._2._2
                    enrichTripSegmentFromRsmSegmentRef.apply(segment, rsmSegment)
                    totalSegmentsEnrichedCounter.add(1)
                    (tripIds, segment)
                })
                .flatMap {
                    case (tripIds, segment) => tripIds.map(tripId => (tripId, segment))
                }
                .aggregateByKey(
                    Iterable.empty[Segment])(
                    (segments, segment) => segments ++ Iterable[Segment](segment),
                    (segmentsOne, segmentsTwo) => segmentsOne.toSet ++ segmentsTwo.toSet)

Failure stack trace:

org.apache.spark.util.collection.CompactBuffer[]
Note: To register this class use: kryo.register(org.apache.spark.util.collection.CompactBuffer[].class);
    at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:503)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
    at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:540)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:645)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:260)
    at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:263)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.org$apache$spark$util$collection$ExternalAppendOnlyMap$$spillMemoryIteratorToDisk(ExternalAppendOnlyMap.scala:235)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap$SpillableIterator.spill(ExternalAppendOnlyMap.scala:587)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.forceSpill(ExternalAppendOnlyMap.scala:197)
    at org.apache.spark.util.collection.Spillable.spill(Spillable.scala:112)
    at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:177)
    at org.apache.spark.memory.MemoryConsumer.acquireMemory(MemoryConsumer.java:136)
    at org.apache.spark.util.collection.Spillable.maybeSpill(Spillable.scala:87)

I am using Glue version 3.0 that uses Spark version 3.1.1-amzn-0 and scala version 2.12.

Per the Spark inbuilt KryoSerializer the class CompactBuffer is already registered with Kryo on initialization. On my end I registered the classes Tuple1, Tuple2, and Tuple3 as well event though these were already registered.

ChatGPT suggested that there may be a version mismatch between the CompactBuffer class that is using during compilation and the one used during runtime. But this suggestion is hard to follow since Glue uses its own spark-core module, and the one that is available to me can have mismatch.

It may be possible that the exception message may be misleading because the exception is always occurring in the block that I shared above. I am pretty sure that spills during joins have happened before. What is it that I am not able to catch?

0

There are 0 best solutions below