How to use PubSubLite Spark connector in custom pyspark container for Dataproc serverless

98 Views Asked by At

Edit

Using image version 2.0 seems to work as expected (no error). So this bug is a regression from --version 2.0 to --version 2.1


Initial message

TL;DR

I submit a Dataproc Serverless PubSubLite related job with the following command line:

gcloud dataproc batches submit pyspark \
  ./main.py \
  --region europe-west1 \
  --deps-bucket gs://... \
  --jars gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-1.0.0-with-dependencies.jar \
  --version 2.1

I get the following error:

23/10/04 14:02:23 ERROR MicroBatchExecution: Query [id = e1d920ae-c3fd-47ab-8139-474080e76a16, runId = 40153dad-ce4f-4d2a-ae7f-b860ab145ab2] terminated with error
java.lang.IncompatibleClassChangeError: Class com.google.longrunning.GetOperationRequest does not implement the requested interface repackaged.com.google.protobuf.MessageLite
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.<init>(ProtoLiteUtils.java:128)
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils.marshaller(ProtoLiteUtils.java:84)
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.ProtoUtils.marshaller(ProtoUtils.java:57)
        at com.google.longrunning.stub.GrpcOperationsStub.<clinit>(GrpcOperationsStub.java:68)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:130)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:116)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.create(GrpcCursorServiceStub.java:96)
        at com.google.cloud.pubsublite.v1.stub.CursorServiceStubSettings.createStub(CursorServiceStubSettings.java:201)
        at com.google.cloud.pubsublite.v1.CursorServiceClient.<init>(CursorServiceClient.java:160)
        at com.google.cloud.pubsublite.v1.CursorServiceClient.create(CursorServiceClient.java:142)
        at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorServiceClient(PslReadDataSourceOptions.java:155)
        at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorClient(PslReadDataSourceOptions.java:169)
        at com.google.cloud.pubsublite.spark.PslMicroBatchStream.<init>(PslMicroBatchStream.java:59)
        at com.google.cloud.pubsublite.spark.PslScanBuilder.toMicroBatchStream(PslScanBuilder.java:110)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:107)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:454)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:100)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:84)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:456)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:84)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:298)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Exception in thread "stream execution thread for [id = e1d920ae-c3fd-47ab-8139-474080e76a16, runId = 40153dad-ce4f-4d2a-ae7f-b860ab145ab2]" java.lang.IncompatibleClassChangeError: Class com.google.longrunning.GetOperationRequest does not implement the requested interface repackaged.com.google.protobuf.MessageLite
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.<init>(ProtoLiteUtils.java:128)
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils.marshaller(ProtoLiteUtils.java:84)
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.ProtoUtils.marshaller(ProtoUtils.java:57)
        at com.google.longrunning.stub.GrpcOperationsStub.<clinit>(GrpcOperationsStub.java:68)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:130)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:116)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.create(GrpcCursorServiceStub.java:96)
        at com.google.cloud.pubsublite.v1.stub.CursorServiceStubSettings.createStub(CursorServiceStubSettings.java:201)
        at com.google.cloud.pubsublite.v1.CursorServiceClient.<init>(CursorServiceClient.java:160)
        at com.google.cloud.pubsublite.v1.CursorServiceClient.create(CursorServiceClient.java:142)
        at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorServiceClient(PslReadDataSourceOptions.java:155)
        at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorClient(PslReadDataSourceOptions.java:169)
        at com.google.cloud.pubsublite.spark.PslMicroBatchStream.<init>(PslMicroBatchStream.java:59)
        at com.google.cloud.pubsublite.spark.PslScanBuilder.toMicroBatchStream(PslScanBuilder.java:110)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:107)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:454)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:100)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:84)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:456)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:84)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:298)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)

How to fix it ?


From what I understand, this seems to be a java packaging related error.

  • the com.google.longrunning.stub.GrpcOperationsStub constructor seems to expect an object implementing repackaged.com.google.protobuf.MessageLite.
  • this looks like a shaded package, wrapped below the repackaged package
  • but the com.google.longrunning.GetOperationRequest seems to not implement the right interface (I suspect it to implement com.google.protobuf.MessageLite (the un-shaded class)).

However, all these stuffs seems to come from the provided gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-1.0.0-with-dependencies.jar that I don't manage.

Here is my Spark code:

from pyspark import sql as psql


spark = (
        psql.SparkSession.Builder()
        .getOrCreate()
    )

input_ = (
    spark.readStream.format("pubsublite")
        .option(
            "pubsublite.subscription",
            "...",
        )
        .load()
)

query = input_.writeStream.trigger(processingTime="2 seconds").format("console").start()

query.awaitTermination(10)

Kind

0

There are 0 best solutions below