pyspark publish google-pubsublite

175 Views Asked by At

the code is from google doc. https://cloud.google.com/pubsub/lite/docs/write-messages-apache-spark

I m trying to publish to a pubsulite topic from pyspark. .

import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
project_number = xxx
location = "us-central1"
topic_id = "kosmin"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.google.cloud:pubsublite-spark-sql-streaming:0.4.1,com.google.cloud:google-cloud-pubsublite:1.6.1  pyspark-shell'

spark = SparkSession.builder.appName("write-app").getOrCreate()


sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
        .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
        .withColumnRenamed("timestamp", "event_timestamp")
        # Populate the attributes field. For example, an even value will
        # have {"key1", [b"even"]}.
        .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
        .drop("value")
)


query = (
    sdf.writeStream.format("pubsublite")
        .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
        # Required. Use a unique checkpoint location for each job.
        .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
        .outputMode("append")
       # .trigger(processingTime="1 second")
        .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

but i m getting

22/07/29 19:09:38 ERROR Utils: Aborting task
com.google.api.gax.rpc.ApiException: 
    at com.google.cloud.pubsublite.internal.CheckedApiException.<init>(CheckedApiException.java:51)
    at com.google.cloud.pubsublite.internal.CheckedApiException.<init>(CheckedApiException.java:55)
    at com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical(ExtractStatus.java:53)
    at com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions.newServiceClient(PslWriteDataSourceOptions.java:131)
    ......
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V
    at io.grpc.Metadata$Key.validateName(Metadata.java:754)
    at io.grpc.Metadata$Key.<init>(Metadata.java:762)
    at io.grpc.Metadata$Key.<init>(Metadata.java:671)
    at io.grpc.Metadata$AsciiKey.<init>(Metadata.java:971)
    at io.grpc.Metadata$AsciiKey.<init>(Metadata.java:966)
    at io.grpc.Metadata$Key.of(Metadata.java:708)
    at io.grpc.Metadata$Key.of(Metadata.java:704)
    at com.google.api.gax.grpc.GrpcHeaderInterceptor.<init>(GrpcHeaderInterceptor.java:60)
    ....

what am I missing ? the credentials are set, maybe some package at submit time ?

1

There are 1 best solutions below

1
On

worked with os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars=pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar pyspark-shell'