Initial Fastapi request is not being processed and stays on hold

52 Views Asked by At

I have a worker fastapi. This api basically creates worker functions and then some data will be processed. My problem arises at the start of these workers. When I first try to make a request, the request stays on hold forever. It doesn't reach the api. When I cancel the request and make another one, everything works perfectly, my workers start.

This is the endpoint which I'm using to create workers:

@app.get("/connectors")
async def start_process(conf: Union[Connector, None] = None):
    print(conf)
    conf_list = conf.config[0]
 
    if not conf_list:
        return {"message": "Empty configuration list"}
    # os.environ.update({str(i[0]): i[1] for i in conf_list})
    log.info(f"Cassandra Host: {conf_list.cassandra_host}, Type: {type(conf_list.cassandra_host)}")

    globalConfig.set(conf_list.cassandra_host,
            conf_list.cassandra_port,
            conf_list.cassandra_keyspace,
            conf_list.kafka_bootstrap_server,
            conf_list.topic)
    # Check if the process with the same name already exists
    existing_process = next((p for p in processes if p["name"] == conf.name), None)
    if existing_process:
        return {"message": "Connector is not created. Duplicated error."}
    p = multiprocessing.Process(target=main, args=(len(processes) + 1,conf_list.connector_class,globalConfig,))
    p.start()
    processes.append({"p": p, "name": conf.name})
    return {"message": f"Started process with ID {p.pid} or {conf.name}"}

This is the body I'm sending to the api:

{
    "name":"cassandra-sink-dog1",
    "config":[
        {
        "connector_class":"io.inovasyon.connect.cassandra.ShardedSinkConnector",
        "cassandra_host":"192.168.2.165",
        "cassandra_port":"9042",
        "cassandra_keyspace":"orion_dog",
        "kafka_bootstrap_server":"192.168.2.165:29092",
        "topic":"eys.orion-dog.entities"
        }
    ]
}

Why is my first request being blocked and I need to cancel it and retry it for it to work?

Edit 1: The entrypoint shown below;

from fastapi import FastAPI
import multiprocessing
from config.Config import Config 
from typing import List,Union
from pydantic import BaseModel
from functions.workerReplication import ReplicationSinkConnector
from functions.workerShard import KafkaConsumerSinkConnector
from functions.workerLogstash import KafkaConsumerLogstashConnector
import logging
import coloredlogs
import uvicorn
from termcolor import colored
coloredlogs.install(level="INFO")
log = logging.getLogger("Functions")

app = FastAPI()
processes = []
globalConfig = Config()
class ConnectorConf(BaseModel):
    connector_class: str
    cassandra_port: str
    cassandra_host: str
    cassandra_keyspace: str
    kafka_bootstrap_server: str
    topic: str

class Connector(BaseModel):
    name: str
    config: List[ConnectorConf]

def main(num,classType,globalConfig):
    """A function that simulates a process"""
    log.info(colored("Worker: ","blue")+ "{}".format(num) + colored(" Running","green"))
    if "ShardedSinkConnector" in str(classType):
        log.info(colored("ShardedSinkConnector: ","green")+ colored(" started.","green"))
        KafkaConsumerSinkConnector().workerKafkaShard(globalConfig)
    if "ReplicationSinkConnector" in str(classType):
        log.info(colored("ReplicationSinkConnector: ","green")+ colored(" started.","green"))
        ReplicationSinkConnector.workerCassandraReplication(globalConfig)
    if "LogstashSinkConnector" in str(classType):
        log.info(colored("LogstashSinkConnector: ","green")+ colored(" started.","green"))
        KafkaConsumerLogstashConnector.workerKafkaLogstash(globalConfig,classType)

@app.get("/connectors")
async def start_process(conf: Union[Connector, None] = None):
    conf_list = conf.config[0]
    # os.environ.update({str(i[0]): i[1] for i in conf_list})
    log.info(f"Cassandra Host: {conf_list.cassandra_host}, Type: {type(conf_list.cassandra_host)}")
    globalConfig.set(conf_list.cassandra_host,
            conf_list.cassandra_port,
            conf_list.cassandra_keyspace,
            conf_list.kafka_bootstrap_server,
            conf_list.topic)
    # Check if the process with the same name already exists
    existing_process = next((p for p in processes if p["name"] == conf.name), None)
    if existing_process:
        return {"message": "Connector is not created. Duplicated error."}
    p = multiprocessing.Process(target=main, args=(len(processes) + 1,conf_list.connector_class,globalConfig,))
    p.start()
    processes.append({"p": p, "name": conf.name})
    return {"message": f"Started process with ID {p.pid} or {conf.name}"}


@app.get("/connectors/{name}")
async def stop_process(name: Union[str, None] = None):
    for idx, p in enumerate(processes):
        if p["name"] == name:
            print(p["p"])
            process_id = p["p"].pid
            p = p["p"]
            p.terminate()
            processes.pop(idx)
            log.info(f"Stopped process with ID: {process_id} - {name} Stopped")
            # log.info(colored("message: Stopped process with ID ","blue")+ "{}".format(id)+ "\n{}".format(name) + colored(" Stopped","red"))
    return {"message": f"Process with ID {id} not found"}


# if __name__ == "__main__":
#     uvicorn.run(app, host="0.0.0.0", port=8000)
# python -m uvicorn CassandraBackend:app --host 0.0.0.0 --port 8000 --reload
0

There are 0 best solutions below