Hadoop Record Reader only reads first line then input stream seems to be closed

917 Views Asked by At

I'm trying to implement a hadoop job, that counts how often a object (Click) appears in a dataset. Therefore i wrote a custom file input format. The record reader seems to read only the first line of the given file and the close the input stream.

Here is the code:

The Pojo class:

package model;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class Click implements WritableComparable<Click>  {
    private String user;
    private String clickStart;
    private String date;
    private String clickTarget;

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(user);
        out.writeUTF(clickStart);
        out.writeUTF(date);
        out.writeUTF(clickTarget);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        user = in.readUTF();
        clickStart = in.readUTF();
        date = in.readUTF();
        clickTarget = in.readUTF();
    }

    public int compareTo(Click arg0) {
        int response = clickTarget.compareTo(arg0.clickTarget);
            if (response == 0) {
                response = date.compareTo(arg0.date);
            }
        return response;
    }
    public String getUser(String user) {
        return this.user;
    }
    public void setUser(String user) {
        this.user = user;
    }

    public String getClickStart() {
        return clickStart;
    }

    public void setClickStart(String clickStart) {
        this.clickStart = clickStart;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getClickTarget() {
        return clickTarget;
    }

    public void setClickTarget(String clickTarget) {
        this.clickTarget = clickTarget;
    }

    public String toString() {
        return clickStart + "\t" + date;
    }
}

Here is the FileInputFormat class:

package ClickAnalysis;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import model.Click;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.tools.ant.types.CommandlineJava.SysProperties;

public class ClickAnalysisInputFormat extends FileInputFormat<Click, IntWritable>{

    @Override
    public RecordReader<Click, IntWritable> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        System.out.println("Creating Record Reader");
        return new ClickReader();
    }


    public static class ClickReader extends RecordReader<Click, IntWritable> {
        private BufferedReader in;
        private Click key;
        private IntWritable value;

        @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
            key = new Click();
            value = new IntWritable(1);
            System.out.println("Starting to read ...");

            FileSplit split = (FileSplit) inputSplit;
            Configuration conf = context.getConfiguration();

            Path path = split.getPath();
            InputStream is = path.getFileSystem(conf).open(path);
            in = new BufferedReader(new InputStreamReader(is));
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            String line = in.readLine();
            System.out.println("line: " + line);
            boolean hasNextKeyValue;
            if (line == null) {
                System.out.println("line is null");
                hasNextKeyValue = false;
            } else {
                String[] click = StringUtils.split(line, '\\', ';');
                System.out.println(click[0].toString());
                System.out.println(click[1].toString());
                System.out.println(click[2].toString());

                key.setClickStart(click[0].toString());
                key.setDate(click[1].toString());
                key.setClickTarget(click[2].toString());
                value.set(1);
                System.out.println("done with first line");
                hasNextKeyValue = true;
            }
            System.out.println(hasNextKeyValue);
            return hasNextKeyValue;
        }

        @Override
        public Click getCurrentKey() throws IOException, InterruptedException {
            return this.key;
        }

        @Override
        public IntWritable getCurrentValue() throws IOException, InterruptedException {
            return this.value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }

        public void close() throws IOException {
            in.close();
            System.out.println("in closed");
        }
    }
}

The Mapper class:

package ClickAnalysis;

import java.io.IOException;

import model.Click;
import model.ClickStartTarget;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.jruby.RubyProcess.Sys;

public class ClickAnalysisMapper extends Mapper<Click, IntWritable, Click, IntWritable> {

    private static final IntWritable outputValue = new IntWritable();

    @Override
    protected void map(Click key, IntWritable value, Context context) throws IOException, InterruptedException {
        System.out.println("Key: " + key.getClickStart() + " " + key.getDate() + " " + key.getClickTarget() + " Value: " + value);

        outputValue.set(value.get());
        System.out.println(outputValue.get());
        context.write(key, outputValue);
        System.out.println("nach context");
    }
}

Partitioner class:

package ClickAnalysis;

import model.Click;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class ClickAnalysisPartitioner extends Partitioner<Click, IntWritable> {

    @Override
    public int getPartition(Click key, IntWritable value, int numPartitions) {
        System.out.println("in Partitioner drinnen");
        int partition = numPartitions;
        return partition;
    }
}

Hadoop Job, which is triggered via an Restful web service call in a servlet container, but this shouldn't be the problem:

package ClickAnalysis;

import model.Click;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class ClickAnalysisJob {

    public int run() throws Exception {

        // TODO Auto-generated method stub
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "ClickAnalysisJob");
        job.setJarByClass(ClickAnalysisJob.class);

        // Job Input path
        FileInputFormat.setInputPaths(job, "hdfs://localhost:9000/user/hadoop/testdata1.csv");
        // Job Output path

        Path out = new Path("hdfs://localhost:9000/user/hadoop/clickdataAnalysis_out");
        FileOutputFormat.setOutputPath(job, out);
        out.getFileSystem(conf).delete(out,true);


        job.setMapperClass(ClickAnalysisMapper.class);
        job.setReducerClass(Reducer.class);
        job.setPartitionerClass(ClickAnalysisPartitioner.class);
        //job.setReducerClass(ClickAnalysisReducer.class);

        job.setInputFormatClass(ClickAnalysisInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Click.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapOutputKeyClass(Click.class);
        job.setMapOutputValueClass(IntWritable.class);

        System.out.println("in run drinnen");


        //job.setGroupingComparatorClass(ClickTargetAnalysisComparator.class);

        job.setNumReduceTasks(1);

        int result = job.waitForCompletion(true)? 0:1;
        return result;
    }
}

Next the dataset (example):

/web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein anderes ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein anderes ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein drittes ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein viertes ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein ziel

When I run the program the syso's are showing following:

in run drinnen
Creating Record Reader
Starting to read ...
line: /web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
/web/big-data-test-site/test-seite-1
2014-07-08
ein ziel
done with first line
true
Key: /web/big-data-test-site/test-seite-1 2014-07-08 ein ziel Value: 1
1
in closed
analyze Method: 1

From that i conclude that the record reader only reads the first line. Why is this happening and how is it fixed?

0

There are 0 best solutions below