How to create threads that last entire duration of program and pass immutable chunks for threads to operate on?

69 Views Asked by At

I have a bunch of math that has real time constraints. My main loop will just call this function repeatedly and it will always store results into an existing buffer. However, I want to be able to spawn the threads at init time and then allow the threads to run and do their work and then wait for more data. The synchronization I will use a Barrier and have that part working. What I can't get working and have tried various iterations of Arc or crossbeam is splitting the thread spawning up and the actual workload. This is what I have now.

pub const WORK_SIZE: usize = 524_288;
pub const NUM_THREADS: usize = 6;
pub const NUM_TASKS_PER_THREAD: usize = WORK_SIZE / NUM_THREADS;

fn main() {
    let mut work: Vec<f64> = Vec::with_capacity(WORK_SIZE);
    for i in 0..WORK_SIZE {
        work.push(i as f64);
    }
    crossbeam::scope(|scope| {
        let threads: Vec<_> = work
            .chunks(NUM_TASKS_PER_THREAD)
            .map(|chunk| scope.spawn(move |_| chunk.iter().cloned().sum::<f64>()))
            .collect();
        let threaded_time = std::time::Instant::now();
        let thread_sum: f64 = threads.into_iter().map(|t| t.join().unwrap()).sum();
        let threaded_micros = threaded_time.elapsed().as_micros() as f64;
        println!("threaded took: {:#?}", threaded_micros);
        let serial_time = std::time::Instant::now();
        let no_thread_sum: f64 = work.iter().cloned().sum();
        let serial_micros = serial_time.elapsed().as_micros() as f64;
        println!("serial took: {:#?}", serial_micros);

        assert_eq!(thread_sum, no_thread_sum);
        println!(
            "Threaded performace was {:?}",
            serial_micros / threaded_micros
        );
    })
    .unwrap();
}

But I can't find a way to spin these threads up in an init function and then in a do_work function pass work into them. I attempted to do something like this with Arc's and Mutex's but couldn't get everything straight there either. What I want to turn this into is something like the following

use std::sync::{Arc, Barrier, Mutex};
use std::{slice::Chunks, thread::JoinHandle};

pub const WORK_SIZE: usize = 524_288;
pub const NUM_THREADS: usize = 6;
pub const NUM_TASKS_PER_THREAD: usize = WORK_SIZE / NUM_THREADS;

//simplified version of what actual work that code base will do
fn do_work(data: &[f64], result: Arc<Mutex<f64>>, barrier: Arc<Barrier>) {
    loop {
        barrier.wait();
        let sum = data.into_iter().cloned().sum::<f64>();
        let mut result = *result.lock().unwrap();
        result += sum;
    }
}
fn init(
    mut data: Chunks<'_, f64>,
    result: &Arc<Mutex<f64>>,
    barrier: &Arc<Barrier>,
) -> Vec<std::thread::JoinHandle<()>> {
    let mut handles = Vec::with_capacity(NUM_THREADS);
    //spawn threads, in actual code these would be stored in a lib crate struct
    for i in 0..NUM_THREADS {
        let result = result.clone();
        let barrier = barrier.clone();
        let chunk = data.nth(i).unwrap();
        handles.push(std::thread::spawn(|| {
            //Pass the particular thread the particular chunk it will operate on.
            do_work(chunk, result, barrier);
        }));
    }
    handles
}
fn main() {
    let mut work: Vec<f64> = Vec::with_capacity(WORK_SIZE);
    let mut result = Arc::new(Mutex::new(0.0));
    for i in 0..WORK_SIZE {
        work.push(i as f64);
    }
    let work_barrier = Arc::new(Barrier::new(NUM_THREADS + 1));
    let threads = init(work.chunks(NUM_TASKS_PER_THREAD), &result, &work_barrier);
    loop {
        work_barrier.wait();
        //actual code base would do something with summation stored in result.
        println!("{:?}", result.lock().unwrap());
    }
}

I hope this expresses the intent clearly enough of what I need to do. The issue with this specific implementation is that the chunks don't seem to live long enough and when I tried wrapping them in an Arc as it just moved the argument doesn't live long enough to the Arc::new(data.chunk(_)) line.

1

There are 1 best solutions below

0
On
use std::sync::{Arc, Barrier, Mutex};
use std::thread;

pub const WORK_SIZE: usize = 524_288;
pub const NUM_THREADS: usize = 6;
pub const NUM_TASKS_PER_THREAD: usize = WORK_SIZE / NUM_THREADS;

//simplified version of what actual work that code base will do
fn do_work(data: &[f64], result: Arc<Mutex<f64>>, barrier: Arc<Barrier>) {
    loop {
        barrier.wait();
        let sum = data.iter().sum::<f64>();
        *result.lock().unwrap() += sum;
    }
}

fn init(
    work: Vec<f64>,
    result: Arc<Mutex<f64>>,
    barrier: Arc<Barrier>,
) -> Vec<thread::JoinHandle<()>> {
    let mut handles = Vec::with_capacity(NUM_THREADS);
    //spawn threads, in actual code these would be stored in a lib crate struct
    for i in 0..NUM_THREADS {
        let slice = work[i * NUM_TASKS_PER_THREAD..(i + 1) * NUM_TASKS_PER_THREAD].to_owned();
        let result = Arc::clone(&result);
        let w = Arc::clone(&barrier);
        handles.push(thread::spawn(move || {
            do_work(&slice, result, w);
        }));
    }
    handles
}

fn main() {
    let mut work: Vec<f64> = Vec::with_capacity(WORK_SIZE);
    let result = Arc::new(Mutex::new(0.0));
    for i in 0..WORK_SIZE {
        work.push(i as f64);
    }
    let work_barrier = Arc::new(Barrier::new(NUM_THREADS + 1));

    let _threads = init(work, Arc::clone(&result), Arc::clone(&work_barrier));

    loop {
        thread::sleep(std::time::Duration::from_secs(3));
        work_barrier.wait();
        //actual code base would do something with summation stored in result.
        println!("{:?}", result.lock().unwrap());
    }
}