How can I upload file to s3 using rusoto, without reading file content to memory (streamed)?
With this code:
use std::fs::File;
use std::io::BufReader;
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};
fn main() {
let file = File::open("input.txt").unwrap();
let mut reader = BufReader::new(file);
let s3_client = S3Client::new(Region::UsEast1);
let result = s3_client.put_object(PutObjectRequest {
bucket: String::from("example_bucket"),
key: "example_filename".to_string(),
// this works:
// body: Some("example string".to_owned().into_bytes().into()),
// this doesn't:
body: Some(StreamingBody::new(reader)),
..Default::default()
}).sync().expect("could not upload");
}
I receive the following error:
error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied --> src/bin/example.rs:18:20 | 18 | body: Some(StreamingBody::new(reader)), | ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>` | = note: required by `rusoto_core::stream::ByteStream::new`
Okay. Strap yourself in, this is a fun one.
StreamingBody
is an alias forByteStream
, which itself takes a parameter typeS: Stream<Item = Bytes, Error = Error> + Send + 'static
. In short, it needs to be a stream of bytes.BufReader
, evidently, does not implement this trait, as it predates futures and streams by a long while. There is also no easy conversion toStream<Item = Bytes>
that you can use to implicitly convert into this.The reason the first (commented) example works is because
String::into_bytes().into()
will follow the typecast chain:String
->Vec<u8>
->ByteStream
thanks to the implementation ofFrom<Vec<u8>>
onByteStream
.Now that we know why this doesn't work, we can fix it. There is a fast way, and then there is a right way. I'll show you both.
The fast way
The fast (but not optimal) way is simply to call
File::read_to_end()
. This will fill up aVec<u8>
, which you can then use like you did before:This is inefficient and suboptimal for two reasons:
read_to_end()
is a blocking call. Based on where you are reading the file from, this blocking time may prove unreasonableVec
definition + some extra we don't really care about)The good way
The good way turns your file into a structure implementing
AsyncRead
. From this, we can then form aStream
.Since you already have a
std::fs::File
, we will first convert it into atokio::fs::File
. This implementsAsyncRead
, which is very important for later:From this, we sadly need to do some pipework to get it into a
Stream
. Multiple crates have implemented it; the way to do so from scratch is the following:byte_stream
is an instance oftokio_util::codec::FramedRead
which implementsStream
with a specific item based on our decoder. As our decoder isBytesCodec
, your stream is thereforeStream<Item = BytesMut>
.As the playground doesn't know
rusoto_core
, I cannot show you the full flow. I can, however, show you that you can generate aStream<Item = Vec<u8>, Error = io::Error>
, which is the crux of this: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133