I have a problem with forwarding a RabbitMQ connection via with_state When I create an instance, Rmq channel.is_open() returns true (it returns the same thing in the main.rs file), but in the handler itself I access state.infrastructure.rmq.channel.is_open() and in the response I see false and this is strange because I do similar actions with the Clickhouse connection and it works (I think it connects on request) I would like the connection to be created once at the start and then be present in the state Below I will present the code for better understanding.
Here is the Rmq structure that connects and saves the channel (src/infrastructure/rmq/mod.rs)
mod config;
use std::time;
use amqprs::{
callbacks::DefaultConnectionCallback,
channel::Channel,
connection::{Connection, OpenConnectionArguments}
};
use self::config::RmqConfig;
#[derive(Clone)]
pub struct Rmq {
pub channel: Channel
}
impl Rmq {
pub async fn new() -> Self {
let config = envy::from_env::<RmqConfig>().unwrap();
let connection_options = OpenConnectionArguments::new(
&config.amqp_host,
config.amqp_port,
&config.amqp_user,
&config.amqp_pass,
);
let mut res = Connection::open(&connection_options)
.await;
while res.is_err() {
tracing::info!("trying to connect after error");
std::thread::sleep(time::Duration::from_millis(2000));
res = Connection::open(&connection_options)
.await;
}
let connection = res.unwrap();
tracing::info!("Connection name: {}", connection.connection_name());
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
let channel = connection.open_channel(None).await.unwrap();
tracing::info!("Channel id: {}", channel.channel_id());
Self {
channel
}
}
}
Here is the general module where connections are created (src/infrastructure/mod.rs)
use self::{clickhouse::ClickHouse, rmq::Rmq};
pub mod rmq;
pub mod clickhouse;
#[derive(Clone)]
pub struct InfrastructureState {
pub rmq: Rmq,
pub ch: ClickHouse
}
pub async fn main() -> InfrastructureState {
let mut ch = clickhouse::ClickHouse::new();
let _ = ch.get_schema().await;
let rmq = Rmq::new().await;
InfrastructureState { rmq, ch }
}
Main file (src/main.rs)
mod config;
mod features;
mod infrastructure;
use std::net::SocketAddr;
use axum::{
routing::{get, post},
Router
};
use config::{AppConfig, AppState};
use dotenv::dotenv;
#[tokio::main]
async fn main() {
dotenv().ok();
let config = envy::from_env::<AppConfig>().unwrap();
if cfg!(debug_assertions) {
tracing_subscriber::fmt().init();
tracing::info!("debugging enabled");
} else {
tracing_subscriber::fmt().json().init();
}
let infrastructure = infrastructure::main().await;
let state = AppState {
config,
infrastructure
};
let addr = SocketAddr::from(([0, 0, 0, 0], state.config.api_port));
let app = Router::new()
.route("/upload_metric", post(features::metrics::main))
.with_state(state.clone())
tracing::info!("Listen on {addr} address");
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap()
}
Here is the handler itself
use std::collections::HashMap;
use serde_json::Value;
use crate::{config::AppState, infrastructure::rmq::Rmq};
pub async fn main(State(mut state): State<AppState>, Json(body): Json<Vec<HashMap<String, Value>>>) -> Result<Response, Response> {
// is false
println!("{}", state.infrastructure.rmq.channel.is_open());
Ok((StatusCode::CREATED, "Success").into_response())
}
And just in case the versions of my dependencies
[dependencies]
serde = { version = "1.0.167", features = ["derive"] }
serde_json = "1.0.107"
tokio = { version = "1.33.0", features = ["full"] }
dotenv = "0.15.0"
envy = "0.4.2"
axum = "0.6.20"
tracing = "0.1.39"
clickhouse = "0.11.6"
tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] }
amqp_serde = "0.3.0"
amqprs = "1.2.0"
I'm new to Rust, so I don't even know where to start in this matter.