I have a bunch of math that has real time constraints. My main loop will just call this function repeatedly and it will always store results into an existing buffer. However, I want to be able to spawn the threads at init time and then allow the threads to run and do their work and then wait for more data. The synchronization I will use a Barrier and have that part working. What I can't get working and have tried various iterations of Arc or crossbeam is splitting the thread spawning up and the actual workload. This is what I have now.
pub const WORK_SIZE: usize = 524_288;
pub const NUM_THREADS: usize = 6;
pub const NUM_TASKS_PER_THREAD: usize = WORK_SIZE / NUM_THREADS;
fn main() {
let mut work: Vec<f64> = Vec::with_capacity(WORK_SIZE);
for i in 0..WORK_SIZE {
work.push(i as f64);
}
crossbeam::scope(|scope| {
let threads: Vec<_> = work
.chunks(NUM_TASKS_PER_THREAD)
.map(|chunk| scope.spawn(move |_| chunk.iter().cloned().sum::<f64>()))
.collect();
let threaded_time = std::time::Instant::now();
let thread_sum: f64 = threads.into_iter().map(|t| t.join().unwrap()).sum();
let threaded_micros = threaded_time.elapsed().as_micros() as f64;
println!("threaded took: {:#?}", threaded_micros);
let serial_time = std::time::Instant::now();
let no_thread_sum: f64 = work.iter().cloned().sum();
let serial_micros = serial_time.elapsed().as_micros() as f64;
println!("serial took: {:#?}", serial_micros);
assert_eq!(thread_sum, no_thread_sum);
println!(
"Threaded performace was {:?}",
serial_micros / threaded_micros
);
})
.unwrap();
}
But I can't find a way to spin these threads up in an init function and then in a do_work function pass work into them. I attempted to do something like this with Arc's and Mutex's but couldn't get everything straight there either. What I want to turn this into is something like the following
use std::sync::{Arc, Barrier, Mutex};
use std::{slice::Chunks, thread::JoinHandle};
pub const WORK_SIZE: usize = 524_288;
pub const NUM_THREADS: usize = 6;
pub const NUM_TASKS_PER_THREAD: usize = WORK_SIZE / NUM_THREADS;
//simplified version of what actual work that code base will do
fn do_work(data: &[f64], result: Arc<Mutex<f64>>, barrier: Arc<Barrier>) {
loop {
barrier.wait();
let sum = data.into_iter().cloned().sum::<f64>();
let mut result = *result.lock().unwrap();
result += sum;
}
}
fn init(
mut data: Chunks<'_, f64>,
result: &Arc<Mutex<f64>>,
barrier: &Arc<Barrier>,
) -> Vec<std::thread::JoinHandle<()>> {
let mut handles = Vec::with_capacity(NUM_THREADS);
//spawn threads, in actual code these would be stored in a lib crate struct
for i in 0..NUM_THREADS {
let result = result.clone();
let barrier = barrier.clone();
let chunk = data.nth(i).unwrap();
handles.push(std::thread::spawn(|| {
//Pass the particular thread the particular chunk it will operate on.
do_work(chunk, result, barrier);
}));
}
handles
}
fn main() {
let mut work: Vec<f64> = Vec::with_capacity(WORK_SIZE);
let mut result = Arc::new(Mutex::new(0.0));
for i in 0..WORK_SIZE {
work.push(i as f64);
}
let work_barrier = Arc::new(Barrier::new(NUM_THREADS + 1));
let threads = init(work.chunks(NUM_TASKS_PER_THREAD), &result, &work_barrier);
loop {
work_barrier.wait();
//actual code base would do something with summation stored in result.
println!("{:?}", result.lock().unwrap());
}
}
I hope this expresses the intent clearly enough of what I need to do. The issue with this specific implementation is that the chunks don't seem to live long enough and when I tried wrapping them in an Arc as it just moved the argument doesn't live long enough to the Arc::new(data.chunk(_)) line.