Rust cache async traits

518 Views Asked by At

I'm running into an issue when I attempt to cache a value for as long as it's valid and update it when it becomes invalid. I believe the issue is due to my attempt to share state across async executions. Further, this component lives in a multi-threaded / concurrent environment.

The error I'm seeing that I don't know how to fix is

future is not `Send` as this value is used across an await

Following is a minimum example that I could come up with (it also features some ownership issues) that generally captures my use-case and the issue I'm seeing. Here is a playground of the code.

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::sync::{Arc, Mutex};

struct Creds {
    expires_at: DateTime<Utc>,
}

impl Creds {
    fn is_expired(&self) -> bool {
        self.expires_at.le(&Utc::now())
    }
}

#[async_trait]
trait CredsProvider {
    async fn get_creds(&self) -> Creds;
}

struct MyCredsProvider {
    cached_creds: Arc<Mutex<Option<Creds>>>,
}

impl MyCredsProvider {
    fn new() -> Self {
        MyCredsProvider {
            cached_creds: Arc::new(Mutex::new(None)),
        }
    }
    async fn inner_get_creds(&self) -> Creds {
        todo!()
    }
}

#[async_trait]
impl CredsProvider for MyCredsProvider {
    async fn get_creds(&self) -> Creds {
        let mg = self
            .cached_creds
            .lock()
            .expect("Unable to get lock on creds mutex");
        if mg.is_some() && !mg.as_ref().unwrap().is_expired() {
            return mg.unwrap();
        }
        let new_creds = self.inner_get_creds().await;
        *mg = Some(new_creds);
        return new_creds;
    }
}

#[tokio::main]
async fn main() {
    MyCredsProvider::new();
    // Some multi-threaded / concurrent logic to periodically refresh creds
    todo!()
}

I wasn't sure how to include this in the example but in main imagine multiple worker threads running concurrently / parallel that each call CredsProvider.get_creds and then use these creds to perform some work (if you can add that to a complete working example, that'd be much appreciated for my edification). Assume MyCredsProvider.inner_get_creds is expensive and should only be called when the cached creds expire.

How do I solve this? I thought that the Arc<Mutex<>> would be enough but it seems not. At one point, I tried making Creds and trait so that I could have Arc<Mutex<Option<Box<dyn Creds + Send + Sync>>>> but that felt like the wrong path and didn't work.

Thanks.

1

There are 1 best solutions below

0
On

You may would like to switch to tokio::sync::Mutex (playground).

It solves

future is not `Send` as this value is used across an await

Code:

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Clone)]
struct Creds {
    expires_at: DateTime<Utc>,
}

impl Creds {
    fn is_expired(&self) -> bool {
        self.expires_at.le(&Utc::now())
    }
}

#[async_trait]
trait CredsProvider {
    async fn get_creds(&self) -> Creds;
}

struct MyCredsProvider {
    cached_creds: Arc<Mutex<Option<Creds>>>,
}

impl MyCredsProvider {
    fn new() -> Self {
        MyCredsProvider {
            cached_creds: Arc::new(Mutex::new(None)),
        }
    }
    async fn inner_get_creds(&self) -> Creds {
        todo!()
    }
}

#[async_trait]
impl CredsProvider for MyCredsProvider {
    async fn get_creds(&self) -> Creds {
        let mut mg = self
            .cached_creds
            .lock()
            .await;
        if mg.is_some() && !mg.as_ref().unwrap().is_expired() {
            return mg.clone().unwrap();
        } else {
            let new_creds = self.inner_get_creds().await;
            *mg = Some(new_creds.clone());
            return new_creds;
        }
    }
}

#[tokio::main]
async fn main() {
    MyCredsProvider::new();
    // Some multi-threaded / concurrent logic to periodically refresh creds
    todo!()
}