isSplitable in combineFileInputFormat does not work

970 Views Asked by At

I have thousands of small files, and I want to process them with combineFileInputFormat.

In combineFileInputFormat, there are multiple small files for one mapper, each file will not be split.

the snippet of one of the small input files like this,

vers,3
period,2015-01-26-18-12-00,438469546,449329626,complete
config,libdvm.so,chromeview
pkgproc,com.futuredial.digitchat,10021,,0ns:10860078
pkgpss,com.futuredial.digitchat,10021,,0ns:9:6627:6627:6637:5912:5912:5912
pkgsvc-run,com.futuredial.digitchat,10021,.LiveScreenService,1,0n:10860078
pkgsvc-start,com.futuredial.digitchat,10021,.LiveScreenService,1,0n:10860078
pkgproc,com.google.android.youtube,10103,,0ns:10860078
pkgpss,com.google.android.youtube,10103,,0ns:9:12986:13000:13021:11552:11564:11580
pkgsvc-     run,com.google.android.youtube,10103,com.google.android.apps.youtube.app.offline.transfer.OfflineTransferService,1,0n:10860078
pkgsvc-    start,com.google.android.youtube,10103,com.google.android.apps.youtube.app.offline.transfer.OfflineTransferService,1,0n:10860078

I want to pass whole file content to the mapper. However, hadoop split the file to half.

For example, the above file may be split into

vers,3
period,2015-01-26-18-12-00,438469546,449329626,complete
config,libdvm.so,chromeview
pkgproc,com.futuredial.digitchat,#the line has been cut

But I want the content of whole file to be processed.

Here is my code, which reference Reading file as single record in hadoop

The driven code

public class CombineSmallfiles {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage: conbinesmallfiles <in> <out>");
        System.exit(2);
    }

    conf.setInt("mapred.min.split.size", 1);
    conf.setLong("mapred.max.split.size", 26214400); // 25m
    //conf.setLong("mapred.max.split.size", 134217728); // 128m

    //conf.setInt("mapred.reduce.tasks", 5);

    Job job = new Job(conf, "combine smallfiles");
    job.setJarByClass(CombineSmallfiles.class);
    job.setMapperClass(CombineSmallfileMapper.class);
    //job.setReducerClass(IdentityReducer.class);
    job.setNumReduceTasks(0);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    MultipleOutputs.addNamedOutput(job,"pkgproc",TextOutputFormat.class,Text.class,Text.class);
    MultipleOutputs.addNamedOutput(job,"pkgpss",TextOutputFormat.class,Text.class,Text.class);
    MultipleOutputs.addNamedOutput(job,"pkgsvc",TextOutputFormat.class,Text.class,Text.class);

    job.setInputFormatClass(CombineSmallfileInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    int exitFlag = job.waitForCompletion(true) ? 0 : 1;
    System.exit(exitFlag);

}

}

My Mapper code

public class CombineSmallfileMapper extends Mapper<NullWritable, Text, Text, Text> {

    private Text file = new Text();
    private MultipleOutputs mos;
    private String period;
    private Long elapsed;

    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        mos = new MultipleOutputs(context);
    }
    @Override
    protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {
        String file_name = context.getConfiguration().get("map.input.file.name");
        String [] filename_tokens = file_name.split("_");
        String uuid = filename_tokens[0];
        String [] datetime_tokens;
        try{
        datetime_tokens = filename_tokens[1].split("-");
        }catch(ArrayIndexOutOfBoundsException err){
            throw new ArrayIndexOutOfBoundsException(file_name);
        }
        String year,month,day,hour,minute,sec,msec;
        year = datetime_tokens[0];
        month = datetime_tokens[1];
        day = datetime_tokens[2];
        hour = datetime_tokens[3];
        minute = datetime_tokens[4];
        sec = datetime_tokens[5];
        msec = datetime_tokens[6];
        String datetime = year+"-"+month+"-"+"-"+day+" "+hour+":"+minute+":"+sec+"."+msec;
        String content = value.toString();
        String []lines = content.split("\n");
        for(int u = 0;u<lines.length;u++){
            String line = lines[u];
            String []tokens = line.split(",");
            if(tokens[0].equals("period")){
                period = tokens[1];
                try{
                long startTime = Long.valueOf(tokens[2]);
                long endTime = Long.valueOf(tokens[3]);
                elapsed = endTime-startTime;
                }catch(NumberFormatException err){
                    throw new NumberFormatException(line);
                }
            }else if(tokens[0].equals("pkgproc")){
                String proc_info = "";
                try{
                proc_info += period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3];
                }catch(ArrayIndexOutOfBoundsException err){
                    throw new ArrayIndexOutOfBoundsException("pkgproc: "+content+ "line:"+line);
                }
                for(int i = 4;i<tokens.length;i++){
                    String []state_info = tokens[i].split(":");
                    String state = "";
                    state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[0].charAt(2)+","+state_info[1];
                    mos.write("pkgproc",new Text(tokens[1]), new Text(proc_info+state+','+uuid+','+datetime));
                }
            }else if(tokens[0].equals("pkgpss")){
                String proc_info = "";
                proc_info += period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3];
                for(int i = 4;i<tokens.length;i++){
                    String []state_info = tokens[i].split(":");
                    String state = "";
                    state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[0].charAt(2)+","+state_info[1]+","+state_info[2]+","+state_info[3]+","+state_info[4]+","+state_info[5]+","+state_info[6]+","+state_info[7];
                    mos.write("pkgpss",new Text(tokens[1]), new Text(proc_info+state+','+uuid+','+datetime));
                }
            }else if(tokens[0].startsWith("pkgsvc")){
                String []stateName = tokens[0].split("-");
                String proc_info = "";
                //tokens[2] = uid, tokens[3] =  serviceName
                proc_info += stateName[1]+','+period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3];
                String opcount = tokens[4];
                for(int i = 5;i<tokens.length;i++){
                    String []state_info = tokens[i].split(":");
                    String state = "";
                    state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[1];
                    mos.write("pkgsvc",new Text(tokens[1]), new Text(proc_info+state+','+opcount+','+uuid+','+datetime));
                }
            }
        }
    }

}

My CombineFileInputFormat, which overrides isSplitable and return false

public class CombineSmallfileInputFormat extends CombineFileInputFormat<NullWritable, Text> {

    @Override
    public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

        return new CombineFileRecordReader<NullWritable,Text>((CombineFileSplit) split,context,WholeFileRecordReader.class);
    }
    @Override
    protected boolean isSplitable(JobContext context,Path file ){
        return false;
    }

}

The WholeFileRecordReader

   public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
    //private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class);

      /** The path to the file to read. */
      private final Path mFileToRead;
      /** The length of this file. */
      private final long mFileLength;

      /** The Configuration. */
      private final Configuration mConf;

      /** Whether this FileSplit has been processed. */
      private boolean mProcessed;
      /** Single Text to store the file name of the current file. */
    //  private final Text mFileName;
      /** Single Text to store the value of this file (the value) when it is read. */
      private final Text mFileText;

      /**
       * Implementation detail: This constructor is built to be called via
       * reflection from within CombineFileRecordReader.
       *
       * @param fileSplit The CombineFileSplit that this will read from.
       * @param context The context for this task.
       * @param pathToProcess The path index from the CombineFileSplit to process in this record.
       */
      public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
          Integer pathToProcess) {
        mProcessed = false;
        mFileToRead = fileSplit.getPath(pathToProcess);
        mFileLength = fileSplit.getLength(pathToProcess);
        mConf = context.getConfiguration();
        context.getConfiguration().set("map.input.file.name", mFileToRead.getName());

        assert 0 == fileSplit.getOffset(pathToProcess);
        //if (LOG.isDebugEnabled()) {
          //LOG.debug("FileToRead is: " + mFileToRead.toString());
          //LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths());

          //try {
            //FileSystem fs = FileSystem.get(mConf);
            //assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;
          //} catch (IOException ioe) {
            //// oh well, I was just testing.
          //}
        //}

        //mFileName = new Text();
        mFileText = new Text();
      }

      /** {@inheritDoc} */
      @Override
      public void close() throws IOException {
        mFileText.clear();
      }

      /**
       * Returns the absolute path to the current file.
       *
       * @return The absolute path to the current file.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
      }

      /**
       * <p>Returns the current value.  If the file has been read with a call to NextKeyValue(),
       * this returns the contents of the file as a BytesWritable.  Otherwise, it returns an
       * empty BytesWritable.</p>
       *
       * <p>Throws an IllegalStateException if initialize() is not called first.</p>
       *
       * @return A BytesWritable containing the contents of the file to read.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public Text getCurrentValue() throws IOException, InterruptedException {
        return mFileText;
      }

      /**
       * Returns whether the file has been processed or not.  Since only one record
       * will be generated for a file, progress will be 0.0 if it has not been processed,
       * and 1.0 if it has.
       *
       * @return 0.0 if the file has not been processed.  1.0 if it has.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public float getProgress() throws IOException, InterruptedException {
        return (mProcessed) ? (float) 1.0 : (float) 0.0;
      }

      /**
       * All of the internal state is already set on instantiation.  This is a no-op.
       *
       * @param split The InputSplit to read.  Unused.
       * @param context The context for this task.  Unused.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public void initialize(InputSplit split, TaskAttemptContext context)
          throws IOException, InterruptedException {
        // no-op.
      }

      /**
       * <p>If the file has not already been read, this reads it into memory, so that a call
       * to getCurrentValue() will return the entire contents of this file as Text,
       * and getCurrentKey() will return the qualified path to this file as Text.  Then, returns
       * true.  If it has already been read, then returns false without updating any internal state.</p>
       *
       * @return Whether the file was read or not.
       * @throws IOException if there is an error reading the file.
       * @throws InterruptedException if there is an error.
       */
      @Override
      public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!mProcessed) {
          if (mFileLength > (long) Integer.MAX_VALUE) {
            throw new IOException("File is longer than Integer.MAX_VALUE.");
          }
          byte[] contents = new byte[(int) mFileLength];

          FileSystem fs = mFileToRead.getFileSystem(mConf);
          FSDataInputStream in = null;
          try {
            // Set the contents of this file.
            in = fs.open(mFileToRead);
            IOUtils.readFully(in, contents, 0, contents.length);
            mFileText.set(contents, 0, contents.length);

          } finally {
            IOUtils.closeQuietly(in);
          }
          mProcessed = true;
          return true;
        }
        return false;
      }

}

I want every mapper to parse multiple small files and each small file can not be split.

However, above code will cut(split) my input file and will raise a parsing error (since my parser will split the line into tokens).

In my concept, combineFileInputFormat will gather multiple files into one split, and each split will feed into one mapper. Therefore, one mapper can handle multiple files.

In my code, the max input split is set to 25MB, so I think the problem is that combineFileInputFormat will split the last part of small file of input split to satisfy the split size limit.

However, I have override isSplitable and return false, but it still splits the small file.

What is the correct way to do that?

I am not sure if it is possible to specify number of files to a mapper, rather than specify input split size?

1

There are 1 best solutions below

1
On

Use setMaxSplitSize() method in your constructor code, it should work, It ideally tells the split size,

public class CFInputFormat extends CombineFileInputFormat<FileLineWritable, Text> {
  public CFInputFormat(){
    super();
    setMaxSplitSize(67108864); // 64 MB, default block size on hadoop
  }
  public RecordReader<FileLineWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException{
    return new CombineFileRecordReader<FileLineWritable, Text>((CombineFileSplit)split, context, CFRecordReader.class);
  }
  @Override
  protected boolean isSplitable(JobContext context, Path file){
    return false;
  }
}