Rust axum connect to RabbitMQ

110 Views Asked by At

I have a problem with forwarding a RabbitMQ connection via with_state When I create an instance, Rmq channel.is_open() returns true (it returns the same thing in the main.rs file), but in the handler itself I access state.infrastructure.rmq.channel.is_open() and in the response I see false and this is strange because I do similar actions with the Clickhouse connection and it works (I think it connects on request) I would like the connection to be created once at the start and then be present in the state Below I will present the code for better understanding.

Here is the Rmq structure that connects and saves the channel (src/infrastructure/rmq/mod.rs)

mod config;

use std::time;
use amqprs::{
    callbacks::DefaultConnectionCallback,
    channel::Channel,
    connection::{Connection, OpenConnectionArguments}
};

use self::config::RmqConfig;

#[derive(Clone)]
pub struct Rmq {
    pub channel: Channel
}

impl Rmq {
    pub async fn new() -> Self {
        let config = envy::from_env::<RmqConfig>().unwrap();
    
        let connection_options = OpenConnectionArguments::new(
            &config.amqp_host,
            config.amqp_port,
            &config.amqp_user,
            &config.amqp_pass,
        );
    
        let mut res = Connection::open(&connection_options)
        .await;
    
        while res.is_err() {
            tracing::info!("trying to connect after error");
            std::thread::sleep(time::Duration::from_millis(2000));
            res = Connection::open(&connection_options)
            .await;
        }
    
        let connection = res.unwrap();
        
        tracing::info!("Connection name: {}", connection.connection_name());
    
        connection
            .register_callback(DefaultConnectionCallback)
            .await
            .unwrap();

        let channel = connection.open_channel(None).await.unwrap();
        tracing::info!("Channel id: {}", channel.channel_id());
        
        Self {
            channel
        }
    }
}

Here is the general module where connections are created (src/infrastructure/mod.rs)

use self::{clickhouse::ClickHouse, rmq::Rmq};

pub mod rmq;
pub mod clickhouse;

#[derive(Clone)]
pub struct InfrastructureState {
    pub rmq: Rmq,
    pub ch: ClickHouse
}

pub async fn main() -> InfrastructureState {
    let mut ch = clickhouse::ClickHouse::new();
    let _ = ch.get_schema().await;

    let rmq = Rmq::new().await;

    InfrastructureState { rmq, ch }
}

Main file (src/main.rs)

mod config;
mod features;
mod infrastructure;

use std::net::SocketAddr;

use axum::{
    routing::{get, post},
    Router
};
use config::{AppConfig, AppState};
use dotenv::dotenv;

#[tokio::main]
async fn main() {
    dotenv().ok();

    let config = envy::from_env::<AppConfig>().unwrap();

    if cfg!(debug_assertions) {
        tracing_subscriber::fmt().init();
        tracing::info!("debugging enabled");
    } else {
        tracing_subscriber::fmt().json().init();
    }

    let infrastructure = infrastructure::main().await;
    let state = AppState {
        config,
        infrastructure
    };

    let addr = SocketAddr::from(([0, 0, 0, 0], state.config.api_port));
    let app = Router::new()
    .route("/upload_metric", post(features::metrics::main))
    .with_state(state.clone())


    tracing::info!("Listen on {addr} address");
    axum::Server::bind(&addr)
    .serve(app.into_make_service())
    .await
    .unwrap()
}

Here is the handler itself

use std::collections::HashMap;
use serde_json::Value;


use crate::{config::AppState, infrastructure::rmq::Rmq};

pub async fn main(State(mut state): State<AppState>, Json(body): Json<Vec<HashMap<String, Value>>>) -> Result<Response, Response> {
    // is false
    println!("{}", state.infrastructure.rmq.channel.is_open());

    Ok((StatusCode::CREATED, "Success").into_response())
}

And just in case the versions of my dependencies

[dependencies]
serde = { version = "1.0.167", features = ["derive"] }
serde_json = "1.0.107"
tokio = { version = "1.33.0", features = ["full"] }
dotenv = "0.15.0"
envy = "0.4.2"
axum = "0.6.20"
tracing = "0.1.39"
clickhouse = "0.11.6"
tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] }
amqp_serde = "0.3.0"
amqprs = "1.2.0"

I'm new to Rust, so I don't even know where to start in this matter.

0

There are 0 best solutions below