1#![deny(missing_docs)]
13
14use crate::prelude::*;
15use crate::runtime::vm::{SendSyncPtr, WaitResult};
16use std::collections::BTreeMap;
17use std::ptr::NonNull;
18use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::SeqCst};
19use std::sync::Mutex;
20use std::thread::{self, Thread};
21use std::time::{Duration, Instant};
22
23#[derive(Default, Debug)]
24struct Spot {
25 head: Option<SendSyncPtr<WaiterInner>>,
26 tail: Option<SendSyncPtr<WaiterInner>>,
27}
28
29#[derive(Default, Debug)]
31pub struct ParkingSpot {
32 inner: Mutex<BTreeMap<u64, Spot>>,
33}
34
35#[derive(Default)]
36pub struct Waiter {
37 inner: Option<Box<WaiterInner>>,
38}
39
40struct WaiterInner {
41 thread: Thread,
44
45 notified: bool,
48 next: Option<SendSyncPtr<WaiterInner>>,
49 prev: Option<SendSyncPtr<WaiterInner>>,
50}
51
52impl ParkingSpot {
53 pub fn wait32(
74 &self,
75 atomic: &AtomicU32,
76 expected: u32,
77 deadline: impl Into<Option<Instant>>,
78 waiter: &mut Waiter,
79 ) -> WaitResult {
80 self.wait(
81 atomic.as_ptr() as u64,
82 || atomic.load(SeqCst) == expected,
83 deadline.into(),
84 waiter,
85 )
86 }
87
88 pub fn wait64(
90 &self,
91 atomic: &AtomicU64,
92 expected: u64,
93 deadline: impl Into<Option<Instant>>,
94 waiter: &mut Waiter,
95 ) -> WaitResult {
96 self.wait(
97 atomic.as_ptr() as u64,
98 || atomic.load(SeqCst) == expected,
99 deadline.into(),
100 waiter,
101 )
102 }
103
104 fn wait(
105 &self,
106 key: u64,
107 validate: impl FnOnce() -> bool,
108 deadline: Option<Instant>,
109 waiter: &mut Waiter,
110 ) -> WaitResult {
111 let mut inner = self
112 .inner
113 .lock()
114 .expect("failed to lock inner parking table");
115
116 if !validate() {
119 return WaitResult::Mismatch;
120 }
121
122 let waiter = waiter.inner.get_or_insert_with(|| {
125 Box::new(WaiterInner {
126 next: None,
127 prev: None,
128 notified: false,
129 thread: thread::current(),
130 })
131 });
132 assert!(waiter.next.is_none());
133 assert!(waiter.prev.is_none());
134
135 waiter.notified = false;
138 waiter.thread = thread::current();
139
140 let ptr = SendSyncPtr::new(NonNull::from(&mut **waiter));
141 let spot = inner.entry(key).or_insert_with(Spot::default);
142 unsafe {
143 spot.push(ptr);
145
146 let timed_out = loop {
157 let timeout = match deadline {
158 Some(deadline) => {
159 let now = Instant::now();
160 if deadline <= now {
161 break true;
162 } else {
163 deadline - now
164 }
165 }
166 None => Duration::MAX,
167 };
168
169 drop(inner);
170 thread::park_timeout(timeout);
171 inner = self.inner.lock().unwrap();
172
173 if ptr.as_ref().notified {
174 break false;
175 }
176 };
177
178 if timed_out {
179 inner.get_mut(&key).unwrap().remove(ptr);
182 WaitResult::TimedOut
183 } else {
184 assert!(ptr.as_ref().next.is_none());
187 assert!(ptr.as_ref().prev.is_none());
188 WaitResult::Ok
189 }
190 }
191 }
192
193 pub fn notify<T>(&self, addr: &T, n: u32) -> u32 {
197 if n == 0 {
198 return 0;
199 }
200 let mut unparked = 0;
201
202 self.with_lot(addr, |spot| unsafe {
206 while let Some(mut head) = spot.pop() {
207 let head = head.as_mut();
208 assert!(head.next.is_none());
209 head.notified = true;
210 head.thread.unpark();
211 unparked += 1;
212 if unparked == n {
213 break;
214 }
215 }
216 });
217
218 unparked
219 }
220
221 fn with_lot<T, F: FnMut(&mut Spot)>(&self, addr: &T, mut f: F) {
222 let key = addr as *const _ as u64;
223 let mut inner = self
224 .inner
225 .lock()
226 .expect("failed to lock inner parking table");
227 if let Some(spot) = inner.get_mut(&key) {
228 f(spot);
229 }
230 }
231}
232
233impl Waiter {
234 pub const fn new() -> Waiter {
235 Waiter { inner: None }
236 }
237}
238
239impl Spot {
240 unsafe fn push(&mut self, mut waiter: SendSyncPtr<WaiterInner>) {
249 assert!(waiter.as_ref().next.is_none());
250 assert!(waiter.as_ref().prev.is_none());
251
252 waiter.as_mut().prev = self.tail;
253 match self.tail {
254 Some(mut tail) => tail.as_mut().next = Some(waiter),
255 None => self.head = Some(waiter),
256 }
257 self.tail = Some(waiter);
258 }
259
260 unsafe fn remove(&mut self, mut waiter: SendSyncPtr<WaiterInner>) {
268 let w = waiter.as_mut();
269 match w.prev {
270 Some(mut prev) => prev.as_mut().next = w.next,
271 None => self.head = w.next,
272 }
273 match w.next {
274 Some(mut next) => next.as_mut().prev = w.prev,
275 None => self.tail = w.prev,
276 }
277 w.prev = None;
278 w.next = None;
279 }
280
281 unsafe fn pop(&mut self) -> Option<SendSyncPtr<WaiterInner>> {
288 let ret = self.head?;
289 self.remove(ret);
290 Some(ret)
291 }
292
293 #[cfg(test)]
294 fn num_parked(&self) -> u32 {
295 let mut ret = 0;
296 let mut cur = self.head;
297 while let Some(next) = cur {
298 ret += 1;
299 cur = unsafe { next.as_ref().next };
300 }
301 ret
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::{ParkingSpot, Waiter};
308 use crate::prelude::*;
309 use std::sync::atomic::{AtomicU64, Ordering};
310 use std::thread;
311 use std::time::{Duration, Instant};
312
313 #[test]
314 fn atomic_wait_notify() {
315 let parking_spot = ParkingSpot::default();
316 let atomic = AtomicU64::new(0);
317
318 let wait_until_value = |val: u64, waiter: &mut Waiter| loop {
319 let cur = atomic.load(Ordering::SeqCst);
320 if cur == val {
321 break;
322 } else {
323 parking_spot.wait64(&atomic, cur, None, waiter);
324 }
325 };
326
327 thread::scope(|s| {
328 let thread1 = s.spawn(|| {
329 let mut waiter = Waiter::default();
330 atomic.store(1, Ordering::SeqCst);
331 parking_spot.notify(&atomic, u32::MAX);
332 parking_spot.wait64(&atomic, 1, None, &mut waiter);
333 });
334
335 let thread2 = s.spawn(|| {
336 let mut waiter = Waiter::default();
337 wait_until_value(1, &mut waiter);
338 atomic.store(2, Ordering::SeqCst);
339 parking_spot.notify(&atomic, u32::MAX);
340 parking_spot.wait64(&atomic, 2, None, &mut waiter);
341 });
342
343 let thread3 = s.spawn(|| {
344 let mut waiter = Waiter::default();
345 wait_until_value(2, &mut waiter);
346 atomic.store(3, Ordering::SeqCst);
347 parking_spot.notify(&atomic, u32::MAX);
348 parking_spot.wait64(&atomic, 3, None, &mut waiter);
349 });
350
351 let mut waiter = Waiter::default();
352 wait_until_value(3, &mut waiter);
353 atomic.store(4, Ordering::SeqCst);
354 parking_spot.notify(&atomic, u32::MAX);
355
356 thread1.join().unwrap();
357 thread2.join().unwrap();
358 thread3.join().unwrap();
359 });
360 }
361
362 mod parking_lot {
363 use super::*;
366 use std::sync::atomic::AtomicU32;
367 use std::sync::Arc;
368
369 macro_rules! test {
370 ( $( $name:ident(
371 repeats: $repeats:expr,
372 latches: $latches:expr,
373 delay: $delay:expr,
374 threads: $threads:expr,
375 single_unparks: $single_unparks:expr);
376 )* ) => {
377 $(
378 #[test]
379 #[cfg_attr(miri, ignore)]
380 fn $name() {
381 if std::env::var("WASMTIME_TEST_NO_HOG_MEMORY").is_ok() {
382 return;
383 }
384 let delay = Duration::from_micros($delay);
385 for _ in 0..$repeats {
386 run_parking_test($latches, delay, $threads, $single_unparks);
387 }
388 })*
389 };
390 }
391
392 test! {
393 unpark_all_one_fast(
394 repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
395 );
396 unpark_all_hundred_fast(
397 repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
398 );
399 unpark_one_one_fast(
400 repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
401 );
402 unpark_one_hundred_fast(
403 repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
404 );
405 unpark_one_fifty_then_fifty_all_fast(
406 repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
407 );
408 unpark_all_one(
409 repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
410 );
411 unpark_all_hundred(
412 repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
413 );
414 unpark_one_one(
415 repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
416 );
417 unpark_one_fifty(
418 repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
419 );
420 unpark_one_fifty_then_fifty_all(
421 repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
422 );
423 hundred_unpark_all_one_fast(
424 repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
425 );
426 hundred_unpark_all_one(
427 repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
428 );
429 }
430
431 fn run_parking_test(
432 num_latches: usize,
433 delay: Duration,
434 num_threads: u32,
435 num_single_unparks: u32,
436 ) {
437 let spot = ParkingSpot::default();
438
439 thread::scope(|s| {
440 let mut tests = Vec::with_capacity(num_latches);
441
442 for _ in 0..num_latches {
443 let test = Arc::new(SingleLatchTest::new(num_threads, &spot));
444 let mut threads = Vec::with_capacity(num_threads as _);
445 for _ in 0..num_threads {
446 let test = test.clone();
447 threads.push(s.spawn(move || test.run()));
448 }
449 tests.push((test, threads));
450 }
451
452 for unpark_index in 0..num_single_unparks {
453 thread::sleep(delay);
454 for (test, _) in &tests {
455 test.unpark_one(unpark_index);
456 }
457 }
458
459 for (test, threads) in tests {
460 test.finish(num_single_unparks);
461 for thread in threads {
462 thread.join().expect("Test thread panic");
463 }
464 }
465 });
466 }
467
468 struct SingleLatchTest<'a> {
469 semaphore: AtomicU32,
470 num_awake: AtomicU32,
471 num_threads: u32,
473 spot: &'a ParkingSpot,
474 }
475
476 impl<'a> SingleLatchTest<'a> {
477 pub fn new(num_threads: u32, spot: &'a ParkingSpot) -> Self {
478 Self {
479 semaphore: AtomicU32::new(0),
481 num_awake: AtomicU32::new(0),
482 num_threads,
483 spot,
484 }
485 }
486
487 pub fn run(&self) {
488 self.down();
490
491 self.num_awake.fetch_add(1, Ordering::SeqCst);
492 }
493
494 pub fn unpark_one(&self, _single_unpark_index: u32) {
495 let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
496
497 self.up();
498
499 while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
501 thread::yield_now();
502 }
503 }
504
505 pub fn finish(&self, num_single_unparks: u32) {
506 let mut num_threads_left =
508 self.num_threads.checked_sub(num_single_unparks).unwrap();
509
510 while num_threads_left > 0 {
513 let mut num_waiting_on_address = 0;
514 self.spot.with_lot(&self.semaphore, |thread_data| {
515 num_waiting_on_address = thread_data.num_parked();
516 });
517 assert!(num_waiting_on_address <= num_threads_left);
518
519 let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
520
521 let num_unparked = self.spot.notify(&self.semaphore, u32::MAX);
522 assert!(num_unparked >= num_waiting_on_address);
523 assert!(num_unparked <= num_threads_left);
524
525 while self.num_awake.load(Ordering::SeqCst)
527 != num_awake_before_unpark + num_unparked
528 {
529 thread::yield_now();
530 }
531
532 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
533 }
534 assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
536
537 let mut num_waiting_on_address = 0;
539 self.spot.with_lot(&self.semaphore, |thread_data| {
540 num_waiting_on_address = thread_data.num_parked();
541 });
542 assert_eq!(num_waiting_on_address, 0);
543 }
544
545 pub fn down(&self) {
546 let mut old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
547
548 if (old_semaphore_value as i32) > 0 {
549 return;
551 }
552
553 let mut waiter = Waiter::new();
555 loop {
556 match self
557 .spot
558 .wait32(&self.semaphore, old_semaphore_value, None, &mut waiter)
559 {
560 crate::runtime::vm::WaitResult::Mismatch => {}
561 _ => break,
562 }
563 old_semaphore_value = self.semaphore.load(Ordering::SeqCst);
564 }
565 }
566
567 pub fn up(&self) {
568 let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst) as i32;
569
570 if old_semaphore_value < 0 {
572 loop {
576 match self.spot.notify(&self.semaphore, 1) {
577 1 => break,
578 0 => (),
579 i => panic!("Should not wake up {i} threads"),
580 }
581 }
582 }
583 }
584 }
585 }
586
587 #[test]
588 fn wait_with_timeout() {
589 let parking_spot = ParkingSpot::default();
590 let atomic = AtomicU64::new(0);
591
592 thread::scope(|s| {
593 const N: u64 = 5;
594 const M: u64 = if cfg!(miri) { 10 } else { 1000 };
595
596 let thread = s.spawn(|| {
597 let mut waiter = Waiter::new();
598 loop {
599 let cur = atomic.load(Ordering::SeqCst);
600 if cur == N * M {
601 break;
602 }
603 let timeout = Instant::now() + Duration::from_millis(1);
604 parking_spot.wait64(&atomic, cur, Some(timeout), &mut waiter);
605 }
606 });
607
608 let mut threads = vec![thread];
609 for _ in 0..N {
610 threads.push(s.spawn(|| {
611 for _ in 0..M {
612 atomic.fetch_add(1, Ordering::SeqCst);
613 parking_spot.notify(&atomic, 1);
614 }
615 }));
616 }
617
618 for thread in threads {
619 thread.join().unwrap();
620 }
621 });
622 }
623}