sorting RDD elements

611 Views Asked by At

For a research project, I tried sorting the elements in an RDD. I did this in two different approaches.

In the first method, I applied a mapPartitions() function on the RDD, so that it would sort the contents of the RDD, and provide a result RDD that contains the sorted list as the only record in the RDD. Then, I applied a reduce function which basically merges sorted lists.

I ran these experiments on an EC2 cluster containing 30 nodes. I set it up using the spark ec2 script. The data file was stored in HDFS.

In the second approach I used the sortBy method in Spark.

I performed these operation on the US census data(100MB) found here

A single lines looks like this

9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000.

I sorted based on the 25th value in the CSV. In this line that is 1758.14.

I noticed that sortBy performs worse than the other method. Is this the expected scenario? If it is, why wouldn't the mapPartitions() and reduce() be the default sorting approach?

Here is my implementation

public static void sortBy(JavaSparkContext sc){
        JavaRDD<String> rdd = sc.textFile("/data.txt",32);
        long start = System.currentTimeMillis();
        rdd.sortBy(new Function<String, Double>(){

            @Override
                public Double call(String v1) throws Exception {
                      // TODO Auto-generated method stub
                  String [] arr = v1.split(",");
                  return Double.parseDouble(arr[24]);   
                }
        }, true, 9).collect();
        long end = System.currentTimeMillis();
        System.out.println("SortBy: " + (end - start));
  }

public static void sortList(JavaSparkContext sc){
        JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8);
        long start = System.currentTimeMillis();
        JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){

        @Override
        public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t)
            throws Exception {
          // TODO Auto-generated method stub
          LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>();
          while(t.hasNext()){       
            String s = t.next();
            String arr1[] = s.split(",");
            Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s);
            lines.add(t1);
          }
          Collections.sort(lines, new IncomeComparator());
          LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>();
          list.add(lines);
          return list;
        }

        });
        rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){

        @Override
        public LinkedList<Tuple2<Double, String>> call(
                LinkedList<Tuple2<Double, String>> a,
                LinkedList<Tuple2<Double, String>> b) throws Exception {
          // TODO Auto-generated method stub
          LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>();
          while (a.size() > 0 && b.size() > 0) {

            if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0)
              result.add(a.poll());
            else
              result.add(b.poll());
          }

          while (a.size() > 0)
            result.add(a.poll());

          while (b.size() > 0)
            result.add(b.poll());

          return result;

        }

        });     
        long end = System.currentTimeMillis();
        System.out.println("MapPartitions: " + (end - start));
  }
1

There are 1 best solutions below

0
On

Collect() is a major bottleneck as it returns all the results to the driver.
It produces both IO hit & additional network traffic to a single source (in this case - the driver).
It also blocks other operations.

Instead of collect() in your first sortBy() code segment, try performing a parallel operation such as saveAsTextFile(tmp) than read back using sc.textFile(tmp).

The other sortBy() code segment utilizes both mapPartitions() and reduce() parallel APIs - so the entire work is done in parallel.
It would seem that this is the cause for the difference in end-to-end performance times.

Note that your findings don't necessarily mean that the sum of execution times over all machines is worse.