SO I'm adding Pub/Sub connector in SparkSession to use spark streaming. But every time it gives error that Pub/Sub data source not found. How to fix it !
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession \
.builder \
.appName("PubSubSpark") \
.config("spark.jars.packages", "org.apache.bahir:spark-streaming-pubsub_2.11:2.4.0") \
.getOrCreate()
project_number = "********"
topic = "*****"
sub = "*****"
# Read from Pub/Sub into Spark DataFrame
df = spark.read \
.format("pubsub") \
.option(f"topic", "projects/{project_number}/topics/{topic}") \
.option(f"subscription", "projects/{project_number}/subscriptions/{sub}") \
.load()
# Show DataFrame
df.show()
# Write DataFrame to Pub/Sub
df.write \
.format("pubsub") \
.option("topic", "projects/{project_number}/topics/{topic}") \
.save()
# Stop SparkSession
spark.stop()
Upon running this code i get this error.
Py4JJavaError Traceback (most recent call last)
<ipython-input-7-e4bd27732667> in <cell line: 15>()
17 .option(f"topic", "projects/{project_number}/topics/{topic}") \
18 .option(f"subscription", "projects/{project_number}/subscriptions/{sub}") \
---> 19 .load()
20
21 # Show DataFrame
3 frames
/usr/local/lib/python3.10/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o70.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: pubsub. Please find packages at `https://spark.apache.org/third-party-projects.html`.