wasmtime/runtime/vm/
parking_spot.rs

1//! Implements thread wait and notify primitives with `std::sync` primitives.
2//!
3//! This is a simplified version of the `parking_lot_core` crate.
4//!
5//! There are two main operations that can be performed:
6//!
7//! - *Parking* refers to suspending the thread while simultaneously enqueuing it
8//!   on a queue keyed by some address.
9//! - *Unparking* refers to dequeuing a thread from a queue keyed by some address
10//!   and resuming it.
11
12#![deny(missing_docs)]
13
14use crate::prelude::*;
15use crate::runtime::vm::{SendSyncPtr, WaitResult};
16use std::collections::BTreeMap;
17use std::ptr::NonNull;
18use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::SeqCst};
19use std::sync::Mutex;
20use std::thread::{self, Thread};
21use std::time::{Duration, Instant};
22
23#[derive(Default, Debug)]
24struct Spot {
25    head: Option<SendSyncPtr<WaiterInner>>,
26    tail: Option<SendSyncPtr<WaiterInner>>,
27}
28
29/// The thread global `ParkingSpot`.
30#[derive(Default, Debug)]
31pub struct ParkingSpot {
32    inner: Mutex<BTreeMap<u64, Spot>>,
33}
34
35#[derive(Default)]
36pub struct Waiter {
37    inner: Option<Box<WaiterInner>>,
38}
39
40struct WaiterInner {
41    // NB: this field may be read concurrently, but is only written under the
42    // lock of a `ParkingSpot`.
43    thread: Thread,
44
45    // NB: these fields are only modified/read under the lock of a
46    // `ParkingSpot`.
47    notified: bool,
48    next: Option<SendSyncPtr<WaiterInner>>,
49    prev: Option<SendSyncPtr<WaiterInner>>,
50}
51
52impl ParkingSpot {
53    /// Atomically validates if `atomic == expected` and, if so, blocks the
54    /// current thread.
55    ///
56    /// This method will first check to see if `atomic == expected` using a
57    /// `SeqCst` load ordering. If the values are not equal then the method
58    /// immediately returns with `WaitResult::Mismatch`. Otherwise the thread
59    /// will be blocked and can only be woken up with `notify` on the same
60    /// address. Note that the check-and-block operation is atomic with respect
61    /// to `notify`.
62    ///
63    /// The optional `deadline` specified can indicate a point in time after
64    /// which this thread will be unblocked. If this thread is not notified and
65    /// `deadline` is reached then `WaitResult::TimedOut` is returned. If
66    /// `deadline` is `None` then this thread will block forever waiting for
67    /// `notify`.
68    ///
69    /// The `waiter` argument is metadata used by this structure to block
70    /// the current thread.
71    ///
72    /// This method will not spuriously wake up one blocked.
73    pub fn wait32(
74        &self,
75        atomic: &AtomicU32,
76        expected: u32,
77        deadline: impl Into<Option<Instant>>,
78        waiter: &mut Waiter,
79    ) -> WaitResult {
80        self.wait(
81            atomic.as_ptr() as u64,
82            || atomic.load(SeqCst) == expected,
83            deadline.into(),
84            waiter,
85        )
86    }
87
88    /// Same as `wait32`, but for 64-bit values.
89    pub fn wait64(
90        &self,
91        atomic: &AtomicU64,
92        expected: u64,
93        deadline: impl Into<Option<Instant>>,
94        waiter: &mut Waiter,
95    ) -> WaitResult {
96        self.wait(
97            atomic.as_ptr() as u64,
98            || atomic.load(SeqCst) == expected,
99            deadline.into(),
100            waiter,
101        )
102    }
103
104    fn wait(
105        &self,
106        key: u64,
107        validate: impl FnOnce() -> bool,
108        deadline: Option<Instant>,
109        waiter: &mut Waiter,
110    ) -> WaitResult {
111        let mut inner = self
112            .inner
113            .lock()
114            .expect("failed to lock inner parking table");
115
116        // This is the "atomic" part of the `validate` check which ensure that
117        // the memory location still indicates that we're allowed to block.
118        if !validate() {
119            return WaitResult::Mismatch;
120        }
121
122        // Lazily initialize the `waiter` node if it hasn't been already, and
123        // additionally ensure it's not accidentally in some other queue.
124        let waiter = waiter.inner.get_or_insert_with(|| {
125            Box::new(WaiterInner {
126                next: None,
127                prev: None,
128                notified: false,
129                thread: thread::current(),
130            })
131        });
132        assert!(waiter.next.is_none());
133        assert!(waiter.prev.is_none());
134
135        // Clear the `notified` flag if it was previously notified and
136        // configure the thread to wakeup as our own.
137        waiter.notified = false;
138        waiter.thread = thread::current();
139
140        let ptr = SendSyncPtr::new(NonNull::from(&mut **waiter));
141        let spot = inner.entry(key).or_insert_with(Spot::default);
142        unsafe {
143            // Enqueue our `waiter` in the internal queue for this spot.
144            spot.push(ptr);
145
146            // Wait for a notification to arrive. This is done through
147            // `std::thread::park_timeout` by dropping the lock that is held.
148            // This loop is somewhat similar to a condition variable.
149            //
150            // If no timeout was given then the maximum duration is effectively
151            // infinite (500 billion years), otherwise the timeout is
152            // calculated relative to the `deadline` specified.
153            //
154            // To handle spurious wakeups if the thread wakes up but a
155            // notification wasn't received then the thread goes back to sleep.
156            let timed_out = loop {
157                let timeout = match deadline {
158                    Some(deadline) => {
159                        let now = Instant::now();
160                        if deadline <= now {
161                            break true;
162                        } else {
163                            deadline - now
164                        }
165                    }
166                    None => Duration::MAX,
167                };
168
169                drop(inner);
170                thread::park_timeout(timeout);
171                inner = self.inner.lock().unwrap();
172
173                if ptr.as_ref().notified {
174                    break false;
175                }
176            };
177
178            if timed_out {
179                // If this thread timed out then it is still present in the
180                // waiter queue, so remove it.
181                inner.get_mut(&key).unwrap().remove(ptr);
182                WaitResult::TimedOut
183            } else {
184                // If this node was notified then we should not be in a queue
185                // at this point.
186                assert!(ptr.as_ref().next.is_none());
187                assert!(ptr.as_ref().prev.is_none());
188                WaitResult::Ok
189            }
190        }
191    }
192
193    /// Notify at most `n` threads that are blocked on the given address.
194    ///
195    /// Returns the number of threads that were actually unparked.
196    pub fn notify<T>(&self, addr: &T, n: u32) -> u32 {
197        if n == 0 {
198            return 0;
199        }
200        let mut unparked = 0;
201
202        // It's known here that `n > 0` so dequeue items until `unparked`
203        // equals `n` or the queue runs out. Each thread dequeued is signaled
204        // that it's been notified and then woken up.
205        self.with_lot(addr, |spot| unsafe {
206            while let Some(mut head) = spot.pop() {
207                let head = head.as_mut();
208                assert!(head.next.is_none());
209                head.notified = true;
210                head.thread.unpark();
211                unparked += 1;
212                if unparked == n {
213                    break;
214                }
215            }
216        });
217
218        unparked
219    }
220
221    fn with_lot<T, F: FnMut(&mut Spot)>(&self, addr: &T, mut f: F) {
222        let key = addr as *const _ as u64;
223        let mut inner = self
224            .inner
225            .lock()
226            .expect("failed to lock inner parking table");
227        if let Some(spot) = inner.get_mut(&key) {
228            f(spot);
229        }
230    }
231}
232
233impl Waiter {
234    pub const fn new() -> Waiter {
235        Waiter { inner: None }
236    }
237}
238
239impl Spot {
240    /// Adds `waiter` to the queue at the end.
241    ///
242    /// # Unsafety
243    ///
244    /// This method is `unsafe` as it can only be invoked under the parking
245    /// spot's mutex. Additionally `waiter` must be a valid pointer not already
246    /// in any other queue and additionally only exclusively used by this queue
247    /// now.
248    unsafe fn push(&mut self, mut waiter: SendSyncPtr<WaiterInner>) {
249        assert!(waiter.as_ref().next.is_none());
250        assert!(waiter.as_ref().prev.is_none());
251
252        waiter.as_mut().prev = self.tail;
253        match self.tail {
254            Some(mut tail) => tail.as_mut().next = Some(waiter),
255            None => self.head = Some(waiter),
256        }
257        self.tail = Some(waiter);
258    }
259
260    /// Removes `waiter` from the queue.
261    ///
262    /// # Unsafety
263    ///
264    /// This method is `unsafe` as it can only be invoked under the parking
265    /// spot's mutex. Additionally `waiter` must be a valid pointer in this
266    /// queue.
267    unsafe fn remove(&mut self, mut waiter: SendSyncPtr<WaiterInner>) {
268        let w = waiter.as_mut();
269        match w.prev {
270            Some(mut prev) => prev.as_mut().next = w.next,
271            None => self.head = w.next,
272        }
273        match w.next {
274            Some(mut next) => next.as_mut().prev = w.prev,
275            None => self.tail = w.prev,
276        }
277        w.prev = None;
278        w.next = None;
279    }
280
281    /// Pops the head of the queue from this linked list to wake up a waiter.
282    ///
283    /// # Unsafety
284    ///
285    /// This method is `unsafe` as it can only be invoked under the parking
286    /// spot's mutex.
287    unsafe fn pop(&mut self) -> Option<SendSyncPtr<WaiterInner>> {
288        let ret = self.head?;
289        self.remove(ret);
290        Some(ret)
291    }
292
293    #[cfg(test)]
294    fn num_parked(&self) -> u32 {
295        let mut ret = 0;
296        let mut cur = self.head;
297        while let Some(next) = cur {
298            ret += 1;
299            cur = unsafe { next.as_ref().next };
300        }
301        ret
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::{ParkingSpot, Waiter};
308    use crate::prelude::*;
309    use std::sync::atomic::{AtomicU64, Ordering};
310    use std::thread;
311    use std::time::{Duration, Instant};
312
313    #[test]
314    fn atomic_wait_notify() {
315        let parking_spot = ParkingSpot::default();
316        let atomic = AtomicU64::new(0);
317
318        let wait_until_value = |val: u64, waiter: &mut Waiter| loop {
319            let cur = atomic.load(Ordering::SeqCst);
320            if cur == val {
321                break;
322            } else {
323                parking_spot.wait64(&atomic, cur, None, waiter);
324            }
325        };
326
327        thread::scope(|s| {
328            let thread1 = s.spawn(|| {
329                let mut waiter = Waiter::default();
330                atomic.store(1, Ordering::SeqCst);
331                parking_spot.notify(&atomic, u32::MAX);
332                parking_spot.wait64(&atomic, 1, None, &mut waiter);
333            });
334
335            let thread2 = s.spawn(|| {
336                let mut waiter = Waiter::default();
337                wait_until_value(1, &mut waiter);
338                atomic.store(2, Ordering::SeqCst);
339                parking_spot.notify(&atomic, u32::MAX);
340                parking_spot.wait64(&atomic, 2, None, &mut waiter);
341            });
342
343            let thread3 = s.spawn(|| {
344                let mut waiter = Waiter::default();
345                wait_until_value(2, &mut waiter);
346                atomic.store(3, Ordering::SeqCst);
347                parking_spot.notify(&atomic, u32::MAX);
348                parking_spot.wait64(&atomic, 3, None, &mut waiter);
349            });
350
351            let mut waiter = Waiter::default();
352            wait_until_value(3, &mut waiter);
353            atomic.store(4, Ordering::SeqCst);
354            parking_spot.notify(&atomic, u32::MAX);
355
356            thread1.join().unwrap();
357            thread2.join().unwrap();
358            thread3.join().unwrap();
359        });
360    }
361
362    mod parking_lot {
363        // This is a modified version of the parking_lot_core tests,
364        // which are licensed under the MIT and Apache 2.0 licenses.
365        use super::*;
366        use std::sync::atomic::AtomicU32;
367        use std::sync::Arc;
368
369        macro_rules! test {
370            ( $( $name:ident(
371                repeats: $repeats:expr,
372                latches: $latches:expr,
373                delay: $delay:expr,
374                threads: $threads:expr,
375                single_unparks: $single_unparks:expr);
376            )* ) => {
377                $(
378                #[test]
379                #[cfg_attr(miri, ignore)]
380                fn $name() {
381                    if std::env::var("WASMTIME_TEST_NO_HOG_MEMORY").is_ok() {
382                        return;
383                    }
384                    let delay = Duration::from_micros($delay);
385                    for _ in 0..$repeats {
386                        run_parking_test($latches, delay, $threads, $single_unparks);
387                    }
388                })*
389            };
390        }
391
392        test! {
393            unpark_all_one_fast(
394                repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
395            );
396            unpark_all_hundred_fast(
397                repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
398            );
399            unpark_one_one_fast(
400                repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
401            );
402            unpark_one_hundred_fast(
403                repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
404            );
405            unpark_one_fifty_then_fifty_all_fast(
406                repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
407            );
408            unpark_all_one(
409                repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
410            );
411            unpark_all_hundred(
412                repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
413            );
414            unpark_one_one(
415                repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
416            );
417            unpark_one_fifty(
418                repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
419            );
420            unpark_one_fifty_then_fifty_all(
421                repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
422            );
423            hundred_unpark_all_one_fast(
424                repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
425            );
426            hundred_unpark_all_one(
427                repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
428            );
429        }
430
431        fn run_parking_test(
432            num_latches: usize,
433            delay: Duration,
434            num_threads: u32,
435            num_single_unparks: u32,
436        ) {
437            let spot = ParkingSpot::default();
438
439            thread::scope(|s| {
440                let mut tests = Vec::with_capacity(num_latches);
441
442                for _ in 0..num_latches {
443                    let test = Arc::new(SingleLatchTest::new(num_threads, &spot));
444                    let mut threads = Vec::with_capacity(num_threads as _);
445                    for _ in 0..num_threads {
446                        let test = test.clone();
447                        threads.push(s.spawn(move || test.run()));
448                    }
449                    tests.push((test, threads));
450                }
451
452                for unpark_index in 0..num_single_unparks {
453                    thread::sleep(delay);
454                    for (test, _) in &tests {
455                        test.unpark_one(unpark_index);
456                    }
457                }
458
459                for (test, threads) in tests {
460                    test.finish(num_single_unparks);
461                    for thread in threads {
462                        thread.join().expect("Test thread panic");
463                    }
464                }
465            });
466        }
467
468        struct SingleLatchTest<'a> {
469            semaphore: AtomicU32,
470            num_awake: AtomicU32,
471            /// Total number of threads participating in this test.
472            num_threads: u32,
473            spot: &'a ParkingSpot,
474        }
475
476        impl<'a> SingleLatchTest<'a> {
477            pub fn new(num_threads: u32, spot: &'a ParkingSpot) -> Self {
478                Self {
479                    // This implements a fair (FIFO) semaphore, and it starts out unavailable.
480                    semaphore: AtomicU32::new(0),
481                    num_awake: AtomicU32::new(0),
482                    num_threads,
483                    spot,
484                }
485            }
486
487            pub fn run(&self) {
488                // Get one slot from the semaphore
489                self.down();
490
491                self.num_awake.fetch_add(1, Ordering::SeqCst);
492            }
493
494            pub fn unpark_one(&self, _single_unpark_index: u32) {
495                let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
496
497                self.up();
498
499                // Wait for a parked thread to wake up and update num_awake + last_awoken.
500                while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
501                    thread::yield_now();
502                }
503            }
504
505            pub fn finish(&self, num_single_unparks: u32) {
506                // The amount of threads not unparked via unpark_one
507                let mut num_threads_left =
508                    self.num_threads.checked_sub(num_single_unparks).unwrap();
509
510                // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
511                // still be threads that has not yet parked.
512                while num_threads_left > 0 {
513                    let mut num_waiting_on_address = 0;
514                    self.spot.with_lot(&self.semaphore, |thread_data| {
515                        num_waiting_on_address = thread_data.num_parked();
516                    });
517                    assert!(num_waiting_on_address <= num_threads_left);
518
519                    let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
520
521                    let num_unparked = self.spot.notify(&self.semaphore, u32::MAX);
522                    assert!(num_unparked >= num_waiting_on_address);
523                    assert!(num_unparked <= num_threads_left);
524
525                    // Wait for all unparked threads to wake up and update num_awake + last_awoken.
526                    while self.num_awake.load(Ordering::SeqCst)
527                        != num_awake_before_unpark + num_unparked
528                    {
529                        thread::yield_now();
530                    }
531
532                    num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
533                }
534                // By now, all threads should have been woken up
535                assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
536
537                // Make sure no thread is parked on our semaphore address
538                let mut num_waiting_on_address = 0;
539                self.spot.with_lot(&self.semaphore, |thread_data| {
540                    num_waiting_on_address = thread_data.num_parked();
541                });
542                assert_eq!(num_waiting_on_address, 0);
543            }
544
545            pub fn down(&self) {
546                let mut old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
547
548                if (old_semaphore_value as i32) > 0 {
549                    // We acquired the semaphore. Done.
550                    return;
551                }
552
553                // Force this thread to go to sleep.
554                let mut waiter = Waiter::new();
555                loop {
556                    match self
557                        .spot
558                        .wait32(&self.semaphore, old_semaphore_value, None, &mut waiter)
559                    {
560                        crate::runtime::vm::WaitResult::Mismatch => {}
561                        _ => break,
562                    }
563                    old_semaphore_value = self.semaphore.load(Ordering::SeqCst);
564                }
565            }
566
567            pub fn up(&self) {
568                let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst) as i32;
569
570                // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
571                if old_semaphore_value < 0 {
572                    // We need to continue until we have actually unparked someone. It might be that
573                    // the thread we want to pass ownership to has decremented the semaphore counter,
574                    // but not yet parked.
575                    loop {
576                        match self.spot.notify(&self.semaphore, 1) {
577                            1 => break,
578                            0 => (),
579                            i => panic!("Should not wake up {i} threads"),
580                        }
581                    }
582                }
583            }
584        }
585    }
586
587    #[test]
588    fn wait_with_timeout() {
589        let parking_spot = ParkingSpot::default();
590        let atomic = AtomicU64::new(0);
591
592        thread::scope(|s| {
593            const N: u64 = 5;
594            const M: u64 = if cfg!(miri) { 10 } else { 1000 };
595
596            let thread = s.spawn(|| {
597                let mut waiter = Waiter::new();
598                loop {
599                    let cur = atomic.load(Ordering::SeqCst);
600                    if cur == N * M {
601                        break;
602                    }
603                    let timeout = Instant::now() + Duration::from_millis(1);
604                    parking_spot.wait64(&atomic, cur, Some(timeout), &mut waiter);
605                }
606            });
607
608            let mut threads = vec![thread];
609            for _ in 0..N {
610                threads.push(s.spawn(|| {
611                    for _ in 0..M {
612                        atomic.fetch_add(1, Ordering::SeqCst);
613                        parking_spot.notify(&atomic, 1);
614                    }
615                }));
616            }
617
618            for thread in threads {
619                thread.join().unwrap();
620            }
621        });
622    }
623}