I'm trying to do a map reduce using mrjob and Python against a MongoDB database. The mongodb-hadoop connector has examples on how to use AWS EMR but not with mrjob, and I'm not quite getting all the bits together. Here is what I have already as far as a mrjob.conf goes:
enable_emr_debugging: true
ami_version: 3.0.4
interpreter: python2.7
upload_files:
- tweets-clean.txt
- train_model.py
python_archives:
- mrcc.py.tar.gz
setup:
#- python2.7 train_model.py
jobconf:
mongo.job.input.format : com.mongodb.hadoop.MongoInputFormat
mongo.input.uri : myserver:27017/twitter_db
stream.io.identifier.resolver : com.mongodb.streaming.io.MongoIdentifierResolver
bootstrap:
- sudo yum --releasever=2014.09 install -y python27 python27-devel gcc-c++ numpy scipy
- sudo python2.7 get-pip.py#
- sudo pip2.7 install boto mrjob simplejson scikit-learn sklearn pymongo-hadoop
- python2.7 train_model.py# tweets-clean.txt#
- mongo-hadoop-bootstrap.sh#
When using mrjob Python mapper/reducers I've used code like this:
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
words=line.split()
for word in words:
yield word, 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
To modify this to use the mongodb-hadoop connecter, I'm trying to do this:
from pymongo_hadoop import BSONMapper
from pymongo_hadoop import BSONReducer
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, documents):
BSONMapper(self.bsonmapper)
def reducer(self, key, values):
BSONReducer(self.bsonreducer)
def bsonmapper(documents):
for doc in documents:
yield {'_id' : doc['id']['user.id']}, {'count' : 1}
def bsonreducer(self, key, values):
count = 0
for v in values:
count += v['count']
return {'_id' : key, 'count' : count}
if __name__ == '__main__':
MRWordFrequencyCount.run()
The problem is that I'm not passing the methods to the BSONMapper and BSONReducer correctly. The BSONMapper class expects 1 argument in the init() but it's getting 2.