Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
395 views
in Technique[技术] by (71.8m points)

multithreading - What is the idiom for cooperative interruption of a CPU-bound worker thread?

I am experimenting with multithreading in Rust. I have created a toy example based on the Sieve of Eratosthenes for finding primes. Each worker thread is given a list of primes to check as divisors for the new candidate.

for chunk in &primes {
    let tx2 = mpsc::Sender::clone(&tx);
    let p2 = Arc::clone(chunk);
    thread::spawn(move || {
        let result = divisible_by_any(i, &p2.lock().unwrap());
        tx2.send(result).unwrap();
    });
}
let mut any = false;
for _i in 0..primes.len() {
    let result = rx.recv().unwrap();
    if result { any = true }
}

One possible optimization is to allow the ringleader process to "interrupt" the worker threads as soon as any of the threads finds a divisor. I am imagining that the worker threads (divisible_by_any) would not be killed, but would check for some kind of semaphore that would indicate if the "interruption" has been requested so it can stop scanning for divisors.

What is the Rust idiom for raising a semaphore that can be noticed/sampled by multiple CPU-bound worker threads?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Literal interruption in Rust is not generally done because side effects of the interrupt can interfere with language guarantees involving borrowing and dropping. The best practice here, if you're not ok with simply letting the worker threads waste their effort on the remainder of their chunks, is for workers to periodically poll some kind of simple status object.

let quit_flag = Arc::new(AtomicBool::new(false));
for chunk in &primes {
    let tx2 = mpsc::Sender::clone(&tx);
    let p2 = Arc::clone(chunk);
    let qf = Arc::clone(quit_flag);
    thread::spawn(move || {
        // repeatedly accessing an &AtomicBool is cheaper than an 
        // Arc<AtomicBool>, so we're unwrapping it here.
        let quit = qf.as_ref(); 
        // divisible_by_any needs to check the quit flag occasionally.
        let result = divisible_by_any(i, &p2.lock().unwrap(), quit);
        tx2.send(result).unwrap();
    });
}
for _i in 0..primes.len() {
    let result = rx.recv().unwrap();
    if result.is_some() { (*quit_flag).store(true, Ordering::Relaxed); }
}

// Some parts of this are best-guesses and pseudocode
fn divisible_by_any(i: Integer, chunk: Chunk, quit: &AtomicBool) -> Option<Integer>
{
    for sub_chunk in chunk.sub_chunks() {
        if quit.load(Ordering::Relaxed) {
            return None;
        }
        for j in sub_chunk {
            // do work, maybe returning Some(Integer)
        }
    }
    None
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...