Sharing arrays between threads in Rust

568 Views Asked by At

I'm new to Rust and I'm struggling with some ownership semantics.

The goal is to do some nonsense measurements on multiplying 2 f64 arrays and writing the result in a third array.

In the single-threaded version, a single thread takes care of the whole range. In the multi-threaded version, each thread takes care of a segment of the range.

The single-threaded version is easy, but my problem is with the multithreaded version where I'm struggling with the ownership rules.

I was thinking to use raw pointers, to bypass the borrow checker. But I'm still not able to make it pass.

#![feature(box_syntax)]

use std::time::SystemTime;
use rand::Rng;
use std::thread;


fn main() {
    let nCells = 1_000_000;
    let concurrency = 1;

    let mut one = box [0f64; 1_000_000];
    let mut two = box [0f64; 1_000_000];
    let mut res = box [0f64; 1_000_000];

    println!("Creating data");
    let mut rng = rand::thread_rng();

    for i in 0..nCells {
        one[i] = rng.gen::<f64>();
        two[i] = rng.gen::<f64>();
        res[i] = 0 as f64;
    }
    println!("Finished creating data");

    let rounds = 100000;
    let start = SystemTime::now();
    let one_raw = Box::into_raw(one);
    let two_raw = Box::into_raw(two);
    let res_raw = Box::into_raw(res);

    let mut handlers = Vec::new();
    for _ in 0..rounds {
        let sizePerJob = nCells / concurrency;
        for j in 0..concurrency {
            let from = j * sizePerJob;
            let to = (j + 1) * sizePerJob;
            handlers.push(thread::spawn(|| {
                unsafe {
                    unsafe {
                        processData(one_raw, two_raw, res_raw, from, to);
                    }
                }
            }));
        }

        for j in 0..concurrency {
            handlers.get_mut(j).unwrap().join();
        }

        handlers.clear();
    }

    let durationUs = SystemTime::now().duration_since(start).unwrap().as_micros();
    let durationPerRound = durationUs / rounds;
    println!("duration per round {} us", durationPerRound);
}

// Make sure we can find the function in the generated Assembly
#[inline(never)]
pub fn processData(one: *const [f64;1000000],
                   two: *const [f64;1000000],
                   res: *mut [f64;1000000],
                   from: usize,
                   to: usize) {
    unsafe {
        for i in from..to {
            (*res)[i] = (*one)[i] * (*two)[i];
        }
    }
}

This is the error I'm getting

error[E0277]: `*mut [f64; 1000000]` cannot be shared between threads safely
   --> src/main.rs:38:27
    |
38  |             handlers.push(thread::spawn(|| {
    |                           ^^^^^^^^^^^^^ `*mut [f64; 1000000]` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `*mut [f64; 1000000]`
    = note: required because of the requirements on the impl of `Send` for `&*mut [f64; 1000000]`
note: required because it's used within this closure
   --> src/main.rs:38:41
    |
38  |             handlers.push(thread::spawn(|| {
    |                                         ^^
note: required by a bound in `spawn`
   --> /home/pveentjer/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:653:8
    |
653 |     F: Send + 'static,
    |        ^^^^ required by this bound in `spawn`

[edit] I know that spawning threads is very expensive. I'll convert this to a pool of worker threads that can be recycled once this code is up and running.

1

There are 1 best solutions below

7
Özgür Murat Sağdıçoğlu On BEST ANSWER

You can use chunks_mut or split_at_mut to get non-overlapping slices of one two and res. You can then access different slices from different threads safely. See: documentation for chunks_mut and documentation for split_at_mut

I was able to compile it using scoped threads and chunks_mut. I have removed all the unsafe stuff because there is no need. See the code:

#![feature(box_syntax)]
#![feature(scoped_threads)]

use rand::Rng;
use std::thread;
use std::time::SystemTime;

fn main() {
    let nCells = 1_000_000;
    let concurrency = 2;

    let mut one = box [0f64; 1_000_000];
    let mut two = box [0f64; 1_000_000];
    let mut res = box [0f64; 1_000_000];

    println!("Creating data");
    let mut rng = rand::thread_rng();

    for i in 0..nCells {
        one[i] = rng.gen::<f64>();
        two[i] = rng.gen::<f64>();
        res[i] = 0 as f64;
    }
    println!("Finished creating data");

    let rounds = 1000;
    let start = SystemTime::now();

    for _ in 0..rounds {
        let size_per_job = nCells / concurrency;
        thread::scope(|s| {
            for it in one
                .chunks_mut(size_per_job)
                .zip(two.chunks_mut(size_per_job))
                .zip(res.chunks_mut(size_per_job))
            {
                let ((one, two), res) = it;
                s.spawn(|| {
                    processData(one, two, res);
                });
            }
        });
    }

    let durationUs = SystemTime::now().duration_since(start).unwrap().as_micros();
    let durationPerRound = durationUs / rounds;
    println!("duration per round {} us", durationPerRound);
}

// Make sure we can find the function in the generated Assembly
#[inline(never)]
pub fn processData(one: &[f64], two: &[f64], res: &mut [f64]) {
    for i in 0..one.len() {
        res[i] = one[i] * two[i];
    }
}