Save kafka stream dataframe to Redis in Databricks after data transformation

619 Views Asked by At

I am using pyspark to direct the kafka streams to redis after performing aggregations on the data. The final output is a streaming datafame.

The code I connect to kafka streams. (You might find my code is a layman job, Please ignore)

app_schema = StructType([
        StructField("applicationId",StringType(),True),
        StructField("applicationTimeStamp",StringType(),True)
    ])

# group_id = "mygroup"
topic = "com.mobile-v1"
bootstrap_servers = "server-1:9093,server-2:9093,server-3:9093"

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="[email protected]" password="xxxxx";',\
    "kafka.ssl.ca.location": "/tmp/cert.crt",\
    "kafka.sasl.mechanism": "PLAIN",\
    "kafka.security.protocol" : "SASL_SSL",\
    "kafka.bootstrap.servers": bootstrap_servers,\
    "failOnDataLoss": "false",\
    "subscribe": topic,\
    "startingOffsets": "latest",\
    "enable.auto.commit": "false",\
    "auto.offset.reset": "false",\
    "enable.partition.eof": "true",\
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_mobile_apps_df = spark.readStream.format("kafka").options(**options).options().load()

kafka_mobile_apps_df = kafka_mobile_apps_df\
    .select(from_json(col("value").cast("string"), app_schema).alias("mob_apps"))

As subscribed to broker this gives me Streaming data frame. After this I have aggregated the data to count_df as shown

count_df = kafka_mobile_apps_df.withColumn("diff_days", ((col("TimeStamp_")) - (col("TimeStamp")))/(60.0*60.0*24))\
                            .withColumn("within_7d_ind", when(col("diff_days") < 7.0, 1).otherwise(0))\
                            .groupBy("_applicationId")
                            .agg(sum(col("within_7d_ind")).alias(feature+"_7day_velocity"))

Now I am trying to write this count_df stream to redis. After my resreach I found I can use "spark-redis_2.11" for spark-redis connectivity.

I dont know scala, I found a spark-redis github exmaple with scala. Could someone help what is the exact way to to write in pyspark to writeStrem this count_df to redis with authentication

please find spark-redis github here

I have installed the required jar "com.redislabs:spark-redis_2.12:2.5.0" on the cluster.

Thanks.

Just found out they dont support python yet, Please let me know is there any other way to write this?

1

There are 1 best solutions below

1
On

You should be do it pyspark, I have answered this question here https://stackoverflow.com/a/68218806/2986344

More helpful links: https://github.com/RedisLabs/spark-redis/issues/307