rust AWS multipart upload using rusoto, multithreaded (rayon) panicked at 'there is no reactor running ...`

1.2k Views Asked by At

I'm trying to upload a file to aws in rust, for that I'm using the s3 rust client by rusoto_s3, I managed to get the multipart upload code working when these parts are sent from a single thread, however, that is not what I want, I want to upload big files and I want to be able to send these parts in multiple threads, for that, I did a little bit of googling and I came across rayon.

For info the way multipart upload works is as follows:

  1. Initiate the multipart -> aws will return an ID
  2. Use this ID to send the different parts, pass the file chunk, and the part number -> aws will return an Etag
  3. Once you sent all the parts, send a complete upload request with all the completed parts as an array contains the Etag and the part number.

I'm new to rust, coming from C++ and Java background, here is my code:

#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
    let now = Instant::now();
    dotenv().ok();
    let local_filename = "./files/test_big.DCM";
    let destination_filename = "24_time_test.dcm";

    let mut file = std::fs::File::open(local_filename).unwrap();
    const CHUNK_SIZE: usize = 7_000_000;
    let mut buffer = Vec::with_capacity(CHUNK_SIZE);

    let client = super::get_client().await;
    let create_multipart_request = CreateMultipartUploadRequest {
        bucket: client.bucket_name.to_owned(),
        key: destination_filename.to_owned(),
        ..Default::default()
    };

    // Start the multipart upload and note the upload_id generated
    let response = client
        .s3
        .create_multipart_upload(create_multipart_request)
        .await
        .expect("Couldn't create multipart upload");
    let upload_id = response.upload_id.unwrap();

    // Create upload parts
    let create_upload_part = |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
        UploadPartRequest {
            body: Some(body.into()),
            bucket: client.bucket_name.to_owned(),
            key: destination_filename.to_owned(),
            upload_id: upload_id.to_owned(),
            part_number: part_number,
            ..Default::default()
        }
    };

    let completed_parts = Arc::new(Mutex::new(vec![]));

    rayon::scope(|scope| {
        let mut part_number = 1;
        loop {
            let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
            println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
            file.by_ref()
                .take(maximum_bytes_to_read as u64)
                .read_to_end(&mut buffer)
                .unwrap();

            println!("length: {}", buffer.len());
            println!("part_number: {}", part_number);
            if buffer.len() == 0 {
                // The file has ended.
                break;
            }

            let next_buffer = Vec::with_capacity(CHUNK_SIZE);
            let data_to_send = buffer;
            let completed_parts_cloned = completed_parts.clone();
            scope.spawn(move |_| {
                let part = create_upload_part(data_to_send.to_vec(), part_number);
                {
                    let part_number = part.part_number;
                    let client = executor::block_on(super::get_client());
                    let response = executor::block_on(client.s3.upload_part(part));

                    completed_parts_cloned.lock().unwrap().push(CompletedPart {
                        e_tag: response
                            .expect("Couldn't complete multipart upload")
                            .e_tag
                            .clone(),
                        part_number: Some(part_number),
                    });
                }
            });

            buffer = next_buffer;
            part_number = part_number + 1;
        }
    });

    let completed_upload = CompletedMultipartUpload {
        parts: Some(completed_parts.lock().unwrap().to_vec()),
    };

    let complete_req = CompleteMultipartUploadRequest {
        bucket: client.bucket_name.to_owned(),
        key: destination_filename.to_owned(),
        upload_id: upload_id.to_owned(),
        multipart_upload: Some(completed_upload),
        ..Default::default()
    };
    client
        .s3
        .complete_multipart_upload(complete_req)
        .await
        .expect("Couldn't complete multipart upload");
    println!(
        "time taken: {}, with chunk:: {}",
        now.elapsed().as_secs(),
        CHUNK_SIZE
    );
}

here is a sample of the output and error I'm getting:

maximum_bytes_to_read: 7000000
length: 7000000
part_number: 1
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 2
maximum_bytes_to_read: 7000000
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
length: 7000000

I googled this error but I did not have a clear understanding on what actually its:

there is no reactor running, must be called from the context of Tokio runtime” 

Here is what I found: another question with the same error

and another question

Which seems its some compatibility issue because s3 might be using some version of tokio that is not compatible with the version of tokio I have.

Here are some relevant dependencies:

tokio = { version = "1", features = ["full"] }
tokio-compat-02 = "0.1.2"
rusoto_s3 = "0.46.0"
rusoto_core = "0.46.0"
rusoto_credential = "0.46.0"
rayon = "1.5.0"

I think the main issue comes on actually wanting to run async code in a rayon thread. I tried changing my async code to blocking code using executor::block_on, I also spend some time trying to make the compiler happy, I have multiple threads they all want to write to let completed_parts = Arc::new(Mutex::new(vec![])); so I did some cloning here to make the complier happy.

Also if my used craes matter, here are they:

extern crate dotenv;
extern crate tokio;
use bytes::Bytes;
use dotenv::dotenv;
use futures::executor;
use futures::*;
use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials};
use rusoto_s3::util::{PreSignedRequest, PreSignedRequestOption};
use rusoto_s3::PutObjectRequest;
use rusoto_s3::StreamingBody;
use rusoto_s3::{
    CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
    CreateMultipartUploadRequest, UploadPartRequest, S3,
};

use std::io::Read;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use tokio::fs;

New to rust, so there a lot of moving pieces to get this one right!

1

There are 1 best solutions below

0
On BEST ANSWER

Thanks @Jmb for the discussion, I got rid of the threads and I spawn a tokio task as follows:

create a vector to hold or the futures so we could wait for them:

let mut multiple_parts_futures = Vec::new();

spawn the async task:

loop { // loop file chuncks
    ...
    let send_part_task_future = tokio::task::spawn(async move {
    // Upload part
    ...
}

and then later wait for all futures:

let _results = futures::future::join_all(multiple_parts_futures).await;

worth mentioning, the completed parts need to be sorted:

let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);

The whole code is:

#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
    let now = Instant::now();
    dotenv().ok();
    let local_filename = "./files/test_big.DCM";
    let destination_filename = generate_unique_name();
    let destination_filename_clone = destination_filename.clone();
    let mut file = std::fs::File::open(local_filename).unwrap();
    const CHUNK_SIZE: usize = 6_000_000;
    let mut buffer = Vec::with_capacity(CHUNK_SIZE);

    let client = super::get_client().await;
    let create_multipart_request = CreateMultipartUploadRequest {
        bucket: client.bucket_name.to_owned(),
        key: destination_filename.to_owned(),
        ..Default::default()
    };

    // Start the multipart upload and note the upload_id generated
    let response = client
        .s3
        .create_multipart_upload(create_multipart_request)
        .await
        .expect("Couldn't create multipart upload");
    let upload_id = response.upload_id.unwrap();

    let upload_id_clone = upload_id.clone();
    // Create upload parts
    let create_upload_part = move |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
        UploadPartRequest {
            body: Some(body.into()),
            bucket: client.bucket_name.to_owned(),
            key: destination_filename_clone.to_owned(),
            upload_id: upload_id_clone.to_owned(),
            part_number: part_number,
            ..Default::default()
        }
    };

    let create_upload_part_arc = Arc::new(create_upload_part);
    let completed_parts = Arc::new(Mutex::new(vec![]));

    let mut part_number = 1;

    let mut multiple_parts_futures = Vec::new();
    loop {
        let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
        println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
        file.by_ref()
            .take(maximum_bytes_to_read as u64)
            .read_to_end(&mut buffer)
            .unwrap();
        println!("length: {}", buffer.len());
        println!("part_number: {}", part_number);
        if buffer.len() == 0 {
            // The file has ended.
            break;
        }
        let next_buffer = Vec::with_capacity(CHUNK_SIZE);
        let data_to_send = buffer;
        let completed_parts_cloned = completed_parts.clone();
        let create_upload_part_arc_cloned = create_upload_part_arc.clone();
        let send_part_task_future = tokio::task::spawn(async move {
            let part = create_upload_part_arc_cloned(data_to_send.to_vec(), part_number);
            {
                let part_number = part.part_number;
                let client = super::get_client().await;
                let response = client.s3.upload_part(part).await;
                completed_parts_cloned.lock().unwrap().push(CompletedPart {
                    e_tag: response
                        .expect("Couldn't complete multipart upload")
                        .e_tag
                        .clone(),
                    part_number: Some(part_number),
                });
            }
        });
        multiple_parts_futures.push(send_part_task_future);
        buffer = next_buffer;
        part_number = part_number + 1;
    }
    let client = super::get_client().await;
    println!("waiting for futures");
    let _results = futures::future::join_all(multiple_parts_futures).await;

    let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
    completed_parts_vector.sort_by_key(|part| part.part_number);
    println!("futures done");
    let completed_upload = CompletedMultipartUpload {
        parts: Some(completed_parts_vector),
    };

    let complete_req = CompleteMultipartUploadRequest {
        bucket: client.bucket_name.to_owned(),
        key: destination_filename.to_owned(),
        upload_id: upload_id.to_owned(),
        multipart_upload: Some(completed_upload),
        ..Default::default()
    };

    client
        .s3
        .complete_multipart_upload(complete_req)
        .await
        .expect("Couldn't complete multipart upload");
    println!(
        "time taken: {}, with chunk:: {}",
        now.elapsed().as_secs(),
        CHUNK_SIZE
    );
}