file not found when using DistributedCache on Hadoop

2k Views Asked by At

I have a fileNotFound problem using code below, this is a simple test of distributedCache, I don't know what is the problem?

The path of the file is right, but I can not find files on datanode:

package mapJoinTest2;

/*
 * this is for map join using DistributedCache
 * using class Path to get cache file in datanode
 * 
 * 2012.1.13
 *  */
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;  
import java.net.URI;
import java.util.*;  
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.conf.*;  
import org.apache.hadoop.io.*;  
import org.apache.hadoop.mapred.*;  
import org.apache.hadoop.util.*;  

 public class wordCount {  
   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {  

  private Text word = new Text();  
  private Text mapKey = new Text();
  private Path [] localFile= new Path[1];
  private FileSystem fs;

  public void configure(JobConf job){
      try {
        fs = FileSystem.getLocal(new Configuration());
        localFile = DistributedCache.getLocalCacheFiles(job);
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }
  }


  public void map(LongWritable  key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
{  
    for(Path f:localFile){
        System.out.println(f.toString());
    }

    mapKey.set("success");
    output.collect(mapKey, value);

    }  
}  


public static void main(String[] args) throws Exception {  
  JobConf conf = new JobConf(wordCount.class);  
  conf.setJobName("wordcount");  

  conf.setOutputKeyClass(Text.class);  
  conf.setOutputValueClass(Text.class);  

  conf.setMapperClass(Map.class);  
  conf.setNumReduceTasks(0);

  conf.setInputFormat(TextInputFormat.class);  
  conf.setOutputFormat(TextOutputFormat.class);  

  FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));  


  String path ="hdfs://namenode:9000/hadoop/test1";  // this file has already put on hdfs.
  Path filePath = new Path(path);
  String uriWithLink = filePath.toUri().toString();
  DistributedCache.addCacheFile(new URI(uriWithLink), conf);

  JobClient.runJob(conf);  
}  
}  

I get a NullPointerException at this point:

    for(Path f:localFile){
        System.out.println(f.toString());
    }

The problem is because value of f is null.

I used code below, but it wound`t work.

  DistributedCache.createdSymlink(conf);
  DistributedCache.addCacheFile(new Path("hdfs://namenode:9000/hadoop/test1").toUri().toString() + "#" + "test1",conf);
0

There are 0 best solutions below