Processing MongoDB in AWS EMR with Python

737 Views Asked by At

I'm trying to do a map reduce using mrjob and Python against a MongoDB database. The mongodb-hadoop connector has examples on how to use AWS EMR but not with mrjob, and I'm not quite getting all the bits together. Here is what I have already as far as a mrjob.conf goes:

    enable_emr_debugging: true
    ami_version: 3.0.4
    interpreter: python2.7

    upload_files:
    - tweets-clean.txt
    - train_model.py

    python_archives:
    - mrcc.py.tar.gz

    setup:
    #- python2.7 train_model.py

    jobconf:
        mongo.job.input.format : com.mongodb.hadoop.MongoInputFormat
        mongo.input.uri : myserver:27017/twitter_db
        stream.io.identifier.resolver : com.mongodb.streaming.io.MongoIdentifierResolver


    bootstrap:
        - sudo yum --releasever=2014.09 install -y python27 python27-devel gcc-c++ numpy scipy
        - sudo python2.7 get-pip.py#
        - sudo pip2.7 install boto mrjob simplejson scikit-learn sklearn pymongo-hadoop
        - python2.7 train_model.py# tweets-clean.txt#
        - mongo-hadoop-bootstrap.sh#

When using mrjob Python mapper/reducers I've used code like this:

from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

def mapper(self, _, line):
    words=line.split()
    for word in words:
        yield word, 1

def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
   MRWordFrequencyCount.run()

To modify this to use the mongodb-hadoop connecter, I'm trying to do this:

from pymongo_hadoop import BSONMapper
from pymongo_hadoop import BSONReducer
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):

def mapper(self, _, documents):
    BSONMapper(self.bsonmapper)

def reducer(self, key, values):
    BSONReducer(self.bsonreducer)

def bsonmapper(documents):
    for doc in documents:
        yield {'_id' : doc['id']['user.id']}, {'count' : 1}

def bsonreducer(self, key, values):
    count = 0
    for v in values:
        count += v['count']
    return {'_id' : key, 'count' : count}       

if __name__ == '__main__':
    MRWordFrequencyCount.run()

The problem is that I'm not passing the methods to the BSONMapper and BSONReducer correctly. The BSONMapper class expects 1 argument in the init() but it's getting 2.

0

There are 0 best solutions below