Counting sort in pyspark

263 Views Asked by At

I have an RDD consisting of over 140 million key-value pair. With my analysis, I found that there are only around 500 unique keys. I have to sort this RDD.

I tried to use groupByKey() to group all the key-value pairs based on the key and then use sortBy() to sort all the keys. After sorting, I use map() to output the sorted key-value pairs.
The code looks like this:

def func(list_x):
 output = []
 for element in list_x[1]:
  output.append((list_x[0], element))
 return output 


textFile = context.textFile(input_features)\
.map(lambda line: score_data(line)) \
.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y)\
.sortBy(lambda line: line[0]) \
.map(lambda x : func((x[0], list(x[1]))))\
.saveAsTextFile(output_features)

With this approach, I am getting this error in spark

Container killed by YARN for exceeding memory limits. 6.3 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

This error is because there are some keys holding 10 million values after the groupByKey().

I replaced the groupbykey with combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y). But now the job is still running for more than 1 hour. Normal sorting takes only ~30 minutes.

Since the number of keys is very very small vis-a-vis the number of key-value pairs, so we can implement something similar to count sort.

I applied repartition(). This has reduced the time taken to complete spark job But i feel still it can be further reduced by implementing count sort as the operaion of would of linear complexity.

Is there a way to implement it in pyspark?

0

There are 0 best solutions below