1use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{Func, call_post_return};
56use crate::component::{
57 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61use crate::prelude::*;
62use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
63use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
64use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
65use crate::{
66 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
67};
68use alloc::borrow::ToOwned;
69use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
70use core::any::Any;
71use core::cell::UnsafeCell;
72use core::fmt;
73use core::future;
74use core::future::Future;
75use core::marker::PhantomData;
76use core::mem::{self, ManuallyDrop, MaybeUninit};
77use core::ops::DerefMut;
78use core::pin::{Pin, pin};
79use core::ptr::{self, NonNull};
80use core::task::{Context, Poll, Waker};
81use futures::channel::oneshot;
82use futures::stream::{FuturesUnordered, StreamExt};
83use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
84use table::{TableDebug, TableId};
85use wasmtime_environ::component::{
86 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
87 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
88 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
89 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
90 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
91};
92use wasmtime_environ::packed_option::ReservedValue;
93use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
94
95pub use abort::JoinHandle;
96pub use func::{FuncCallConcurrent, TypedFuncCallConcurrent};
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod func;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113const BLOCKED: u32 = 0xffff_ffff;
116
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120 Starting = 0,
121 Started = 1,
122 Returned = 2,
123 StartCancelled = 3,
124 ReturnCancelled = 4,
125}
126
127impl Status {
128 pub fn pack(self, waitable: Option<u32>) -> u32 {
134 assert!(matches!(self, Status::Returned) == waitable.is_none());
135 let waitable = waitable.unwrap_or(0);
136 assert!(waitable < (1 << 28));
137 (waitable << 4) | (self as u32)
138 }
139}
140
141#[derive(Clone, Copy, Debug)]
144enum Event {
145 None,
146 Subtask {
147 status: Status,
148 },
149 StreamRead {
150 code: ReturnCode,
151 pending: Option<(TypeStreamTableIndex, u32)>,
152 },
153 StreamWrite {
154 code: ReturnCode,
155 pending: Option<(TypeStreamTableIndex, u32)>,
156 },
157 FutureRead {
158 code: ReturnCode,
159 pending: Option<(TypeFutureTableIndex, u32)>,
160 },
161 FutureWrite {
162 code: ReturnCode,
163 pending: Option<(TypeFutureTableIndex, u32)>,
164 },
165 Cancelled,
166}
167
168impl Event {
169 fn parts(self) -> (u32, u32) {
174 const EVENT_NONE: u32 = 0;
175 const EVENT_SUBTASK: u32 = 1;
176 const EVENT_STREAM_READ: u32 = 2;
177 const EVENT_STREAM_WRITE: u32 = 3;
178 const EVENT_FUTURE_READ: u32 = 4;
179 const EVENT_FUTURE_WRITE: u32 = 5;
180 const EVENT_CANCELLED: u32 = 6;
181 match self {
182 Event::None => (EVENT_NONE, 0),
183 Event::Cancelled => (EVENT_CANCELLED, 0),
184 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189 }
190 }
191}
192
193mod callback_code {
195 pub const EXIT: u32 = 0;
196 pub const YIELD: u32 = 1;
197 pub const WAIT: u32 = 2;
198}
199
200const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204
205pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211 store: StoreContextMut<'a, T>,
212 get_data: fn(&mut T) -> D::Data<'_>,
213}
214
215impl<'a, T, D> Access<'a, T, D>
216where
217 D: HasData + ?Sized,
218 T: 'static,
219{
220 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222 Self { store, get_data }
223 }
224
225 pub fn data_mut(&mut self) -> &mut T {
227 self.store.data_mut()
228 }
229
230 pub fn get(&mut self) -> D::Data<'_> {
232 (self.get_data)(self.data_mut())
233 }
234
235 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239 where
240 T: 'static,
241 {
242 let accessor = Accessor {
243 get_data: self.get_data,
244 token: StoreToken::new(self.store.as_context_mut()),
245 };
246 self.store
247 .as_context_mut()
248 .spawn_with_accessor(accessor, task)
249 }
250
251 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254 self.get_data
255 }
256}
257
258impl<'a, T, D> AsContext for Access<'a, T, D>
259where
260 D: HasData + ?Sized,
261 T: 'static,
262{
263 type Data = T;
264
265 fn as_context(&self) -> StoreContext<'_, T> {
266 self.store.as_context()
267 }
268}
269
270impl<'a, T, D> AsContextMut for Access<'a, T, D>
271where
272 D: HasData + ?Sized,
273 T: 'static,
274{
275 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276 self.store.as_context_mut()
277 }
278}
279
280pub struct Accessor<T: 'static, D = HasSelf<T>>
340where
341 D: HasData + ?Sized,
342{
343 token: StoreToken<T>,
344 get_data: fn(&mut T) -> D::Data<'_>,
345}
346
347pub trait AsAccessor {
364 type Data: 'static;
366
367 type AccessorData: HasData + ?Sized;
370
371 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373}
374
375impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376 type Data = T::Data;
377 type AccessorData = T::AccessorData;
378
379 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380 T::as_accessor(self)
381 }
382}
383
384impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385 type Data = T;
386 type AccessorData = D;
387
388 fn as_accessor(&self) -> &Accessor<T, D> {
389 self
390 }
391}
392
393const _: () = {
416 const fn assert<T: Send + Sync>() {}
417 assert::<Accessor<UnsafeCell<u32>>>();
418};
419
420impl<T> Accessor<T> {
421 pub(crate) fn new(token: StoreToken<T>) -> Self {
430 Self {
431 token,
432 get_data: |x| x,
433 }
434 }
435}
436
437impl<T, D> Accessor<T, D>
438where
439 D: HasData + ?Sized,
440{
441 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459 tls::get(|vmstore| {
460 fun(Access {
461 store: self.token.as_context_mut(vmstore),
462 get_data: self.get_data,
463 })
464 })
465 }
466
467 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470 self.get_data
471 }
472
473 pub fn with_getter<D2: HasData>(
490 &self,
491 get_data: fn(&mut T) -> D2::Data<'_>,
492 ) -> Accessor<T, D2> {
493 Accessor {
494 token: self.token,
495 get_data,
496 }
497 }
498
499 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515 where
516 T: 'static,
517 {
518 let accessor = self.clone_for_spawn();
519 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520 }
521
522 fn clone_for_spawn(&self) -> Self {
523 Self {
524 token: self.token,
525 get_data: self.get_data,
526 }
527 }
528
529 pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
565 self.with(|mut access| {
566 let store = access.as_context_mut().0;
567 let state = store.concurrent_state_mut();
568 if state.interesting_tasks == 0 {
569 Poll::Ready(())
570 } else {
571 state.interesting_tasks_empty_waker = Some(cx.waker().clone());
572 Poll::Pending
573 }
574 })
575 }
576}
577
578pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
590where
591 D: HasData + ?Sized,
592{
593 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
595}
596
597enum CallerInfo {
600 Async {
602 params: Vec<ValRaw>,
603 has_result: bool,
604 },
605 Sync {
607 params: Vec<ValRaw>,
608 result_count: u32,
609 },
610}
611
612enum WaitMode {
614 Fiber(StoreFiber<'static>),
616 Callback(Instance),
619}
620
621#[derive(Debug)]
623enum SuspendReason {
624 Waiting {
627 set: TableId<WaitableSet>,
628 thread: QualifiedThreadId,
629 skip_may_block_check: bool,
630 },
631 NeedWork,
634 Yielding {
637 thread: QualifiedThreadId,
638 cancellable: bool,
639 skip_may_block_check: bool,
640 },
641 ExplicitlySuspending {
643 thread: QualifiedThreadId,
644 skip_may_block_check: bool,
645 },
646}
647
648enum GuestCallKind {
650 DeliverEvent {
653 instance: Instance,
655 set: Option<TableId<WaitableSet>>,
660 },
661 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
667 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
668}
669
670impl fmt::Debug for GuestCallKind {
671 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
672 match self {
673 Self::DeliverEvent { instance, set } => f
674 .debug_struct("DeliverEvent")
675 .field("instance", instance)
676 .field("set", set)
677 .finish(),
678 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
679 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
680 }
681 }
682}
683
684#[derive(Copy, Clone, Debug)]
686pub enum SuspensionTarget {
687 SomeSuspended(u32),
688 Some(u32),
689 None,
690}
691
692impl SuspensionTarget {
693 fn is_none(&self) -> bool {
694 matches!(self, SuspensionTarget::None)
695 }
696 fn is_some(&self) -> bool {
697 !self.is_none()
698 }
699}
700
701#[derive(Debug)]
703struct GuestCall {
704 thread: QualifiedThreadId,
705 kind: GuestCallKind,
706}
707
708impl GuestCall {
709 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
719 let instance = store
720 .concurrent_state_mut()
721 .get_mut(self.thread.task)?
722 .instance;
723 let state = store.instance_state(instance).concurrent_state();
724
725 let ready = match &self.kind {
726 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
727 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
728 GuestCallKind::StartExplicit(_) => true,
729 };
730 log::trace!(
731 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
732 state.do_not_enter,
733 state.backpressure
734 );
735 Ok(ready)
736 }
737}
738
739enum WorkerItem {
741 GuestCall(GuestCall),
742 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
743}
744
745enum WorkItem {
748 PushFuture(AlwaysMut<HostTaskFuture>),
750 ResumeFiber(StoreFiber<'static>),
752 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
754 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
756 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
758}
759
760impl fmt::Debug for WorkItem {
761 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
762 match self {
763 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
764 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
765 Self::ResumeThread(instance, thread) => f
766 .debug_tuple("ResumeThread")
767 .field(instance)
768 .field(thread)
769 .finish(),
770 Self::GuestCall(instance, call) => f
771 .debug_tuple("GuestCall")
772 .field(instance)
773 .field(call)
774 .finish(),
775 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
776 }
777 }
778}
779
780#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
782pub(crate) enum WaitResult {
783 Cancelled,
784 Completed,
785}
786
787pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
795 store: &mut dyn VMStore,
796 future: impl Future<Output = Result<R>> + Send + 'static,
797) -> Result<R> {
798 let state = store.concurrent_state_mut();
799 let task = state.current_host_thread()?;
800
801 let mut future = Box::pin(async move {
805 let result = future.await?;
806 tls::get(move |store| {
807 let state = store.concurrent_state_mut();
808 let host_state = &mut state.get_mut(task)?.state;
809 assert!(matches!(host_state, HostTaskState::CalleeStarted));
810 *host_state = HostTaskState::CalleeFinished(Box::new(result));
811
812 Waitable::Host(task).set_event(
813 state,
814 Some(Event::Subtask {
815 status: Status::Returned,
816 }),
817 )?;
818
819 Ok(())
820 })
821 }) as HostTaskFuture;
822
823 let poll = tls::set(store, || {
827 future
828 .as_mut()
829 .poll(&mut Context::from_waker(&Waker::noop()))
830 });
831
832 match poll {
833 Poll::Ready(result) => result?,
835
836 Poll::Pending => {
841 let state = store.concurrent_state_mut();
842 state.push_future(future);
843
844 let caller = state.get_mut(task)?.caller;
845 let set = state.get_mut(caller.thread)?.sync_call_set;
846 Waitable::Host(task).join(state, Some(set))?;
847
848 store.suspend(SuspendReason::Waiting {
849 set,
850 thread: caller,
851 skip_may_block_check: false,
852 })?;
853
854 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
858 }
859 }
860
861 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
863 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
864 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
865 Ok(result) => *result,
866 Err(_) => bail_bug!("host task finished with wrong type of result"),
867 }),
868 _ => bail_bug!("unexpected host task state after completion"),
869 }
870}
871
872fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
874 let mut next = Some(call);
875 while let Some(call) = next.take() {
876 match call.kind {
877 GuestCallKind::DeliverEvent { instance, set } => {
878 let (event, waitable) =
879 match instance.get_event(store, call.thread.task, set, true)? {
880 Some(pair) => pair,
881 None => bail_bug!("delivering non-present event"),
882 };
883 let state = store.concurrent_state_mut();
884 let task = state.get_mut(call.thread.task)?;
885 let runtime_instance = task.instance;
886 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
887
888 log::trace!(
889 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
890 call.thread,
891 );
892
893 let old_thread = store.set_thread(call.thread)?;
894 log::trace!(
895 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
896 call.thread
897 );
898
899 store.enter_instance(runtime_instance);
900
901 let Some(callback) = store
902 .concurrent_state_mut()
903 .get_mut(call.thread.task)?
904 .callback
905 .take()
906 else {
907 bail_bug!("guest task callback field not present")
908 };
909
910 let code = callback(store, event, handle)?;
911
912 store
913 .concurrent_state_mut()
914 .get_mut(call.thread.task)?
915 .callback = Some(callback);
916
917 store.exit_instance(runtime_instance)?;
918
919 store.set_thread(old_thread)?;
920
921 next = instance.handle_callback_code(
922 store,
923 call.thread,
924 runtime_instance.index,
925 code,
926 )?;
927
928 log::trace!(
929 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
930 );
931 }
932 GuestCallKind::StartImplicit(fun) => {
933 next = fun(store)?;
934 }
935 GuestCallKind::StartExplicit(fun) => {
936 fun(store)?;
937 }
938 }
939 }
940
941 Ok(())
942}
943
944impl<T> Store<T> {
945 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
947 where
948 T: Send + 'static,
949 {
950 ensure!(
951 self.as_context().0.concurrency_support(),
952 "cannot use `run_concurrent` when Config::concurrency_support disabled",
953 );
954 self.as_context_mut().run_concurrent(fun).await
955 }
956
957 #[doc(hidden)]
958 pub fn assert_concurrent_state_empty(&mut self) {
959 self.as_context_mut().assert_concurrent_state_empty();
960 }
961
962 #[doc(hidden)]
963 pub fn concurrent_state_table_size(&mut self) -> usize {
964 self.as_context_mut().concurrent_state_table_size()
965 }
966
967 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
969 where
970 T: 'static,
971 {
972 self.as_context_mut().spawn(task)
973 }
974}
975
976impl<T> StoreContextMut<'_, T> {
977 #[doc(hidden)]
988 pub fn assert_concurrent_state_empty(self) {
989 let store = self.0;
990 store
991 .store_data_mut()
992 .components
993 .assert_instance_states_empty();
994 let state = store.concurrent_state_mut();
995 assert!(
996 state.table.get_mut().is_empty(),
997 "non-empty table: {:?}",
998 state.table.get_mut()
999 );
1000 assert!(state.high_priority.is_empty());
1001 assert!(state.low_priority.is_empty());
1002 assert!(state.current_thread.is_none());
1003 assert!(state.futures_mut().unwrap().is_empty());
1004 assert!(state.global_error_context_ref_counts.is_empty());
1005 }
1006
1007 #[doc(hidden)]
1012 pub fn concurrent_state_table_size(&mut self) -> usize {
1013 self.0
1014 .concurrent_state_mut()
1015 .table
1016 .get_mut()
1017 .iter_mut()
1018 .count()
1019 }
1020
1021 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1031 where
1032 T: 'static,
1033 {
1034 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1035 self.spawn_with_accessor(accessor, task)
1036 }
1037
1038 fn spawn_with_accessor<D>(
1041 self,
1042 accessor: Accessor<T, D>,
1043 task: impl AccessorTask<T, D>,
1044 ) -> JoinHandle
1045 where
1046 T: 'static,
1047 D: HasData + ?Sized,
1048 {
1049 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1053 self.0
1054 .concurrent_state_mut()
1055 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1056 handle
1057 }
1058
1059 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1143 where
1144 T: Send + 'static,
1145 {
1146 ensure!(
1147 self.0.concurrency_support(),
1148 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1149 );
1150 self.do_run_concurrent(fun, false).await
1151 }
1152
1153 pub(super) async fn run_concurrent_trap_on_idle<R>(
1154 self,
1155 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1156 ) -> Result<R>
1157 where
1158 T: Send + 'static,
1159 {
1160 self.do_run_concurrent(fun, true).await
1161 }
1162
1163 async fn do_run_concurrent<R>(
1164 mut self,
1165 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1166 trap_on_idle: bool,
1167 ) -> Result<R>
1168 where
1169 T: Send + 'static,
1170 {
1171 debug_assert!(self.0.concurrency_support());
1172 check_recursive_run();
1173 let token = StoreToken::new(self.as_context_mut());
1174
1175 struct Dropper<'a, T: 'static, V> {
1176 store: StoreContextMut<'a, T>,
1177 value: ManuallyDrop<V>,
1178 }
1179
1180 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1181 fn drop(&mut self) {
1182 tls::set(self.store.0, || {
1183 unsafe { ManuallyDrop::drop(&mut self.value) }
1188 });
1189 }
1190 }
1191
1192 let accessor = &Accessor::new(token);
1193 let dropper = &mut Dropper {
1194 store: self,
1195 value: ManuallyDrop::new(fun(accessor)),
1196 };
1197 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1199
1200 dropper
1201 .store
1202 .as_context_mut()
1203 .poll_until(future, trap_on_idle)
1204 .await
1205 }
1206
1207 async fn poll_until<R>(
1213 mut self,
1214 mut future: Pin<&mut impl Future<Output = R>>,
1215 trap_on_idle: bool,
1216 ) -> Result<R>
1217 where
1218 T: Send + 'static,
1219 {
1220 struct Reset<'a, T: 'static> {
1221 store: StoreContextMut<'a, T>,
1222 futures: Option<FuturesUnordered<HostTaskFuture>>,
1223 }
1224
1225 impl<'a, T> Drop for Reset<'a, T> {
1226 fn drop(&mut self) {
1227 if let Some(futures) = self.futures.take() {
1228 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1229 }
1230 }
1231 }
1232
1233 loop {
1234 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1238 let mut reset = Reset {
1239 store: self.as_context_mut(),
1240 futures,
1241 };
1242 let mut next = match reset.futures.as_mut() {
1243 Some(f) => pin!(f.next()),
1244 None => bail_bug!("concurrent state missing futures field"),
1245 };
1246
1247 enum PollResult<R> {
1248 Complete(R),
1249 ProcessWork {
1250 ready: Vec<WorkItem>,
1251 low_priority: bool,
1252 },
1253 }
1254
1255 let result = future::poll_fn(|cx| {
1256 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1259 return Poll::Ready(Ok(PollResult::Complete(value)));
1260 }
1261
1262 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1266 Poll::Ready(Some(output)) => {
1267 match output {
1268 Err(e) => return Poll::Ready(Err(e)),
1269 Ok(()) => {}
1270 }
1271 Poll::Ready(true)
1272 }
1273 Poll::Ready(None) => Poll::Ready(false),
1274 Poll::Pending => Poll::Pending,
1275 };
1276
1277 let state = reset.store.0.concurrent_state_mut();
1281 let mut ready = mem::take(&mut state.high_priority);
1282 let mut low_priority = false;
1283 if ready.is_empty() {
1284 if let Some(item) = state.low_priority.pop_back() {
1285 ready.push(item);
1286 low_priority = true;
1287 }
1288 }
1289 if !ready.is_empty() {
1290 return Poll::Ready(Ok(PollResult::ProcessWork {
1291 ready,
1292 low_priority,
1293 }));
1294 }
1295
1296 return match next {
1300 Poll::Ready(true) => {
1301 Poll::Ready(Ok(PollResult::ProcessWork {
1307 ready: Vec::new(),
1308 low_priority: false,
1309 }))
1310 }
1311 Poll::Ready(false) => {
1312 if let Poll::Ready(value) =
1316 tls::set(reset.store.0, || future.as_mut().poll(cx))
1317 {
1318 Poll::Ready(Ok(PollResult::Complete(value)))
1319 } else {
1320 if trap_on_idle {
1326 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1329 } else {
1330 Poll::Pending
1334 }
1335 }
1336 }
1337 Poll::Pending => Poll::Pending,
1342 };
1343 })
1344 .await;
1345
1346 drop(reset);
1350
1351 match result? {
1352 PollResult::Complete(value) => break Ok(value),
1355 PollResult::ProcessWork {
1358 ready,
1359 low_priority,
1360 } => {
1361 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1362 store: StoreContextMut<'a, T>,
1363 ready: I,
1364 }
1365
1366 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1367 fn drop(&mut self) {
1368 while let Some(item) = self.ready.next() {
1369 match item {
1370 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1371 WorkItem::PushFuture(future) => {
1372 tls::set(self.store.0, move || drop(future))
1373 }
1374 _ => {}
1375 }
1376 }
1377 }
1378 }
1379
1380 let mut dispose = Dispose {
1381 store: self.as_context_mut(),
1382 ready: ready.into_iter(),
1383 };
1384
1385 if low_priority {
1407 dispose.store.0.yield_now().await
1408 }
1409
1410 while let Some(item) = dispose.ready.next() {
1411 dispose
1412 .store
1413 .as_context_mut()
1414 .handle_work_item(item)
1415 .await?;
1416 }
1417 }
1418 }
1419 }
1420 }
1421
1422 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1424 where
1425 T: Send,
1426 {
1427 log::trace!("handle work item {item:?}");
1428 match item {
1429 WorkItem::PushFuture(future) => {
1430 self.0
1431 .concurrent_state_mut()
1432 .futures_mut()?
1433 .push(future.into_inner());
1434 }
1435 WorkItem::ResumeFiber(fiber) => {
1436 self.0.resume_fiber(fiber).await?;
1437 }
1438 WorkItem::ResumeThread(_, thread) => {
1439 if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1440 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1441 GuestThreadState::Running,
1442 ) {
1443 self.0.resume_fiber(fiber).await?;
1444 } else {
1445 bail_bug!("cannot resume non-pending thread {thread:?}");
1446 }
1447 }
1448 WorkItem::GuestCall(_, call) => {
1449 if call.is_ready(self.0)? {
1450 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1451 } else {
1452 let state = self.0.concurrent_state_mut();
1453 let task = state.get_mut(call.thread.task)?;
1454 if !task.starting_sent {
1455 task.starting_sent = true;
1456 if let GuestCallKind::StartImplicit(_) = &call.kind {
1457 Waitable::Guest(call.thread.task).set_event(
1458 state,
1459 Some(Event::Subtask {
1460 status: Status::Starting,
1461 }),
1462 )?;
1463 }
1464 }
1465
1466 let instance = state.get_mut(call.thread.task)?.instance;
1467 self.0
1468 .instance_state(instance)
1469 .concurrent_state()
1470 .pending
1471 .insert(call.thread, call.kind);
1472 }
1473 }
1474 WorkItem::WorkerFunction(fun) => {
1475 self.run_on_worker(WorkerItem::Function(fun)).await?;
1476 }
1477 }
1478
1479 Ok(())
1480 }
1481
1482 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1484 where
1485 T: Send,
1486 {
1487 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1488 fiber
1489 } else {
1490 fiber::make_fiber(self.0, move |store| {
1491 loop {
1492 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1493 bail_bug!("worker_item not present when resuming fiber")
1494 };
1495 match item {
1496 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1497 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1498 }
1499
1500 store.suspend(SuspendReason::NeedWork)?;
1501 }
1502 })?
1503 };
1504
1505 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1506 assert!(worker_item.is_none());
1507 *worker_item = Some(item);
1508
1509 self.0.resume_fiber(worker).await
1510 }
1511
1512 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1517 where
1518 T: 'static,
1519 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1520 + Send
1521 + Sync
1522 + 'static,
1523 R: Send + Sync + 'static,
1524 {
1525 let token = StoreToken::new(self);
1526 async move {
1527 let mut accessor = Accessor::new(token);
1528 closure(&mut accessor).await
1529 }
1530 }
1531
1532 pub fn async_call_stack(&mut self) -> impl Iterator<Item = GuestTaskId> {
1542 let state = self.0.concurrent_state_mut();
1543 let mut cur = Some(state.current_thread);
1544 core::iter::from_fn(move || {
1545 while let Some(t) = cur {
1546 cur = state.parent(t);
1547 if let Some(thread) = t.guest() {
1548 return Some(GuestTaskId(thread.task));
1549 }
1550 }
1551
1552 None
1553 })
1554 }
1555}
1556
1557impl StoreOpaque {
1558 pub(crate) fn enter_guest_sync_call(
1565 &mut self,
1566 guest_caller: Option<RuntimeInstance>,
1567 callee_async: bool,
1568 callee: RuntimeInstance,
1569 ) -> Result<()> {
1570 log::trace!("enter sync call {callee:?}");
1571 if !self.concurrency_support() {
1572 return self.enter_call_not_concurrent();
1573 }
1574
1575 let state = self.concurrent_state_mut();
1576 let thread = state.current_thread;
1577 let instance = if let Some(thread) = thread.guest() {
1578 Some(state.get_mut(thread.task)?.instance)
1579 } else {
1580 None
1581 };
1582 if guest_caller.is_some() {
1583 debug_assert_eq!(instance, guest_caller);
1584 }
1585 let guest_thread = GuestTask::new(
1586 state,
1587 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1588 LiftResult {
1589 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1590 ty: TypeTupleIndex::reserved_value(),
1591 memory: None,
1592 string_encoding: StringEncoding::Utf8,
1593 },
1594 if let Some(thread) = thread.guest() {
1595 Caller::Guest { thread: *thread }
1596 } else {
1597 Caller::Host {
1598 tx: None,
1599 host_future_present: false,
1600 caller: thread,
1601 }
1602 },
1603 None,
1604 callee,
1605 callee_async,
1606 )?;
1607
1608 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1609 guest_thread.thread,
1610 self,
1611 callee.index,
1612 )?;
1613 self.set_thread(guest_thread)?;
1614
1615 Ok(())
1616 }
1617
1618 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1620 if !self.concurrency_support() {
1621 return Ok(self.exit_call_not_concurrent());
1622 }
1623 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1624 Some(t) => *t,
1625 None => bail_bug!("expected task when exiting"),
1626 };
1627 let task = self.concurrent_state_mut().get_mut(thread.task)?;
1628 let instance = task.instance;
1629 let caller = match &task.caller {
1630 &Caller::Guest { thread } => thread.into(),
1631 &Caller::Host { caller, .. } => caller,
1632 };
1633 task.lift_result = None;
1634 task.exited = true;
1635 self.set_thread(caller)?;
1636
1637 log::trace!("exit sync call {instance:?}");
1638 self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1639
1640 Ok(())
1641 }
1642
1643 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1651 if !self.concurrency_support() {
1652 self.enter_call_not_concurrent()?;
1653 return Ok(None);
1654 }
1655 let state = self.concurrent_state_mut();
1656 let caller = state.current_guest_thread()?;
1657 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1658 log::trace!("new host task {task:?}");
1659 self.set_thread(task)?;
1660 Ok(Some(task))
1661 }
1662
1663 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1669 if !self.concurrency_support() {
1670 return Ok(());
1671 }
1672 let task = self.concurrent_state_mut().current_host_thread()?;
1673 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1674 self.set_thread(caller)?;
1675 Ok(())
1676 }
1677
1678 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1685 match task {
1686 Some(task) => {
1687 log::trace!("delete host task {task:?}");
1688 self.concurrent_state_mut().delete(task)?;
1689 }
1690 None => {
1691 self.exit_call_not_concurrent();
1692 }
1693 }
1694 Ok(())
1695 }
1696
1697 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1705 if self.trapped() {
1706 return Ok(false);
1707 }
1708 if !self.concurrency_support() {
1709 return Ok(true);
1710 }
1711 let state = self.concurrent_state_mut();
1712 let mut cur = Some(state.current_thread);
1713 while let Some(t) = cur {
1714 if let Some(thread) = t.guest() {
1715 let task = state.get_mut(thread.task)?;
1716 if task.instance.instance == instance.instance {
1723 return Ok(false);
1724 }
1725 }
1726 cur = state.parent(t);
1727 }
1728 Ok(true)
1729 }
1730
1731 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1734 self.component_instance_mut(instance.instance)
1735 .instance_state(instance.index)
1736 }
1737
1738 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1744 let thread = thread.into();
1745 let state = self.concurrent_state_mut();
1746 let old_thread = mem::replace(&mut state.current_thread, thread);
1747
1748 if let Some(old_thread) = old_thread.guest() {
1756 let old_context = self.vm_store_context().component_context;
1757 self.concurrent_state_mut()
1758 .get_mut(old_thread.thread)?
1759 .context = old_context;
1760 }
1761 if cfg!(debug_assertions) {
1762 self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1763 }
1764 if let Some(thread) = thread.guest() {
1765 let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1766 let context = thread.context;
1767 if cfg!(debug_assertions) {
1768 thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1769 }
1770 self.vm_store_context_mut().component_context = context;
1771 }
1772
1773 let state = self.concurrent_state_mut();
1781 if let Some(old_thread) = old_thread.guest() {
1782 let instance = state.get_mut(old_thread.task)?.instance.instance;
1783 self.component_instance_mut(instance)
1784 .set_task_may_block(false)
1785 }
1786
1787 if thread.guest().is_some() {
1788 self.set_task_may_block()?;
1789 }
1790
1791 Ok(old_thread)
1792 }
1793
1794 fn set_task_may_block(&mut self) -> Result<()> {
1797 let state = self.concurrent_state_mut();
1798 let guest_thread = state.current_guest_thread()?;
1799 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1800 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1801 self.component_instance_mut(instance)
1802 .set_task_may_block(may_block);
1803 Ok(())
1804 }
1805
1806 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1807 if !self.concurrency_support() {
1808 return Ok(());
1809 }
1810 let state = self.concurrent_state_mut();
1811 let task = state.current_guest_thread()?.task;
1812 let instance = state.get_mut(task)?.instance.instance;
1813 let task_may_block = self.component_instance(instance).get_task_may_block();
1814
1815 if task_may_block {
1816 Ok(())
1817 } else {
1818 Err(Trap::CannotBlockSyncTask.into())
1819 }
1820 }
1821
1822 fn enter_instance(&mut self, instance: RuntimeInstance) {
1826 log::trace!("enter {instance:?}");
1827 self.instance_state(instance)
1828 .concurrent_state()
1829 .do_not_enter = true;
1830 }
1831
1832 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1836 log::trace!("exit {instance:?}");
1837 self.instance_state(instance)
1838 .concurrent_state()
1839 .do_not_enter = false;
1840 self.partition_pending(instance)
1841 }
1842
1843 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1848 for (thread, kind) in
1849 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1850 {
1851 let call = GuestCall { thread, kind };
1852 if call.is_ready(self)? {
1853 self.concurrent_state_mut()
1854 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1855 } else {
1856 self.instance_state(instance)
1857 .concurrent_state()
1858 .pending
1859 .insert(call.thread, call.kind);
1860 }
1861 }
1862
1863 Ok(())
1864 }
1865
1866 pub(crate) fn backpressure_modify(
1868 &mut self,
1869 caller_instance: RuntimeInstance,
1870 modify: impl FnOnce(u16) -> Option<u16>,
1871 ) -> Result<()> {
1872 let state = self.instance_state(caller_instance).concurrent_state();
1873 let old = state.backpressure;
1874 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1875 state.backpressure = new;
1876
1877 if old > 0 && new == 0 {
1878 self.partition_pending(caller_instance)?;
1881 }
1882
1883 Ok(())
1884 }
1885
1886 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1889 let old_thread = self.concurrent_state_mut().current_thread;
1890 log::trace!("resume_fiber: save current thread {old_thread:?}");
1891
1892 let fiber = fiber::resolve_or_release(self, fiber).await?;
1893
1894 self.set_thread(old_thread)?;
1895
1896 let state = self.concurrent_state_mut();
1897
1898 if let Some(ot) = old_thread.guest() {
1899 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1900 }
1901 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1902
1903 if let Some(mut fiber) = fiber {
1904 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1905 let reason = match state.suspend_reason.take() {
1907 Some(r) => r,
1908 None => bail_bug!("suspend reason missing when resuming fiber"),
1909 };
1910 match reason {
1911 SuspendReason::NeedWork => {
1912 if state.worker.is_none() {
1913 state.worker = Some(fiber);
1914 } else {
1915 fiber.dispose(self);
1916 }
1917 }
1918 SuspendReason::Yielding {
1919 thread,
1920 cancellable,
1921 ..
1922 } => {
1923 state.get_mut(thread.thread)?.state =
1924 GuestThreadState::Ready { fiber, cancellable };
1925 let instance = state.get_mut(thread.task)?.instance.index;
1926 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1927 }
1928 SuspendReason::ExplicitlySuspending { thread, .. } => {
1929 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1930 }
1931 SuspendReason::Waiting { set, thread, .. } => {
1932 let old = state
1933 .get_mut(set)?
1934 .waiting
1935 .insert(thread, WaitMode::Fiber(fiber));
1936 assert!(old.is_none());
1937 }
1938 };
1939 } else {
1940 log::trace!("resume_fiber: fiber has exited");
1941 }
1942
1943 Ok(())
1944 }
1945
1946 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1952 log::trace!("suspend fiber: {reason:?}");
1953
1954 let task = match &reason {
1958 SuspendReason::Yielding { thread, .. }
1959 | SuspendReason::Waiting { thread, .. }
1960 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1961 SuspendReason::NeedWork => None,
1962 };
1963
1964 let old_guest_thread = if task.is_some() {
1965 self.concurrent_state_mut().current_thread
1966 } else {
1967 CurrentThread::None
1968 };
1969
1970 debug_assert!(
1976 matches!(
1977 reason,
1978 SuspendReason::ExplicitlySuspending {
1979 skip_may_block_check: true,
1980 ..
1981 } | SuspendReason::Waiting {
1982 skip_may_block_check: true,
1983 ..
1984 } | SuspendReason::Yielding {
1985 skip_may_block_check: true,
1986 ..
1987 }
1988 ) || old_guest_thread
1989 .guest()
1990 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1991 .transpose()?
1992 .unwrap_or(true)
1993 );
1994
1995 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1996 assert!(suspend_reason.is_none());
1997 *suspend_reason = Some(reason);
1998
1999 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
2000
2001 if task.is_some() {
2002 self.set_thread(old_guest_thread)?;
2003 }
2004
2005 Ok(())
2006 }
2007
2008 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
2009 let state = self.concurrent_state_mut();
2010
2011 if waitable.common(state)?.set.is_some() {
2012 bail!(Trap::WaitableSyncAndAsync);
2013 }
2014
2015 let caller = state.current_guest_thread()?;
2016 let set = state.get_mut(caller.thread)?.sync_call_set;
2017 waitable.join(state, Some(set))?;
2018 self.suspend(SuspendReason::Waiting {
2019 set,
2020 thread: caller,
2021 skip_may_block_check: false,
2022 })?;
2023 let state = self.concurrent_state_mut();
2024 waitable.join(state, None)
2025 }
2026
2027 fn cleanup_thread(
2049 &mut self,
2050 guest_thread: QualifiedThreadId,
2051 runtime_instance: RuntimeInstance,
2052 cleanup_task: CleanupTask,
2053 ) -> Result<()> {
2054 let state = self.concurrent_state_mut();
2055 let thread_data = state.get_mut(guest_thread.thread)?;
2056 let sync_call_set = thread_data.sync_call_set;
2057 if let Some(guest_id) = thread_data.instance_rep {
2058 self.instance_state(runtime_instance)
2059 .thread_handle_table()
2060 .guest_thread_remove(guest_id)?;
2061 }
2062 let state = self.concurrent_state_mut();
2063
2064 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2066 if let Some(Event::Subtask {
2067 status: Status::Returned | Status::ReturnCancelled,
2068 }) = waitable.common(state)?.event
2069 {
2070 waitable.delete_from(state)?;
2071 }
2072 }
2073
2074 state.delete(guest_thread.thread)?;
2075 state.delete(sync_call_set)?;
2076 let task = state.get_mut(guest_thread.task)?;
2077 task.threads.remove(&guest_thread.thread);
2078
2079 if task.threads.is_empty() && !task.returned_or_cancelled() {
2080 bail!(Trap::NoAsyncResult);
2081 }
2082 let ready_to_delete = task.ready_to_delete();
2083
2084 if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2085 task.decremented_interesting_task_count = true;
2086
2087 debug_assert!(state.interesting_tasks > 0);
2088 state.interesting_tasks -= 1;
2089 if state.interesting_tasks == 0
2090 && let Some(waker) = state.interesting_tasks_empty_waker.take()
2091 {
2092 waker.wake();
2093 }
2094 }
2095
2096 match cleanup_task {
2097 CleanupTask::Yes => {
2098 if ready_to_delete {
2099 Waitable::Guest(guest_thread.task).delete_from(state)?;
2100 }
2101 }
2102 CleanupTask::No => {}
2103 }
2104
2105 Ok(())
2106 }
2107
2108 fn cancel_guest_subtask_without_lowered_parameters(
2121 &mut self,
2122 caller_instance: RuntimeInstance,
2123 guest_task: TableId<GuestTask>,
2124 ) -> Result<()> {
2125 let concurrent_state = self.concurrent_state_mut();
2126 let task = concurrent_state.get_mut(guest_task)?;
2127 assert!(!task.already_lowered_parameters());
2128 task.lower_params = None;
2132 task.lift_result = None;
2133 task.exited = true;
2134 let instance = task.instance;
2135
2136 assert_eq!(1, task.threads.len());
2139 let thread = *task.threads.iter().next().unwrap();
2140 self.cleanup_thread(
2141 QualifiedThreadId {
2142 task: guest_task,
2143 thread,
2144 },
2145 caller_instance,
2146 CleanupTask::No,
2147 )?;
2148
2149 let pending = &mut self.instance_state(instance).concurrent_state().pending;
2151 let pending_count = pending.len();
2152 pending.retain(|thread, _| thread.task != guest_task);
2153 if pending.len() == pending_count {
2155 bail!(Trap::SubtaskCancelAfterTerminal);
2156 }
2157 Ok(())
2158 }
2159}
2160
2161enum CleanupTask {
2162 Yes,
2163 No,
2164}
2165
2166impl Instance {
2167 fn get_event(
2170 self,
2171 store: &mut StoreOpaque,
2172 guest_task: TableId<GuestTask>,
2173 set: Option<TableId<WaitableSet>>,
2174 cancellable: bool,
2175 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2176 let state = store.concurrent_state_mut();
2177
2178 let event = &mut state.get_mut(guest_task)?.event;
2179 if let Some(ev) = event
2180 && (cancellable || !matches!(ev, Event::Cancelled))
2181 {
2182 log::trace!("deliver event {ev:?} to {guest_task:?}");
2183 let ev = *ev;
2184 *event = None;
2185 return Ok(Some((ev, None)));
2186 }
2187
2188 let set = match set {
2189 Some(set) => set,
2190 None => return Ok(None),
2191 };
2192 let waitable = match state.get_mut(set)?.ready.pop_first() {
2193 Some(v) => v,
2194 None => return Ok(None),
2195 };
2196
2197 let common = waitable.common(state)?;
2198 let handle = match common.handle {
2199 Some(h) => h,
2200 None => bail_bug!("handle not set when delivering event"),
2201 };
2202 let event = match common.event.take() {
2203 Some(e) => e,
2204 None => bail_bug!("event not set when delivering event"),
2205 };
2206
2207 log::trace!(
2208 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2209 );
2210
2211 waitable.on_delivery(store, self, event)?;
2212
2213 Ok(Some((event, Some((waitable, handle)))))
2214 }
2215
2216 fn handle_callback_code(
2222 self,
2223 store: &mut StoreOpaque,
2224 guest_thread: QualifiedThreadId,
2225 runtime_instance: RuntimeComponentInstanceIndex,
2226 code: u32,
2227 ) -> Result<Option<GuestCall>> {
2228 let (code, set) = unpack_callback_code(code);
2229
2230 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2231
2232 let state = store.concurrent_state_mut();
2233
2234 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2235 let set = store
2236 .instance_state(self.runtime_instance(runtime_instance))
2237 .handle_table()
2238 .waitable_set_rep(handle)?;
2239
2240 Ok(TableId::<WaitableSet>::new(set))
2241 };
2242
2243 Ok(match code {
2244 callback_code::EXIT => {
2245 log::trace!("implicit thread {guest_thread:?} completed");
2246 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2247 task.exited = true;
2248 task.callback = None;
2249 store.cleanup_thread(
2250 guest_thread,
2251 self.runtime_instance(runtime_instance),
2252 CleanupTask::Yes,
2253 )?;
2254 None
2255 }
2256 callback_code::YIELD => {
2257 let task = state.get_mut(guest_thread.task)?;
2258 if let Some(event) = task.event {
2263 assert!(matches!(event, Event::None | Event::Cancelled));
2264 } else {
2265 task.event = Some(Event::None);
2266 }
2267 let call = GuestCall {
2268 thread: guest_thread,
2269 kind: GuestCallKind::DeliverEvent {
2270 instance: self,
2271 set: None,
2272 },
2273 };
2274 if state.may_block(guest_thread.task)? {
2275 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2278 None
2279 } else {
2280 Some(call)
2284 }
2285 }
2286 callback_code::WAIT => {
2287 state.check_blocking_for(guest_thread.task)?;
2290
2291 let set = get_set(store, set)?;
2292 let state = store.concurrent_state_mut();
2293
2294 if state.get_mut(guest_thread.task)?.event.is_some()
2295 || !state.get_mut(set)?.ready.is_empty()
2296 {
2297 state.push_high_priority(WorkItem::GuestCall(
2299 runtime_instance,
2300 GuestCall {
2301 thread: guest_thread,
2302 kind: GuestCallKind::DeliverEvent {
2303 instance: self,
2304 set: Some(set),
2305 },
2306 },
2307 ));
2308 } else {
2309 let old = state
2317 .get_mut(guest_thread.thread)?
2318 .wake_on_cancel
2319 .replace(set);
2320 if !old.is_none() {
2321 bail_bug!("thread unexpectedly had wake_on_cancel set");
2322 }
2323 let old = state
2324 .get_mut(set)?
2325 .waiting
2326 .insert(guest_thread, WaitMode::Callback(self));
2327 if !old.is_none() {
2328 bail_bug!("set's waiting set already had this thread registered");
2329 }
2330 }
2331 None
2332 }
2333 _ => bail!(Trap::UnsupportedCallbackCode),
2334 })
2335 }
2336
2337 unsafe fn queue_call<T: 'static>(
2344 self,
2345 mut store: StoreContextMut<T>,
2346 guest_thread: QualifiedThreadId,
2347 callee: SendSyncPtr<VMFuncRef>,
2348 param_count: usize,
2349 result_count: usize,
2350 async_: bool,
2351 callback: Option<SendSyncPtr<VMFuncRef>>,
2352 post_return: Option<SendSyncPtr<VMFuncRef>>,
2353 ) -> Result<()> {
2354 unsafe fn make_call<T: 'static>(
2369 store: StoreContextMut<T>,
2370 guest_thread: QualifiedThreadId,
2371 callee: SendSyncPtr<VMFuncRef>,
2372 param_count: usize,
2373 result_count: usize,
2374 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2375 + Send
2376 + Sync
2377 + 'static
2378 + use<T> {
2379 let token = StoreToken::new(store);
2380 move |store: &mut dyn VMStore| {
2381 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2382
2383 store
2384 .concurrent_state_mut()
2385 .get_mut(guest_thread.thread)?
2386 .state = GuestThreadState::Running;
2387 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2388 let lower = match task.lower_params.take() {
2389 Some(l) => l,
2390 None => bail_bug!("lower_params missing"),
2391 };
2392
2393 lower(store, &mut storage[..param_count])?;
2394
2395 let mut store = token.as_context_mut(store);
2396
2397 unsafe {
2400 crate::Func::call_unchecked_raw(
2401 &mut store,
2402 callee.as_non_null(),
2403 NonNull::new(
2404 &mut storage[..param_count.max(result_count)]
2405 as *mut [MaybeUninit<ValRaw>] as _,
2406 )
2407 .unwrap(),
2408 )?;
2409 }
2410
2411 Ok(storage)
2412 }
2413 }
2414
2415 let call = unsafe {
2419 make_call(
2420 store.as_context_mut(),
2421 guest_thread,
2422 callee,
2423 param_count,
2424 result_count,
2425 )
2426 };
2427
2428 let callee_instance = store
2429 .0
2430 .concurrent_state_mut()
2431 .get_mut(guest_thread.task)?
2432 .instance;
2433
2434 let fun = if callback.is_some() {
2435 assert!(async_);
2436
2437 Box::new(move |store: &mut dyn VMStore| {
2438 self.add_guest_thread_to_instance_table(
2439 guest_thread.thread,
2440 store,
2441 callee_instance.index,
2442 )?;
2443 let old_thread = store.set_thread(guest_thread)?;
2444 log::trace!(
2445 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2446 );
2447
2448 store.enter_instance(callee_instance);
2449
2450 let storage = call(store)?;
2457
2458 store.exit_instance(callee_instance)?;
2459
2460 store.set_thread(old_thread)?;
2461 let state = store.concurrent_state_mut();
2462 if let Some(t) = old_thread.guest() {
2463 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2464 }
2465 log::trace!("stackless call: restored {old_thread:?} as current thread");
2466
2467 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2470
2471 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2472 })
2473 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2474 } else {
2475 let token = StoreToken::new(store.as_context_mut());
2476 Box::new(move |store: &mut dyn VMStore| {
2477 self.add_guest_thread_to_instance_table(
2478 guest_thread.thread,
2479 store,
2480 callee_instance.index,
2481 )?;
2482 let old_thread = store.set_thread(guest_thread)?;
2483 log::trace!(
2484 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2485 );
2486 let flags = self.id().get(store).instance_flags(callee_instance.index);
2487
2488 if !async_ {
2492 store.enter_instance(callee_instance);
2493 }
2494
2495 let storage = call(store)?;
2502
2503 if !async_ {
2504 let lift = {
2510 store.exit_instance(callee_instance)?;
2511
2512 let state = store.concurrent_state_mut();
2513 if !state.get_mut(guest_thread.task)?.result.is_none() {
2514 bail_bug!("task has already produced a result");
2515 }
2516
2517 match state.get_mut(guest_thread.task)?.lift_result.take() {
2518 Some(lift) => lift,
2519 None => bail_bug!("lift_result field is missing"),
2520 }
2521 };
2522
2523 let result = (lift.lift)(store, unsafe {
2526 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2527 &storage[..result_count],
2528 )
2529 })?;
2530
2531 let post_return_arg = match result_count {
2532 0 => ValRaw::i32(0),
2533 1 => unsafe { storage[0].assume_init() },
2536 _ => unreachable!(),
2537 };
2538
2539 unsafe {
2540 call_post_return(
2541 token.as_context_mut(store),
2542 post_return.map(|v| v.as_non_null()),
2543 post_return_arg,
2544 flags,
2545 )?;
2546 }
2547
2548 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2549 }
2550
2551 store.set_thread(old_thread)?;
2552
2553 store
2554 .concurrent_state_mut()
2555 .get_mut(guest_thread.task)?
2556 .exited = true;
2557
2558 store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2560 Ok(None)
2561 })
2562 };
2563
2564 store
2565 .0
2566 .concurrent_state_mut()
2567 .push_high_priority(WorkItem::GuestCall(
2568 callee_instance.index,
2569 GuestCall {
2570 thread: guest_thread,
2571 kind: GuestCallKind::StartImplicit(fun),
2572 },
2573 ));
2574
2575 Ok(())
2576 }
2577
2578 unsafe fn prepare_call<T: 'static>(
2591 self,
2592 mut store: StoreContextMut<T>,
2593 start: NonNull<VMFuncRef>,
2594 return_: NonNull<VMFuncRef>,
2595 caller_instance: RuntimeComponentInstanceIndex,
2596 callee_instance: RuntimeComponentInstanceIndex,
2597 task_return_type: TypeTupleIndex,
2598 callee_async: bool,
2599 memory: *mut VMMemoryDefinition,
2600 string_encoding: StringEncoding,
2601 caller_info: CallerInfo,
2602 ) -> Result<()> {
2603 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2604 store.0.check_blocking()?;
2608 }
2609
2610 enum ResultInfo {
2611 Heap { results: u32 },
2612 Stack { result_count: u32 },
2613 }
2614
2615 let result_info = match &caller_info {
2616 CallerInfo::Async {
2617 has_result: true,
2618 params,
2619 } => ResultInfo::Heap {
2620 results: match params.last() {
2621 Some(r) => r.get_u32(),
2622 None => bail_bug!("retptr missing"),
2623 },
2624 },
2625 CallerInfo::Async {
2626 has_result: false, ..
2627 } => ResultInfo::Stack { result_count: 0 },
2628 CallerInfo::Sync {
2629 result_count,
2630 params,
2631 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2632 results: match params.last() {
2633 Some(r) => r.get_u32(),
2634 None => bail_bug!("arg ptr missing"),
2635 },
2636 },
2637 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2638 result_count: *result_count,
2639 },
2640 };
2641
2642 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2643
2644 let start = SendSyncPtr::new(start);
2648 let return_ = SendSyncPtr::new(return_);
2649 let token = StoreToken::new(store.as_context_mut());
2650 let state = store.0.concurrent_state_mut();
2651 let old_thread = state.current_guest_thread()?;
2652
2653 debug_assert_eq!(
2654 state.get_mut(old_thread.task)?.instance,
2655 self.runtime_instance(caller_instance)
2656 );
2657
2658 let guest_thread = GuestTask::new(
2659 state,
2660 Box::new(move |store, dst| {
2661 let mut store = token.as_context_mut(store);
2662 assert!(dst.len() <= MAX_FLAT_PARAMS);
2663 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2665 let count = match caller_info {
2666 CallerInfo::Async { params, has_result } => {
2670 let params = ¶ms[..params.len() - usize::from(has_result)];
2671 for (param, src) in params.iter().zip(&mut src) {
2672 src.write(*param);
2673 }
2674 params.len()
2675 }
2676
2677 CallerInfo::Sync { params, .. } => {
2679 for (param, src) in params.iter().zip(&mut src) {
2680 src.write(*param);
2681 }
2682 params.len()
2683 }
2684 };
2685 unsafe {
2692 crate::Func::call_unchecked_raw(
2693 &mut store,
2694 start.as_non_null(),
2695 NonNull::new(
2696 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2697 )
2698 .unwrap(),
2699 )?;
2700 }
2701 dst.copy_from_slice(&src[..dst.len()]);
2702 let state = store.0.concurrent_state_mut();
2703 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2704 state,
2705 Some(Event::Subtask {
2706 status: Status::Started,
2707 }),
2708 )?;
2709 Ok(())
2710 }),
2711 LiftResult {
2712 lift: Box::new(move |store, src| {
2713 let mut store = token.as_context_mut(store);
2716 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2718 my_src.push(ValRaw::u32(*results));
2719 }
2720
2721 let prev = store.0.set_thread(old_thread)?;
2727
2728 unsafe {
2735 crate::Func::call_unchecked_raw(
2736 &mut store,
2737 return_.as_non_null(),
2738 my_src.as_mut_slice().into(),
2739 )?;
2740 }
2741
2742 store.0.set_thread(prev)?;
2745
2746 let state = store.0.concurrent_state_mut();
2747 let thread = state.current_guest_thread()?;
2748 if sync_caller {
2749 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2750 if let ResultInfo::Stack { result_count } = &result_info {
2751 match result_count {
2752 0 => None,
2753 1 => Some(my_src[0]),
2754 _ => unreachable!(),
2755 }
2756 } else {
2757 None
2758 },
2759 );
2760 }
2761 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2762 }),
2763 ty: task_return_type,
2764 memory: NonNull::new(memory).map(SendSyncPtr::new),
2765 string_encoding,
2766 },
2767 Caller::Guest { thread: old_thread },
2768 None,
2769 self.runtime_instance(callee_instance),
2770 callee_async,
2771 )?;
2772
2773 store.0.set_thread(guest_thread)?;
2776 log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2777
2778 Ok(())
2779 }
2780
2781 unsafe fn call_callback<T>(
2786 self,
2787 mut store: StoreContextMut<T>,
2788 function: SendSyncPtr<VMFuncRef>,
2789 event: Event,
2790 handle: u32,
2791 ) -> Result<u32> {
2792 let (ordinal, result) = event.parts();
2793 let params = &mut [
2794 ValRaw::u32(ordinal),
2795 ValRaw::u32(handle),
2796 ValRaw::u32(result),
2797 ];
2798 unsafe {
2803 crate::Func::call_unchecked_raw(
2804 &mut store,
2805 function.as_non_null(),
2806 params.as_mut_slice().into(),
2807 )?;
2808 }
2809 Ok(params[0].get_u32())
2810 }
2811
2812 unsafe fn start_call<T: 'static>(
2825 self,
2826 mut store: StoreContextMut<T>,
2827 callback: *mut VMFuncRef,
2828 post_return: *mut VMFuncRef,
2829 callee: NonNull<VMFuncRef>,
2830 param_count: u32,
2831 result_count: u32,
2832 flags: u32,
2833 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2834 ) -> Result<u32> {
2835 let token = StoreToken::new(store.as_context_mut());
2836 let async_caller = storage.is_none();
2837 let state = store.0.concurrent_state_mut();
2838 let guest_thread = state.current_guest_thread()?;
2839 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2840 let callee = SendSyncPtr::new(callee);
2841 let param_count = usize::try_from(param_count)?;
2842 assert!(param_count <= MAX_FLAT_PARAMS);
2843 let result_count = usize::try_from(result_count)?;
2844 assert!(result_count <= MAX_FLAT_RESULTS);
2845
2846 let task = state.get_mut(guest_thread.task)?;
2847 if let Some(callback) = NonNull::new(callback) {
2848 let callback = SendSyncPtr::new(callback);
2852 task.callback = Some(Box::new(move |store, event, handle| {
2853 let store = token.as_context_mut(store);
2854 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2855 }));
2856 }
2857
2858 let Caller::Guest { thread: caller } = &task.caller else {
2859 bail_bug!("start_call unexpectedly invoked for host->guest call");
2862 };
2863 let caller = *caller;
2864 let caller_instance = state.get_mut(caller.task)?.instance;
2865
2866 unsafe {
2868 self.queue_call(
2869 store.as_context_mut(),
2870 guest_thread,
2871 callee,
2872 param_count,
2873 result_count,
2874 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2875 NonNull::new(callback).map(SendSyncPtr::new),
2876 NonNull::new(post_return).map(SendSyncPtr::new),
2877 )?;
2878 }
2879
2880 let state = store.0.concurrent_state_mut();
2881
2882 let guest_waitable = Waitable::Guest(guest_thread.task);
2885 let old_set = guest_waitable.common(state)?.set;
2886 let set = state.get_mut(caller.thread)?.sync_call_set;
2887 guest_waitable.join(state, Some(set))?;
2888
2889 store.0.set_thread(CurrentThread::None)?;
2890
2891 let (status, waitable) = loop {
2907 store.0.suspend(SuspendReason::Waiting {
2908 set,
2909 thread: caller,
2910 skip_may_block_check: async_caller || !callee_async,
2918 })?;
2919
2920 let state = store.0.concurrent_state_mut();
2921
2922 log::trace!("taking event for {:?}", guest_thread.task);
2923 let event = guest_waitable.take_event(state)?;
2924 let Some(Event::Subtask { status }) = event else {
2925 bail_bug!("subtasks should only get subtask events, got {event:?}")
2926 };
2927
2928 log::trace!("status {status:?} for {:?}", guest_thread.task);
2929
2930 if status == Status::Returned {
2931 break (status, None);
2933 } else if async_caller {
2934 let handle = store
2938 .0
2939 .instance_state(caller_instance)
2940 .handle_table()
2941 .subtask_insert_guest(guest_thread.task.rep())?;
2942 store
2943 .0
2944 .concurrent_state_mut()
2945 .get_mut(guest_thread.task)?
2946 .common
2947 .handle = Some(handle);
2948 break (status, Some(handle));
2949 } else {
2950 }
2954 };
2955
2956 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2957
2958 store.0.set_thread(caller)?;
2960 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2961 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2962
2963 if let Some(storage) = storage {
2964 let state = store.0.concurrent_state_mut();
2968 let task = state.get_mut(guest_thread.task)?;
2969 if let Some(result) = task.sync_result.take()? {
2970 if let Some(result) = result {
2971 storage[0] = MaybeUninit::new(result);
2972 }
2973
2974 if task.exited && task.ready_to_delete() {
2975 Waitable::Guest(guest_thread.task).delete_from(state)?;
2976 }
2977 }
2978 }
2979
2980 Ok(status.pack(waitable))
2981 }
2982
2983 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2996 self,
2997 mut store: StoreContextMut<'_, T>,
2998 future: impl Future<Output = Result<R>> + Send + 'static,
2999 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
3000 ) -> Result<Option<u32>> {
3001 let token = StoreToken::new(store.as_context_mut());
3002 let state = store.0.concurrent_state_mut();
3003 let task = state.current_host_thread()?;
3004
3005 let (join_handle, future) = JoinHandle::run(future);
3008 {
3009 let state = &mut state.get_mut(task)?.state;
3010 assert!(matches!(state, HostTaskState::CalleeStarted));
3011 *state = HostTaskState::CalleeRunning(join_handle);
3012 }
3013
3014 let mut future = Box::pin(future);
3015
3016 let poll = tls::set(store.0, || {
3021 future
3022 .as_mut()
3023 .poll(&mut Context::from_waker(&Waker::noop()))
3024 });
3025
3026 match poll {
3027 Poll::Ready(result) => {
3029 let result = result.transpose()?;
3030 lower(store.as_context_mut(), result)?;
3031 return Ok(None);
3032 }
3033
3034 Poll::Pending => {}
3036 }
3037
3038 let future = Box::pin(async move {
3046 let result = match future.await {
3047 Some(result) => Some(result?),
3048 None => None,
3049 };
3050 let on_complete = move |store: &mut dyn VMStore| {
3051 let mut store = token.as_context_mut(store);
3055 let old = store.0.set_thread(task)?;
3056
3057 let status = if result.is_some() {
3058 Status::Returned
3059 } else {
3060 Status::ReturnCancelled
3061 };
3062
3063 lower(store.as_context_mut(), result)?;
3064 let state = store.0.concurrent_state_mut();
3065 match &mut state.get_mut(task)?.state {
3066 HostTaskState::CalleeDone { .. } => {}
3069
3070 other => *other = HostTaskState::CalleeDone { cancelled: false },
3072 }
3073 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3074
3075 store.0.set_thread(old)?;
3076 Ok(())
3077 };
3078
3079 tls::get(move |store| {
3084 store
3085 .concurrent_state_mut()
3086 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3087 on_complete,
3088 ))));
3089 Ok(())
3090 })
3091 });
3092
3093 let state = store.0.concurrent_state_mut();
3096 state.push_future(future);
3097 let caller = state.get_mut(task)?.caller;
3098 let instance = state.get_mut(caller.task)?.instance;
3099 let handle = store
3100 .0
3101 .instance_state(instance)
3102 .handle_table()
3103 .subtask_insert_host(task.rep())?;
3104 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3105 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3106
3107 store.0.set_thread(caller)?;
3111 Ok(Some(handle))
3112 }
3113
3114 pub(crate) fn task_return(
3117 self,
3118 store: &mut dyn VMStore,
3119 ty: TypeTupleIndex,
3120 options: OptionsIndex,
3121 storage: &[ValRaw],
3122 ) -> Result<()> {
3123 let state = store.concurrent_state_mut();
3124 let guest_thread = state.current_guest_thread()?;
3125 let lift = state
3126 .get_mut(guest_thread.task)?
3127 .lift_result
3128 .take()
3129 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3130 if !state.get_mut(guest_thread.task)?.result.is_none() {
3131 bail_bug!("task result unexpectedly already set");
3132 }
3133
3134 let CanonicalOptions {
3135 string_encoding,
3136 data_model,
3137 ..
3138 } = &self.id().get(store).component().env_component().options[options];
3139
3140 let invalid = ty != lift.ty
3141 || string_encoding != &lift.string_encoding
3142 || match data_model {
3143 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3144 Some(memory) => {
3145 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3146 let actual = self.id().get(store).runtime_memory(memory);
3147 expected != actual.as_ptr()
3148 }
3149 None => false,
3152 },
3153 CanonicalOptionsDataModel::Gc { .. } => true,
3155 };
3156
3157 if invalid {
3158 bail!(Trap::TaskReturnInvalid);
3159 }
3160
3161 log::trace!("task.return for {guest_thread:?}");
3162
3163 let result = (lift.lift)(store, storage)?;
3164 self.task_complete(store, guest_thread.task, result, Status::Returned)
3165 }
3166
3167 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3169 let state = store.concurrent_state_mut();
3170 let guest_thread = state.current_guest_thread()?;
3171 let task = state.get_mut(guest_thread.task)?;
3172 if !task.cancel_sent {
3173 bail!(Trap::TaskCancelNotCancelled);
3174 }
3175 _ = task
3176 .lift_result
3177 .take()
3178 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3179
3180 if !task.result.is_none() {
3181 bail_bug!("task result should not bet set yet");
3182 }
3183
3184 log::trace!("task.cancel for {guest_thread:?}");
3185
3186 self.task_complete(
3187 store,
3188 guest_thread.task,
3189 Box::new(DummyResult),
3190 Status::ReturnCancelled,
3191 )
3192 }
3193
3194 fn task_complete(
3200 self,
3201 store: &mut StoreOpaque,
3202 guest_task: TableId<GuestTask>,
3203 result: Box<dyn Any + Send + Sync>,
3204 status: Status,
3205 ) -> Result<()> {
3206 store
3207 .component_resource_tables(Some(self))
3208 .validate_scope_exit()?;
3209
3210 let state = store.concurrent_state_mut();
3211 let task = state.get_mut(guest_task)?;
3212
3213 if let Caller::Host { tx, .. } = &mut task.caller {
3214 if let Some(tx) = tx.take() {
3215 _ = tx.send(result);
3216 }
3217 } else {
3218 task.result = Some(result);
3219 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3220 }
3221
3222 Ok(())
3223 }
3224
3225 pub(crate) fn waitable_set_new(
3227 self,
3228 store: &mut StoreOpaque,
3229 caller_instance: RuntimeComponentInstanceIndex,
3230 ) -> Result<u32> {
3231 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3232 let handle = store
3233 .instance_state(self.runtime_instance(caller_instance))
3234 .handle_table()
3235 .waitable_set_insert(set.rep())?;
3236 log::trace!("new waitable set {set:?} (handle {handle})");
3237 Ok(handle)
3238 }
3239
3240 pub(crate) fn waitable_set_drop(
3242 self,
3243 store: &mut StoreOpaque,
3244 caller_instance: RuntimeComponentInstanceIndex,
3245 set: u32,
3246 ) -> Result<()> {
3247 let rep = store
3248 .instance_state(self.runtime_instance(caller_instance))
3249 .handle_table()
3250 .waitable_set_remove(set)?;
3251
3252 log::trace!("drop waitable set {rep} (handle {set})");
3253
3254 if !store
3258 .concurrent_state_mut()
3259 .get_mut(TableId::<WaitableSet>::new(rep))?
3260 .waiting
3261 .is_empty()
3262 {
3263 bail!(Trap::WaitableSetDropHasWaiters);
3264 }
3265
3266 store
3267 .concurrent_state_mut()
3268 .delete(TableId::<WaitableSet>::new(rep))?;
3269
3270 Ok(())
3271 }
3272
3273 pub(crate) fn waitable_join(
3275 self,
3276 store: &mut StoreOpaque,
3277 caller_instance: RuntimeComponentInstanceIndex,
3278 waitable_handle: u32,
3279 set_handle: u32,
3280 ) -> Result<()> {
3281 let mut instance = self.id().get_mut(store);
3282 let waitable =
3283 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3284
3285 let set = if set_handle == 0 {
3286 None
3287 } else {
3288 let set = instance.instance_states().0[caller_instance]
3289 .handle_table()
3290 .waitable_set_rep(set_handle)?;
3291
3292 let state = store.concurrent_state_mut();
3293 if let Some(old) = waitable.common(state)?.set
3294 && state.get_mut(old)?.is_sync_call_set
3295 {
3296 bail!(Trap::WaitableSyncAndAsync);
3297 }
3298
3299 Some(TableId::<WaitableSet>::new(set))
3300 };
3301
3302 log::trace!(
3303 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3304 );
3305
3306 waitable.join(store.concurrent_state_mut(), set)
3307 }
3308
3309 pub(crate) fn subtask_drop(
3311 self,
3312 store: &mut StoreOpaque,
3313 caller_instance: RuntimeComponentInstanceIndex,
3314 task_id: u32,
3315 ) -> Result<()> {
3316 self.waitable_join(store, caller_instance, task_id, 0)?;
3317
3318 let (rep, is_host) = store
3319 .instance_state(self.runtime_instance(caller_instance))
3320 .handle_table()
3321 .subtask_remove(task_id)?;
3322
3323 let concurrent_state = store.concurrent_state_mut();
3324 let (waitable, delete) = if is_host {
3325 let id = TableId::<HostTask>::new(rep);
3326 let task = concurrent_state.get_mut(id)?;
3327 match &task.state {
3328 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3329 HostTaskState::CalleeDone { .. } => {}
3330 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3331 bail_bug!("invalid state for callee in `subtask.drop`")
3332 }
3333 }
3334 (Waitable::Host(id), true)
3335 } else {
3336 let id = TableId::<GuestTask>::new(rep);
3337 let task = concurrent_state.get_mut(id)?;
3338 if task.lift_result.is_some() {
3339 bail!(Trap::SubtaskDropNotResolved);
3340 }
3341 (
3342 Waitable::Guest(id),
3343 concurrent_state.get_mut(id)?.ready_to_delete(),
3344 )
3345 };
3346
3347 waitable.common(concurrent_state)?.handle = None;
3348
3349 if waitable.take_event(concurrent_state)?.is_some() {
3352 bail!(Trap::SubtaskDropNotResolved);
3353 }
3354
3355 if delete {
3356 waitable.delete_from(concurrent_state)?;
3357 }
3358
3359 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3360 Ok(())
3361 }
3362
3363 pub(crate) fn waitable_set_wait(
3365 self,
3366 store: &mut StoreOpaque,
3367 options: OptionsIndex,
3368 set: u32,
3369 payload: u32,
3370 ) -> Result<u32> {
3371 if !self.options(store, options).async_ {
3372 store.check_blocking()?;
3376 }
3377
3378 let &CanonicalOptions {
3379 cancellable,
3380 instance: caller_instance,
3381 ..
3382 } = &self.id().get(store).component().env_component().options[options];
3383 let rep = store
3384 .instance_state(self.runtime_instance(caller_instance))
3385 .handle_table()
3386 .waitable_set_rep(set)?;
3387
3388 self.waitable_check(
3389 store,
3390 cancellable,
3391 WaitableCheck::Wait,
3392 WaitableCheckParams {
3393 set: TableId::new(rep),
3394 options,
3395 payload,
3396 },
3397 )
3398 }
3399
3400 pub(crate) fn waitable_set_poll(
3402 self,
3403 store: &mut StoreOpaque,
3404 options: OptionsIndex,
3405 set: u32,
3406 payload: u32,
3407 ) -> Result<u32> {
3408 let &CanonicalOptions {
3409 cancellable,
3410 instance: caller_instance,
3411 ..
3412 } = &self.id().get(store).component().env_component().options[options];
3413 let rep = store
3414 .instance_state(self.runtime_instance(caller_instance))
3415 .handle_table()
3416 .waitable_set_rep(set)?;
3417
3418 self.waitable_check(
3419 store,
3420 cancellable,
3421 WaitableCheck::Poll,
3422 WaitableCheckParams {
3423 set: TableId::new(rep),
3424 options,
3425 payload,
3426 },
3427 )
3428 }
3429
3430 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3432 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3433 match store
3434 .concurrent_state_mut()
3435 .get_mut(thread_id)?
3436 .instance_rep
3437 {
3438 Some(r) => Ok(r),
3439 None => bail_bug!("thread should have instance_rep by now"),
3440 }
3441 }
3442
3443 pub(crate) fn thread_new_indirect<T: 'static>(
3445 self,
3446 mut store: StoreContextMut<T>,
3447 runtime_instance: RuntimeComponentInstanceIndex,
3448 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3450 start_func_idx: u32,
3451 context: i32,
3452 ) -> Result<u32> {
3453 log::trace!("creating new thread");
3454
3455 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3456 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3457 let callee = instance
3458 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3459 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3460 if callee.type_index(store.0) != start_func_ty.type_index() {
3461 bail!(Trap::ThreadNewIndirectInvalidType);
3462 }
3463
3464 let token = StoreToken::new(store.as_context_mut());
3465 let start_func = Box::new(
3466 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3467 let old_thread = store.set_thread(guest_thread)?;
3468 log::trace!(
3469 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3470 );
3471
3472 let mut store = token.as_context_mut(store);
3473 let mut params = [ValRaw::i32(context)];
3474 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3477
3478 store.0.set_thread(old_thread)?;
3479
3480 store.0.cleanup_thread(
3481 guest_thread,
3482 self.runtime_instance(runtime_instance),
3483 CleanupTask::Yes,
3484 )?;
3485 log::trace!("explicit thread {guest_thread:?} completed");
3486 let state = store.0.concurrent_state_mut();
3487 if let Some(t) = old_thread.guest() {
3488 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3489 }
3490 log::trace!("thread start: restored {old_thread:?} as current thread");
3491
3492 Ok(())
3493 },
3494 );
3495
3496 let state = store.0.concurrent_state_mut();
3497 let current_thread = state.current_guest_thread()?;
3498 let parent_task = current_thread.task;
3499
3500 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3501 let thread_id = state.push(new_thread)?;
3502 state.get_mut(parent_task)?.threads.insert(thread_id);
3503
3504 log::trace!("new thread with id {thread_id:?} created");
3505
3506 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3507 }
3508
3509 pub(crate) fn resume_thread(
3510 self,
3511 store: &mut StoreOpaque,
3512 runtime_instance: RuntimeComponentInstanceIndex,
3513 thread_idx: u32,
3514 high_priority: bool,
3515 allow_ready: bool,
3516 ) -> Result<()> {
3517 let thread_id =
3518 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3519 let state = store.concurrent_state_mut();
3520 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3521 let thread = state.get_mut(guest_thread.thread)?;
3522
3523 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3524 GuestThreadState::NotStartedExplicit(start_func) => {
3525 log::trace!("starting thread {guest_thread:?}");
3526 let guest_call = WorkItem::GuestCall(
3527 runtime_instance,
3528 GuestCall {
3529 thread: guest_thread,
3530 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3531 start_func(store, guest_thread)
3532 })),
3533 },
3534 );
3535 store
3536 .concurrent_state_mut()
3537 .push_work_item(guest_call, high_priority);
3538 }
3539 GuestThreadState::Suspended(fiber) => {
3540 log::trace!("resuming thread {thread_id:?} that was suspended");
3541 store
3542 .concurrent_state_mut()
3543 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3544 }
3545 GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3546 log::trace!("resuming thread {thread_id:?} that was ready");
3547 thread.state = GuestThreadState::Ready { fiber, cancellable };
3548 store
3549 .concurrent_state_mut()
3550 .promote_thread_work_item(guest_thread);
3551 }
3552 other => {
3553 thread.state = other;
3554 bail!(Trap::CannotResumeThread);
3555 }
3556 }
3557 Ok(())
3558 }
3559
3560 fn add_guest_thread_to_instance_table(
3561 self,
3562 thread_id: TableId<GuestThread>,
3563 store: &mut StoreOpaque,
3564 runtime_instance: RuntimeComponentInstanceIndex,
3565 ) -> Result<u32> {
3566 let guest_id = store
3567 .instance_state(self.runtime_instance(runtime_instance))
3568 .thread_handle_table()
3569 .guest_thread_insert(thread_id.rep())?;
3570 store
3571 .concurrent_state_mut()
3572 .get_mut(thread_id)?
3573 .instance_rep = Some(guest_id);
3574 Ok(guest_id)
3575 }
3576
3577 pub(crate) fn suspension_intrinsic(
3580 self,
3581 store: &mut StoreOpaque,
3582 caller: RuntimeComponentInstanceIndex,
3583 cancellable: bool,
3584 yielding: bool,
3585 to_thread: SuspensionTarget,
3586 ) -> Result<WaitResult> {
3587 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3588 if to_thread.is_none() {
3589 let state = store.concurrent_state_mut();
3590 if yielding {
3591 if !state.may_block(guest_thread.task)? {
3593 if !state.promote_instance_local_thread_work_item(caller) {
3596 return Ok(WaitResult::Completed);
3598 }
3599 }
3600 } else {
3601 store.check_blocking()?;
3605 }
3606 }
3607
3608 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3610 return Ok(WaitResult::Cancelled);
3611 }
3612
3613 match to_thread {
3614 SuspensionTarget::SomeSuspended(thread) => {
3615 self.resume_thread(store, caller, thread, true, false)?
3616 }
3617 SuspensionTarget::Some(thread) => {
3618 self.resume_thread(store, caller, thread, true, true)?
3619 }
3620 SuspensionTarget::None => { }
3621 }
3622
3623 let reason = if yielding {
3624 SuspendReason::Yielding {
3625 thread: guest_thread,
3626 cancellable,
3627 skip_may_block_check: to_thread.is_some(),
3631 }
3632 } else {
3633 SuspendReason::ExplicitlySuspending {
3634 thread: guest_thread,
3635 skip_may_block_check: to_thread.is_some(),
3639 }
3640 };
3641
3642 store.suspend(reason)?;
3643
3644 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3645 Ok(WaitResult::Cancelled)
3646 } else {
3647 Ok(WaitResult::Completed)
3648 }
3649 }
3650
3651 fn waitable_check(
3653 self,
3654 store: &mut StoreOpaque,
3655 cancellable: bool,
3656 check: WaitableCheck,
3657 params: WaitableCheckParams,
3658 ) -> Result<u32> {
3659 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3660
3661 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3662
3663 let state = store.concurrent_state_mut();
3664 let task = state.get_mut(guest_thread.task)?;
3665
3666 match &check {
3669 WaitableCheck::Wait => {
3670 let set = params.set;
3671
3672 if (task.event.is_none()
3673 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3674 && state.get_mut(set)?.ready.is_empty()
3675 {
3676 if cancellable {
3677 let old = state
3678 .get_mut(guest_thread.thread)?
3679 .wake_on_cancel
3680 .replace(set);
3681 if !old.is_none() {
3682 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3683 }
3684 }
3685
3686 store.suspend(SuspendReason::Waiting {
3687 set,
3688 thread: guest_thread,
3689 skip_may_block_check: false,
3690 })?;
3691 }
3692 }
3693 WaitableCheck::Poll => {}
3694 }
3695
3696 log::trace!(
3697 "waitable check for {guest_thread:?}; set {:?}, part two",
3698 params.set
3699 );
3700
3701 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3703
3704 let (ordinal, handle, result) = match &check {
3705 WaitableCheck::Wait => {
3706 let (event, waitable) = match event {
3707 Some(p) => p,
3708 None => bail_bug!("event expected to be present"),
3709 };
3710 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3711 let (ordinal, result) = event.parts();
3712 (ordinal, handle, result)
3713 }
3714 WaitableCheck::Poll => {
3715 if let Some((event, waitable)) = event {
3716 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3717 let (ordinal, result) = event.parts();
3718 (ordinal, handle, result)
3719 } else {
3720 log::trace!(
3721 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3722 guest_thread.task,
3723 params.set
3724 );
3725 let (ordinal, result) = Event::None.parts();
3726 (ordinal, 0, result)
3727 }
3728 }
3729 };
3730 let memory = self.options_memory_mut(store, params.options);
3731 let ptr = crate::component::func::validate_inbounds_dynamic(
3732 &CanonicalAbiInfo::POINTER_PAIR,
3733 memory,
3734 &ValRaw::u32(params.payload),
3735 )?;
3736 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3737 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3738 Ok(ordinal)
3739 }
3740
3741 pub(crate) fn subtask_cancel(
3743 self,
3744 store: &mut StoreOpaque,
3745 caller_instance: RuntimeComponentInstanceIndex,
3746 async_: bool,
3747 task_id: u32,
3748 ) -> Result<u32> {
3749 if !async_ {
3750 store.check_blocking()?;
3754 }
3755
3756 let (rep, is_host) = store
3757 .instance_state(self.runtime_instance(caller_instance))
3758 .handle_table()
3759 .subtask_rep(task_id)?;
3760 let waitable = if is_host {
3761 Waitable::Host(TableId::<HostTask>::new(rep))
3762 } else {
3763 Waitable::Guest(TableId::<GuestTask>::new(rep))
3764 };
3765 let concurrent_state = store.concurrent_state_mut();
3766
3767 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3768
3769 let needs_block;
3770 if let Waitable::Host(host_task) = waitable {
3771 let state = &mut concurrent_state.get_mut(host_task)?.state;
3772 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3773 HostTaskState::CalleeRunning(handle) => {
3780 handle.abort();
3781 needs_block = true;
3782 }
3783
3784 HostTaskState::CalleeDone { cancelled } => {
3787 if cancelled {
3788 bail!(Trap::SubtaskCancelAfterTerminal);
3789 } else {
3790 needs_block = false;
3793 }
3794 }
3795
3796 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3799 bail_bug!("invalid states for host callee")
3800 }
3801 }
3802 } else {
3803 let guest_task = TableId::<GuestTask>::new(rep);
3804 let task = concurrent_state.get_mut(guest_task)?;
3805 if !task.already_lowered_parameters() {
3806 store.cancel_guest_subtask_without_lowered_parameters(
3807 self.runtime_instance(caller_instance),
3808 guest_task,
3809 )?;
3810 return Ok(Status::StartCancelled as u32);
3811 } else if !task.returned_or_cancelled() {
3812 task.cancel_sent = true;
3815 task.event = Some(Event::Cancelled);
3820 let runtime_instance = task.instance.index;
3821 for thread in task.threads.clone() {
3822 let thread = QualifiedThreadId {
3823 task: guest_task,
3824 thread,
3825 };
3826 let thread_mut = concurrent_state.get_mut(thread.thread)?;
3827 if let Some(set) = thread_mut.wake_on_cancel.take() {
3828 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3830 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3831 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3832 runtime_instance,
3833 GuestCall {
3834 thread,
3835 kind: GuestCallKind::DeliverEvent {
3836 instance,
3837 set: None,
3838 },
3839 },
3840 ),
3841 None => bail_bug!("thread not present in wake_on_cancel set"),
3842 };
3843 concurrent_state.push_high_priority(item);
3844
3845 let caller = concurrent_state.current_guest_thread()?;
3846 store.suspend(SuspendReason::Yielding {
3847 thread: caller,
3848 cancellable: false,
3849 skip_may_block_check: false,
3852 })?;
3853 break;
3854 } else if let GuestThreadState::Ready {
3855 cancellable: true, ..
3856 } = &thread_mut.state
3857 {
3858 let caller = concurrent_state.current_guest_thread()?;
3861 concurrent_state.promote_thread_work_item(thread);
3862 store.suspend(SuspendReason::Yielding {
3863 thread: caller,
3864 cancellable: false,
3865 skip_may_block_check: false,
3866 })?;
3867 break;
3868 }
3869 }
3870
3871 needs_block = !store
3874 .concurrent_state_mut()
3875 .get_mut(guest_task)?
3876 .returned_or_cancelled()
3877 } else {
3878 needs_block = false;
3879 }
3880 };
3881
3882 if needs_block {
3886 if async_ {
3887 return Ok(BLOCKED);
3888 }
3889
3890 store.wait_for_event(waitable)?;
3894
3895 }
3897
3898 let event = waitable.take_event(store.concurrent_state_mut())?;
3899 if let Some(Event::Subtask {
3900 status: status @ (Status::Returned | Status::ReturnCancelled),
3901 }) = event
3902 {
3903 Ok(status as u32)
3904 } else {
3905 bail!(Trap::SubtaskCancelAfterTerminal);
3906 }
3907 }
3908}
3909
3910pub trait VMComponentAsyncStore {
3918 unsafe fn prepare_call(
3924 &mut self,
3925 instance: Instance,
3926 memory: *mut VMMemoryDefinition,
3927 start: NonNull<VMFuncRef>,
3928 return_: NonNull<VMFuncRef>,
3929 caller_instance: RuntimeComponentInstanceIndex,
3930 callee_instance: RuntimeComponentInstanceIndex,
3931 task_return_type: TypeTupleIndex,
3932 callee_async: bool,
3933 string_encoding: StringEncoding,
3934 result_count: u32,
3935 storage: *mut ValRaw,
3936 storage_len: usize,
3937 ) -> Result<()>;
3938
3939 unsafe fn sync_start(
3942 &mut self,
3943 instance: Instance,
3944 callback: *mut VMFuncRef,
3945 callee: NonNull<VMFuncRef>,
3946 param_count: u32,
3947 storage: *mut MaybeUninit<ValRaw>,
3948 storage_len: usize,
3949 ) -> Result<()>;
3950
3951 unsafe fn async_start(
3954 &mut self,
3955 instance: Instance,
3956 callback: *mut VMFuncRef,
3957 post_return: *mut VMFuncRef,
3958 callee: NonNull<VMFuncRef>,
3959 param_count: u32,
3960 result_count: u32,
3961 flags: u32,
3962 ) -> Result<u32>;
3963
3964 fn future_write(
3966 &mut self,
3967 instance: Instance,
3968 caller: RuntimeComponentInstanceIndex,
3969 ty: TypeFutureTableIndex,
3970 options: OptionsIndex,
3971 future: u32,
3972 address: u32,
3973 ) -> Result<u32>;
3974
3975 fn future_read(
3977 &mut self,
3978 instance: Instance,
3979 caller: RuntimeComponentInstanceIndex,
3980 ty: TypeFutureTableIndex,
3981 options: OptionsIndex,
3982 future: u32,
3983 address: u32,
3984 ) -> Result<u32>;
3985
3986 fn future_drop_writable(
3988 &mut self,
3989 instance: Instance,
3990 ty: TypeFutureTableIndex,
3991 writer: u32,
3992 ) -> Result<()>;
3993
3994 fn stream_write(
3996 &mut self,
3997 instance: Instance,
3998 caller: RuntimeComponentInstanceIndex,
3999 ty: TypeStreamTableIndex,
4000 options: OptionsIndex,
4001 stream: u32,
4002 address: u32,
4003 count: u32,
4004 ) -> Result<u32>;
4005
4006 fn stream_read(
4008 &mut self,
4009 instance: Instance,
4010 caller: RuntimeComponentInstanceIndex,
4011 ty: TypeStreamTableIndex,
4012 options: OptionsIndex,
4013 stream: u32,
4014 address: u32,
4015 count: u32,
4016 ) -> Result<u32>;
4017
4018 fn flat_stream_write(
4021 &mut self,
4022 instance: Instance,
4023 caller: RuntimeComponentInstanceIndex,
4024 ty: TypeStreamTableIndex,
4025 options: OptionsIndex,
4026 payload_size: u32,
4027 payload_align: u32,
4028 stream: u32,
4029 address: u32,
4030 count: u32,
4031 ) -> Result<u32>;
4032
4033 fn flat_stream_read(
4036 &mut self,
4037 instance: Instance,
4038 caller: RuntimeComponentInstanceIndex,
4039 ty: TypeStreamTableIndex,
4040 options: OptionsIndex,
4041 payload_size: u32,
4042 payload_align: u32,
4043 stream: u32,
4044 address: u32,
4045 count: u32,
4046 ) -> Result<u32>;
4047
4048 fn stream_drop_writable(
4050 &mut self,
4051 instance: Instance,
4052 ty: TypeStreamTableIndex,
4053 writer: u32,
4054 ) -> Result<()>;
4055
4056 fn error_context_debug_message(
4058 &mut self,
4059 instance: Instance,
4060 ty: TypeComponentLocalErrorContextTableIndex,
4061 options: OptionsIndex,
4062 err_ctx_handle: u32,
4063 debug_msg_address: u32,
4064 ) -> Result<()>;
4065
4066 fn thread_new_indirect(
4068 &mut self,
4069 instance: Instance,
4070 caller: RuntimeComponentInstanceIndex,
4071 func_ty_idx: TypeFuncIndex,
4072 start_func_table_idx: RuntimeTableIndex,
4073 start_func_idx: u32,
4074 context: i32,
4075 ) -> Result<u32>;
4076}
4077
4078impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4080 unsafe fn prepare_call(
4081 &mut self,
4082 instance: Instance,
4083 memory: *mut VMMemoryDefinition,
4084 start: NonNull<VMFuncRef>,
4085 return_: NonNull<VMFuncRef>,
4086 caller_instance: RuntimeComponentInstanceIndex,
4087 callee_instance: RuntimeComponentInstanceIndex,
4088 task_return_type: TypeTupleIndex,
4089 callee_async: bool,
4090 string_encoding: StringEncoding,
4091 result_count_or_max_if_async: u32,
4092 storage: *mut ValRaw,
4093 storage_len: usize,
4094 ) -> Result<()> {
4095 let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4099
4100 unsafe {
4101 instance.prepare_call(
4102 StoreContextMut(self),
4103 start,
4104 return_,
4105 caller_instance,
4106 callee_instance,
4107 task_return_type,
4108 callee_async,
4109 memory,
4110 string_encoding,
4111 match result_count_or_max_if_async {
4112 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4113 params,
4114 has_result: false,
4115 },
4116 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4117 params,
4118 has_result: true,
4119 },
4120 result_count => CallerInfo::Sync {
4121 params,
4122 result_count,
4123 },
4124 },
4125 )
4126 }
4127 }
4128
4129 unsafe fn sync_start(
4130 &mut self,
4131 instance: Instance,
4132 callback: *mut VMFuncRef,
4133 callee: NonNull<VMFuncRef>,
4134 param_count: u32,
4135 storage: *mut MaybeUninit<ValRaw>,
4136 storage_len: usize,
4137 ) -> Result<()> {
4138 unsafe {
4139 instance
4140 .start_call(
4141 StoreContextMut(self),
4142 callback,
4143 ptr::null_mut(),
4144 callee,
4145 param_count,
4146 1,
4147 START_FLAG_ASYNC_CALLEE,
4148 Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4152 )
4153 .map(drop)
4154 }
4155 }
4156
4157 unsafe fn async_start(
4158 &mut self,
4159 instance: Instance,
4160 callback: *mut VMFuncRef,
4161 post_return: *mut VMFuncRef,
4162 callee: NonNull<VMFuncRef>,
4163 param_count: u32,
4164 result_count: u32,
4165 flags: u32,
4166 ) -> Result<u32> {
4167 unsafe {
4168 instance.start_call(
4169 StoreContextMut(self),
4170 callback,
4171 post_return,
4172 callee,
4173 param_count,
4174 result_count,
4175 flags,
4176 None,
4177 )
4178 }
4179 }
4180
4181 fn future_write(
4182 &mut self,
4183 instance: Instance,
4184 caller: RuntimeComponentInstanceIndex,
4185 ty: TypeFutureTableIndex,
4186 options: OptionsIndex,
4187 future: u32,
4188 address: u32,
4189 ) -> Result<u32> {
4190 instance
4191 .guest_write(
4192 StoreContextMut(self),
4193 caller,
4194 TransmitIndex::Future(ty),
4195 options,
4196 None,
4197 future,
4198 address,
4199 1,
4200 )
4201 .map(|result| result.encode())
4202 }
4203
4204 fn future_read(
4205 &mut self,
4206 instance: Instance,
4207 caller: RuntimeComponentInstanceIndex,
4208 ty: TypeFutureTableIndex,
4209 options: OptionsIndex,
4210 future: u32,
4211 address: u32,
4212 ) -> Result<u32> {
4213 instance
4214 .guest_read(
4215 StoreContextMut(self),
4216 caller,
4217 TransmitIndex::Future(ty),
4218 options,
4219 None,
4220 future,
4221 address,
4222 1,
4223 )
4224 .map(|result| result.encode())
4225 }
4226
4227 fn stream_write(
4228 &mut self,
4229 instance: Instance,
4230 caller: RuntimeComponentInstanceIndex,
4231 ty: TypeStreamTableIndex,
4232 options: OptionsIndex,
4233 stream: u32,
4234 address: u32,
4235 count: u32,
4236 ) -> Result<u32> {
4237 instance
4238 .guest_write(
4239 StoreContextMut(self),
4240 caller,
4241 TransmitIndex::Stream(ty),
4242 options,
4243 None,
4244 stream,
4245 address,
4246 count,
4247 )
4248 .map(|result| result.encode())
4249 }
4250
4251 fn stream_read(
4252 &mut self,
4253 instance: Instance,
4254 caller: RuntimeComponentInstanceIndex,
4255 ty: TypeStreamTableIndex,
4256 options: OptionsIndex,
4257 stream: u32,
4258 address: u32,
4259 count: u32,
4260 ) -> Result<u32> {
4261 instance
4262 .guest_read(
4263 StoreContextMut(self),
4264 caller,
4265 TransmitIndex::Stream(ty),
4266 options,
4267 None,
4268 stream,
4269 address,
4270 count,
4271 )
4272 .map(|result| result.encode())
4273 }
4274
4275 fn future_drop_writable(
4276 &mut self,
4277 instance: Instance,
4278 ty: TypeFutureTableIndex,
4279 writer: u32,
4280 ) -> Result<()> {
4281 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4282 }
4283
4284 fn flat_stream_write(
4285 &mut self,
4286 instance: Instance,
4287 caller: RuntimeComponentInstanceIndex,
4288 ty: TypeStreamTableIndex,
4289 options: OptionsIndex,
4290 payload_size: u32,
4291 payload_align: u32,
4292 stream: u32,
4293 address: u32,
4294 count: u32,
4295 ) -> Result<u32> {
4296 instance
4297 .guest_write(
4298 StoreContextMut(self),
4299 caller,
4300 TransmitIndex::Stream(ty),
4301 options,
4302 Some(FlatAbi {
4303 size: payload_size,
4304 align: payload_align,
4305 }),
4306 stream,
4307 address,
4308 count,
4309 )
4310 .map(|result| result.encode())
4311 }
4312
4313 fn flat_stream_read(
4314 &mut self,
4315 instance: Instance,
4316 caller: RuntimeComponentInstanceIndex,
4317 ty: TypeStreamTableIndex,
4318 options: OptionsIndex,
4319 payload_size: u32,
4320 payload_align: u32,
4321 stream: u32,
4322 address: u32,
4323 count: u32,
4324 ) -> Result<u32> {
4325 instance
4326 .guest_read(
4327 StoreContextMut(self),
4328 caller,
4329 TransmitIndex::Stream(ty),
4330 options,
4331 Some(FlatAbi {
4332 size: payload_size,
4333 align: payload_align,
4334 }),
4335 stream,
4336 address,
4337 count,
4338 )
4339 .map(|result| result.encode())
4340 }
4341
4342 fn stream_drop_writable(
4343 &mut self,
4344 instance: Instance,
4345 ty: TypeStreamTableIndex,
4346 writer: u32,
4347 ) -> Result<()> {
4348 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4349 }
4350
4351 fn error_context_debug_message(
4352 &mut self,
4353 instance: Instance,
4354 ty: TypeComponentLocalErrorContextTableIndex,
4355 options: OptionsIndex,
4356 err_ctx_handle: u32,
4357 debug_msg_address: u32,
4358 ) -> Result<()> {
4359 instance.error_context_debug_message(
4360 StoreContextMut(self),
4361 ty,
4362 options,
4363 err_ctx_handle,
4364 debug_msg_address,
4365 )
4366 }
4367
4368 fn thread_new_indirect(
4369 &mut self,
4370 instance: Instance,
4371 caller: RuntimeComponentInstanceIndex,
4372 func_ty_idx: TypeFuncIndex,
4373 start_func_table_idx: RuntimeTableIndex,
4374 start_func_idx: u32,
4375 context: i32,
4376 ) -> Result<u32> {
4377 instance.thread_new_indirect(
4378 StoreContextMut(self),
4379 caller,
4380 func_ty_idx,
4381 start_func_table_idx,
4382 start_func_idx,
4383 context,
4384 )
4385 }
4386}
4387
4388type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4389
4390pub(crate) struct HostTask {
4394 common: WaitableCommon,
4395
4396 caller: QualifiedThreadId,
4398
4399 call_context: CallContext,
4402
4403 state: HostTaskState,
4404}
4405
4406enum HostTaskState {
4407 CalleeStarted,
4412
4413 CalleeRunning(JoinHandle),
4418
4419 CalleeFinished(LiftedResult),
4423
4424 CalleeDone { cancelled: bool },
4427}
4428
4429impl HostTask {
4430 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4431 Self {
4432 common: WaitableCommon::default(),
4433 call_context: CallContext::default(),
4434 caller,
4435 state,
4436 }
4437 }
4438}
4439
4440impl TableDebug for HostTask {
4441 fn type_name() -> &'static str {
4442 "HostTask"
4443 }
4444}
4445
4446type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4447
4448enum Caller {
4450 Host {
4452 tx: Option<oneshot::Sender<LiftedResult>>,
4454 host_future_present: bool,
4457 caller: CurrentThread,
4461 },
4462 Guest {
4464 thread: QualifiedThreadId,
4466 },
4467}
4468
4469struct LiftResult {
4472 lift: RawLift,
4473 ty: TypeTupleIndex,
4474 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4475 string_encoding: StringEncoding,
4476}
4477
4478#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4483pub(crate) struct QualifiedThreadId {
4484 task: TableId<GuestTask>,
4485 thread: TableId<GuestThread>,
4486}
4487
4488impl QualifiedThreadId {
4489 fn qualify(
4490 state: &mut ConcurrentState,
4491 thread: TableId<GuestThread>,
4492 ) -> Result<QualifiedThreadId> {
4493 Ok(QualifiedThreadId {
4494 task: state.get_mut(thread)?.parent_task,
4495 thread,
4496 })
4497 }
4498}
4499
4500impl fmt::Debug for QualifiedThreadId {
4501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4502 f.debug_tuple("QualifiedThreadId")
4503 .field(&self.task.rep())
4504 .field(&self.thread.rep())
4505 .finish()
4506 }
4507}
4508
4509enum GuestThreadState {
4510 NotStartedImplicit,
4511 NotStartedExplicit(
4512 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4513 ),
4514 Running,
4515 Suspended(StoreFiber<'static>),
4516 Ready {
4517 fiber: StoreFiber<'static>,
4518 cancellable: bool,
4519 },
4520 Completed,
4521}
4522pub struct GuestThread {
4523 context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4526 parent_task: TableId<GuestTask>,
4528 wake_on_cancel: Option<TableId<WaitableSet>>,
4531 state: GuestThreadState,
4533 instance_rep: Option<u32>,
4536 sync_call_set: TableId<WaitableSet>,
4538}
4539
4540impl GuestThread {
4541 fn from_instance(
4544 state: Pin<&mut ComponentInstance>,
4545 caller_instance: RuntimeComponentInstanceIndex,
4546 guest_thread: u32,
4547 ) -> Result<TableId<Self>> {
4548 let rep = state.instance_states().0[caller_instance]
4549 .thread_handle_table()
4550 .guest_thread_rep(guest_thread)?;
4551 Ok(TableId::new(rep))
4552 }
4553
4554 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4555 let sync_call_set = state.push(WaitableSet {
4556 is_sync_call_set: true,
4557 ..WaitableSet::default()
4558 })?;
4559 Ok(Self {
4560 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4561 parent_task,
4562 wake_on_cancel: None,
4563 state: GuestThreadState::NotStartedImplicit,
4564 instance_rep: None,
4565 sync_call_set,
4566 })
4567 }
4568
4569 fn new_explicit(
4570 state: &mut ConcurrentState,
4571 parent_task: TableId<GuestTask>,
4572 start_func: Box<
4573 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4574 >,
4575 ) -> Result<Self> {
4576 let sync_call_set = state.push(WaitableSet {
4577 is_sync_call_set: true,
4578 ..WaitableSet::default()
4579 })?;
4580 Ok(Self {
4581 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4582 parent_task,
4583 wake_on_cancel: None,
4584 state: GuestThreadState::NotStartedExplicit(start_func),
4585 instance_rep: None,
4586 sync_call_set,
4587 })
4588 }
4589}
4590
4591impl TableDebug for GuestThread {
4592 fn type_name() -> &'static str {
4593 "GuestThread"
4594 }
4595}
4596
4597enum SyncResult {
4598 NotProduced,
4599 Produced(Option<ValRaw>),
4600 Taken,
4601}
4602
4603impl SyncResult {
4604 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4605 Ok(match mem::replace(self, SyncResult::Taken) {
4606 SyncResult::NotProduced => None,
4607 SyncResult::Produced(val) => Some(val),
4608 SyncResult::Taken => {
4609 bail_bug!("attempted to take a synchronous result that was already taken")
4610 }
4611 })
4612 }
4613}
4614
4615#[derive(Debug)]
4616enum HostFutureState {
4617 NotApplicable,
4618 Live,
4619 Dropped,
4620}
4621
4622pub(crate) struct GuestTask {
4624 common: WaitableCommon,
4626 lower_params: Option<RawLower>,
4628 lift_result: Option<LiftResult>,
4630 result: Option<LiftedResult>,
4633 callback: Option<CallbackFn>,
4636 caller: Caller,
4638 call_context: CallContext,
4643 sync_result: SyncResult,
4646 cancel_sent: bool,
4649 starting_sent: bool,
4652 instance: RuntimeInstance,
4659 event: Option<Event>,
4662 exited: bool,
4664 threads: HashSet<TableId<GuestThread>>,
4666 host_future_state: HostFutureState,
4669 async_function: bool,
4672
4673 decremented_interesting_task_count: bool,
4674}
4675
4676impl GuestTask {
4677 fn already_lowered_parameters(&self) -> bool {
4678 self.lower_params.is_none()
4680 }
4681
4682 fn returned_or_cancelled(&self) -> bool {
4683 self.lift_result.is_none()
4685 }
4686
4687 fn ready_to_delete(&self) -> bool {
4688 let threads_completed = self.threads.is_empty();
4689 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4690 let pending_completion_event = matches!(
4691 self.common.event,
4692 Some(Event::Subtask {
4693 status: Status::Returned | Status::ReturnCancelled
4694 })
4695 );
4696 let ready = threads_completed
4697 && !has_sync_result
4698 && !pending_completion_event
4699 && !matches!(self.host_future_state, HostFutureState::Live);
4700 log::trace!(
4701 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4702 threads_completed,
4703 has_sync_result,
4704 pending_completion_event,
4705 self.host_future_state
4706 );
4707 ready
4708 }
4709
4710 fn new(
4711 state: &mut ConcurrentState,
4712 lower_params: RawLower,
4713 lift_result: LiftResult,
4714 caller: Caller,
4715 callback: Option<CallbackFn>,
4716 instance: RuntimeInstance,
4717 async_function: bool,
4718 ) -> Result<QualifiedThreadId> {
4719 let host_future_state = match &caller {
4720 Caller::Guest { .. } => HostFutureState::NotApplicable,
4721 Caller::Host {
4722 host_future_present,
4723 ..
4724 } => {
4725 if *host_future_present {
4726 HostFutureState::Live
4727 } else {
4728 HostFutureState::NotApplicable
4729 }
4730 }
4731 };
4732 let task = state.push(Self {
4733 common: WaitableCommon::default(),
4734 lower_params: Some(lower_params),
4735 lift_result: Some(lift_result),
4736 result: None,
4737 callback,
4738 caller,
4739 call_context: CallContext::default(),
4740 sync_result: SyncResult::NotProduced,
4741 cancel_sent: false,
4742 starting_sent: false,
4743 instance,
4744 event: None,
4745 exited: false,
4746 threads: HashSet::new(),
4747 host_future_state,
4748 async_function,
4749 decremented_interesting_task_count: false,
4750 })?;
4751 let new_thread = GuestThread::new_implicit(state, task)?;
4752 let thread = state.push(new_thread)?;
4753 state.get_mut(task)?.threads.insert(thread);
4754 state.interesting_tasks += 1;
4755 Ok(QualifiedThreadId { task, thread })
4756 }
4757}
4758
4759impl TableDebug for GuestTask {
4760 fn type_name() -> &'static str {
4761 "GuestTask"
4762 }
4763}
4764
4765#[derive(Default)]
4767struct WaitableCommon {
4768 event: Option<Event>,
4770 set: Option<TableId<WaitableSet>>,
4772 handle: Option<u32>,
4774}
4775
4776#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4778enum Waitable {
4779 Host(TableId<HostTask>),
4781 Guest(TableId<GuestTask>),
4783 Transmit(TableId<TransmitHandle>),
4785}
4786
4787impl Waitable {
4788 fn from_instance(
4791 state: Pin<&mut ComponentInstance>,
4792 caller_instance: RuntimeComponentInstanceIndex,
4793 waitable: u32,
4794 ) -> Result<Self> {
4795 use crate::runtime::vm::component::Waitable;
4796
4797 let (waitable, kind) = state.instance_states().0[caller_instance]
4798 .handle_table()
4799 .waitable_rep(waitable)?;
4800
4801 Ok(match kind {
4802 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4803 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4804 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4805 })
4806 }
4807
4808 fn rep(&self) -> u32 {
4810 match self {
4811 Self::Host(id) => id.rep(),
4812 Self::Guest(id) => id.rep(),
4813 Self::Transmit(id) => id.rep(),
4814 }
4815 }
4816
4817 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4821 log::trace!("waitable {self:?} join set {set:?}");
4822
4823 let old = mem::replace(&mut self.common(state)?.set, set);
4824
4825 if let Some(old) = old {
4826 match *self {
4827 Waitable::Host(id) => state.remove_child(id, old),
4828 Waitable::Guest(id) => state.remove_child(id, old),
4829 Waitable::Transmit(id) => state.remove_child(id, old),
4830 }?;
4831
4832 state.get_mut(old)?.ready.remove(self);
4833 }
4834
4835 if let Some(set) = set {
4836 match *self {
4837 Waitable::Host(id) => state.add_child(id, set),
4838 Waitable::Guest(id) => state.add_child(id, set),
4839 Waitable::Transmit(id) => state.add_child(id, set),
4840 }?;
4841
4842 if self.common(state)?.event.is_some() {
4843 self.mark_ready(state)?;
4844 }
4845 }
4846
4847 Ok(())
4848 }
4849
4850 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4852 Ok(match self {
4853 Self::Host(id) => &mut state.get_mut(*id)?.common,
4854 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4855 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4856 })
4857 }
4858
4859 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4863 log::trace!("set event for {self:?}: {event:?}");
4864 self.common(state)?.event = event;
4865 self.mark_ready(state)
4866 }
4867
4868 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4870 let common = self.common(state)?;
4871 let event = common.event.take();
4872 if let Some(set) = self.common(state)?.set {
4873 state.get_mut(set)?.ready.remove(self);
4874 }
4875
4876 Ok(event)
4877 }
4878
4879 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4883 if let Some(set) = self.common(state)?.set {
4884 state.get_mut(set)?.ready.insert(*self);
4885 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4886 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4887 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4888
4889 let item = match mode {
4890 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4891 WaitMode::Callback(instance) => WorkItem::GuestCall(
4892 state.get_mut(thread.task)?.instance.index,
4893 GuestCall {
4894 thread,
4895 kind: GuestCallKind::DeliverEvent {
4896 instance,
4897 set: Some(set),
4898 },
4899 },
4900 ),
4901 };
4902 state.push_high_priority(item);
4903 }
4904 }
4905 Ok(())
4906 }
4907
4908 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4910 match self {
4911 Self::Host(task) => {
4912 log::trace!("delete host task {task:?}");
4913 state.delete(*task)?;
4914 }
4915 Self::Guest(task) => {
4916 log::trace!("delete guest task {task:?}");
4917 let task = state.delete(*task)?;
4918
4919 debug_assert!(task.decremented_interesting_task_count);
4926 }
4927 Self::Transmit(task) => {
4928 state.delete(*task)?;
4929 }
4930 }
4931
4932 Ok(())
4933 }
4934}
4935
4936impl fmt::Debug for Waitable {
4937 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4938 match self {
4939 Self::Host(id) => write!(f, "{id:?}"),
4940 Self::Guest(id) => write!(f, "{id:?}"),
4941 Self::Transmit(id) => write!(f, "{id:?}"),
4942 }
4943 }
4944}
4945
4946#[derive(Default)]
4948struct WaitableSet {
4949 ready: BTreeSet<Waitable>,
4951 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4953 is_sync_call_set: bool,
4956}
4957
4958impl TableDebug for WaitableSet {
4959 fn type_name() -> &'static str {
4960 "WaitableSet"
4961 }
4962}
4963
4964type RawLower =
4966 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4967
4968type RawLift = Box<
4970 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4971>;
4972
4973type LiftedResult = Box<dyn Any + Send + Sync>;
4977
4978struct DummyResult;
4981
4982#[derive(Default)]
4984pub struct ConcurrentInstanceState {
4985 backpressure: u16,
4987 do_not_enter: bool,
4989 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4992}
4993
4994impl ConcurrentInstanceState {
4995 pub fn pending_is_empty(&self) -> bool {
4996 self.pending.is_empty()
4997 }
4998}
4999
5000#[derive(Debug, Copy, Clone)]
5001pub(crate) enum CurrentThread {
5002 Guest(QualifiedThreadId),
5003 Host(TableId<HostTask>),
5004 None,
5005}
5006
5007impl CurrentThread {
5008 fn guest(&self) -> Option<&QualifiedThreadId> {
5009 match self {
5010 Self::Guest(id) => Some(id),
5011 _ => None,
5012 }
5013 }
5014
5015 fn host(&self) -> Option<TableId<HostTask>> {
5016 match self {
5017 Self::Host(id) => Some(*id),
5018 _ => None,
5019 }
5020 }
5021
5022 fn is_none(&self) -> bool {
5023 matches!(self, Self::None)
5024 }
5025}
5026
5027impl From<QualifiedThreadId> for CurrentThread {
5028 fn from(id: QualifiedThreadId) -> Self {
5029 Self::Guest(id)
5030 }
5031}
5032
5033impl From<TableId<HostTask>> for CurrentThread {
5034 fn from(id: TableId<HostTask>) -> Self {
5035 Self::Host(id)
5036 }
5037}
5038
5039pub struct ConcurrentState {
5041 current_thread: CurrentThread,
5043
5044 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5049 table: AlwaysMut<ResourceTable>,
5051 high_priority: Vec<WorkItem>,
5053 low_priority: VecDeque<WorkItem>,
5055 suspend_reason: Option<SuspendReason>,
5059 worker: Option<StoreFiber<'static>>,
5063 worker_item: Option<WorkerItem>,
5065
5066 global_error_context_ref_counts:
5079 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5080
5081 interesting_tasks: usize,
5094
5095 interesting_tasks_empty_waker: Option<Waker>,
5099}
5100
5101impl Default for ConcurrentState {
5102 fn default() -> Self {
5103 Self {
5104 current_thread: CurrentThread::None,
5105 table: AlwaysMut::new(ResourceTable::new()),
5106 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5107 high_priority: Vec::new(),
5108 low_priority: VecDeque::new(),
5109 suspend_reason: None,
5110 worker: None,
5111 worker_item: None,
5112 global_error_context_ref_counts: BTreeMap::new(),
5113 interesting_tasks: 0,
5114 interesting_tasks_empty_waker: None,
5115 }
5116 }
5117}
5118
5119impl ConcurrentState {
5120 pub(crate) fn take_fibers_and_futures(
5137 &mut self,
5138 fibers: &mut Vec<StoreFiber<'static>>,
5139 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5140 ) {
5141 for entry in self.table.get_mut().iter_mut() {
5142 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5143 for mode in mem::take(&mut set.waiting).into_values() {
5144 if let WaitMode::Fiber(fiber) = mode {
5145 fibers.push(fiber);
5146 }
5147 }
5148 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5149 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5150 mem::replace(&mut thread.state, GuestThreadState::Completed)
5151 {
5152 fibers.push(fiber);
5153 }
5154 }
5155 }
5156
5157 if let Some(fiber) = self.worker.take() {
5158 fibers.push(fiber);
5159 }
5160
5161 let mut handle_item = |item| match item {
5162 WorkItem::ResumeFiber(fiber) => {
5163 fibers.push(fiber);
5164 }
5165 WorkItem::PushFuture(future) => {
5166 self.futures
5167 .get_mut()
5168 .as_mut()
5169 .unwrap()
5170 .push(future.into_inner());
5171 }
5172 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5173 }
5174 };
5175
5176 for item in mem::take(&mut self.high_priority) {
5177 handle_item(item);
5178 }
5179 for item in mem::take(&mut self.low_priority) {
5180 handle_item(item);
5181 }
5182
5183 if let Some(them) = self.futures.get_mut().take() {
5184 futures.push(them);
5185 }
5186 }
5187
5188 fn push<V: Send + Sync + 'static>(
5189 &mut self,
5190 value: V,
5191 ) -> Result<TableId<V>, ResourceTableError> {
5192 self.table.get_mut().push(value).map(TableId::from)
5193 }
5194
5195 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5196 self.table.get_mut().get_mut(&Resource::from(id))
5197 }
5198
5199 pub fn add_child<T: 'static, U: 'static>(
5200 &mut self,
5201 child: TableId<T>,
5202 parent: TableId<U>,
5203 ) -> Result<(), ResourceTableError> {
5204 self.table
5205 .get_mut()
5206 .add_child(Resource::from(child), Resource::from(parent))
5207 }
5208
5209 pub fn remove_child<T: 'static, U: 'static>(
5210 &mut self,
5211 child: TableId<T>,
5212 parent: TableId<U>,
5213 ) -> Result<(), ResourceTableError> {
5214 self.table
5215 .get_mut()
5216 .remove_child(Resource::from(child), Resource::from(parent))
5217 }
5218
5219 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5220 self.table.get_mut().delete(Resource::from(id))
5221 }
5222
5223 fn push_future(&mut self, future: HostTaskFuture) {
5224 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5231 }
5232
5233 fn push_high_priority(&mut self, item: WorkItem) {
5234 log::trace!("push high priority: {item:?}");
5235 self.high_priority.push(item);
5236 }
5237
5238 fn push_low_priority(&mut self, item: WorkItem) {
5239 log::trace!("push low priority: {item:?}");
5240 self.low_priority.push_front(item);
5241 }
5242
5243 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5244 if high_priority {
5245 self.push_high_priority(item);
5246 } else {
5247 self.push_low_priority(item);
5248 }
5249 }
5250
5251 fn promote_instance_local_thread_work_item(
5252 &mut self,
5253 current_instance: RuntimeComponentInstanceIndex,
5254 ) -> bool {
5255 self.promote_work_items_matching(|item: &WorkItem| match item {
5256 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5257 *instance == current_instance
5258 }
5259 _ => false,
5260 })
5261 }
5262
5263 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5264 self.promote_work_items_matching(|item: &WorkItem| match item {
5265 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5266 *t == thread
5267 }
5268 _ => false,
5269 })
5270 }
5271
5272 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5273 where
5274 F: FnMut(&WorkItem) -> bool,
5275 {
5276 if self.high_priority.iter().any(&mut predicate) {
5280 true
5281 }
5282 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5285 let item = self.low_priority.remove(idx).unwrap();
5286 self.push_high_priority(item);
5287 true
5288 } else {
5289 false
5290 }
5291 }
5292
5293 fn take_pending_cancellation(&mut self) -> Result<bool> {
5296 let thread = self.current_guest_thread()?;
5297 if let Some(event) = self.get_mut(thread.task)?.event.take() {
5298 assert!(matches!(event, Event::Cancelled));
5299 Ok(true)
5300 } else {
5301 Ok(false)
5302 }
5303 }
5304
5305 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5306 if self.may_block(task)? {
5307 Ok(())
5308 } else {
5309 Err(Trap::CannotBlockSyncTask.into())
5310 }
5311 }
5312
5313 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5314 let task = self.get_mut(task)?;
5315 Ok(task.async_function || task.returned_or_cancelled())
5316 }
5317
5318 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5324 let (task, is_host) = (task >> 1, task & 1 == 1);
5325 if is_host {
5326 let task: TableId<HostTask> = TableId::new(task);
5327 Ok(&mut self.get_mut(task)?.call_context)
5328 } else {
5329 let task: TableId<GuestTask> = TableId::new(task);
5330 Ok(&mut self.get_mut(task)?.call_context)
5331 }
5332 }
5333
5334 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5337 let (bits, is_host) = match self.current_thread {
5338 CurrentThread::Guest(id) => (id.task.rep(), false),
5339 CurrentThread::Host(id) => (id.rep(), true),
5340 CurrentThread::None => bail_bug!("current thread is not set"),
5341 };
5342 assert_eq!((bits << 1) >> 1, bits);
5343 Ok((bits << 1) | u32::from(is_host))
5344 }
5345
5346 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5347 match self.current_thread.guest() {
5348 Some(id) => Ok(*id),
5349 None => bail_bug!("current thread is not a guest thread"),
5350 }
5351 }
5352
5353 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5354 match self.current_thread.host() {
5355 Some(id) => Ok(id),
5356 None => bail_bug!("current thread is not a host thread"),
5357 }
5358 }
5359
5360 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5361 match self.futures.get_mut().as_mut() {
5362 Some(f) => Ok(f),
5363 None => bail_bug!("futures field of concurrent state is currently taken"),
5364 }
5365 }
5366
5367 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5368 self.table.get_mut()
5369 }
5370
5371 fn parent(&mut self, cur: CurrentThread) -> Option<CurrentThread> {
5373 match cur {
5374 CurrentThread::Guest(thread) => {
5375 let task = self.get_mut(thread.task).ok()?;
5376 Some(match task.caller {
5377 Caller::Host { caller, .. } => caller,
5378 Caller::Guest { thread } => thread.into(),
5379 })
5380 }
5381 CurrentThread::Host(id) => Some(self.get_mut(id).ok()?.caller.into()),
5382 CurrentThread::None => None,
5383 }
5384 }
5385}
5386
5387fn for_any_lower<
5390 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5391>(
5392 fun: F,
5393) -> F {
5394 fun
5395}
5396
5397fn for_any_lift<
5399 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5400>(
5401 fun: F,
5402) -> F {
5403 fun
5404}
5405
5406fn check_ambient_store(id: StoreId) {
5407 let message = "\
5408 `Future`s which depend on asynchronous component tasks, streams, or \
5409 futures to complete may only be polled from the event loop of the \
5410 store to which they belong. Please use \
5411 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5412 ";
5413 tls::try_get(|store| {
5414 let matched = match store {
5415 tls::TryGet::Some(store) => store.id() == id,
5416 tls::TryGet::Taken | tls::TryGet::None => false,
5417 };
5418
5419 if !matched {
5420 panic!("{message}")
5421 }
5422 });
5423}
5424
5425fn check_recursive_run() {
5428 tls::try_get(|store| {
5429 if !matches!(store, tls::TryGet::None) {
5430 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5431 }
5432 });
5433}
5434
5435fn unpack_callback_code(code: u32) -> (u32, u32) {
5436 (code & 0xF, code >> 4)
5437}
5438
5439struct WaitableCheckParams {
5443 set: TableId<WaitableSet>,
5444 options: OptionsIndex,
5445 payload: u32,
5446}
5447
5448enum WaitableCheck {
5451 Wait,
5452 Poll,
5453}
5454
5455#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
5464pub struct GuestTaskId(TableId<GuestTask>);
5465
5466pub(crate) struct PreparedCall<R> {
5468 handle: Func,
5470 thread: QualifiedThreadId,
5472 param_count: usize,
5474 rx: oneshot::Receiver<LiftedResult>,
5477 runtime_instance: RuntimeInstance,
5479 _phantom: PhantomData<R>,
5480}
5481
5482impl<R> PreparedCall<R> {
5483 pub(crate) fn task_id(&self) -> TaskId {
5485 TaskId {
5486 task: self.thread.task,
5487 runtime_instance: self.runtime_instance,
5488 }
5489 }
5490}
5491
5492pub(crate) struct TaskId {
5494 task: TableId<GuestTask>,
5495 runtime_instance: RuntimeInstance,
5496}
5497
5498impl TaskId {
5499 pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5505 let task = store.concurrent_state_mut().get_mut(self.task)?;
5506 let delete = if !task.already_lowered_parameters() {
5507 store.cancel_guest_subtask_without_lowered_parameters(
5508 self.runtime_instance,
5509 self.task,
5510 )?;
5511 true
5512 } else {
5513 task.host_future_state = HostFutureState::Dropped;
5514 task.ready_to_delete()
5515 };
5516 if delete {
5517 Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5518 }
5519 Ok(())
5520 }
5521}
5522
5523pub(crate) fn prepare_call<T, R>(
5529 mut store: StoreContextMut<T>,
5530 handle: Func,
5531 param_count: usize,
5532 host_future_present: bool,
5533 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5534 + Send
5535 + Sync
5536 + 'static,
5537 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5538 + Send
5539 + Sync
5540 + 'static,
5541) -> Result<PreparedCall<R>> {
5542 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5543
5544 let instance = handle.instance().id().get(store.0);
5545 let options = &instance.component().env_component().options[options];
5546 let ty = &instance.component().types()[ty];
5547 let async_function = ty.async_;
5548 let task_return_type = ty.results;
5549 let component_instance = raw_options.instance;
5550 let callback = options.callback.map(|i| instance.runtime_callback(i));
5551 let memory = options
5552 .memory()
5553 .map(|i| instance.runtime_memory(i))
5554 .map(SendSyncPtr::new);
5555 let string_encoding = options.string_encoding;
5556 let token = StoreToken::new(store.as_context_mut());
5557 let state = store.0.concurrent_state_mut();
5558
5559 let (tx, rx) = oneshot::channel();
5560
5561 let instance = handle.instance().runtime_instance(component_instance);
5562 let caller = state.current_thread;
5563 let thread = GuestTask::new(
5564 state,
5565 Box::new(for_any_lower(move |store, params| {
5566 lower_params(handle, token.as_context_mut(store), params)
5567 })),
5568 LiftResult {
5569 lift: Box::new(for_any_lift(move |store, result| {
5570 lift_result(handle, store, result)
5571 })),
5572 ty: task_return_type,
5573 memory,
5574 string_encoding,
5575 },
5576 Caller::Host {
5577 tx: Some(tx),
5578 host_future_present,
5579 caller,
5580 },
5581 callback.map(|callback| {
5582 let callback = SendSyncPtr::new(callback);
5583 let instance = handle.instance();
5584 Box::new(move |store: &mut dyn VMStore, event, handle| {
5585 let store = token.as_context_mut(store);
5586 unsafe { instance.call_callback(store, callback, event, handle) }
5589 }) as CallbackFn
5590 }),
5591 instance,
5592 async_function,
5593 )?;
5594
5595 if !store.0.may_enter(instance)? {
5596 bail!(Trap::CannotEnterComponent);
5597 }
5598
5599 Ok(PreparedCall {
5600 handle,
5601 thread,
5602 param_count,
5603 runtime_instance: instance,
5604 rx,
5605 _phantom: PhantomData,
5606 })
5607}
5608
5609pub(crate) struct QueuedCall<R> {
5610 store: StoreId,
5611 task: TableId<GuestTask>,
5612 rx: oneshot::Receiver<LiftedResult>,
5613 _marker: PhantomData<fn() -> R>,
5614}
5615
5616impl<R> QueuedCall<R> {
5617 pub(crate) fn new<T: 'static>(
5624 mut store: StoreContextMut<T>,
5625 prepared: PreparedCall<R>,
5626 ) -> Result<QueuedCall<R>> {
5627 let PreparedCall {
5628 handle,
5629 thread,
5630 param_count,
5631 rx,
5632 ..
5633 } = prepared;
5634
5635 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5636
5637 Ok(QueuedCall {
5638 store: store.0.id(),
5639 task: thread.task,
5640 rx,
5641 _marker: PhantomData,
5642 })
5643 }
5644
5645 fn task(&self) -> GuestTaskId {
5646 GuestTaskId(self.task)
5647 }
5648}
5649
5650impl<R> Future for QueuedCall<R>
5651where
5652 R: 'static,
5653{
5654 type Output = Result<R>;
5655
5656 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5657 check_ambient_store(self.store);
5658 Pin::new(&mut self.rx).poll(cx).map(|result| match result {
5659 Ok(r) => match r.downcast() {
5660 Ok(r) => Ok(*r),
5661 Err(_) => bail_bug!("wrong type of value produced"),
5662 },
5663 Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5664 })
5665 }
5666}
5667
5668fn queue_call0<T: 'static>(
5671 store: StoreContextMut<T>,
5672 handle: Func,
5673 guest_thread: QualifiedThreadId,
5674 param_count: usize,
5675) -> Result<()> {
5676 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5677 let is_concurrent = raw_options.async_;
5678 let callback = raw_options.callback;
5679 let instance = handle.instance();
5680 let callee = handle.lifted_core_func(store.0);
5681 let post_return = handle.post_return_core_func(store.0);
5682 let callback = callback.map(|i| {
5683 let instance = instance.id().get(store.0);
5684 SendSyncPtr::new(instance.runtime_callback(i))
5685 });
5686
5687 log::trace!("queueing call {guest_thread:?}");
5688
5689 unsafe {
5693 instance.queue_call(
5694 store,
5695 guest_thread,
5696 SendSyncPtr::new(callee),
5697 param_count,
5698 1,
5699 is_concurrent,
5700 callback,
5701 post_return.map(SendSyncPtr::new),
5702 )
5703 }
5704}