I have 2 datasets which I am trying to combine, namely the transactions
dataset and the contract
dataset, where I want to use address
resp. to_address
as the join attribute and the value
attribute for the value.
contract dataset fields:
address, is_erc20, is_erc721, block_number, block_timestamp
transactions dataset fields:
block_number, from_address, to_address, value, gas, gas_price, timestamp
So what I'm trying to do is make a join with an output of: address, value
example:
transactions dataset:
to_address value
0x412270b1f0f3884 240648550000000000
0x8d5a0a7c555602f 984699000000000000
contract dataset:
address
0x412270b1f0f3884
the output should be:
to_address value
0x412270b1f0f3884 240648550000000000
as 0x8d5a0a7c555602f is not present in the contract dataset.
Below is the code I have and I'm not sure what I'm doing wrong. Any help??
from mrjob.job import MRJob
class repartition_join(MRJob):
def mapper(self, _, line):
try:
if(len(line.split(','))==5): #contracts dataset
fields=line.split(',')
join_key=fields[0] #key is address
yield (join_key, 1) #yield join key given id 1?
elif(len(line.split(','))==7): #transactions dataset
fields=line.split(',')
join_key=fields[2] #to_address, which is the key
join_value=int(fields[3]) #[3] = value
yield (join_key,(join_value,2)) #gives key with value
except:
pass
def reducer(self, key, values):
val = None
for value in values:
if value[1] == 2:
val = (value[0])
yield(key, val)
if __name__=='__main__':
repartition_join.run()
Think about your map-reduce pipeline for the Reduce Side Join again. It looks like you have difficulties in understanding it.
In order to distinguish a key-value pair from your two relations, you have to add a relation symbol to the value your mapper is yielding. Assuming, you want to do an inner-join, you have to
yield
a tuple in the reducer for the Reduce Side Join only if there is a tuple in yourcontracts
and yourtransactions
dataset. Thus, you have to hold the tuples of those relations in separate lists and identify a tuple by the relation symbol. This can be easily adjusted for other joins — e.g. (Left/Right/Full) Outer Join, Semi/Anti-Join.In the following example, I used the relation symbol
'C'
for thecontracts
and'T'
for thetransactions
dataset. I cannot try it out myself because I am lacking the dataset, but it should work like this. If you have any troubles let me know with a comment.I can suggest that you have a look on the book "MapReduce Design Patterns by Donald Miner, Adam Shook" because it also explains common join algorithms for Map-Reduce-Tasks. Also check out the latest mrjob documentation.