When my http listener gets a request it returns the response immediately.
But I want to pop and return the value of queue.pop(), only when there is a value in the queue, and if there is no value, I want to make them wait.
How should I modify my code?
Here is my code snippet.
#[tokio::main]
async fn main() -> Result<()> {
//...
let (tx, rx) = unbounded::<RequestMessage>();
let (tx2, rx2) = unbounded::<SessionIdsWithTokens>();
let priority_queue: Arc<Mutex<BinaryHeap<Reverse<RequestMessage>>>> =
Arc::new(Mutex::new(BinaryHeap::new()));
tokio::spawn(process_flume1_data(rx, tx2.clone(), tokenizer.clone()).map(|_| ()));
tokio::spawn(
process_flume2_data(
rx2,
priority_queue.clone(),
llama,
device,
args.sample_len,
logits_processor,
tokenizer,
)
.map(|_| ()),
);
let addr = ([172, 17, 0, 4], 6006).into();
let make_svc = make_service_fn(|_conn| {
let tx = tx.clone();
let priority_queue = priority_queue.clone();
async {
core::result::Result::Ok::<_, hyper::Error>(service_fn(move |req| {
handle_request(req, tx.clone(), priority_queue.clone())
}))
}
});
let server = Server::bind(&addr).serve(make_svc);
println!("Server running at http://{}", addr);
if let Err(e) = server.await {
eprintln!("Server error: {}", e);
}
// }
Ok(())
}
async fn handle_request(
req: Request<Body>,
tx: Sender<RequestMessage>,
priority_queue: Arc<Mutex<BinaryHeap<Reverse<RequestMessage>>>>,
) -> Result<Response<Body>, hyper::Error> {
//...
// Wait for response from the priority queue using await
if let Some(Reverse(event)) = process_response_from_queue(priority_queue.clone()).await {
// Use the response to generate the HTTP response
core::result::Result::Ok(Response::new(Body::from(format!("Data from priority queue: {}", event.message))))
} else {
core::result::Result::Ok(Response::new(Body::from("No data available in the priority queue")))
}
}
async fn process_response_from_queue(
priority_queue: Arc<Mutex<BinaryHeap<Reverse<RequestMessage>>>>,
) -> Option<Reverse<RequestMessage>> {
let mut queue = priority_queue.lock().unwrap();
queue.pop()
}
At now in my code, when I send a request to my http listener it returns the next line immediately but I don't want to return anything when there is no value:
core::result::Result::Ok(Response::new(Body::from("No data available in the priority queue")))