How can I wait for the return value of queue.pop()?

90 Views Asked by At

When my http listener gets a request it returns the response immediately. But I want to pop and return the value of queue.pop(), only when there is a value in the queue, and if there is no value, I want to make them wait. How should I modify my code? Here is my code snippet.

#[tokio::main]
async fn main() -> Result<()> {
    //...

    let (tx, rx) = unbounded::<RequestMessage>();
    let (tx2, rx2) = unbounded::<SessionIdsWithTokens>();

    let priority_queue: Arc<Mutex<BinaryHeap<Reverse<RequestMessage>>>> =
        Arc::new(Mutex::new(BinaryHeap::new()));

    tokio::spawn(process_flume1_data(rx, tx2.clone(), tokenizer.clone()).map(|_| ()));
    tokio::spawn(
        process_flume2_data(
            rx2,
            priority_queue.clone(), 
            llama,
            device,
            args.sample_len,
            logits_processor,
            tokenizer,
        )
        .map(|_| ()),
    );

    let addr = ([172, 17, 0, 4], 6006).into();
    let make_svc = make_service_fn(|_conn| {
        let tx = tx.clone();
        let priority_queue = priority_queue.clone(); 
        async {
            core::result::Result::Ok::<_, hyper::Error>(service_fn(move |req| {
                handle_request(req, tx.clone(), priority_queue.clone())
            }))
        }
    });

    let server = Server::bind(&addr).serve(make_svc);

    println!("Server running at http://{}", addr);

    if let Err(e) = server.await {
        eprintln!("Server error: {}", e);
    }

    // }
    Ok(())
}

async fn handle_request(
    req: Request<Body>,
    tx: Sender<RequestMessage>,
    priority_queue: Arc<Mutex<BinaryHeap<Reverse<RequestMessage>>>>,
) -> Result<Response<Body>, hyper::Error> {
    //...
    // Wait for response from the priority queue using await
    if let Some(Reverse(event)) = process_response_from_queue(priority_queue.clone()).await {
        // Use the response to generate the HTTP response
        core::result::Result::Ok(Response::new(Body::from(format!("Data from priority queue: {}", event.message))))
    } else {
        core::result::Result::Ok(Response::new(Body::from("No data available in the priority queue")))
    }
}

async fn process_response_from_queue( 
    priority_queue: Arc<Mutex<BinaryHeap<Reverse<RequestMessage>>>>, 
) -> Option<Reverse<RequestMessage>> {
    let mut queue = priority_queue.lock().unwrap();
    queue.pop()
}

At now in my code, when I send a request to my http listener it returns the next line immediately but I don't want to return anything when there is no value:

core::result::Result::Ok(Response::new(Body::from("No data available in the priority queue")))
0

There are 0 best solutions below