How to do a Reduce Side Join as a Map Reduce Job with mrjob in Python

1.2k Views Asked by At

I have 2 datasets which I am trying to combine, namely the transactions dataset and the contract dataset, where I want to use address resp. to_address as the join attribute and the value attribute for the value.

contract dataset fields: 
address, is_erc20, is_erc721, block_number, block_timestamp

transactions dataset fields: 
block_number, from_address, to_address, value, gas, gas_price, timestamp

So what I'm trying to do is make a join with an output of: address, value

example:

transactions dataset: 
to_address        value
0x412270b1f0f3884 240648550000000000
0x8d5a0a7c555602f 984699000000000000

contract dataset:
address
0x412270b1f0f3884

the output should be:
to_address        value
0x412270b1f0f3884 240648550000000000

as 0x8d5a0a7c555602f is not present in the contract dataset. 

Below is the code I have and I'm not sure what I'm doing wrong. Any help??

from mrjob.job import MRJob

class repartition_join(MRJob):

    def mapper(self, _, line):
        try:
            if(len(line.split(','))==5): #contracts dataset
                fields=line.split(',')
                join_key=fields[0] #key is address
                yield (join_key, 1) #yield join key given id 1?
            elif(len(line.split(','))==7): #transactions dataset
                fields=line.split(',')
                join_key=fields[2] #to_address, which is the key
                join_value=int(fields[3]) #[3] = value
                yield (join_key,(join_value,2)) #gives key with value
        except:
            pass

    def reducer(self, key, values):
      val = None
        for value in values:
            if value[1] == 2:
                val = (value[0])
        yield(key, val)
 

if __name__=='__main__':
    repartition_join.run()
1

There are 1 best solutions below

0
On

Think about your map-reduce pipeline for the Reduce Side Join again. It looks like you have difficulties in understanding it.

In order to distinguish a key-value pair from your two relations, you have to add a relation symbol to the value your mapper is yielding. Assuming, you want to do an inner-join, you have to yield a tuple in the reducer for the Reduce Side Join only if there is a tuple in your contracts and your transactions dataset. Thus, you have to hold the tuples of those relations in separate lists and identify a tuple by the relation symbol. This can be easily adjusted for other joins — e.g. (Left/Right/Full) Outer Join, Semi/Anti-Join.

In the following example, I used the relation symbol 'C' for the contracts and 'T' for the transactions dataset. I cannot try it out myself because I am lacking the dataset, but it should work like this. If you have any troubles let me know with a comment.

I can suggest that you have a look on the book "MapReduce Design Patterns by Donald Miner, Adam Shook" because it also explains common join algorithms for Map-Reduce-Tasks. Also check out the latest mrjob documentation.

from mrjob.job import MRJob
from mrjob.step import MRStep

class repartition_join(MRJob):

    def mapper(self, _, line):
        fields=line.split(',')
        if len(fields == 5): # contracts dataset
            join_key = fields[0] # key is in attribute address
            yield (join_key, ('C', 1)) # yield join key, value not used
        
        elif len(fields) == 7: # transactions dataset
            join_key = fields[2] # key is in attribute to_address
            join_value = int(fields[3]) # value is in attribute value
            yield (join_key, ('T', join_value)) # yields join key with value
        else:
            pass # TODO handle error

    def reducer(self, key, values):

        address = key # the join key
        contracts_tuples = []
        transactions_tuples = []

        for value in values:
            relation_symbol = value[0] # either 'T' or 'C'
            if relation_symbol == 'C': # contracts dataset
                contracts_tuples.append(value[1]) # always 1 - just to know that there is a tuple in contracts
            elif relation_symbol == 'T': # transactions dataset
                transactions_tuples.append(value[1]) # append the value inside value attribute
            else:
                pass # TODO handle error

        # inner join contract and transaction, generalize if needed
        if len(contracts_tuples) > 0 and len(transactions_tuples) > 0:
            for value in transactions_tuples:
                yield (address, value)

    def steps(self):
        return [MRStep(
            mapper=self.mapper,
            reducer=self.reducer)
        ]

if __name__=='__main__':
    repartition_join.run()