Scala to Pyspark

358 Views Asked by At

I am trying to perform a join between a Dstream and a static RDD.

PySpark

  #Create static data
    ip_classification_rdd = sc.parallelize([('log_name','enrichment_success')])
    #Broadcast it to all nodes
    ip_classification_rdd_broadcast = sc.broadcast(ip_classification_rdd)
    #Join stream with static dataset on field log_name      
    joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_rdd[log_name]))

I get this exception: "It appears that you are attempting to broadcast an RDD or reference an RDD from an "

Scala

However, someone has the same requirement here: How to join a DStream with a non-stream file?

And this was the solution:

val vdpJoinedGeo = goodIPsFltrBI.flatMap{ip => geoDataBC.value.get(ip).map(data=> (ip,data)}

What is the equivalent for this in Pyspark?

1

There are 1 best solutions below

7
On

A couple of changes are required in your code:

  • You can not broadcast an RDD: instead do it on the underlying "data":
  • You then obtain the broadcast variable inside the closure using the value() method

Here is an approximation of what your updated code might look like:

 #Create static data
    data = [('log_name','enrichment_success')])
    #Broadcast it to all nodes
    ip_classification_broadcast = sc.broadcast(data)
    #Join stream with static dataset on field log_name      
    joinedStream = kafkaStream.transform(lambda rdd:  \
        rdd.join(ip_classification_broadcast.value().get[1]))