Scio / apache beam java.lang.IllegalArgumentException: unable to serialize method

2k Views Asked by At

I am trying to use dataflow to move some data from pub sub to cloud storage. I need to provide a timestamp to scio / beam so it can group the data into windows.

I have a simple case class that models my event, It looks like this (some fields removed)

case class DataEvent(source: String,                      
                     record: AnyRef,
                     timestampUtc: DateTime,
                     publishedUtc: DateTime)

My pipeline begins with this. The events in pub sub as in json format and I'm using json4s to deserialize:

sc
                 .pubsubSubscription[String]("subscription")
                 .map(event => parse(event).camelizeKeys.extract[DataEvent])
                 .timestampBy({ event => event.timestampUtc.toInstant })

In the same scope i have an implcit format for Json4s defined

 implicit val formats: Formats = new DefaultFormats {
      override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    } ++ org.json4s.ext.JodaTimeSerializers.all

Im using the json4s.ext which has jodatime support, note the datetime in the case class is joda time. There seems to be some issue with this extension library because i get the following exception:

java.lang.IllegalArgumentException: unable to serialize anonymous function map@{PubSubToGcsJob.scala:78}
java.lang.IllegalArgumentException: unable to serialize anonymous function map@{PubSubToGcsJob.scala:78}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
    at com.spotify.scio.values.PCollectionWrapper$class.parDo(PCollectionWrapper.scala:58)
    at com.spotify.scio.values.SCollectionImpl.parDo(SCollection.scala:1181)
    at com.spotify.scio.values.SCollection$class.map(SCollection.scala:359)
    at com.spotify.scio.values.SCollectionImpl.map(SCollection.scala:1181)
    at PubSubToGcsJob$.main(PubSubToGcsJob.scala:78)
    at PubSubToGcsJob.main(PubSubToGcsJob.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.io.NotSerializableException: org.json4s.ext.IntervalSerializer$$anon$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
    at com.spotify.scio.values.PCollectionWrapper$class.parDo(PCollectionWrapper.scala:58)
    at com.spotify.scio.values.SCollectionImpl.parDo(SCollection.scala:1181)
    at com.spotify.scio.values.SCollection$class.map(SCollection.scala:359)
    at com.spotify.scio.values.SCollectionImpl.map(SCollection.scala:1181)
    at PubSubToGcsJob$.main(PubSubToGcsJob.scala:78)
    at PubSubToGcsJob.main(PubSubToGcsJob.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

I tried a work around where i made the type of the timestampUtc and publishedUtc strings and just parsing the string inside my pipeline like so:

val formatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")

and in the pipeline

                 .timestampBy({ event => formatter.parseDateTime(event.timestampUtc).toInstant })

But i get a similar exception:

java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn@7b7a1a8f
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn@7b7a1a8f
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
... etc

Why is this happening and how do i resolve this?

Thanks

1

There are 1 best solutions below

0
On

This happens when a lambda function pulls in something non-serializable from its closure. In this case I suspect val formatter.

One workaround is to move the val to a companion` object so it's initialized statically on workers and doesn't need to be go through ser/de. For example:

object Util { val formatter = ... }

For more details see: https://www.lyh.me/lambda-serialization.html