I'm writing an application using actix_web
and rusoto_s3
.
When I run a command outside of an actix request directly from main
, it runs fine, and the get_object
works as expected. When this is encapsulated inside an actix_web request, the stream is blocked forever.
I have a client that is shared for all requests which is encapsulated into an Arc
(this happens in actix data internals).
Full code:
fn index(
_req: HttpRequest,
path: web::Path<String>,
s3: web::Data<S3Client>,
) -> impl Future<Item = HttpResponse, Error = actix_web::Error> {
s3.get_object(GetObjectRequest {
bucket: "my_bucket".to_owned(),
key: path.to_owned(),
..Default::default()
})
.and_then(move |res| {
info!("Response {:?}", res);
let mut stream = res.body.unwrap().into_blocking_read();
let mut body = Vec::new();
stream.read_to_end(&mut body).unwrap();
match process_file(body.as_slice()) {
Ok(result) => Ok(result),
Err(error) => Err(RusotoError::from(error)),
}
})
.map_err(|e| match e {
RusotoError::Service(GetObjectError::NoSuchKey(key)) => {
actix_web::error::ErrorNotFound(format!("{} not found", key))
}
error => {
error!("Error: {:?}", error);
actix_web::error::ErrorInternalServerError("error")
}
})
.from_err()
.and_then(move |img| HttpResponse::Ok().body(Body::from(img)))
}
fn health() -> HttpResponse {
HttpResponse::Ok().finish()
}
fn main() -> std::io::Result<()> {
let name = "rust_s3_test";
env::set_var("RUST_LOG", "debug");
pretty_env_logger::init();
let sys = actix_rt::System::builder().stop_on_panic(true).build();
let prometheus = PrometheusMetrics::new(name, "/metrics");
let s3 = S3Client::new(Region::Custom {
name: "eu-west-1".to_owned(),
endpoint: "http://localhost:9000".to_owned(),
});
let s3_client_data = web::Data::new(s3);
Server::build()
.bind(name, "0.0.0.0:8080", move || {
HttpService::build().keep_alive(KeepAlive::Os).h1(App::new()
.register_data(s3_client_data.clone())
.wrap(prometheus.clone())
.wrap(actix_web::middleware::Logger::default())
.service(web::resource("/health").route(web::get().to(health)))
.service(web::resource("/{file_name}").route(web::get().to_async(index))))
})?
.start();
sys.run()
}
In stream.read_to_end
the thread is being blocked and never resolved.
I have tried cloning the client per request and also creating a new client per request, but I've got the same result in all scenarios.
Am I doing something wrong?
It works if I don't use it async...
s3.get_object(GetObjectRequest {
bucket: "my_bucket".to_owned(),
key: path.to_owned(),
..Default::default()
})
.sync()
.unwrap()
.body
.unwrap()
.into_blocking_read();
let mut body = Vec::new();
io::copy(&mut stream, &mut body);
Is this an issue with Tokio?
Check the implementation of
into_blocking_read()
: it calls.wait()
. You shouldn't call blocking code inside aFuture
.Since Rusoto's
body
is aStream
, there is a way to read it asynchronously:process_file
should not block the enclosingFuture
. If it needs to block, you may consider running it on new thread or encapsulate with tokio_threadpool'sblocking
.Note: You can use tokio_threadpool's
blocking
in your implementation, but I recommend you understand how it works first.If you are not aiming to load the whole file into memory, you can use
for_each
:See also: