I have a pyspark Dataframe and now i want to iterate over each row and insert/update to mongoDB collection.
#Did every required imports
#dataframe
+---+----+
|age|name|
+---+----+
| 30| c|
| 5| e|
| 6| f|
+---+----+
db = mongodbclient['mydatabase']
collection = db['mycollection']
#created below function to insert/update
def customFunction(row):
key = {'name':row.name}
data = dict(zip(columns,[row.x for x in columns]))
collection.update(key, data, {upsert:true})
#return a_flag #commented it as of now, a_flag can be 0 or 1
If a name exist in the mongoDB collection 'mycollection' it should update that row/record else insert that new record.
i am getting following error when tried to map this function over spark-dataframe
result = my_dataframe.rdd.map(customFunction)
#.....TypeError: can't pickle _thread.lock objects....
#AttributeError: 'TypeError' object has no attribute 'message'
Can anybody please figure out 'what is wrong here in that function and/or anywhere else' or please suggest if any other alternative is there to this type of task.
Basically iterate each row(without a collect call is that even possible??)
And, on each row apply a function to run outside-spark work.
Please suggest, Thanks in Advance..:)
My data in mongoDB
name age
a 1
b 2
c 3 #new update should make age as 30 and 2 more new recs should inserted
It looks like connection object cannot be pickled. I'd use
foreachPartition
:but keep in mind that fatal failure might leave the database in inconsistent state.