Sort MapReduce WordCount output by value

3.5k Views Asked by At

I am currently writing a Hadoop program that outputs the top 100 most tweeted hastags given a data set of tweets. I was able to output all the hashtags with the WordCount program. So the output looks like this, ignore the quotation marks:

"#USA 2" 

"#Holy 5"

"#SOS 3"

"#Love 66"

However, I ran into trouble when I attempt to sort them by their word frequencies (the value) with the code from here.

I noticed that the key are integers instead of strings for the program input provided in the link above. I try changing a few parameters in the code to fit my usage but it didn't work out so well, as I don't understand them so well. Please help me!

1

There are 1 best solutions below

3
On

You need a second mapReduce job, Where the input is the output of your first job.

I have tweaked the code to make it work as per your wish.

For Input

#USA 2

#Holy 5

#SOS 3

#Love 66 

The output should be

66 #Love 

5 #Holy 

3 #SOS 

2 #USA

I have assumed that tab is delimited between hashtag and count. If it is something else, please change that. The code is not tested, please let me know if it works.

package com.my.cert.example;

import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class ValueSortExp {
 public static void main(String[] args) throws Exception {

  Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
  Path outputDir = new Path("C:\\hadoop\\test\\test1");

  // Path inputPath = new Path(args[0]);
  // Path outputDir = new Path(args[1]);

  // Create configuration
  Configuration conf = new Configuration(true);

  // Create job
  Job job = new Job(conf, "Test HIVE commond");
  job.setJarByClass(ValueSortExp.class);

  // Setup MapReduce
  job.setMapperClass(ValueSortExp.MapTask.class);
  job.setReducerClass(ValueSortExp.ReduceTask.class);
  job.setNumReduceTasks(1);

  // Specify key / value
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(IntWritable.class);
  job.setOutputValueClass(Text.class);
  job.setSortComparatorClass(IntComparator.class);
  // Input
  FileInputFormat.addInputPath(job, inputPath);
  job.setInputFormatClass(TextInputFormat.class);

  // Output
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setOutputFormatClass(TextOutputFormat.class);

  /*
   * // Delete output if exists FileSystem hdfs = FileSystem.get(conf); if
   * (hdfs.exists(outputDir)) hdfs.delete(outputDir, true);
   * 
   * // Execute job int code = job.waitForCompletion(true) ? 0 : 1;
   * System.exit(code);
   */

  // Execute job
  int code = job.waitForCompletion(true) ? 0 : 1;
  System.exit(code);

 }

 public static class IntComparator extends WritableComparator {

     public IntComparator() {
         super(IntWritable.class);
     }

     @Override
     public int compare(byte[] b1, int s1, int l1,
             byte[] b2, int s2, int l2) {

         Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
         Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

         return v1.compareTo(v2) * (-1);
     }
 }

 public static class MapTask extends
   Mapper<LongWritable, Text, IntWritable, IntWritable> {
  public void map(LongWritable key, Text value, Context context)
    throws java.io.IOException, InterruptedException {
   String line = value.toString();
   String[] tokens = line.split("\t"); // This is the delimiter between Key and Value
   int valuePart = Integer.parseInt(tokens[1]);
   context.write(new IntWritable(valuePart), new Text(tokens[0]));
  }
 }

 public static class ReduceTask extends
   Reducer<IntWritable, Text, Text, IntWritable> {
  public void reduce(IntWritable key, Iterable<Text> list, Context context)
    throws java.io.IOException, InterruptedException {

   for (Text value : list) {

    context.write(value,key);

   }

  }
 }

}