I am trying to perform a join between a Dstream and a static RDD.
PySpark
#Create static data
ip_classification_rdd = sc.parallelize([('log_name','enrichment_success')])
#Broadcast it to all nodes
ip_classification_rdd_broadcast = sc.broadcast(ip_classification_rdd)
#Join stream with static dataset on field log_name
joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_rdd[log_name]))
I get this exception: "It appears that you are attempting to broadcast an RDD or reference an RDD from an "
Scala
However, someone has the same requirement here: How to join a DStream with a non-stream file?
And this was the solution:
val vdpJoinedGeo = goodIPsFltrBI.flatMap{ip => geoDataBC.value.get(ip).map(data=> (ip,data)}
What is the equivalent for this in Pyspark?
A couple of changes are required in your code:
RDD
: instead do it on the underlying "data":value()
methodHere is an approximation of what your updated code might look like: