How could I store closures and use them with Actix actors?

1k Views Asked by At

I'm trying to use Actix to communicate capture events through WebSockets and process them using something like https://github.com/foochi/how-store-closures-with-actix. The idea is providing a library that can be used to store closures (events), and run them when a WebSockets text message is received.

use actix::*;
use actix_web::ws::{Client, Message, ProtocolError};
use futures::Future;

struct MyActor {
    handler: Box<Fn(String) + 'static>,
}

impl Actor for MyActor {
    type Context = Context<Self>;
}

impl StreamHandler<Message, ProtocolError> for MyActor {
    fn handle(&mut self, msg: Message, _ctx: &mut Context<Self>) {
        match msg {
            Message::Text(text) => {
                (self.handler)(text)
            },
            _ => panic!(),
        }
    }
}

pub struct Event {
    handler: Box<Fn(String) + 'static>,
}

pub struct EventManager {
    events: Vec<Event>,
}

impl EventManager {

    pub fn new() -> Self {
        Self { events: vec![] }
    }

    pub fn capture<F>(&mut self, function: F)
    where
        F: for<'h> Fn(String) + 'static
    {
        let event = Event { handler: Box::new(function), };
        self.events.push(event);
    }

    pub fn run(&self) {
        let runner = System::new("example");
        let event = &self.events[0];

        Arbiter::spawn(
            Client::new("example")
                .connect()
                .map(|(reader, _writer)| {
                    MyActor::create(|ctx| {
                        MyActor::add_stream(reader, ctx);
                        MyActor { handler: event.handler }
                    });
                })
                .map_err(|err| {})
        );

        runner.run();
    }
}

My problem is that I have this error:

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
  --> src/events.rs:48:22
   |
48 |         let event = &self.events[0];
   |                      ^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 46:5...
  --> src/events.rs:46:5
   |
46 | /     pub fn run(&self) {
47 | |         let runner = System::new("example");
48 | |         let event = &self.events[0];
49 | |
...  |
62 | |         runner.run();
63 | |     }
   | |_____^
note: ...so that reference does not outlive borrowed content
  --> src/events.rs:48:22
   |
48 |         let event = &self.events[0];
   |                      ^^^^^^^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/events.rs:54:37: 57:22 reader:actix_web::ws::ClientReader, event:&&events::Event]` will meet its required lifetime bounds
  --> src/events.rs:54:21
   |
54 |                     MyActor::create(|ctx| {
   |                     ^^^^^^^^^^^^^^^

I think that I partly understand the root cause: I'm trying to pass a reference (the event) to the StreamHandler, but the lifetimes don't match.

How could I fix it?

1

There are 1 best solutions below

1
On BEST ANSWER

Disclaimer: I'm not able to comment if this is a good design pattern for Actix as I'm just beginning understanding the framework.

As you have already discovered, the problem is related to lifetime requirements.

The Actor::create method requires a 'static lifetime for the closure argument:

fn create<F>(f: F) -> Addr<Self> 
where
    Self: Actor<Context = Context<Self>>,
    F: FnOnce(&mut Context<Self>) -> Self + 'static, 

&self.events[0] does not satisfy the 'static lifetime requirement.

One solution is to move ownership of the EventManager object to MyActor:

use actix::*;
use actix_web::ws::{Client, Message, ProtocolError};
use futures::Future;

struct MyActor {
    evm: EventManager,
}

impl Actor for MyActor {
    type Context = Context<Self>;
}

impl StreamHandler<Message, ProtocolError> for MyActor {
    fn handle(&mut self, msg: Message, _ctx: &mut Context<Self>) {
        match msg {
            Message::Text(text) => {
                // just for sake of demo: execute all event handlers
                for idx in 0..self.evm.events.len() {
                    (self.evm.events[idx].handler)(text.clone())
                }
            }
            _ => panic!(),
        }
    }
}

pub struct Event {
    handler: Box<Fn(String) + 'static>,
}

pub struct EventManager {
    events: Vec<Event>,
}

impl EventManager {
    pub fn new() -> Self {
        Self { events: vec![] }
    }

    pub fn capture<F>(&mut self, function: F)
    where
        F: Fn(String) + 'static,
    {
        let event = Event {
            handler: Box::new(function),
        };
        self.events.push(event);
    }

    pub fn run(self) {
        let runner = System::new("example");

        Arbiter::spawn(
            Client::new("http://127.0.0.1:8080/ws/")
                .connect()
                .map(|(reader, _writer)| {
                    MyActor::create(|ctx| {
                        MyActor::add_stream(reader, ctx);

                        // move event manager inside the actor
                        MyActor { evm: self }
                    });
                }).map_err(|err| println!("FATAL: {}", err)),
        );

        runner.run();
    }
}

pub fn ready() {
    let mut client = EventManager::new();

    client.capture(|data| println!("processing the data: {:?}", data));
    client.capture(|data| println!("processing AGAIN the data: {:?}", data));
    client.run();

    // here run client is not more available: it was moved inside the actor
}

fn main() {
    ready();
}