I have an express Node.js Application and a machine learning algorithm using python. I'm using RabbitMQ to integrate Node.Js and Python and it works, I mean, this is more performatic than using the library child_process with spawn, for example. But I'm having trouble synchronizing workers' responses to their respective requests.

My code is similar to the example below (toy example) and based on this post 1. This implementation has two main problems, it is sending the wrong answer to the client (postman) or is not completing the request.

This code should take a request from a client (image and type), put this task on the queue (tasks queue), waiting for the worker finish its job and send the result to the client (the right one).

import express from "express";
import bodyParser from 'body-parser';
import cors from 'cors';
import path from 'path';
import multer from "multer";
import amqp from "amqplib/callback_api.js";

const port = 3000;

const app = express();

app.use(express.json());
app.use(bodyParser.urlencoded({extended: true}));
app.use(cors());


const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

app.get('/', (req, res) => {
    res.status(200).send({message: "ok"});
});

app.post("/classify", upload.single("image"), async(req, res) => {

    const { type } = req.body;

    const task = Buffer.from(
        JSON.stringify({
            type: type, 
            image: req.file.buffer.toString("base64")
        })
    );

    amqp.connect("amqp://localhost", (err, conn) => {
        conn.createChannel((err, ch) => {
            ch.assertQueue("tasks", {durable: false});
            ch.assertQueue("results", {durable: false});
            ch.sendToQueue("tasks", task);

            ch.consume("results", (msg) => {
                const predictions = JSON.parse(msg.content.toString());
                res.status(200).send({message: predictions });
                
            },{noAck: true});
        });
        setTimeout(() => {conn.close();}, 500)
    });
});

console.clear();

app.listen(port, () => {
    console.log(`Server listening at http://localhost:${port}`)
})

This code should take tasks from a queue (task queue), run them and put the result into the result queue.

#!/usr/bin/env python
import pika
import json
import time 
import random

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue="tasks")
channel.queue_declare(queue="results")


# Simulate the execution of a machine learning model
def run_model(image, type):
    time.sleep(random.randint(1, 4))
    return random.choice([" Iris setosa", "Iris virginica", "Iris versicolor"])

   
def callback(ch, method, properties, body):
    params = json.loads(body.decode('utf-8'))
    type = str(params["type"])
    image = params['image']

    print("Worker received a new task...")

    results = run_model(image, type)

    # send a message back
    channel.basic_publish(
        exchange="", 
        routing_key="results", 
        body=json.dumps(results, ensure_ascii=False)
    )
  
    #connection.close()

# receive message and complete the task
channel.basic_consume("tasks", callback, auto_ack=True)
channel.start_consuming()

How to solve these problems?

1

There are 1 best solutions below

0
On

I'm not super familiar with RabbitMQ, but I think you need a way to uniquely identify each task sent to the Python worker.

Try generating a UUID on the Node.js side (use the uuid package), and passing that along with the job data. Then, consume the results-${uuid} channel in Node.

When the Python worker finishes, have it use the results-${uuid} routing key from the passed-in params. That way, each request is only listening for its particular result channel.