mapreduce.TextInputFormat hadoop

3.3k Views Asked by At

I am a hadoop beginner. I came across this custom RecordReader program which reads 3 lines at a time and outputs the number of times a 3 line input was given to the mapper.

I am able to understand why a RecordReader is used, but I'm not able to see how each InputSplit can contain 3 lines when the input format class is essentially extending the mapreduce.TextInputFormat class. And as per my understanding the TextInputFormat class emits 1 InputSplit for each line(for each \n).

So how could the RecordReader read 3 lines from each InputSplit? Please someone explain how this is possible. Thanks in advance!

1

There are 1 best solutions below

2
On

You need to understand the implementation of TextInputFormat to discover the answer.

Let's dive into the code. I will speak about the new mapreduce API but the "old" mapred API is quite similar.

As you said, from a user point of view a TextInputFormat splits a split into records according to some new line characters. Let's check the implementation.

You can see that the class is almost empty. The key function is createRecord which is defined by InputFormat

@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, 
        TaskAttemptContext context
) {
   return new LineRecordReader();
}

The general contract is that an InputFormat is used to get a RecordReader. If you look inside Mapper and MapContextImpl you will see that the mapper only uses the RecordReader to get the next key and value. He does not know anything else.

Mapper:

public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);

}

MapContextImpl:

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  return reader.nextKeyValue();
}

Now re-read carefully this link you provided. You will see that:

  • NLinesInputFormat extends TextInputFormat and only override createRecordReader. Basically rather that using a LineReader you provide your own RecordReader. You want to extends TextInputFormat rather than another class higher in the hierarchy because it already takes care of everything that in done at this level and you could need (compression, non-splitable format etc.)
  • NLinesRecordReader does the real job. In initialize it does what is required to get an InputStream seeked at the right offset from the provided InputSplit. It also create a LineReader, the same one that is used by TextInputFormat
  • In the nextKeyValue method you will see that LineReader.readLine() is invoked three times to get three lines (plus some logic to properly handle corner cases like a too large record, end of line, end of split)

Hope that it helps you. The key is to understand the overall design of the API and how each part interact with each other.