Hadoop - Multiple Files from Record Reader to Map Function

761 Views Asked by At

I have implemented a custom Combine File Input Format in order to create splits for Map task composed by group of files. I created a solution which passes each file of the split through record reader and everything's fine. Now I am trying to pass to the map function the whole set of files.

This is my record reader code:

public class MultiImagesRecordReader extends
        RecordReader<Text[], BytesWritable[]> {
private long start = 0;
private long end = 0;
private int pos = 0;
private BytesWritable[] value;
private Text key[];
private CombineFileSplit split;
private Configuration conf;
private FileSystem fs;
private static boolean recordsRead;

public MultiImagesRecordReader(CombineFileSplit split,
        TaskAttemptContext context, Integer index) throws IOException {
    this.split = split;
    this.conf = context.getConfiguration();
}

@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
        throws IOException, InterruptedException {
    start = split.getOffset(0);
    end = start + split.getLength();
    recordsRead = false;
    this.pos = (int) start;
    fs = FileSystem.get(conf);
    value = new BytesWritable[split.getNumPaths()];
    key = new Text[split.getNumPaths()];
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (recordsRead == true) {
        System.out.println("Sono nel next true"+InetAddress.getLocalHost());
        return false;
    } else {
        recordsRead = true;
        System.out.println("Sono nel next false"+InetAddress.getLocalHost());
        for (int i = 0; i < split.getNumPaths(); i++) {

            int fileLength = (int) split.getLength(i);
            Path path = split.getPath(i);
            byte[] result = new byte[fileLength];

            FSDataInputStream in = null;

            String file_path = path.toString();
            key[i] = new Text(file_path);
            try {
                in = fs.open(path);
                IOUtils.readFully(in, result, 0, fileLength);

            } finally {
                IOUtils.closeStream(in);
            }

            value[i] = new BytesWritable(result);
        }
        return true;
    }
}

With this code it happens that the map function receives correctly the vector of keys and values but repeatedly. I mean, I expected that the map function was called once, instead it is called multiple times. What am I doing wrong?

1

There are 1 best solutions below

0
On

I think you know the map() of Mapper will be called for each record that your Reader return from currentKey() , currentValue() until all key value pairs in the given Split have finished. I understand that your map function is called repeatedly for the same key value pair (which is supposed to be called once for single key value pair). That means your Record reader reads the same record(key value pair) repeatedly. I also implemented custom combine fileinput formats and record reader.You can see their generic forms here and implementations here within the same project