Hadoop streaming KeyFieldBasedPartitioner

2.6k Views Asked by At

I am extracting data from freebase dump (title, aliases, type names) into avro (not yet in this job). I am using mapreduce streaming with python.

This job reducer expects type title (which is generally any object title) and type id reference to object. Form of record is: id%relation\tvalue

For example:

common.topic%title  Topic
common.topic%used_by    m.01dyhm
common.topic%used_by    m.03x5qm
common.topic%used_by    m.04pm6

Reducer emits:

m.01dyhm%type   Topic
m.03x5qm%type   Topic
m.04pm6%type    Topic

Title preceeds references (so reducer remembers it and emits dereferenced records), and all records related with one type must be partitioned to one reducer. This assured by key sorting. As I am using composite key, I need to correctly partition records. I am using KeyFieldBasedPartitioner with configuration "-k1,1" and I set key field separator to "%". It should partition data on object identifier, e.g. "common.topic" or "m.01dyhm". But I think my configuration is wrong. It works with single reducer (Hortonworks VM), but emits blank files on 32 node cluster (which I do not have direct access, so I can't effectively experiment). I guess partitioning is wrong and there are no data to join on single reducer.

This is my hadoop command:

hadoop \
jar $streaming \
-D mapred.job.name='Freebase extract - phase 3' \
-D mapreduce.map.output.key.field.separator='%' \
-D mapreduce.partition.keypartitioner.options=-k1,1 \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input freebase/job1output \
-input freebase/job2output \
-output freebase/job3output \
-mapper "python job3mapper.py" \
-reducer "python job3reducer.py" \
-file job3mapper.py \
-file job3reducer.py 

Is my partitioner configuration right? Thanks for any help.

1

There are 1 best solutions below

3
On

This looks good to me. You are splitting the key into two subkeys and using the first part for partitioning.

You might want to add the following option to tell the partitioner that you want to sort by the compound key so that your reducer input is sorted.

-D stream.num.map.output.key.fields=2

If you are getting empty lines in your output that indicates that you are writing extra linefeeds. The lines are fed in through sys.stdin with a trailing \n. You can try using print line, or print line.strip() in your mappers and reducers to see if that is the case.

If you are getting no output at all the problem might be in the python code.