Mapreduce Custom TextOutputFormat - Strange characters NUL, SOH, etc

260 Views Asked by At

I have implemented a custom output format for converting key value pairs to a Json format.

public class JSONOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();
    Path path = getOutputPath(context);
    FileSystem fs = path.getFileSystem(conf);
    FSDataOutputStream out = fs.create(new Path(path,context.getJobName()));
    return new JsonRecordWriter(out);
}

}

private static class JsonRecordWriter extends LineRecordWriter<Text,IntWritable>{
    boolean firstRecord = true;
    @Override
    public synchronized void close(TaskAttemptContext context) throws IOException {
        out.writeBytes("}");
        super.close(context);
    }

    @Override
    public synchronized void write(Text key, IntWritable value)
            throws IOException {
        if (!firstRecord){

            out.writeBytes(",\r\n");
            firstRecord = false;
        }
        out.writeUTF(key.toString() + ":" +value.toString());
    }

    public JsonRecordWriter(DataOutputStream out) throws IOException{
        super(out);
        out.writeBytes("{");
    }
}

However, the output from the Mapreduce job has some undesirable chars such as: {NUL Chair:12 NUL BS Book:1}

My driver class is as follows:

public class Driver {

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

    IntWritable one = new IntWritable(1);
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");
        for(String word: words)
            context.write(new Text(word), one);
    }
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        Iterator it = values.iterator();

        int count = 0;
        while (it.hasNext()){
            IntWritable c = (IntWritable) it.next();
            count+=c.get();
        }
        context.write(key, new IntWritable(count));
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration, "wordcountjson");
    job.setJarByClass(Driver.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);
    job.setOutputFormatClass(JSONOutputFormat.class);

    job.setNumReduceTasks(1);

    System.exit(job.waitForCompletion(true)?0:1);



}

}

Any ideas why those chars are appearing in the output?

0

There are 0 best solutions below