Custom RecordReader initialize not called

741 Views Asked by At

I've recently started messing with Hadoop and just created my own inputformat to handle pdf's.

For some reason my custom RecordReader class doesn't have it's initialize method called. (checked it with a sysout, cause I haven't set up a debugging environment)

I'm running hadoop 2.2.0 on windows 7 32bit. Doing my calls with yarn jar, as hadoop jar is bugged under windows...

import ...

public class PDFInputFormat extends FileInputFormat<Text, Text>
{


        @Override
        public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
                JobConf arg1, Reporter arg2) throws IOException 
                {
                    return new PDFRecordReader();
                }

        public static class PDFRecordReader implements RecordReader<Text, Text>
        {

            private FSDataInputStream fileIn;
            public String fileName=null;
            HashSet<String> hset=new HashSet<String>();

            private Text key=null;
            private Text value=null;

            private byte[] output=null;
            private int position = 0;

            @Override
            public Text createValue() {
                int endpos = -1;
                for (int i = position; i < output.length; i++){
                    if (output[i] == (byte) '\n') {
                        endpos = i;
                    }
                }
                if (endpos == -1) {
                    return new Text(Arrays.copyOfRange(output,position,output.length));
                }
                return new Text(Arrays.copyOfRange(output,position,endpos));
            }

            @Override
            public void initialize(InputSplit genericSplit, TaskAttemptContext job) throws
            IOException, InterruptedException
            {
                System.out.println("initialization is called");
                FileSplit split=(FileSplit) genericSplit;
                Configuration conf=job.getConfiguration();

                Path file=split.getPath();
                FileSystem fs=file.getFileSystem(conf);
                fileIn= fs.open(split.getPath());

                fileName=split.getPath().getName().toString();

                System.out.println(fileIn.toString());

                PDDocument docum = PDDocument.load(fileIn);

                ByteArrayOutputStream boss = new ByteArrayOutputStream();
                OutputStreamWriter ow = new OutputStreamWriter(boss);

                PDFTextStripper stripper=new PDFTextStripper();
                stripper.writeText(docum, ow);
                ow.flush();

                output = boss.toByteArray();

            }
        }


}
2

There are 2 best solutions below

0
On BEST ANSWER

As I figured it out last night and I might help someone else with this:

RecordReader is a deprecated interface of Hadoop (hadoop.common.mapred) and it doesn't actually contain an initialize method, which explains why it doesn't get called automatically.

Extending the RecordReader class in hadoop.common.mapreduce does let you extend the initialize method of that class.

1
On

The System.out.println() may not help while running job. To make sure your initialize() is called or not try throw some RuntimeException there as below:

 @Override
            public void initialize(InputSplit genericSplit, TaskAttemptContext job) throws
            IOException, InterruptedException
            {
               throw new NullPointerException("inside initialize()");
               ....

This will definitely do.