Performing 3 Way Join MapReduce Python

112 Views Asked by At

I have the following tables stored as separate csv files:

customers (c_id, gender, address, dob)

meals (r_id, c_id, date) (so a customer having a meal at a restaurant)

restaurants (type, r_id)

restaurants has size 10.000, meals 1.000.000 and customers 2.000.000

I need the following map reduce job: for all restaurants, show the number of meals they appear in where the restaurant is of type 'bistro' and customers are male.

This translates to the following sql query:

SELECT r.r_id, COUNT(*) AS count_meals 
FROM restaurants r 
INNER JOIN meals m ON r.r_id = r.r_id 
INNER JOIN customers c ON m.c_id = c.c_id 
WHERE c.gender = 'MALE' AND r.type = 'bistro' 
GROUP BY r.r_id

The conditions above reduce the sizes of the table as so: restaurants 300, customers 900.000 and meals remains the same

The map reduce job is started with the following command:

python3 mrCustomers.py < "data/restaurants.csv" "data/customers.csv" "data/meals.csv" > output.csv

To read in the files I relied on the length of each entry and my mapper looks like this:

from mrjob.job import MRJob
from mrjob.step import MRStep
from mr3px.csvprotocol import CsvProtocol
import csv

class MRCustomers(MRJob):
    OUTPUT_PROTOCOL = CsvProtocol
    def mapper(self, _, line):
        if line.startswith('c_id') or line.startswith('r_id') or line.startswith('type'):
            return
        reader = csv.reader([line])
        columns = next(reader)
        if len(columns) == 4:
            if str(columns[1]) != 'MALE':
                return
            c_id = columns[0]
            yield c_id, "customer"
        elif len(columns) == 3:
            r_id = columns[0]
            c_id = columns[1]
            yield r_id, ("M", c_id)
        else:
            type = columns[0]
            if type == 'bistro':
                r_id = columns[1]
                yield r_id, "restaurant"

The problem I have is that in the reducer I always receive chunks of each table, e.g. in the first round I get 30 customers, 20.000 meals and no restaurants. It is impossible for me to perform a join if I don't get matching sets from each mapper round. Is my mapper logic flawed? Also how should I go about writing the reducer?

Also in the reducer theoretically I'm supposed to get aggregated tuples that have the same key, so the yield r_id, "restaurant" and yield r_id, ("M", c_id) should be aggregated and received as one match in the reducer but this doesn't happen

Edit: So I've managed to join meals and restaurants together on the reducer side in the first job-step as follows:

    def reducer1(self, key, values):
        joins = [x for x in values]
        if len(joins) > 1:
            if joins[0][0] == "restaurants":
                for tup in joins[1:]:
                    c_id = tup[1]
                    yield c_id, (tup[0], key)
        elif joins[0][0] == "customer":
            for customer in joins:
                yield key, ("customer", key)

and then in the second job-step I try to aggregate the results in a mapper then pass it to a reducer and join on customers

    def mapper2(self, key, value):
        yield key, value

    def reducer2(self, key, values):
        joins = [x for x in values]
        if len(joins) > 1:
            if joins[0][0] == "customer":
                for tup in joins[1:]:
                    yield tup[1], 1

    def reducer3(self, key, values):
        yield None, (key, sum(values))

    def steps(self):
        first_step = MRStep(
            mapper=self.mapper1,
            reducer=self.reducer1,
        )
        second_step = MRStep(
            mapper=self.mapper2,
            reducer=self.reducer2,
        )
        third_step = MRStep(
            reducer=self.reducer3
        )

        return [first_step, second_step, third_step]

Now I have the following problem: the tuples that are generated in the output are part of the full result from the SQL query, so it is an incomplete solution. I don't understand why the join isn't working as expected

0

There are 0 best solutions below