Apache Spark Structured Streaming - not writing to checkpoint location

1.5k Views Asked by At

i have a simple Apache Spark Structured Streaming python code, which reads data from Kafka, and writes the messages to console.

i've setup checkpoint location, however the code is not writing to checkpoint.. any ideas why ?

Here is the code:

from pyspark.sql import SparkSession, Window


spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate()
# os.environ["SPARK_HOME"] = "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0'

# kafkaBrokers='localhost:9092'
kafkaBrokers='<host>:<port>'
topic = "my-topic"
# bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093
security_protocol="SSL"
ssl_truststore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/ca.p12"
ssl_truststore_password="<pwd_1>"
ssl_keystore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/user.p12"
ssl_keystore_password="<pwd_2>"
consumerGroupId = "my-group"

spark.sparkContext.setLogLevel("ERROR")

df = spark.read.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.truststore.location",ssl_truststore_location) \
    .option("kafka.ssl.truststore.password",ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location)\
    .option("kafka.ssl.keystore.password", ssl_keystore_password)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .load()

query = df.selectExpr("CAST(value AS STRING)") \
    .write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation", "~/PycharmProjects/Kafka/checkpoint/") \
    .option("outputMode", "complete")\
    .save("output")
2

There are 2 best solutions below

0
Warren Zhu On

According to offical doc, this checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query. But in your code, it's local path on driver, so after restart of streaming job, it will be lost.

For details, please refer https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

0
Mozart Sindeaux Neto On

I had the same problem here and I could solve it using the absolute path for the folder.

Maybe It'll work in your case as well.

Try replace the line: .option("checkpointLocation", "~/PycharmProjects/Kafka/checkpoint/")

By: .option("checkpointLocation", "/home/YOUR_USER/PycharmProjects/Kafka/checkpoint/")