1use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{self, Func, call_post_return};
56use crate::component::{
57 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61use crate::prelude::*;
62use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
63use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
64use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
65use crate::{
66 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
67};
68use alloc::borrow::ToOwned;
69use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
70use core::any::Any;
71use core::cell::UnsafeCell;
72use core::fmt;
73use core::future::Future;
74use core::marker::PhantomData;
75use core::mem::{self, ManuallyDrop, MaybeUninit};
76use core::ops::DerefMut;
77use core::pin::{Pin, pin};
78use core::ptr::{self, NonNull};
79use core::task::{Context, Poll, Waker};
80use futures::channel::oneshot;
81use futures::future::{self, FutureExt};
82use futures::stream::{FuturesUnordered, StreamExt};
83use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
84use table::{TableDebug, TableId};
85use wasmtime_environ::component::{
86 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
87 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
88 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
89 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
90 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
91};
92use wasmtime_environ::packed_option::ReservedValue;
93use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
94
95pub use abort::JoinHandle;
96pub use future_stream_any::{FutureAny, StreamAny};
97pub use futures_and_streams::{
98 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
99 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
100 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
101};
102pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
103
104mod abort;
105mod error_contexts;
106mod future_stream_any;
107mod futures_and_streams;
108pub(crate) mod table;
109pub(crate) mod tls;
110
111const BLOCKED: u32 = 0xffff_ffff;
114
115#[derive(Clone, Copy, Eq, PartialEq, Debug)]
117pub enum Status {
118 Starting = 0,
119 Started = 1,
120 Returned = 2,
121 StartCancelled = 3,
122 ReturnCancelled = 4,
123}
124
125impl Status {
126 pub fn pack(self, waitable: Option<u32>) -> u32 {
132 assert!(matches!(self, Status::Returned) == waitable.is_none());
133 let waitable = waitable.unwrap_or(0);
134 assert!(waitable < (1 << 28));
135 (waitable << 4) | (self as u32)
136 }
137}
138
139#[derive(Clone, Copy, Debug)]
142enum Event {
143 None,
144 Subtask {
145 status: Status,
146 },
147 StreamRead {
148 code: ReturnCode,
149 pending: Option<(TypeStreamTableIndex, u32)>,
150 },
151 StreamWrite {
152 code: ReturnCode,
153 pending: Option<(TypeStreamTableIndex, u32)>,
154 },
155 FutureRead {
156 code: ReturnCode,
157 pending: Option<(TypeFutureTableIndex, u32)>,
158 },
159 FutureWrite {
160 code: ReturnCode,
161 pending: Option<(TypeFutureTableIndex, u32)>,
162 },
163 Cancelled,
164}
165
166impl Event {
167 fn parts(self) -> (u32, u32) {
172 const EVENT_NONE: u32 = 0;
173 const EVENT_SUBTASK: u32 = 1;
174 const EVENT_STREAM_READ: u32 = 2;
175 const EVENT_STREAM_WRITE: u32 = 3;
176 const EVENT_FUTURE_READ: u32 = 4;
177 const EVENT_FUTURE_WRITE: u32 = 5;
178 const EVENT_CANCELLED: u32 = 6;
179 match self {
180 Event::None => (EVENT_NONE, 0),
181 Event::Cancelled => (EVENT_CANCELLED, 0),
182 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
183 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
184 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
185 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
186 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
187 }
188 }
189}
190
191mod callback_code {
193 pub const EXIT: u32 = 0;
194 pub const YIELD: u32 = 1;
195 pub const WAIT: u32 = 2;
196}
197
198const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
202
203pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
209 store: StoreContextMut<'a, T>,
210 get_data: fn(&mut T) -> D::Data<'_>,
211}
212
213impl<'a, T, D> Access<'a, T, D>
214where
215 D: HasData + ?Sized,
216 T: 'static,
217{
218 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
220 Self { store, get_data }
221 }
222
223 pub fn data_mut(&mut self) -> &mut T {
225 self.store.data_mut()
226 }
227
228 pub fn get(&mut self) -> D::Data<'_> {
230 (self.get_data)(self.data_mut())
231 }
232
233 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
237 where
238 T: 'static,
239 {
240 let accessor = Accessor {
241 get_data: self.get_data,
242 token: StoreToken::new(self.store.as_context_mut()),
243 };
244 self.store
245 .as_context_mut()
246 .spawn_with_accessor(accessor, task)
247 }
248
249 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
252 self.get_data
253 }
254}
255
256impl<'a, T, D> AsContext for Access<'a, T, D>
257where
258 D: HasData + ?Sized,
259 T: 'static,
260{
261 type Data = T;
262
263 fn as_context(&self) -> StoreContext<'_, T> {
264 self.store.as_context()
265 }
266}
267
268impl<'a, T, D> AsContextMut for Access<'a, T, D>
269where
270 D: HasData + ?Sized,
271 T: 'static,
272{
273 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
274 self.store.as_context_mut()
275 }
276}
277
278pub struct Accessor<T: 'static, D = HasSelf<T>>
338where
339 D: HasData + ?Sized,
340{
341 token: StoreToken<T>,
342 get_data: fn(&mut T) -> D::Data<'_>,
343}
344
345pub trait AsAccessor {
362 type Data: 'static;
364
365 type AccessorData: HasData + ?Sized;
368
369 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
371}
372
373impl<T: AsAccessor + ?Sized> AsAccessor for &T {
374 type Data = T::Data;
375 type AccessorData = T::AccessorData;
376
377 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
378 T::as_accessor(self)
379 }
380}
381
382impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
383 type Data = T;
384 type AccessorData = D;
385
386 fn as_accessor(&self) -> &Accessor<T, D> {
387 self
388 }
389}
390
391const _: () = {
414 const fn assert<T: Send + Sync>() {}
415 assert::<Accessor<UnsafeCell<u32>>>();
416};
417
418impl<T> Accessor<T> {
419 pub(crate) fn new(token: StoreToken<T>) -> Self {
428 Self {
429 token,
430 get_data: |x| x,
431 }
432 }
433}
434
435impl<T, D> Accessor<T, D>
436where
437 D: HasData + ?Sized,
438{
439 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
457 tls::get(|vmstore| {
458 fun(Access {
459 store: self.token.as_context_mut(vmstore),
460 get_data: self.get_data,
461 })
462 })
463 }
464
465 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
468 self.get_data
469 }
470
471 pub fn with_getter<D2: HasData>(
488 &self,
489 get_data: fn(&mut T) -> D2::Data<'_>,
490 ) -> Accessor<T, D2> {
491 Accessor {
492 token: self.token,
493 get_data,
494 }
495 }
496
497 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
513 where
514 T: 'static,
515 {
516 let accessor = self.clone_for_spawn();
517 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
518 }
519
520 fn clone_for_spawn(&self) -> Self {
521 Self {
522 token: self.token,
523 get_data: self.get_data,
524 }
525 }
526
527 pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
563 self.with(|mut access| {
564 let store = access.as_context_mut().0;
565 let state = store.concurrent_state_mut();
566 if state.interesting_tasks == 0 {
567 Poll::Ready(())
568 } else {
569 state.interesting_tasks_empty_waker = Some(cx.waker().clone());
570 Poll::Pending
571 }
572 })
573 }
574}
575
576pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
588where
589 D: HasData + ?Sized,
590{
591 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
593}
594
595enum CallerInfo {
598 Async {
600 params: Vec<ValRaw>,
601 has_result: bool,
602 },
603 Sync {
605 params: Vec<ValRaw>,
606 result_count: u32,
607 },
608}
609
610enum WaitMode {
612 Fiber(StoreFiber<'static>),
614 Callback(Instance),
617}
618
619#[derive(Debug)]
621enum SuspendReason {
622 Waiting {
625 set: TableId<WaitableSet>,
626 thread: QualifiedThreadId,
627 skip_may_block_check: bool,
628 },
629 NeedWork,
632 Yielding {
635 thread: QualifiedThreadId,
636 cancellable: bool,
637 skip_may_block_check: bool,
638 },
639 ExplicitlySuspending {
641 thread: QualifiedThreadId,
642 skip_may_block_check: bool,
643 },
644}
645
646enum GuestCallKind {
648 DeliverEvent {
651 instance: Instance,
653 set: Option<TableId<WaitableSet>>,
658 },
659 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
665 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
666}
667
668impl fmt::Debug for GuestCallKind {
669 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
670 match self {
671 Self::DeliverEvent { instance, set } => f
672 .debug_struct("DeliverEvent")
673 .field("instance", instance)
674 .field("set", set)
675 .finish(),
676 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
677 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
678 }
679 }
680}
681
682#[derive(Copy, Clone, Debug)]
684pub enum SuspensionTarget {
685 SomeSuspended(u32),
686 Some(u32),
687 None,
688}
689
690impl SuspensionTarget {
691 fn is_none(&self) -> bool {
692 matches!(self, SuspensionTarget::None)
693 }
694 fn is_some(&self) -> bool {
695 !self.is_none()
696 }
697}
698
699#[derive(Debug)]
701struct GuestCall {
702 thread: QualifiedThreadId,
703 kind: GuestCallKind,
704}
705
706impl GuestCall {
707 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
717 let instance = store
718 .concurrent_state_mut()
719 .get_mut(self.thread.task)?
720 .instance;
721 let state = store.instance_state(instance).concurrent_state();
722
723 let ready = match &self.kind {
724 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
725 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
726 GuestCallKind::StartExplicit(_) => true,
727 };
728 log::trace!(
729 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
730 state.do_not_enter,
731 state.backpressure
732 );
733 Ok(ready)
734 }
735}
736
737enum WorkerItem {
739 GuestCall(GuestCall),
740 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
741}
742
743enum WorkItem {
746 PushFuture(AlwaysMut<HostTaskFuture>),
748 ResumeFiber(StoreFiber<'static>),
750 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
752 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
754 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
756}
757
758impl fmt::Debug for WorkItem {
759 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
760 match self {
761 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
762 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
763 Self::ResumeThread(instance, thread) => f
764 .debug_tuple("ResumeThread")
765 .field(instance)
766 .field(thread)
767 .finish(),
768 Self::GuestCall(instance, call) => f
769 .debug_tuple("GuestCall")
770 .field(instance)
771 .field(call)
772 .finish(),
773 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
774 }
775 }
776}
777
778#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
780pub(crate) enum WaitResult {
781 Cancelled,
782 Completed,
783}
784
785pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
793 store: &mut dyn VMStore,
794 future: impl Future<Output = Result<R>> + Send + 'static,
795) -> Result<R> {
796 let state = store.concurrent_state_mut();
797 let task = state.current_host_thread()?;
798
799 let mut future = Box::pin(async move {
803 let result = future.await?;
804 tls::get(move |store| {
805 let state = store.concurrent_state_mut();
806 let host_state = &mut state.get_mut(task)?.state;
807 assert!(matches!(host_state, HostTaskState::CalleeStarted));
808 *host_state = HostTaskState::CalleeFinished(Box::new(result));
809
810 Waitable::Host(task).set_event(
811 state,
812 Some(Event::Subtask {
813 status: Status::Returned,
814 }),
815 )?;
816
817 Ok(())
818 })
819 }) as HostTaskFuture;
820
821 let poll = tls::set(store, || {
825 future
826 .as_mut()
827 .poll(&mut Context::from_waker(&Waker::noop()))
828 });
829
830 match poll {
831 Poll::Ready(result) => result?,
833
834 Poll::Pending => {
839 let state = store.concurrent_state_mut();
840 state.push_future(future);
841
842 let caller = state.get_mut(task)?.caller;
843 let set = state.get_mut(caller.thread)?.sync_call_set;
844 Waitable::Host(task).join(state, Some(set))?;
845
846 store.suspend(SuspendReason::Waiting {
847 set,
848 thread: caller,
849 skip_may_block_check: false,
850 })?;
851
852 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
856 }
857 }
858
859 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
861 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
862 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
863 Ok(result) => *result,
864 Err(_) => bail_bug!("host task finished with wrong type of result"),
865 }),
866 _ => bail_bug!("unexpected host task state after completion"),
867 }
868}
869
870fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
872 let mut next = Some(call);
873 while let Some(call) = next.take() {
874 match call.kind {
875 GuestCallKind::DeliverEvent { instance, set } => {
876 let (event, waitable) =
877 match instance.get_event(store, call.thread.task, set, true)? {
878 Some(pair) => pair,
879 None => bail_bug!("delivering non-present event"),
880 };
881 let state = store.concurrent_state_mut();
882 let task = state.get_mut(call.thread.task)?;
883 let runtime_instance = task.instance;
884 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
885
886 log::trace!(
887 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
888 call.thread,
889 );
890
891 let old_thread = store.set_thread(call.thread)?;
892 log::trace!(
893 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
894 call.thread
895 );
896
897 store.enter_instance(runtime_instance);
898
899 let Some(callback) = store
900 .concurrent_state_mut()
901 .get_mut(call.thread.task)?
902 .callback
903 .take()
904 else {
905 bail_bug!("guest task callback field not present")
906 };
907
908 let code = callback(store, event, handle)?;
909
910 store
911 .concurrent_state_mut()
912 .get_mut(call.thread.task)?
913 .callback = Some(callback);
914
915 store.exit_instance(runtime_instance)?;
916
917 store.set_thread(old_thread)?;
918
919 next = instance.handle_callback_code(
920 store,
921 call.thread,
922 runtime_instance.index,
923 code,
924 )?;
925
926 log::trace!(
927 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
928 );
929 }
930 GuestCallKind::StartImplicit(fun) => {
931 next = fun(store)?;
932 }
933 GuestCallKind::StartExplicit(fun) => {
934 fun(store)?;
935 }
936 }
937 }
938
939 Ok(())
940}
941
942impl<T> Store<T> {
943 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
945 where
946 T: Send + 'static,
947 {
948 ensure!(
949 self.as_context().0.concurrency_support(),
950 "cannot use `run_concurrent` when Config::concurrency_support disabled",
951 );
952 self.as_context_mut().run_concurrent(fun).await
953 }
954
955 #[doc(hidden)]
956 pub fn assert_concurrent_state_empty(&mut self) {
957 self.as_context_mut().assert_concurrent_state_empty();
958 }
959
960 #[doc(hidden)]
961 pub fn concurrent_state_table_size(&mut self) -> usize {
962 self.as_context_mut().concurrent_state_table_size()
963 }
964
965 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
967 where
968 T: 'static,
969 {
970 self.as_context_mut().spawn(task)
971 }
972}
973
974impl<T> StoreContextMut<'_, T> {
975 #[doc(hidden)]
986 pub fn assert_concurrent_state_empty(self) {
987 let store = self.0;
988 store
989 .store_data_mut()
990 .components
991 .assert_instance_states_empty();
992 let state = store.concurrent_state_mut();
993 assert!(
994 state.table.get_mut().is_empty(),
995 "non-empty table: {:?}",
996 state.table.get_mut()
997 );
998 assert!(state.high_priority.is_empty());
999 assert!(state.low_priority.is_empty());
1000 assert!(state.current_thread.is_none());
1001 assert!(state.futures_mut().unwrap().is_empty());
1002 assert!(state.global_error_context_ref_counts.is_empty());
1003 }
1004
1005 #[doc(hidden)]
1010 pub fn concurrent_state_table_size(&mut self) -> usize {
1011 self.0
1012 .concurrent_state_mut()
1013 .table
1014 .get_mut()
1015 .iter_mut()
1016 .count()
1017 }
1018
1019 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1029 where
1030 T: 'static,
1031 {
1032 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1033 self.spawn_with_accessor(accessor, task)
1034 }
1035
1036 fn spawn_with_accessor<D>(
1039 self,
1040 accessor: Accessor<T, D>,
1041 task: impl AccessorTask<T, D>,
1042 ) -> JoinHandle
1043 where
1044 T: 'static,
1045 D: HasData + ?Sized,
1046 {
1047 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1051 self.0
1052 .concurrent_state_mut()
1053 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1054 handle
1055 }
1056
1057 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1141 where
1142 T: Send + 'static,
1143 {
1144 ensure!(
1145 self.0.concurrency_support(),
1146 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1147 );
1148 self.do_run_concurrent(fun, false).await
1149 }
1150
1151 pub(super) async fn run_concurrent_trap_on_idle<R>(
1152 self,
1153 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1154 ) -> Result<R>
1155 where
1156 T: Send + 'static,
1157 {
1158 self.do_run_concurrent(fun, true).await
1159 }
1160
1161 async fn do_run_concurrent<R>(
1162 mut self,
1163 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1164 trap_on_idle: bool,
1165 ) -> Result<R>
1166 where
1167 T: Send + 'static,
1168 {
1169 debug_assert!(self.0.concurrency_support());
1170 check_recursive_run();
1171 let token = StoreToken::new(self.as_context_mut());
1172
1173 struct Dropper<'a, T: 'static, V> {
1174 store: StoreContextMut<'a, T>,
1175 value: ManuallyDrop<V>,
1176 }
1177
1178 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1179 fn drop(&mut self) {
1180 tls::set(self.store.0, || {
1181 unsafe { ManuallyDrop::drop(&mut self.value) }
1186 });
1187 }
1188 }
1189
1190 let accessor = &Accessor::new(token);
1191 let dropper = &mut Dropper {
1192 store: self,
1193 value: ManuallyDrop::new(fun(accessor)),
1194 };
1195 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1197
1198 dropper
1199 .store
1200 .as_context_mut()
1201 .poll_until(future, trap_on_idle)
1202 .await
1203 }
1204
1205 async fn poll_until<R>(
1211 mut self,
1212 mut future: Pin<&mut impl Future<Output = R>>,
1213 trap_on_idle: bool,
1214 ) -> Result<R>
1215 where
1216 T: Send + 'static,
1217 {
1218 struct Reset<'a, T: 'static> {
1219 store: StoreContextMut<'a, T>,
1220 futures: Option<FuturesUnordered<HostTaskFuture>>,
1221 }
1222
1223 impl<'a, T> Drop for Reset<'a, T> {
1224 fn drop(&mut self) {
1225 if let Some(futures) = self.futures.take() {
1226 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1227 }
1228 }
1229 }
1230
1231 loop {
1232 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1236 let mut reset = Reset {
1237 store: self.as_context_mut(),
1238 futures,
1239 };
1240 let mut next = match reset.futures.as_mut() {
1241 Some(f) => pin!(f.next()),
1242 None => bail_bug!("concurrent state missing futures field"),
1243 };
1244
1245 enum PollResult<R> {
1246 Complete(R),
1247 ProcessWork {
1248 ready: Vec<WorkItem>,
1249 low_priority: bool,
1250 },
1251 }
1252
1253 let result = future::poll_fn(|cx| {
1254 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1257 return Poll::Ready(Ok(PollResult::Complete(value)));
1258 }
1259
1260 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1264 Poll::Ready(Some(output)) => {
1265 match output {
1266 Err(e) => return Poll::Ready(Err(e)),
1267 Ok(()) => {}
1268 }
1269 Poll::Ready(true)
1270 }
1271 Poll::Ready(None) => Poll::Ready(false),
1272 Poll::Pending => Poll::Pending,
1273 };
1274
1275 let state = reset.store.0.concurrent_state_mut();
1279 let mut ready = mem::take(&mut state.high_priority);
1280 let mut low_priority = false;
1281 if ready.is_empty() {
1282 if let Some(item) = state.low_priority.pop_back() {
1283 ready.push(item);
1284 low_priority = true;
1285 }
1286 }
1287 if !ready.is_empty() {
1288 return Poll::Ready(Ok(PollResult::ProcessWork {
1289 ready,
1290 low_priority,
1291 }));
1292 }
1293
1294 return match next {
1298 Poll::Ready(true) => {
1299 Poll::Ready(Ok(PollResult::ProcessWork {
1305 ready: Vec::new(),
1306 low_priority: false,
1307 }))
1308 }
1309 Poll::Ready(false) => {
1310 if let Poll::Ready(value) =
1314 tls::set(reset.store.0, || future.as_mut().poll(cx))
1315 {
1316 Poll::Ready(Ok(PollResult::Complete(value)))
1317 } else {
1318 if trap_on_idle {
1324 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1327 } else {
1328 Poll::Pending
1332 }
1333 }
1334 }
1335 Poll::Pending => Poll::Pending,
1340 };
1341 })
1342 .await;
1343
1344 drop(reset);
1348
1349 match result? {
1350 PollResult::Complete(value) => break Ok(value),
1353 PollResult::ProcessWork {
1356 ready,
1357 low_priority,
1358 } => {
1359 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1360 store: StoreContextMut<'a, T>,
1361 ready: I,
1362 }
1363
1364 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1365 fn drop(&mut self) {
1366 while let Some(item) = self.ready.next() {
1367 match item {
1368 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1369 WorkItem::PushFuture(future) => {
1370 tls::set(self.store.0, move || drop(future))
1371 }
1372 _ => {}
1373 }
1374 }
1375 }
1376 }
1377
1378 let mut dispose = Dispose {
1379 store: self.as_context_mut(),
1380 ready: ready.into_iter(),
1381 };
1382
1383 if low_priority {
1405 dispose.store.0.yield_now().await
1406 }
1407
1408 while let Some(item) = dispose.ready.next() {
1409 dispose
1410 .store
1411 .as_context_mut()
1412 .handle_work_item(item)
1413 .await?;
1414 }
1415 }
1416 }
1417 }
1418 }
1419
1420 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1422 where
1423 T: Send,
1424 {
1425 log::trace!("handle work item {item:?}");
1426 match item {
1427 WorkItem::PushFuture(future) => {
1428 self.0
1429 .concurrent_state_mut()
1430 .futures_mut()?
1431 .push(future.into_inner());
1432 }
1433 WorkItem::ResumeFiber(fiber) => {
1434 self.0.resume_fiber(fiber).await?;
1435 }
1436 WorkItem::ResumeThread(_, thread) => {
1437 if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1438 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1439 GuestThreadState::Running,
1440 ) {
1441 self.0.resume_fiber(fiber).await?;
1442 } else {
1443 bail_bug!("cannot resume non-pending thread {thread:?}");
1444 }
1445 }
1446 WorkItem::GuestCall(_, call) => {
1447 if call.is_ready(self.0)? {
1448 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1449 } else {
1450 let state = self.0.concurrent_state_mut();
1451 let task = state.get_mut(call.thread.task)?;
1452 if !task.starting_sent {
1453 task.starting_sent = true;
1454 if let GuestCallKind::StartImplicit(_) = &call.kind {
1455 Waitable::Guest(call.thread.task).set_event(
1456 state,
1457 Some(Event::Subtask {
1458 status: Status::Starting,
1459 }),
1460 )?;
1461 }
1462 }
1463
1464 let instance = state.get_mut(call.thread.task)?.instance;
1465 self.0
1466 .instance_state(instance)
1467 .concurrent_state()
1468 .pending
1469 .insert(call.thread, call.kind);
1470 }
1471 }
1472 WorkItem::WorkerFunction(fun) => {
1473 self.run_on_worker(WorkerItem::Function(fun)).await?;
1474 }
1475 }
1476
1477 Ok(())
1478 }
1479
1480 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1482 where
1483 T: Send,
1484 {
1485 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1486 fiber
1487 } else {
1488 fiber::make_fiber(self.0, move |store| {
1489 loop {
1490 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1491 bail_bug!("worker_item not present when resuming fiber")
1492 };
1493 match item {
1494 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1495 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1496 }
1497
1498 store.suspend(SuspendReason::NeedWork)?;
1499 }
1500 })?
1501 };
1502
1503 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1504 assert!(worker_item.is_none());
1505 *worker_item = Some(item);
1506
1507 self.0.resume_fiber(worker).await
1508 }
1509
1510 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1515 where
1516 T: 'static,
1517 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1518 + Send
1519 + Sync
1520 + 'static,
1521 R: Send + Sync + 'static,
1522 {
1523 let token = StoreToken::new(self);
1524 async move {
1525 let mut accessor = Accessor::new(token);
1526 closure(&mut accessor).await
1527 }
1528 }
1529}
1530
1531impl StoreOpaque {
1532 pub(crate) fn enter_guest_sync_call(
1539 &mut self,
1540 guest_caller: Option<RuntimeInstance>,
1541 callee_async: bool,
1542 callee: RuntimeInstance,
1543 ) -> Result<()> {
1544 log::trace!("enter sync call {callee:?}");
1545 if !self.concurrency_support() {
1546 return self.enter_call_not_concurrent();
1547 }
1548
1549 let state = self.concurrent_state_mut();
1550 let thread = state.current_thread;
1551 let instance = if let Some(thread) = thread.guest() {
1552 Some(state.get_mut(thread.task)?.instance)
1553 } else {
1554 None
1555 };
1556 if guest_caller.is_some() {
1557 debug_assert_eq!(instance, guest_caller);
1558 }
1559 let guest_thread = GuestTask::new(
1560 state,
1561 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1562 LiftResult {
1563 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1564 ty: TypeTupleIndex::reserved_value(),
1565 memory: None,
1566 string_encoding: StringEncoding::Utf8,
1567 },
1568 if let Some(thread) = thread.guest() {
1569 Caller::Guest { thread: *thread }
1570 } else {
1571 Caller::Host {
1572 tx: None,
1573 host_future_present: false,
1574 caller: thread,
1575 }
1576 },
1577 None,
1578 callee,
1579 callee_async,
1580 )?;
1581
1582 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1583 guest_thread.thread,
1584 self,
1585 callee.index,
1586 )?;
1587 self.set_thread(guest_thread)?;
1588
1589 Ok(())
1590 }
1591
1592 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1594 if !self.concurrency_support() {
1595 return Ok(self.exit_call_not_concurrent());
1596 }
1597 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1598 Some(t) => *t,
1599 None => bail_bug!("expected task when exiting"),
1600 };
1601 let task = self.concurrent_state_mut().get_mut(thread.task)?;
1602 let instance = task.instance;
1603 let caller = match &task.caller {
1604 &Caller::Guest { thread } => thread.into(),
1605 &Caller::Host { caller, .. } => caller,
1606 };
1607 task.lift_result = None;
1608 task.exited = true;
1609 self.set_thread(caller)?;
1610
1611 log::trace!("exit sync call {instance:?}");
1612 self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1613
1614 Ok(())
1615 }
1616
1617 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1625 if !self.concurrency_support() {
1626 self.enter_call_not_concurrent()?;
1627 return Ok(None);
1628 }
1629 let state = self.concurrent_state_mut();
1630 let caller = state.current_guest_thread()?;
1631 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1632 log::trace!("new host task {task:?}");
1633 self.set_thread(task)?;
1634 Ok(Some(task))
1635 }
1636
1637 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1643 if !self.concurrency_support() {
1644 return Ok(());
1645 }
1646 let task = self.concurrent_state_mut().current_host_thread()?;
1647 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1648 self.set_thread(caller)?;
1649 Ok(())
1650 }
1651
1652 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1659 match task {
1660 Some(task) => {
1661 log::trace!("delete host task {task:?}");
1662 self.concurrent_state_mut().delete(task)?;
1663 }
1664 None => {
1665 self.exit_call_not_concurrent();
1666 }
1667 }
1668 Ok(())
1669 }
1670
1671 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1679 if self.trapped() {
1680 return Ok(false);
1681 }
1682 if !self.concurrency_support() {
1683 return Ok(true);
1684 }
1685 let state = self.concurrent_state_mut();
1686 let mut cur = state.current_thread;
1687 loop {
1688 match cur {
1689 CurrentThread::None => break Ok(true),
1690 CurrentThread::Guest(thread) => {
1691 let task = state.get_mut(thread.task)?;
1692
1693 if task.instance.instance == instance.instance {
1700 break Ok(false);
1701 }
1702 cur = match task.caller {
1703 Caller::Host { caller, .. } => caller,
1704 Caller::Guest { thread } => thread.into(),
1705 };
1706 }
1707 CurrentThread::Host(id) => {
1708 cur = state.get_mut(id)?.caller.into();
1709 }
1710 }
1711 }
1712 }
1713
1714 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1717 self.component_instance_mut(instance.instance)
1718 .instance_state(instance.index)
1719 }
1720
1721 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1727 let thread = thread.into();
1728 let state = self.concurrent_state_mut();
1729 let old_thread = mem::replace(&mut state.current_thread, thread);
1730
1731 if let Some(old_thread) = old_thread.guest() {
1739 let old_context = self.vm_store_context().component_context;
1740 self.concurrent_state_mut()
1741 .get_mut(old_thread.thread)?
1742 .context = old_context;
1743 }
1744 if cfg!(debug_assertions) {
1745 self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1746 }
1747 if let Some(thread) = thread.guest() {
1748 let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1749 let context = thread.context;
1750 if cfg!(debug_assertions) {
1751 thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1752 }
1753 self.vm_store_context_mut().component_context = context;
1754 }
1755
1756 let state = self.concurrent_state_mut();
1764 if let Some(old_thread) = old_thread.guest() {
1765 let instance = state.get_mut(old_thread.task)?.instance.instance;
1766 self.component_instance_mut(instance)
1767 .set_task_may_block(false)
1768 }
1769
1770 if thread.guest().is_some() {
1771 self.set_task_may_block()?;
1772 }
1773
1774 Ok(old_thread)
1775 }
1776
1777 fn set_task_may_block(&mut self) -> Result<()> {
1780 let state = self.concurrent_state_mut();
1781 let guest_thread = state.current_guest_thread()?;
1782 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1783 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1784 self.component_instance_mut(instance)
1785 .set_task_may_block(may_block);
1786 Ok(())
1787 }
1788
1789 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1790 if !self.concurrency_support() {
1791 return Ok(());
1792 }
1793 let state = self.concurrent_state_mut();
1794 let task = state.current_guest_thread()?.task;
1795 let instance = state.get_mut(task)?.instance.instance;
1796 let task_may_block = self.component_instance(instance).get_task_may_block();
1797
1798 if task_may_block {
1799 Ok(())
1800 } else {
1801 Err(Trap::CannotBlockSyncTask.into())
1802 }
1803 }
1804
1805 fn enter_instance(&mut self, instance: RuntimeInstance) {
1809 log::trace!("enter {instance:?}");
1810 self.instance_state(instance)
1811 .concurrent_state()
1812 .do_not_enter = true;
1813 }
1814
1815 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1819 log::trace!("exit {instance:?}");
1820 self.instance_state(instance)
1821 .concurrent_state()
1822 .do_not_enter = false;
1823 self.partition_pending(instance)
1824 }
1825
1826 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1831 for (thread, kind) in
1832 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1833 {
1834 let call = GuestCall { thread, kind };
1835 if call.is_ready(self)? {
1836 self.concurrent_state_mut()
1837 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1838 } else {
1839 self.instance_state(instance)
1840 .concurrent_state()
1841 .pending
1842 .insert(call.thread, call.kind);
1843 }
1844 }
1845
1846 Ok(())
1847 }
1848
1849 pub(crate) fn backpressure_modify(
1851 &mut self,
1852 caller_instance: RuntimeInstance,
1853 modify: impl FnOnce(u16) -> Option<u16>,
1854 ) -> Result<()> {
1855 let state = self.instance_state(caller_instance).concurrent_state();
1856 let old = state.backpressure;
1857 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1858 state.backpressure = new;
1859
1860 if old > 0 && new == 0 {
1861 self.partition_pending(caller_instance)?;
1864 }
1865
1866 Ok(())
1867 }
1868
1869 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1872 let old_thread = self.concurrent_state_mut().current_thread;
1873 log::trace!("resume_fiber: save current thread {old_thread:?}");
1874
1875 let fiber = fiber::resolve_or_release(self, fiber).await?;
1876
1877 self.set_thread(old_thread)?;
1878
1879 let state = self.concurrent_state_mut();
1880
1881 if let Some(ot) = old_thread.guest() {
1882 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1883 }
1884 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1885
1886 if let Some(mut fiber) = fiber {
1887 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1888 let reason = match state.suspend_reason.take() {
1890 Some(r) => r,
1891 None => bail_bug!("suspend reason missing when resuming fiber"),
1892 };
1893 match reason {
1894 SuspendReason::NeedWork => {
1895 if state.worker.is_none() {
1896 state.worker = Some(fiber);
1897 } else {
1898 fiber.dispose(self);
1899 }
1900 }
1901 SuspendReason::Yielding {
1902 thread,
1903 cancellable,
1904 ..
1905 } => {
1906 state.get_mut(thread.thread)?.state =
1907 GuestThreadState::Ready { fiber, cancellable };
1908 let instance = state.get_mut(thread.task)?.instance.index;
1909 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1910 }
1911 SuspendReason::ExplicitlySuspending { thread, .. } => {
1912 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1913 }
1914 SuspendReason::Waiting { set, thread, .. } => {
1915 let old = state
1916 .get_mut(set)?
1917 .waiting
1918 .insert(thread, WaitMode::Fiber(fiber));
1919 assert!(old.is_none());
1920 }
1921 };
1922 } else {
1923 log::trace!("resume_fiber: fiber has exited");
1924 }
1925
1926 Ok(())
1927 }
1928
1929 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1935 log::trace!("suspend fiber: {reason:?}");
1936
1937 let task = match &reason {
1941 SuspendReason::Yielding { thread, .. }
1942 | SuspendReason::Waiting { thread, .. }
1943 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1944 SuspendReason::NeedWork => None,
1945 };
1946
1947 let old_guest_thread = if task.is_some() {
1948 self.concurrent_state_mut().current_thread
1949 } else {
1950 CurrentThread::None
1951 };
1952
1953 debug_assert!(
1959 matches!(
1960 reason,
1961 SuspendReason::ExplicitlySuspending {
1962 skip_may_block_check: true,
1963 ..
1964 } | SuspendReason::Waiting {
1965 skip_may_block_check: true,
1966 ..
1967 } | SuspendReason::Yielding {
1968 skip_may_block_check: true,
1969 ..
1970 }
1971 ) || old_guest_thread
1972 .guest()
1973 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1974 .transpose()?
1975 .unwrap_or(true)
1976 );
1977
1978 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1979 assert!(suspend_reason.is_none());
1980 *suspend_reason = Some(reason);
1981
1982 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1983
1984 if task.is_some() {
1985 self.set_thread(old_guest_thread)?;
1986 }
1987
1988 Ok(())
1989 }
1990
1991 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1992 let state = self.concurrent_state_mut();
1993
1994 if waitable.common(state)?.set.is_some() {
1995 bail!(Trap::WaitableSyncAndAsync);
1996 }
1997
1998 let caller = state.current_guest_thread()?;
1999 let set = state.get_mut(caller.thread)?.sync_call_set;
2000 waitable.join(state, Some(set))?;
2001 self.suspend(SuspendReason::Waiting {
2002 set,
2003 thread: caller,
2004 skip_may_block_check: false,
2005 })?;
2006 let state = self.concurrent_state_mut();
2007 waitable.join(state, None)
2008 }
2009
2010 fn cleanup_thread(
2032 &mut self,
2033 guest_thread: QualifiedThreadId,
2034 runtime_instance: RuntimeInstance,
2035 cleanup_task: CleanupTask,
2036 ) -> Result<()> {
2037 let state = self.concurrent_state_mut();
2038 let thread_data = state.get_mut(guest_thread.thread)?;
2039 let sync_call_set = thread_data.sync_call_set;
2040 if let Some(guest_id) = thread_data.instance_rep {
2041 self.instance_state(runtime_instance)
2042 .thread_handle_table()
2043 .guest_thread_remove(guest_id)?;
2044 }
2045 let state = self.concurrent_state_mut();
2046
2047 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2049 if let Some(Event::Subtask {
2050 status: Status::Returned | Status::ReturnCancelled,
2051 }) = waitable.common(state)?.event
2052 {
2053 waitable.delete_from(state)?;
2054 }
2055 }
2056
2057 state.delete(guest_thread.thread)?;
2058 state.delete(sync_call_set)?;
2059 let task = state.get_mut(guest_thread.task)?;
2060 task.threads.remove(&guest_thread.thread);
2061
2062 if task.threads.is_empty() && !task.returned_or_cancelled() {
2063 bail!(Trap::NoAsyncResult);
2064 }
2065 let ready_to_delete = task.ready_to_delete();
2066
2067 if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2068 task.decremented_interesting_task_count = true;
2069
2070 debug_assert!(state.interesting_tasks > 0);
2071 state.interesting_tasks -= 1;
2072 if state.interesting_tasks == 0
2073 && let Some(waker) = state.interesting_tasks_empty_waker.take()
2074 {
2075 waker.wake();
2076 }
2077 }
2078
2079 match cleanup_task {
2080 CleanupTask::Yes => {
2081 if ready_to_delete {
2082 Waitable::Guest(guest_thread.task).delete_from(state)?;
2083 }
2084 }
2085 CleanupTask::No => {}
2086 }
2087
2088 Ok(())
2089 }
2090
2091 fn cancel_guest_subtask_without_lowered_parameters(
2104 &mut self,
2105 caller_instance: RuntimeInstance,
2106 guest_task: TableId<GuestTask>,
2107 ) -> Result<()> {
2108 let concurrent_state = self.concurrent_state_mut();
2109 let task = concurrent_state.get_mut(guest_task)?;
2110 assert!(!task.already_lowered_parameters());
2111 task.lower_params = None;
2115 task.lift_result = None;
2116 task.exited = true;
2117 let instance = task.instance;
2118
2119 assert_eq!(1, task.threads.len());
2122 let thread = *task.threads.iter().next().unwrap();
2123 self.cleanup_thread(
2124 QualifiedThreadId {
2125 task: guest_task,
2126 thread,
2127 },
2128 caller_instance,
2129 CleanupTask::No,
2130 )?;
2131
2132 let pending = &mut self.instance_state(instance).concurrent_state().pending;
2134 let pending_count = pending.len();
2135 pending.retain(|thread, _| thread.task != guest_task);
2136 if pending.len() == pending_count {
2138 bail!(Trap::SubtaskCancelAfterTerminal);
2139 }
2140 Ok(())
2141 }
2142}
2143
2144enum CleanupTask {
2145 Yes,
2146 No,
2147}
2148
2149impl Instance {
2150 fn get_event(
2153 self,
2154 store: &mut StoreOpaque,
2155 guest_task: TableId<GuestTask>,
2156 set: Option<TableId<WaitableSet>>,
2157 cancellable: bool,
2158 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2159 let state = store.concurrent_state_mut();
2160
2161 let event = &mut state.get_mut(guest_task)?.event;
2162 if let Some(ev) = event
2163 && (cancellable || !matches!(ev, Event::Cancelled))
2164 {
2165 log::trace!("deliver event {ev:?} to {guest_task:?}");
2166 let ev = *ev;
2167 *event = None;
2168 return Ok(Some((ev, None)));
2169 }
2170
2171 let set = match set {
2172 Some(set) => set,
2173 None => return Ok(None),
2174 };
2175 let waitable = match state.get_mut(set)?.ready.pop_first() {
2176 Some(v) => v,
2177 None => return Ok(None),
2178 };
2179
2180 let common = waitable.common(state)?;
2181 let handle = match common.handle {
2182 Some(h) => h,
2183 None => bail_bug!("handle not set when delivering event"),
2184 };
2185 let event = match common.event.take() {
2186 Some(e) => e,
2187 None => bail_bug!("event not set when delivering event"),
2188 };
2189
2190 log::trace!(
2191 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2192 );
2193
2194 waitable.on_delivery(store, self, event)?;
2195
2196 Ok(Some((event, Some((waitable, handle)))))
2197 }
2198
2199 fn handle_callback_code(
2205 self,
2206 store: &mut StoreOpaque,
2207 guest_thread: QualifiedThreadId,
2208 runtime_instance: RuntimeComponentInstanceIndex,
2209 code: u32,
2210 ) -> Result<Option<GuestCall>> {
2211 let (code, set) = unpack_callback_code(code);
2212
2213 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2214
2215 let state = store.concurrent_state_mut();
2216
2217 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2218 let set = store
2219 .instance_state(self.runtime_instance(runtime_instance))
2220 .handle_table()
2221 .waitable_set_rep(handle)?;
2222
2223 Ok(TableId::<WaitableSet>::new(set))
2224 };
2225
2226 Ok(match code {
2227 callback_code::EXIT => {
2228 log::trace!("implicit thread {guest_thread:?} completed");
2229 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2230 task.exited = true;
2231 task.callback = None;
2232 store.cleanup_thread(
2233 guest_thread,
2234 self.runtime_instance(runtime_instance),
2235 CleanupTask::Yes,
2236 )?;
2237 None
2238 }
2239 callback_code::YIELD => {
2240 let task = state.get_mut(guest_thread.task)?;
2241 if let Some(event) = task.event {
2246 assert!(matches!(event, Event::None | Event::Cancelled));
2247 } else {
2248 task.event = Some(Event::None);
2249 }
2250 let call = GuestCall {
2251 thread: guest_thread,
2252 kind: GuestCallKind::DeliverEvent {
2253 instance: self,
2254 set: None,
2255 },
2256 };
2257 if state.may_block(guest_thread.task)? {
2258 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2261 None
2262 } else {
2263 Some(call)
2267 }
2268 }
2269 callback_code::WAIT => {
2270 state.check_blocking_for(guest_thread.task)?;
2273
2274 let set = get_set(store, set)?;
2275 let state = store.concurrent_state_mut();
2276
2277 if state.get_mut(guest_thread.task)?.event.is_some()
2278 || !state.get_mut(set)?.ready.is_empty()
2279 {
2280 state.push_high_priority(WorkItem::GuestCall(
2282 runtime_instance,
2283 GuestCall {
2284 thread: guest_thread,
2285 kind: GuestCallKind::DeliverEvent {
2286 instance: self,
2287 set: Some(set),
2288 },
2289 },
2290 ));
2291 } else {
2292 let old = state
2300 .get_mut(guest_thread.thread)?
2301 .wake_on_cancel
2302 .replace(set);
2303 if !old.is_none() {
2304 bail_bug!("thread unexpectedly had wake_on_cancel set");
2305 }
2306 let old = state
2307 .get_mut(set)?
2308 .waiting
2309 .insert(guest_thread, WaitMode::Callback(self));
2310 if !old.is_none() {
2311 bail_bug!("set's waiting set already had this thread registered");
2312 }
2313 }
2314 None
2315 }
2316 _ => bail!(Trap::UnsupportedCallbackCode),
2317 })
2318 }
2319
2320 unsafe fn queue_call<T: 'static>(
2327 self,
2328 mut store: StoreContextMut<T>,
2329 guest_thread: QualifiedThreadId,
2330 callee: SendSyncPtr<VMFuncRef>,
2331 param_count: usize,
2332 result_count: usize,
2333 async_: bool,
2334 callback: Option<SendSyncPtr<VMFuncRef>>,
2335 post_return: Option<SendSyncPtr<VMFuncRef>>,
2336 ) -> Result<()> {
2337 unsafe fn make_call<T: 'static>(
2352 store: StoreContextMut<T>,
2353 guest_thread: QualifiedThreadId,
2354 callee: SendSyncPtr<VMFuncRef>,
2355 param_count: usize,
2356 result_count: usize,
2357 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2358 + Send
2359 + Sync
2360 + 'static
2361 + use<T> {
2362 let token = StoreToken::new(store);
2363 move |store: &mut dyn VMStore| {
2364 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2365
2366 store
2367 .concurrent_state_mut()
2368 .get_mut(guest_thread.thread)?
2369 .state = GuestThreadState::Running;
2370 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2371 let lower = match task.lower_params.take() {
2372 Some(l) => l,
2373 None => bail_bug!("lower_params missing"),
2374 };
2375
2376 lower(store, &mut storage[..param_count])?;
2377
2378 let mut store = token.as_context_mut(store);
2379
2380 unsafe {
2383 crate::Func::call_unchecked_raw(
2384 &mut store,
2385 callee.as_non_null(),
2386 NonNull::new(
2387 &mut storage[..param_count.max(result_count)]
2388 as *mut [MaybeUninit<ValRaw>] as _,
2389 )
2390 .unwrap(),
2391 )?;
2392 }
2393
2394 Ok(storage)
2395 }
2396 }
2397
2398 let call = unsafe {
2402 make_call(
2403 store.as_context_mut(),
2404 guest_thread,
2405 callee,
2406 param_count,
2407 result_count,
2408 )
2409 };
2410
2411 let callee_instance = store
2412 .0
2413 .concurrent_state_mut()
2414 .get_mut(guest_thread.task)?
2415 .instance;
2416
2417 let fun = if callback.is_some() {
2418 assert!(async_);
2419
2420 Box::new(move |store: &mut dyn VMStore| {
2421 self.add_guest_thread_to_instance_table(
2422 guest_thread.thread,
2423 store,
2424 callee_instance.index,
2425 )?;
2426 let old_thread = store.set_thread(guest_thread)?;
2427 log::trace!(
2428 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2429 );
2430
2431 store.enter_instance(callee_instance);
2432
2433 let storage = call(store)?;
2440
2441 store.exit_instance(callee_instance)?;
2442
2443 store.set_thread(old_thread)?;
2444 let state = store.concurrent_state_mut();
2445 if let Some(t) = old_thread.guest() {
2446 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2447 }
2448 log::trace!("stackless call: restored {old_thread:?} as current thread");
2449
2450 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2453
2454 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2455 })
2456 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2457 } else {
2458 let token = StoreToken::new(store.as_context_mut());
2459 Box::new(move |store: &mut dyn VMStore| {
2460 self.add_guest_thread_to_instance_table(
2461 guest_thread.thread,
2462 store,
2463 callee_instance.index,
2464 )?;
2465 let old_thread = store.set_thread(guest_thread)?;
2466 log::trace!(
2467 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2468 );
2469 let flags = self.id().get(store).instance_flags(callee_instance.index);
2470
2471 if !async_ {
2475 store.enter_instance(callee_instance);
2476 }
2477
2478 let storage = call(store)?;
2485
2486 if !async_ {
2487 let lift = {
2493 store.exit_instance(callee_instance)?;
2494
2495 let state = store.concurrent_state_mut();
2496 if !state.get_mut(guest_thread.task)?.result.is_none() {
2497 bail_bug!("task has already produced a result");
2498 }
2499
2500 match state.get_mut(guest_thread.task)?.lift_result.take() {
2501 Some(lift) => lift,
2502 None => bail_bug!("lift_result field is missing"),
2503 }
2504 };
2505
2506 let result = (lift.lift)(store, unsafe {
2509 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2510 &storage[..result_count],
2511 )
2512 })?;
2513
2514 let post_return_arg = match result_count {
2515 0 => ValRaw::i32(0),
2516 1 => unsafe { storage[0].assume_init() },
2519 _ => unreachable!(),
2520 };
2521
2522 unsafe {
2523 call_post_return(
2524 token.as_context_mut(store),
2525 post_return.map(|v| v.as_non_null()),
2526 post_return_arg,
2527 flags,
2528 )?;
2529 }
2530
2531 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2532 }
2533
2534 store.set_thread(old_thread)?;
2535
2536 store
2537 .concurrent_state_mut()
2538 .get_mut(guest_thread.task)?
2539 .exited = true;
2540
2541 store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2543 Ok(None)
2544 })
2545 };
2546
2547 store
2548 .0
2549 .concurrent_state_mut()
2550 .push_high_priority(WorkItem::GuestCall(
2551 callee_instance.index,
2552 GuestCall {
2553 thread: guest_thread,
2554 kind: GuestCallKind::StartImplicit(fun),
2555 },
2556 ));
2557
2558 Ok(())
2559 }
2560
2561 unsafe fn prepare_call<T: 'static>(
2574 self,
2575 mut store: StoreContextMut<T>,
2576 start: NonNull<VMFuncRef>,
2577 return_: NonNull<VMFuncRef>,
2578 caller_instance: RuntimeComponentInstanceIndex,
2579 callee_instance: RuntimeComponentInstanceIndex,
2580 task_return_type: TypeTupleIndex,
2581 callee_async: bool,
2582 memory: *mut VMMemoryDefinition,
2583 string_encoding: StringEncoding,
2584 caller_info: CallerInfo,
2585 ) -> Result<()> {
2586 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2587 store.0.check_blocking()?;
2591 }
2592
2593 enum ResultInfo {
2594 Heap { results: u32 },
2595 Stack { result_count: u32 },
2596 }
2597
2598 let result_info = match &caller_info {
2599 CallerInfo::Async {
2600 has_result: true,
2601 params,
2602 } => ResultInfo::Heap {
2603 results: match params.last() {
2604 Some(r) => r.get_u32(),
2605 None => bail_bug!("retptr missing"),
2606 },
2607 },
2608 CallerInfo::Async {
2609 has_result: false, ..
2610 } => ResultInfo::Stack { result_count: 0 },
2611 CallerInfo::Sync {
2612 result_count,
2613 params,
2614 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2615 results: match params.last() {
2616 Some(r) => r.get_u32(),
2617 None => bail_bug!("arg ptr missing"),
2618 },
2619 },
2620 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2621 result_count: *result_count,
2622 },
2623 };
2624
2625 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2626
2627 let start = SendSyncPtr::new(start);
2631 let return_ = SendSyncPtr::new(return_);
2632 let token = StoreToken::new(store.as_context_mut());
2633 let state = store.0.concurrent_state_mut();
2634 let old_thread = state.current_guest_thread()?;
2635
2636 debug_assert_eq!(
2637 state.get_mut(old_thread.task)?.instance,
2638 self.runtime_instance(caller_instance)
2639 );
2640
2641 let guest_thread = GuestTask::new(
2642 state,
2643 Box::new(move |store, dst| {
2644 let mut store = token.as_context_mut(store);
2645 assert!(dst.len() <= MAX_FLAT_PARAMS);
2646 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2648 let count = match caller_info {
2649 CallerInfo::Async { params, has_result } => {
2653 let params = ¶ms[..params.len() - usize::from(has_result)];
2654 for (param, src) in params.iter().zip(&mut src) {
2655 src.write(*param);
2656 }
2657 params.len()
2658 }
2659
2660 CallerInfo::Sync { params, .. } => {
2662 for (param, src) in params.iter().zip(&mut src) {
2663 src.write(*param);
2664 }
2665 params.len()
2666 }
2667 };
2668 unsafe {
2675 crate::Func::call_unchecked_raw(
2676 &mut store,
2677 start.as_non_null(),
2678 NonNull::new(
2679 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2680 )
2681 .unwrap(),
2682 )?;
2683 }
2684 dst.copy_from_slice(&src[..dst.len()]);
2685 let state = store.0.concurrent_state_mut();
2686 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2687 state,
2688 Some(Event::Subtask {
2689 status: Status::Started,
2690 }),
2691 )?;
2692 Ok(())
2693 }),
2694 LiftResult {
2695 lift: Box::new(move |store, src| {
2696 let mut store = token.as_context_mut(store);
2699 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2701 my_src.push(ValRaw::u32(*results));
2702 }
2703
2704 let prev = store.0.set_thread(old_thread)?;
2710
2711 unsafe {
2718 crate::Func::call_unchecked_raw(
2719 &mut store,
2720 return_.as_non_null(),
2721 my_src.as_mut_slice().into(),
2722 )?;
2723 }
2724
2725 store.0.set_thread(prev)?;
2728
2729 let state = store.0.concurrent_state_mut();
2730 let thread = state.current_guest_thread()?;
2731 if sync_caller {
2732 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2733 if let ResultInfo::Stack { result_count } = &result_info {
2734 match result_count {
2735 0 => None,
2736 1 => Some(my_src[0]),
2737 _ => unreachable!(),
2738 }
2739 } else {
2740 None
2741 },
2742 );
2743 }
2744 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2745 }),
2746 ty: task_return_type,
2747 memory: NonNull::new(memory).map(SendSyncPtr::new),
2748 string_encoding,
2749 },
2750 Caller::Guest { thread: old_thread },
2751 None,
2752 self.runtime_instance(callee_instance),
2753 callee_async,
2754 )?;
2755
2756 store.0.set_thread(guest_thread)?;
2759 log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2760
2761 Ok(())
2762 }
2763
2764 unsafe fn call_callback<T>(
2769 self,
2770 mut store: StoreContextMut<T>,
2771 function: SendSyncPtr<VMFuncRef>,
2772 event: Event,
2773 handle: u32,
2774 ) -> Result<u32> {
2775 let (ordinal, result) = event.parts();
2776 let params = &mut [
2777 ValRaw::u32(ordinal),
2778 ValRaw::u32(handle),
2779 ValRaw::u32(result),
2780 ];
2781 unsafe {
2786 crate::Func::call_unchecked_raw(
2787 &mut store,
2788 function.as_non_null(),
2789 params.as_mut_slice().into(),
2790 )?;
2791 }
2792 Ok(params[0].get_u32())
2793 }
2794
2795 unsafe fn start_call<T: 'static>(
2808 self,
2809 mut store: StoreContextMut<T>,
2810 callback: *mut VMFuncRef,
2811 post_return: *mut VMFuncRef,
2812 callee: NonNull<VMFuncRef>,
2813 param_count: u32,
2814 result_count: u32,
2815 flags: u32,
2816 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2817 ) -> Result<u32> {
2818 let token = StoreToken::new(store.as_context_mut());
2819 let async_caller = storage.is_none();
2820 let state = store.0.concurrent_state_mut();
2821 let guest_thread = state.current_guest_thread()?;
2822 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2823 let callee = SendSyncPtr::new(callee);
2824 let param_count = usize::try_from(param_count)?;
2825 assert!(param_count <= MAX_FLAT_PARAMS);
2826 let result_count = usize::try_from(result_count)?;
2827 assert!(result_count <= MAX_FLAT_RESULTS);
2828
2829 let task = state.get_mut(guest_thread.task)?;
2830 if let Some(callback) = NonNull::new(callback) {
2831 let callback = SendSyncPtr::new(callback);
2835 task.callback = Some(Box::new(move |store, event, handle| {
2836 let store = token.as_context_mut(store);
2837 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2838 }));
2839 }
2840
2841 let Caller::Guest { thread: caller } = &task.caller else {
2842 bail_bug!("start_call unexpectedly invoked for host->guest call");
2845 };
2846 let caller = *caller;
2847 let caller_instance = state.get_mut(caller.task)?.instance;
2848
2849 unsafe {
2851 self.queue_call(
2852 store.as_context_mut(),
2853 guest_thread,
2854 callee,
2855 param_count,
2856 result_count,
2857 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2858 NonNull::new(callback).map(SendSyncPtr::new),
2859 NonNull::new(post_return).map(SendSyncPtr::new),
2860 )?;
2861 }
2862
2863 let state = store.0.concurrent_state_mut();
2864
2865 let guest_waitable = Waitable::Guest(guest_thread.task);
2868 let old_set = guest_waitable.common(state)?.set;
2869 let set = state.get_mut(caller.thread)?.sync_call_set;
2870 guest_waitable.join(state, Some(set))?;
2871
2872 store.0.set_thread(CurrentThread::None)?;
2873
2874 let (status, waitable) = loop {
2890 store.0.suspend(SuspendReason::Waiting {
2891 set,
2892 thread: caller,
2893 skip_may_block_check: async_caller || !callee_async,
2901 })?;
2902
2903 let state = store.0.concurrent_state_mut();
2904
2905 log::trace!("taking event for {:?}", guest_thread.task);
2906 let event = guest_waitable.take_event(state)?;
2907 let Some(Event::Subtask { status }) = event else {
2908 bail_bug!("subtasks should only get subtask events, got {event:?}")
2909 };
2910
2911 log::trace!("status {status:?} for {:?}", guest_thread.task);
2912
2913 if status == Status::Returned {
2914 break (status, None);
2916 } else if async_caller {
2917 let handle = store
2921 .0
2922 .instance_state(caller_instance)
2923 .handle_table()
2924 .subtask_insert_guest(guest_thread.task.rep())?;
2925 store
2926 .0
2927 .concurrent_state_mut()
2928 .get_mut(guest_thread.task)?
2929 .common
2930 .handle = Some(handle);
2931 break (status, Some(handle));
2932 } else {
2933 }
2937 };
2938
2939 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2940
2941 store.0.set_thread(caller)?;
2943 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2944 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2945
2946 if let Some(storage) = storage {
2947 let state = store.0.concurrent_state_mut();
2951 let task = state.get_mut(guest_thread.task)?;
2952 if let Some(result) = task.sync_result.take()? {
2953 if let Some(result) = result {
2954 storage[0] = MaybeUninit::new(result);
2955 }
2956
2957 if task.exited && task.ready_to_delete() {
2958 Waitable::Guest(guest_thread.task).delete_from(state)?;
2959 }
2960 }
2961 }
2962
2963 Ok(status.pack(waitable))
2964 }
2965
2966 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2979 self,
2980 mut store: StoreContextMut<'_, T>,
2981 future: impl Future<Output = Result<R>> + Send + 'static,
2982 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2983 ) -> Result<Option<u32>> {
2984 let token = StoreToken::new(store.as_context_mut());
2985 let state = store.0.concurrent_state_mut();
2986 let task = state.current_host_thread()?;
2987
2988 let (join_handle, future) = JoinHandle::run(future);
2991 {
2992 let state = &mut state.get_mut(task)?.state;
2993 assert!(matches!(state, HostTaskState::CalleeStarted));
2994 *state = HostTaskState::CalleeRunning(join_handle);
2995 }
2996
2997 let mut future = Box::pin(future);
2998
2999 let poll = tls::set(store.0, || {
3004 future
3005 .as_mut()
3006 .poll(&mut Context::from_waker(&Waker::noop()))
3007 });
3008
3009 match poll {
3010 Poll::Ready(result) => {
3012 let result = result.transpose()?;
3013 lower(store.as_context_mut(), result)?;
3014 return Ok(None);
3015 }
3016
3017 Poll::Pending => {}
3019 }
3020
3021 let future = Box::pin(async move {
3029 let result = match future.await {
3030 Some(result) => Some(result?),
3031 None => None,
3032 };
3033 let on_complete = move |store: &mut dyn VMStore| {
3034 let mut store = token.as_context_mut(store);
3038 let old = store.0.set_thread(task)?;
3039
3040 let status = if result.is_some() {
3041 Status::Returned
3042 } else {
3043 Status::ReturnCancelled
3044 };
3045
3046 lower(store.as_context_mut(), result)?;
3047 let state = store.0.concurrent_state_mut();
3048 match &mut state.get_mut(task)?.state {
3049 HostTaskState::CalleeDone { .. } => {}
3052
3053 other => *other = HostTaskState::CalleeDone { cancelled: false },
3055 }
3056 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3057
3058 store.0.set_thread(old)?;
3059 Ok(())
3060 };
3061
3062 tls::get(move |store| {
3067 store
3068 .concurrent_state_mut()
3069 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3070 on_complete,
3071 ))));
3072 Ok(())
3073 })
3074 });
3075
3076 let state = store.0.concurrent_state_mut();
3079 state.push_future(future);
3080 let caller = state.get_mut(task)?.caller;
3081 let instance = state.get_mut(caller.task)?.instance;
3082 let handle = store
3083 .0
3084 .instance_state(instance)
3085 .handle_table()
3086 .subtask_insert_host(task.rep())?;
3087 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3088 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3089
3090 store.0.set_thread(caller)?;
3094 Ok(Some(handle))
3095 }
3096
3097 pub(crate) fn task_return(
3100 self,
3101 store: &mut dyn VMStore,
3102 ty: TypeTupleIndex,
3103 options: OptionsIndex,
3104 storage: &[ValRaw],
3105 ) -> Result<()> {
3106 let state = store.concurrent_state_mut();
3107 let guest_thread = state.current_guest_thread()?;
3108 let lift = state
3109 .get_mut(guest_thread.task)?
3110 .lift_result
3111 .take()
3112 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3113 if !state.get_mut(guest_thread.task)?.result.is_none() {
3114 bail_bug!("task result unexpectedly already set");
3115 }
3116
3117 let CanonicalOptions {
3118 string_encoding,
3119 data_model,
3120 ..
3121 } = &self.id().get(store).component().env_component().options[options];
3122
3123 let invalid = ty != lift.ty
3124 || string_encoding != &lift.string_encoding
3125 || match data_model {
3126 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3127 Some(memory) => {
3128 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3129 let actual = self.id().get(store).runtime_memory(memory);
3130 expected != actual.as_ptr()
3131 }
3132 None => false,
3135 },
3136 CanonicalOptionsDataModel::Gc { .. } => true,
3138 };
3139
3140 if invalid {
3141 bail!(Trap::TaskReturnInvalid);
3142 }
3143
3144 log::trace!("task.return for {guest_thread:?}");
3145
3146 let result = (lift.lift)(store, storage)?;
3147 self.task_complete(store, guest_thread.task, result, Status::Returned)
3148 }
3149
3150 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3152 let state = store.concurrent_state_mut();
3153 let guest_thread = state.current_guest_thread()?;
3154 let task = state.get_mut(guest_thread.task)?;
3155 if !task.cancel_sent {
3156 bail!(Trap::TaskCancelNotCancelled);
3157 }
3158 _ = task
3159 .lift_result
3160 .take()
3161 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3162
3163 if !task.result.is_none() {
3164 bail_bug!("task result should not bet set yet");
3165 }
3166
3167 log::trace!("task.cancel for {guest_thread:?}");
3168
3169 self.task_complete(
3170 store,
3171 guest_thread.task,
3172 Box::new(DummyResult),
3173 Status::ReturnCancelled,
3174 )
3175 }
3176
3177 fn task_complete(
3183 self,
3184 store: &mut StoreOpaque,
3185 guest_task: TableId<GuestTask>,
3186 result: Box<dyn Any + Send + Sync>,
3187 status: Status,
3188 ) -> Result<()> {
3189 store
3190 .component_resource_tables(Some(self))
3191 .validate_scope_exit()?;
3192
3193 let state = store.concurrent_state_mut();
3194 let task = state.get_mut(guest_task)?;
3195
3196 if let Caller::Host { tx, .. } = &mut task.caller {
3197 if let Some(tx) = tx.take() {
3198 _ = tx.send(result);
3199 }
3200 } else {
3201 task.result = Some(result);
3202 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3203 }
3204
3205 Ok(())
3206 }
3207
3208 pub(crate) fn waitable_set_new(
3210 self,
3211 store: &mut StoreOpaque,
3212 caller_instance: RuntimeComponentInstanceIndex,
3213 ) -> Result<u32> {
3214 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3215 let handle = store
3216 .instance_state(self.runtime_instance(caller_instance))
3217 .handle_table()
3218 .waitable_set_insert(set.rep())?;
3219 log::trace!("new waitable set {set:?} (handle {handle})");
3220 Ok(handle)
3221 }
3222
3223 pub(crate) fn waitable_set_drop(
3225 self,
3226 store: &mut StoreOpaque,
3227 caller_instance: RuntimeComponentInstanceIndex,
3228 set: u32,
3229 ) -> Result<()> {
3230 let rep = store
3231 .instance_state(self.runtime_instance(caller_instance))
3232 .handle_table()
3233 .waitable_set_remove(set)?;
3234
3235 log::trace!("drop waitable set {rep} (handle {set})");
3236
3237 if !store
3241 .concurrent_state_mut()
3242 .get_mut(TableId::<WaitableSet>::new(rep))?
3243 .waiting
3244 .is_empty()
3245 {
3246 bail!(Trap::WaitableSetDropHasWaiters);
3247 }
3248
3249 store
3250 .concurrent_state_mut()
3251 .delete(TableId::<WaitableSet>::new(rep))?;
3252
3253 Ok(())
3254 }
3255
3256 pub(crate) fn waitable_join(
3258 self,
3259 store: &mut StoreOpaque,
3260 caller_instance: RuntimeComponentInstanceIndex,
3261 waitable_handle: u32,
3262 set_handle: u32,
3263 ) -> Result<()> {
3264 let mut instance = self.id().get_mut(store);
3265 let waitable =
3266 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3267
3268 let set = if set_handle == 0 {
3269 None
3270 } else {
3271 let set = instance.instance_states().0[caller_instance]
3272 .handle_table()
3273 .waitable_set_rep(set_handle)?;
3274
3275 let state = store.concurrent_state_mut();
3276 if let Some(old) = waitable.common(state)?.set
3277 && state.get_mut(old)?.is_sync_call_set
3278 {
3279 bail!(Trap::WaitableSyncAndAsync);
3280 }
3281
3282 Some(TableId::<WaitableSet>::new(set))
3283 };
3284
3285 log::trace!(
3286 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3287 );
3288
3289 waitable.join(store.concurrent_state_mut(), set)
3290 }
3291
3292 pub(crate) fn subtask_drop(
3294 self,
3295 store: &mut StoreOpaque,
3296 caller_instance: RuntimeComponentInstanceIndex,
3297 task_id: u32,
3298 ) -> Result<()> {
3299 self.waitable_join(store, caller_instance, task_id, 0)?;
3300
3301 let (rep, is_host) = store
3302 .instance_state(self.runtime_instance(caller_instance))
3303 .handle_table()
3304 .subtask_remove(task_id)?;
3305
3306 let concurrent_state = store.concurrent_state_mut();
3307 let (waitable, delete) = if is_host {
3308 let id = TableId::<HostTask>::new(rep);
3309 let task = concurrent_state.get_mut(id)?;
3310 match &task.state {
3311 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3312 HostTaskState::CalleeDone { .. } => {}
3313 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3314 bail_bug!("invalid state for callee in `subtask.drop`")
3315 }
3316 }
3317 (Waitable::Host(id), true)
3318 } else {
3319 let id = TableId::<GuestTask>::new(rep);
3320 let task = concurrent_state.get_mut(id)?;
3321 if task.lift_result.is_some() {
3322 bail!(Trap::SubtaskDropNotResolved);
3323 }
3324 (
3325 Waitable::Guest(id),
3326 concurrent_state.get_mut(id)?.ready_to_delete(),
3327 )
3328 };
3329
3330 waitable.common(concurrent_state)?.handle = None;
3331
3332 if waitable.take_event(concurrent_state)?.is_some() {
3335 bail!(Trap::SubtaskDropNotResolved);
3336 }
3337
3338 if delete {
3339 waitable.delete_from(concurrent_state)?;
3340 }
3341
3342 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3343 Ok(())
3344 }
3345
3346 pub(crate) fn waitable_set_wait(
3348 self,
3349 store: &mut StoreOpaque,
3350 options: OptionsIndex,
3351 set: u32,
3352 payload: u32,
3353 ) -> Result<u32> {
3354 if !self.options(store, options).async_ {
3355 store.check_blocking()?;
3359 }
3360
3361 let &CanonicalOptions {
3362 cancellable,
3363 instance: caller_instance,
3364 ..
3365 } = &self.id().get(store).component().env_component().options[options];
3366 let rep = store
3367 .instance_state(self.runtime_instance(caller_instance))
3368 .handle_table()
3369 .waitable_set_rep(set)?;
3370
3371 self.waitable_check(
3372 store,
3373 cancellable,
3374 WaitableCheck::Wait,
3375 WaitableCheckParams {
3376 set: TableId::new(rep),
3377 options,
3378 payload,
3379 },
3380 )
3381 }
3382
3383 pub(crate) fn waitable_set_poll(
3385 self,
3386 store: &mut StoreOpaque,
3387 options: OptionsIndex,
3388 set: u32,
3389 payload: u32,
3390 ) -> Result<u32> {
3391 let &CanonicalOptions {
3392 cancellable,
3393 instance: caller_instance,
3394 ..
3395 } = &self.id().get(store).component().env_component().options[options];
3396 let rep = store
3397 .instance_state(self.runtime_instance(caller_instance))
3398 .handle_table()
3399 .waitable_set_rep(set)?;
3400
3401 self.waitable_check(
3402 store,
3403 cancellable,
3404 WaitableCheck::Poll,
3405 WaitableCheckParams {
3406 set: TableId::new(rep),
3407 options,
3408 payload,
3409 },
3410 )
3411 }
3412
3413 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3415 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3416 match store
3417 .concurrent_state_mut()
3418 .get_mut(thread_id)?
3419 .instance_rep
3420 {
3421 Some(r) => Ok(r),
3422 None => bail_bug!("thread should have instance_rep by now"),
3423 }
3424 }
3425
3426 pub(crate) fn thread_new_indirect<T: 'static>(
3428 self,
3429 mut store: StoreContextMut<T>,
3430 runtime_instance: RuntimeComponentInstanceIndex,
3431 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3433 start_func_idx: u32,
3434 context: i32,
3435 ) -> Result<u32> {
3436 log::trace!("creating new thread");
3437
3438 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3439 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3440 let callee = instance
3441 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3442 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3443 if callee.type_index(store.0) != start_func_ty.type_index() {
3444 bail!(Trap::ThreadNewIndirectInvalidType);
3445 }
3446
3447 let token = StoreToken::new(store.as_context_mut());
3448 let start_func = Box::new(
3449 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3450 let old_thread = store.set_thread(guest_thread)?;
3451 log::trace!(
3452 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3453 );
3454
3455 let mut store = token.as_context_mut(store);
3456 let mut params = [ValRaw::i32(context)];
3457 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3460
3461 store.0.set_thread(old_thread)?;
3462
3463 store.0.cleanup_thread(
3464 guest_thread,
3465 self.runtime_instance(runtime_instance),
3466 CleanupTask::Yes,
3467 )?;
3468 log::trace!("explicit thread {guest_thread:?} completed");
3469 let state = store.0.concurrent_state_mut();
3470 if let Some(t) = old_thread.guest() {
3471 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3472 }
3473 log::trace!("thread start: restored {old_thread:?} as current thread");
3474
3475 Ok(())
3476 },
3477 );
3478
3479 let state = store.0.concurrent_state_mut();
3480 let current_thread = state.current_guest_thread()?;
3481 let parent_task = current_thread.task;
3482
3483 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3484 let thread_id = state.push(new_thread)?;
3485 state.get_mut(parent_task)?.threads.insert(thread_id);
3486
3487 log::trace!("new thread with id {thread_id:?} created");
3488
3489 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3490 }
3491
3492 pub(crate) fn resume_thread(
3493 self,
3494 store: &mut StoreOpaque,
3495 runtime_instance: RuntimeComponentInstanceIndex,
3496 thread_idx: u32,
3497 high_priority: bool,
3498 allow_ready: bool,
3499 ) -> Result<()> {
3500 let thread_id =
3501 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3502 let state = store.concurrent_state_mut();
3503 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3504 let thread = state.get_mut(guest_thread.thread)?;
3505
3506 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3507 GuestThreadState::NotStartedExplicit(start_func) => {
3508 log::trace!("starting thread {guest_thread:?}");
3509 let guest_call = WorkItem::GuestCall(
3510 runtime_instance,
3511 GuestCall {
3512 thread: guest_thread,
3513 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3514 start_func(store, guest_thread)
3515 })),
3516 },
3517 );
3518 store
3519 .concurrent_state_mut()
3520 .push_work_item(guest_call, high_priority);
3521 }
3522 GuestThreadState::Suspended(fiber) => {
3523 log::trace!("resuming thread {thread_id:?} that was suspended");
3524 store
3525 .concurrent_state_mut()
3526 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3527 }
3528 GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3529 log::trace!("resuming thread {thread_id:?} that was ready");
3530 thread.state = GuestThreadState::Ready { fiber, cancellable };
3531 store
3532 .concurrent_state_mut()
3533 .promote_thread_work_item(guest_thread);
3534 }
3535 other => {
3536 thread.state = other;
3537 bail!(Trap::CannotResumeThread);
3538 }
3539 }
3540 Ok(())
3541 }
3542
3543 fn add_guest_thread_to_instance_table(
3544 self,
3545 thread_id: TableId<GuestThread>,
3546 store: &mut StoreOpaque,
3547 runtime_instance: RuntimeComponentInstanceIndex,
3548 ) -> Result<u32> {
3549 let guest_id = store
3550 .instance_state(self.runtime_instance(runtime_instance))
3551 .thread_handle_table()
3552 .guest_thread_insert(thread_id.rep())?;
3553 store
3554 .concurrent_state_mut()
3555 .get_mut(thread_id)?
3556 .instance_rep = Some(guest_id);
3557 Ok(guest_id)
3558 }
3559
3560 pub(crate) fn suspension_intrinsic(
3563 self,
3564 store: &mut StoreOpaque,
3565 caller: RuntimeComponentInstanceIndex,
3566 cancellable: bool,
3567 yielding: bool,
3568 to_thread: SuspensionTarget,
3569 ) -> Result<WaitResult> {
3570 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3571 if to_thread.is_none() {
3572 let state = store.concurrent_state_mut();
3573 if yielding {
3574 if !state.may_block(guest_thread.task)? {
3576 if !state.promote_instance_local_thread_work_item(caller) {
3579 return Ok(WaitResult::Completed);
3581 }
3582 }
3583 } else {
3584 store.check_blocking()?;
3588 }
3589 }
3590
3591 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3593 return Ok(WaitResult::Cancelled);
3594 }
3595
3596 match to_thread {
3597 SuspensionTarget::SomeSuspended(thread) => {
3598 self.resume_thread(store, caller, thread, true, false)?
3599 }
3600 SuspensionTarget::Some(thread) => {
3601 self.resume_thread(store, caller, thread, true, true)?
3602 }
3603 SuspensionTarget::None => { }
3604 }
3605
3606 let reason = if yielding {
3607 SuspendReason::Yielding {
3608 thread: guest_thread,
3609 cancellable,
3610 skip_may_block_check: to_thread.is_some(),
3614 }
3615 } else {
3616 SuspendReason::ExplicitlySuspending {
3617 thread: guest_thread,
3618 skip_may_block_check: to_thread.is_some(),
3622 }
3623 };
3624
3625 store.suspend(reason)?;
3626
3627 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3628 Ok(WaitResult::Cancelled)
3629 } else {
3630 Ok(WaitResult::Completed)
3631 }
3632 }
3633
3634 fn waitable_check(
3636 self,
3637 store: &mut StoreOpaque,
3638 cancellable: bool,
3639 check: WaitableCheck,
3640 params: WaitableCheckParams,
3641 ) -> Result<u32> {
3642 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3643
3644 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3645
3646 let state = store.concurrent_state_mut();
3647 let task = state.get_mut(guest_thread.task)?;
3648
3649 match &check {
3652 WaitableCheck::Wait => {
3653 let set = params.set;
3654
3655 if (task.event.is_none()
3656 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3657 && state.get_mut(set)?.ready.is_empty()
3658 {
3659 if cancellable {
3660 let old = state
3661 .get_mut(guest_thread.thread)?
3662 .wake_on_cancel
3663 .replace(set);
3664 if !old.is_none() {
3665 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3666 }
3667 }
3668
3669 store.suspend(SuspendReason::Waiting {
3670 set,
3671 thread: guest_thread,
3672 skip_may_block_check: false,
3673 })?;
3674 }
3675 }
3676 WaitableCheck::Poll => {}
3677 }
3678
3679 log::trace!(
3680 "waitable check for {guest_thread:?}; set {:?}, part two",
3681 params.set
3682 );
3683
3684 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3686
3687 let (ordinal, handle, result) = match &check {
3688 WaitableCheck::Wait => {
3689 let (event, waitable) = match event {
3690 Some(p) => p,
3691 None => bail_bug!("event expected to be present"),
3692 };
3693 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3694 let (ordinal, result) = event.parts();
3695 (ordinal, handle, result)
3696 }
3697 WaitableCheck::Poll => {
3698 if let Some((event, waitable)) = event {
3699 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3700 let (ordinal, result) = event.parts();
3701 (ordinal, handle, result)
3702 } else {
3703 log::trace!(
3704 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3705 guest_thread.task,
3706 params.set
3707 );
3708 let (ordinal, result) = Event::None.parts();
3709 (ordinal, 0, result)
3710 }
3711 }
3712 };
3713 let memory = self.options_memory_mut(store, params.options);
3714 let ptr = func::validate_inbounds_dynamic(
3715 &CanonicalAbiInfo::POINTER_PAIR,
3716 memory,
3717 &ValRaw::u32(params.payload),
3718 )?;
3719 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3720 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3721 Ok(ordinal)
3722 }
3723
3724 pub(crate) fn subtask_cancel(
3726 self,
3727 store: &mut StoreOpaque,
3728 caller_instance: RuntimeComponentInstanceIndex,
3729 async_: bool,
3730 task_id: u32,
3731 ) -> Result<u32> {
3732 if !async_ {
3733 store.check_blocking()?;
3737 }
3738
3739 let (rep, is_host) = store
3740 .instance_state(self.runtime_instance(caller_instance))
3741 .handle_table()
3742 .subtask_rep(task_id)?;
3743 let waitable = if is_host {
3744 Waitable::Host(TableId::<HostTask>::new(rep))
3745 } else {
3746 Waitable::Guest(TableId::<GuestTask>::new(rep))
3747 };
3748 let concurrent_state = store.concurrent_state_mut();
3749
3750 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3751
3752 let needs_block;
3753 if let Waitable::Host(host_task) = waitable {
3754 let state = &mut concurrent_state.get_mut(host_task)?.state;
3755 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3756 HostTaskState::CalleeRunning(handle) => {
3763 handle.abort();
3764 needs_block = true;
3765 }
3766
3767 HostTaskState::CalleeDone { cancelled } => {
3770 if cancelled {
3771 bail!(Trap::SubtaskCancelAfterTerminal);
3772 } else {
3773 needs_block = false;
3776 }
3777 }
3778
3779 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3782 bail_bug!("invalid states for host callee")
3783 }
3784 }
3785 } else {
3786 let guest_task = TableId::<GuestTask>::new(rep);
3787 let task = concurrent_state.get_mut(guest_task)?;
3788 if !task.already_lowered_parameters() {
3789 store.cancel_guest_subtask_without_lowered_parameters(
3790 self.runtime_instance(caller_instance),
3791 guest_task,
3792 )?;
3793 return Ok(Status::StartCancelled as u32);
3794 } else if !task.returned_or_cancelled() {
3795 task.cancel_sent = true;
3798 task.event = Some(Event::Cancelled);
3803 let runtime_instance = task.instance.index;
3804 for thread in task.threads.clone() {
3805 let thread = QualifiedThreadId {
3806 task: guest_task,
3807 thread,
3808 };
3809 let thread_mut = concurrent_state.get_mut(thread.thread)?;
3810 if let Some(set) = thread_mut.wake_on_cancel.take() {
3811 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3813 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3814 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3815 runtime_instance,
3816 GuestCall {
3817 thread,
3818 kind: GuestCallKind::DeliverEvent {
3819 instance,
3820 set: None,
3821 },
3822 },
3823 ),
3824 None => bail_bug!("thread not present in wake_on_cancel set"),
3825 };
3826 concurrent_state.push_high_priority(item);
3827
3828 let caller = concurrent_state.current_guest_thread()?;
3829 store.suspend(SuspendReason::Yielding {
3830 thread: caller,
3831 cancellable: false,
3832 skip_may_block_check: false,
3835 })?;
3836 break;
3837 } else if let GuestThreadState::Ready {
3838 cancellable: true, ..
3839 } = &thread_mut.state
3840 {
3841 let caller = concurrent_state.current_guest_thread()?;
3844 concurrent_state.promote_thread_work_item(thread);
3845 store.suspend(SuspendReason::Yielding {
3846 thread: caller,
3847 cancellable: false,
3848 skip_may_block_check: false,
3849 })?;
3850 break;
3851 }
3852 }
3853
3854 needs_block = !store
3857 .concurrent_state_mut()
3858 .get_mut(guest_task)?
3859 .returned_or_cancelled()
3860 } else {
3861 needs_block = false;
3862 }
3863 };
3864
3865 if needs_block {
3869 if async_ {
3870 return Ok(BLOCKED);
3871 }
3872
3873 store.wait_for_event(waitable)?;
3877
3878 }
3880
3881 let event = waitable.take_event(store.concurrent_state_mut())?;
3882 if let Some(Event::Subtask {
3883 status: status @ (Status::Returned | Status::ReturnCancelled),
3884 }) = event
3885 {
3886 Ok(status as u32)
3887 } else {
3888 bail!(Trap::SubtaskCancelAfterTerminal);
3889 }
3890 }
3891}
3892
3893pub trait VMComponentAsyncStore {
3901 unsafe fn prepare_call(
3907 &mut self,
3908 instance: Instance,
3909 memory: *mut VMMemoryDefinition,
3910 start: NonNull<VMFuncRef>,
3911 return_: NonNull<VMFuncRef>,
3912 caller_instance: RuntimeComponentInstanceIndex,
3913 callee_instance: RuntimeComponentInstanceIndex,
3914 task_return_type: TypeTupleIndex,
3915 callee_async: bool,
3916 string_encoding: StringEncoding,
3917 result_count: u32,
3918 storage: *mut ValRaw,
3919 storage_len: usize,
3920 ) -> Result<()>;
3921
3922 unsafe fn sync_start(
3925 &mut self,
3926 instance: Instance,
3927 callback: *mut VMFuncRef,
3928 callee: NonNull<VMFuncRef>,
3929 param_count: u32,
3930 storage: *mut MaybeUninit<ValRaw>,
3931 storage_len: usize,
3932 ) -> Result<()>;
3933
3934 unsafe fn async_start(
3937 &mut self,
3938 instance: Instance,
3939 callback: *mut VMFuncRef,
3940 post_return: *mut VMFuncRef,
3941 callee: NonNull<VMFuncRef>,
3942 param_count: u32,
3943 result_count: u32,
3944 flags: u32,
3945 ) -> Result<u32>;
3946
3947 fn future_write(
3949 &mut self,
3950 instance: Instance,
3951 caller: RuntimeComponentInstanceIndex,
3952 ty: TypeFutureTableIndex,
3953 options: OptionsIndex,
3954 future: u32,
3955 address: u32,
3956 ) -> Result<u32>;
3957
3958 fn future_read(
3960 &mut self,
3961 instance: Instance,
3962 caller: RuntimeComponentInstanceIndex,
3963 ty: TypeFutureTableIndex,
3964 options: OptionsIndex,
3965 future: u32,
3966 address: u32,
3967 ) -> Result<u32>;
3968
3969 fn future_drop_writable(
3971 &mut self,
3972 instance: Instance,
3973 ty: TypeFutureTableIndex,
3974 writer: u32,
3975 ) -> Result<()>;
3976
3977 fn stream_write(
3979 &mut self,
3980 instance: Instance,
3981 caller: RuntimeComponentInstanceIndex,
3982 ty: TypeStreamTableIndex,
3983 options: OptionsIndex,
3984 stream: u32,
3985 address: u32,
3986 count: u32,
3987 ) -> Result<u32>;
3988
3989 fn stream_read(
3991 &mut self,
3992 instance: Instance,
3993 caller: RuntimeComponentInstanceIndex,
3994 ty: TypeStreamTableIndex,
3995 options: OptionsIndex,
3996 stream: u32,
3997 address: u32,
3998 count: u32,
3999 ) -> Result<u32>;
4000
4001 fn flat_stream_write(
4004 &mut self,
4005 instance: Instance,
4006 caller: RuntimeComponentInstanceIndex,
4007 ty: TypeStreamTableIndex,
4008 options: OptionsIndex,
4009 payload_size: u32,
4010 payload_align: u32,
4011 stream: u32,
4012 address: u32,
4013 count: u32,
4014 ) -> Result<u32>;
4015
4016 fn flat_stream_read(
4019 &mut self,
4020 instance: Instance,
4021 caller: RuntimeComponentInstanceIndex,
4022 ty: TypeStreamTableIndex,
4023 options: OptionsIndex,
4024 payload_size: u32,
4025 payload_align: u32,
4026 stream: u32,
4027 address: u32,
4028 count: u32,
4029 ) -> Result<u32>;
4030
4031 fn stream_drop_writable(
4033 &mut self,
4034 instance: Instance,
4035 ty: TypeStreamTableIndex,
4036 writer: u32,
4037 ) -> Result<()>;
4038
4039 fn error_context_debug_message(
4041 &mut self,
4042 instance: Instance,
4043 ty: TypeComponentLocalErrorContextTableIndex,
4044 options: OptionsIndex,
4045 err_ctx_handle: u32,
4046 debug_msg_address: u32,
4047 ) -> Result<()>;
4048
4049 fn thread_new_indirect(
4051 &mut self,
4052 instance: Instance,
4053 caller: RuntimeComponentInstanceIndex,
4054 func_ty_idx: TypeFuncIndex,
4055 start_func_table_idx: RuntimeTableIndex,
4056 start_func_idx: u32,
4057 context: i32,
4058 ) -> Result<u32>;
4059}
4060
4061impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4063 unsafe fn prepare_call(
4064 &mut self,
4065 instance: Instance,
4066 memory: *mut VMMemoryDefinition,
4067 start: NonNull<VMFuncRef>,
4068 return_: NonNull<VMFuncRef>,
4069 caller_instance: RuntimeComponentInstanceIndex,
4070 callee_instance: RuntimeComponentInstanceIndex,
4071 task_return_type: TypeTupleIndex,
4072 callee_async: bool,
4073 string_encoding: StringEncoding,
4074 result_count_or_max_if_async: u32,
4075 storage: *mut ValRaw,
4076 storage_len: usize,
4077 ) -> Result<()> {
4078 let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4082
4083 unsafe {
4084 instance.prepare_call(
4085 StoreContextMut(self),
4086 start,
4087 return_,
4088 caller_instance,
4089 callee_instance,
4090 task_return_type,
4091 callee_async,
4092 memory,
4093 string_encoding,
4094 match result_count_or_max_if_async {
4095 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4096 params,
4097 has_result: false,
4098 },
4099 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4100 params,
4101 has_result: true,
4102 },
4103 result_count => CallerInfo::Sync {
4104 params,
4105 result_count,
4106 },
4107 },
4108 )
4109 }
4110 }
4111
4112 unsafe fn sync_start(
4113 &mut self,
4114 instance: Instance,
4115 callback: *mut VMFuncRef,
4116 callee: NonNull<VMFuncRef>,
4117 param_count: u32,
4118 storage: *mut MaybeUninit<ValRaw>,
4119 storage_len: usize,
4120 ) -> Result<()> {
4121 unsafe {
4122 instance
4123 .start_call(
4124 StoreContextMut(self),
4125 callback,
4126 ptr::null_mut(),
4127 callee,
4128 param_count,
4129 1,
4130 START_FLAG_ASYNC_CALLEE,
4131 Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4135 )
4136 .map(drop)
4137 }
4138 }
4139
4140 unsafe fn async_start(
4141 &mut self,
4142 instance: Instance,
4143 callback: *mut VMFuncRef,
4144 post_return: *mut VMFuncRef,
4145 callee: NonNull<VMFuncRef>,
4146 param_count: u32,
4147 result_count: u32,
4148 flags: u32,
4149 ) -> Result<u32> {
4150 unsafe {
4151 instance.start_call(
4152 StoreContextMut(self),
4153 callback,
4154 post_return,
4155 callee,
4156 param_count,
4157 result_count,
4158 flags,
4159 None,
4160 )
4161 }
4162 }
4163
4164 fn future_write(
4165 &mut self,
4166 instance: Instance,
4167 caller: RuntimeComponentInstanceIndex,
4168 ty: TypeFutureTableIndex,
4169 options: OptionsIndex,
4170 future: u32,
4171 address: u32,
4172 ) -> Result<u32> {
4173 instance
4174 .guest_write(
4175 StoreContextMut(self),
4176 caller,
4177 TransmitIndex::Future(ty),
4178 options,
4179 None,
4180 future,
4181 address,
4182 1,
4183 )
4184 .map(|result| result.encode())
4185 }
4186
4187 fn future_read(
4188 &mut self,
4189 instance: Instance,
4190 caller: RuntimeComponentInstanceIndex,
4191 ty: TypeFutureTableIndex,
4192 options: OptionsIndex,
4193 future: u32,
4194 address: u32,
4195 ) -> Result<u32> {
4196 instance
4197 .guest_read(
4198 StoreContextMut(self),
4199 caller,
4200 TransmitIndex::Future(ty),
4201 options,
4202 None,
4203 future,
4204 address,
4205 1,
4206 )
4207 .map(|result| result.encode())
4208 }
4209
4210 fn stream_write(
4211 &mut self,
4212 instance: Instance,
4213 caller: RuntimeComponentInstanceIndex,
4214 ty: TypeStreamTableIndex,
4215 options: OptionsIndex,
4216 stream: u32,
4217 address: u32,
4218 count: u32,
4219 ) -> Result<u32> {
4220 instance
4221 .guest_write(
4222 StoreContextMut(self),
4223 caller,
4224 TransmitIndex::Stream(ty),
4225 options,
4226 None,
4227 stream,
4228 address,
4229 count,
4230 )
4231 .map(|result| result.encode())
4232 }
4233
4234 fn stream_read(
4235 &mut self,
4236 instance: Instance,
4237 caller: RuntimeComponentInstanceIndex,
4238 ty: TypeStreamTableIndex,
4239 options: OptionsIndex,
4240 stream: u32,
4241 address: u32,
4242 count: u32,
4243 ) -> Result<u32> {
4244 instance
4245 .guest_read(
4246 StoreContextMut(self),
4247 caller,
4248 TransmitIndex::Stream(ty),
4249 options,
4250 None,
4251 stream,
4252 address,
4253 count,
4254 )
4255 .map(|result| result.encode())
4256 }
4257
4258 fn future_drop_writable(
4259 &mut self,
4260 instance: Instance,
4261 ty: TypeFutureTableIndex,
4262 writer: u32,
4263 ) -> Result<()> {
4264 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4265 }
4266
4267 fn flat_stream_write(
4268 &mut self,
4269 instance: Instance,
4270 caller: RuntimeComponentInstanceIndex,
4271 ty: TypeStreamTableIndex,
4272 options: OptionsIndex,
4273 payload_size: u32,
4274 payload_align: u32,
4275 stream: u32,
4276 address: u32,
4277 count: u32,
4278 ) -> Result<u32> {
4279 instance
4280 .guest_write(
4281 StoreContextMut(self),
4282 caller,
4283 TransmitIndex::Stream(ty),
4284 options,
4285 Some(FlatAbi {
4286 size: payload_size,
4287 align: payload_align,
4288 }),
4289 stream,
4290 address,
4291 count,
4292 )
4293 .map(|result| result.encode())
4294 }
4295
4296 fn flat_stream_read(
4297 &mut self,
4298 instance: Instance,
4299 caller: RuntimeComponentInstanceIndex,
4300 ty: TypeStreamTableIndex,
4301 options: OptionsIndex,
4302 payload_size: u32,
4303 payload_align: u32,
4304 stream: u32,
4305 address: u32,
4306 count: u32,
4307 ) -> Result<u32> {
4308 instance
4309 .guest_read(
4310 StoreContextMut(self),
4311 caller,
4312 TransmitIndex::Stream(ty),
4313 options,
4314 Some(FlatAbi {
4315 size: payload_size,
4316 align: payload_align,
4317 }),
4318 stream,
4319 address,
4320 count,
4321 )
4322 .map(|result| result.encode())
4323 }
4324
4325 fn stream_drop_writable(
4326 &mut self,
4327 instance: Instance,
4328 ty: TypeStreamTableIndex,
4329 writer: u32,
4330 ) -> Result<()> {
4331 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4332 }
4333
4334 fn error_context_debug_message(
4335 &mut self,
4336 instance: Instance,
4337 ty: TypeComponentLocalErrorContextTableIndex,
4338 options: OptionsIndex,
4339 err_ctx_handle: u32,
4340 debug_msg_address: u32,
4341 ) -> Result<()> {
4342 instance.error_context_debug_message(
4343 StoreContextMut(self),
4344 ty,
4345 options,
4346 err_ctx_handle,
4347 debug_msg_address,
4348 )
4349 }
4350
4351 fn thread_new_indirect(
4352 &mut self,
4353 instance: Instance,
4354 caller: RuntimeComponentInstanceIndex,
4355 func_ty_idx: TypeFuncIndex,
4356 start_func_table_idx: RuntimeTableIndex,
4357 start_func_idx: u32,
4358 context: i32,
4359 ) -> Result<u32> {
4360 instance.thread_new_indirect(
4361 StoreContextMut(self),
4362 caller,
4363 func_ty_idx,
4364 start_func_table_idx,
4365 start_func_idx,
4366 context,
4367 )
4368 }
4369}
4370
4371type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4372
4373pub(crate) struct HostTask {
4377 common: WaitableCommon,
4378
4379 caller: QualifiedThreadId,
4381
4382 call_context: CallContext,
4385
4386 state: HostTaskState,
4387}
4388
4389enum HostTaskState {
4390 CalleeStarted,
4395
4396 CalleeRunning(JoinHandle),
4401
4402 CalleeFinished(LiftedResult),
4406
4407 CalleeDone { cancelled: bool },
4410}
4411
4412impl HostTask {
4413 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4414 Self {
4415 common: WaitableCommon::default(),
4416 call_context: CallContext::default(),
4417 caller,
4418 state,
4419 }
4420 }
4421}
4422
4423impl TableDebug for HostTask {
4424 fn type_name() -> &'static str {
4425 "HostTask"
4426 }
4427}
4428
4429type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4430
4431enum Caller {
4433 Host {
4435 tx: Option<oneshot::Sender<LiftedResult>>,
4437 host_future_present: bool,
4440 caller: CurrentThread,
4444 },
4445 Guest {
4447 thread: QualifiedThreadId,
4449 },
4450}
4451
4452struct LiftResult {
4455 lift: RawLift,
4456 ty: TypeTupleIndex,
4457 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4458 string_encoding: StringEncoding,
4459}
4460
4461#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4466pub(crate) struct QualifiedThreadId {
4467 task: TableId<GuestTask>,
4468 thread: TableId<GuestThread>,
4469}
4470
4471impl QualifiedThreadId {
4472 fn qualify(
4473 state: &mut ConcurrentState,
4474 thread: TableId<GuestThread>,
4475 ) -> Result<QualifiedThreadId> {
4476 Ok(QualifiedThreadId {
4477 task: state.get_mut(thread)?.parent_task,
4478 thread,
4479 })
4480 }
4481}
4482
4483impl fmt::Debug for QualifiedThreadId {
4484 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4485 f.debug_tuple("QualifiedThreadId")
4486 .field(&self.task.rep())
4487 .field(&self.thread.rep())
4488 .finish()
4489 }
4490}
4491
4492enum GuestThreadState {
4493 NotStartedImplicit,
4494 NotStartedExplicit(
4495 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4496 ),
4497 Running,
4498 Suspended(StoreFiber<'static>),
4499 Ready {
4500 fiber: StoreFiber<'static>,
4501 cancellable: bool,
4502 },
4503 Completed,
4504}
4505pub struct GuestThread {
4506 context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4509 parent_task: TableId<GuestTask>,
4511 wake_on_cancel: Option<TableId<WaitableSet>>,
4514 state: GuestThreadState,
4516 instance_rep: Option<u32>,
4519 sync_call_set: TableId<WaitableSet>,
4521}
4522
4523impl GuestThread {
4524 fn from_instance(
4527 state: Pin<&mut ComponentInstance>,
4528 caller_instance: RuntimeComponentInstanceIndex,
4529 guest_thread: u32,
4530 ) -> Result<TableId<Self>> {
4531 let rep = state.instance_states().0[caller_instance]
4532 .thread_handle_table()
4533 .guest_thread_rep(guest_thread)?;
4534 Ok(TableId::new(rep))
4535 }
4536
4537 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4538 let sync_call_set = state.push(WaitableSet {
4539 is_sync_call_set: true,
4540 ..WaitableSet::default()
4541 })?;
4542 Ok(Self {
4543 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4544 parent_task,
4545 wake_on_cancel: None,
4546 state: GuestThreadState::NotStartedImplicit,
4547 instance_rep: None,
4548 sync_call_set,
4549 })
4550 }
4551
4552 fn new_explicit(
4553 state: &mut ConcurrentState,
4554 parent_task: TableId<GuestTask>,
4555 start_func: Box<
4556 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4557 >,
4558 ) -> Result<Self> {
4559 let sync_call_set = state.push(WaitableSet {
4560 is_sync_call_set: true,
4561 ..WaitableSet::default()
4562 })?;
4563 Ok(Self {
4564 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4565 parent_task,
4566 wake_on_cancel: None,
4567 state: GuestThreadState::NotStartedExplicit(start_func),
4568 instance_rep: None,
4569 sync_call_set,
4570 })
4571 }
4572}
4573
4574impl TableDebug for GuestThread {
4575 fn type_name() -> &'static str {
4576 "GuestThread"
4577 }
4578}
4579
4580enum SyncResult {
4581 NotProduced,
4582 Produced(Option<ValRaw>),
4583 Taken,
4584}
4585
4586impl SyncResult {
4587 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4588 Ok(match mem::replace(self, SyncResult::Taken) {
4589 SyncResult::NotProduced => None,
4590 SyncResult::Produced(val) => Some(val),
4591 SyncResult::Taken => {
4592 bail_bug!("attempted to take a synchronous result that was already taken")
4593 }
4594 })
4595 }
4596}
4597
4598#[derive(Debug)]
4599enum HostFutureState {
4600 NotApplicable,
4601 Live,
4602 Dropped,
4603}
4604
4605pub(crate) struct GuestTask {
4607 common: WaitableCommon,
4609 lower_params: Option<RawLower>,
4611 lift_result: Option<LiftResult>,
4613 result: Option<LiftedResult>,
4616 callback: Option<CallbackFn>,
4619 caller: Caller,
4621 call_context: CallContext,
4626 sync_result: SyncResult,
4629 cancel_sent: bool,
4632 starting_sent: bool,
4635 instance: RuntimeInstance,
4642 event: Option<Event>,
4645 exited: bool,
4647 threads: HashSet<TableId<GuestThread>>,
4649 host_future_state: HostFutureState,
4652 async_function: bool,
4655
4656 decremented_interesting_task_count: bool,
4657}
4658
4659impl GuestTask {
4660 fn already_lowered_parameters(&self) -> bool {
4661 self.lower_params.is_none()
4663 }
4664
4665 fn returned_or_cancelled(&self) -> bool {
4666 self.lift_result.is_none()
4668 }
4669
4670 fn ready_to_delete(&self) -> bool {
4671 let threads_completed = self.threads.is_empty();
4672 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4673 let pending_completion_event = matches!(
4674 self.common.event,
4675 Some(Event::Subtask {
4676 status: Status::Returned | Status::ReturnCancelled
4677 })
4678 );
4679 let ready = threads_completed
4680 && !has_sync_result
4681 && !pending_completion_event
4682 && !matches!(self.host_future_state, HostFutureState::Live);
4683 log::trace!(
4684 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4685 threads_completed,
4686 has_sync_result,
4687 pending_completion_event,
4688 self.host_future_state
4689 );
4690 ready
4691 }
4692
4693 fn new(
4694 state: &mut ConcurrentState,
4695 lower_params: RawLower,
4696 lift_result: LiftResult,
4697 caller: Caller,
4698 callback: Option<CallbackFn>,
4699 instance: RuntimeInstance,
4700 async_function: bool,
4701 ) -> Result<QualifiedThreadId> {
4702 let host_future_state = match &caller {
4703 Caller::Guest { .. } => HostFutureState::NotApplicable,
4704 Caller::Host {
4705 host_future_present,
4706 ..
4707 } => {
4708 if *host_future_present {
4709 HostFutureState::Live
4710 } else {
4711 HostFutureState::NotApplicable
4712 }
4713 }
4714 };
4715 let task = state.push(Self {
4716 common: WaitableCommon::default(),
4717 lower_params: Some(lower_params),
4718 lift_result: Some(lift_result),
4719 result: None,
4720 callback,
4721 caller,
4722 call_context: CallContext::default(),
4723 sync_result: SyncResult::NotProduced,
4724 cancel_sent: false,
4725 starting_sent: false,
4726 instance,
4727 event: None,
4728 exited: false,
4729 threads: HashSet::new(),
4730 host_future_state,
4731 async_function,
4732 decremented_interesting_task_count: false,
4733 })?;
4734 let new_thread = GuestThread::new_implicit(state, task)?;
4735 let thread = state.push(new_thread)?;
4736 state.get_mut(task)?.threads.insert(thread);
4737 state.interesting_tasks += 1;
4738 Ok(QualifiedThreadId { task, thread })
4739 }
4740}
4741
4742impl TableDebug for GuestTask {
4743 fn type_name() -> &'static str {
4744 "GuestTask"
4745 }
4746}
4747
4748#[derive(Default)]
4750struct WaitableCommon {
4751 event: Option<Event>,
4753 set: Option<TableId<WaitableSet>>,
4755 handle: Option<u32>,
4757}
4758
4759#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4761enum Waitable {
4762 Host(TableId<HostTask>),
4764 Guest(TableId<GuestTask>),
4766 Transmit(TableId<TransmitHandle>),
4768}
4769
4770impl Waitable {
4771 fn from_instance(
4774 state: Pin<&mut ComponentInstance>,
4775 caller_instance: RuntimeComponentInstanceIndex,
4776 waitable: u32,
4777 ) -> Result<Self> {
4778 use crate::runtime::vm::component::Waitable;
4779
4780 let (waitable, kind) = state.instance_states().0[caller_instance]
4781 .handle_table()
4782 .waitable_rep(waitable)?;
4783
4784 Ok(match kind {
4785 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4786 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4787 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4788 })
4789 }
4790
4791 fn rep(&self) -> u32 {
4793 match self {
4794 Self::Host(id) => id.rep(),
4795 Self::Guest(id) => id.rep(),
4796 Self::Transmit(id) => id.rep(),
4797 }
4798 }
4799
4800 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4804 log::trace!("waitable {self:?} join set {set:?}");
4805
4806 let old = mem::replace(&mut self.common(state)?.set, set);
4807
4808 if let Some(old) = old {
4809 match *self {
4810 Waitable::Host(id) => state.remove_child(id, old),
4811 Waitable::Guest(id) => state.remove_child(id, old),
4812 Waitable::Transmit(id) => state.remove_child(id, old),
4813 }?;
4814
4815 state.get_mut(old)?.ready.remove(self);
4816 }
4817
4818 if let Some(set) = set {
4819 match *self {
4820 Waitable::Host(id) => state.add_child(id, set),
4821 Waitable::Guest(id) => state.add_child(id, set),
4822 Waitable::Transmit(id) => state.add_child(id, set),
4823 }?;
4824
4825 if self.common(state)?.event.is_some() {
4826 self.mark_ready(state)?;
4827 }
4828 }
4829
4830 Ok(())
4831 }
4832
4833 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4835 Ok(match self {
4836 Self::Host(id) => &mut state.get_mut(*id)?.common,
4837 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4838 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4839 })
4840 }
4841
4842 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4846 log::trace!("set event for {self:?}: {event:?}");
4847 self.common(state)?.event = event;
4848 self.mark_ready(state)
4849 }
4850
4851 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4853 let common = self.common(state)?;
4854 let event = common.event.take();
4855 if let Some(set) = self.common(state)?.set {
4856 state.get_mut(set)?.ready.remove(self);
4857 }
4858
4859 Ok(event)
4860 }
4861
4862 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4866 if let Some(set) = self.common(state)?.set {
4867 state.get_mut(set)?.ready.insert(*self);
4868 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4869 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4870 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4871
4872 let item = match mode {
4873 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4874 WaitMode::Callback(instance) => WorkItem::GuestCall(
4875 state.get_mut(thread.task)?.instance.index,
4876 GuestCall {
4877 thread,
4878 kind: GuestCallKind::DeliverEvent {
4879 instance,
4880 set: Some(set),
4881 },
4882 },
4883 ),
4884 };
4885 state.push_high_priority(item);
4886 }
4887 }
4888 Ok(())
4889 }
4890
4891 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4893 match self {
4894 Self::Host(task) => {
4895 log::trace!("delete host task {task:?}");
4896 state.delete(*task)?;
4897 }
4898 Self::Guest(task) => {
4899 log::trace!("delete guest task {task:?}");
4900 let task = state.delete(*task)?;
4901
4902 debug_assert!(task.decremented_interesting_task_count);
4909 }
4910 Self::Transmit(task) => {
4911 state.delete(*task)?;
4912 }
4913 }
4914
4915 Ok(())
4916 }
4917}
4918
4919impl fmt::Debug for Waitable {
4920 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4921 match self {
4922 Self::Host(id) => write!(f, "{id:?}"),
4923 Self::Guest(id) => write!(f, "{id:?}"),
4924 Self::Transmit(id) => write!(f, "{id:?}"),
4925 }
4926 }
4927}
4928
4929#[derive(Default)]
4931struct WaitableSet {
4932 ready: BTreeSet<Waitable>,
4934 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4936 is_sync_call_set: bool,
4939}
4940
4941impl TableDebug for WaitableSet {
4942 fn type_name() -> &'static str {
4943 "WaitableSet"
4944 }
4945}
4946
4947type RawLower =
4949 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4950
4951type RawLift = Box<
4953 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4954>;
4955
4956type LiftedResult = Box<dyn Any + Send + Sync>;
4960
4961struct DummyResult;
4964
4965#[derive(Default)]
4967pub struct ConcurrentInstanceState {
4968 backpressure: u16,
4970 do_not_enter: bool,
4972 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4975}
4976
4977impl ConcurrentInstanceState {
4978 pub fn pending_is_empty(&self) -> bool {
4979 self.pending.is_empty()
4980 }
4981}
4982
4983#[derive(Debug, Copy, Clone)]
4984pub(crate) enum CurrentThread {
4985 Guest(QualifiedThreadId),
4986 Host(TableId<HostTask>),
4987 None,
4988}
4989
4990impl CurrentThread {
4991 fn guest(&self) -> Option<&QualifiedThreadId> {
4992 match self {
4993 Self::Guest(id) => Some(id),
4994 _ => None,
4995 }
4996 }
4997
4998 fn host(&self) -> Option<TableId<HostTask>> {
4999 match self {
5000 Self::Host(id) => Some(*id),
5001 _ => None,
5002 }
5003 }
5004
5005 fn is_none(&self) -> bool {
5006 matches!(self, Self::None)
5007 }
5008}
5009
5010impl From<QualifiedThreadId> for CurrentThread {
5011 fn from(id: QualifiedThreadId) -> Self {
5012 Self::Guest(id)
5013 }
5014}
5015
5016impl From<TableId<HostTask>> for CurrentThread {
5017 fn from(id: TableId<HostTask>) -> Self {
5018 Self::Host(id)
5019 }
5020}
5021
5022pub struct ConcurrentState {
5024 current_thread: CurrentThread,
5026
5027 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5032 table: AlwaysMut<ResourceTable>,
5034 high_priority: Vec<WorkItem>,
5036 low_priority: VecDeque<WorkItem>,
5038 suspend_reason: Option<SuspendReason>,
5042 worker: Option<StoreFiber<'static>>,
5046 worker_item: Option<WorkerItem>,
5048
5049 global_error_context_ref_counts:
5062 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5063
5064 interesting_tasks: usize,
5077
5078 interesting_tasks_empty_waker: Option<Waker>,
5082}
5083
5084impl Default for ConcurrentState {
5085 fn default() -> Self {
5086 Self {
5087 current_thread: CurrentThread::None,
5088 table: AlwaysMut::new(ResourceTable::new()),
5089 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5090 high_priority: Vec::new(),
5091 low_priority: VecDeque::new(),
5092 suspend_reason: None,
5093 worker: None,
5094 worker_item: None,
5095 global_error_context_ref_counts: BTreeMap::new(),
5096 interesting_tasks: 0,
5097 interesting_tasks_empty_waker: None,
5098 }
5099 }
5100}
5101
5102impl ConcurrentState {
5103 pub(crate) fn take_fibers_and_futures(
5120 &mut self,
5121 fibers: &mut Vec<StoreFiber<'static>>,
5122 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5123 ) {
5124 for entry in self.table.get_mut().iter_mut() {
5125 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5126 for mode in mem::take(&mut set.waiting).into_values() {
5127 if let WaitMode::Fiber(fiber) = mode {
5128 fibers.push(fiber);
5129 }
5130 }
5131 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5132 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5133 mem::replace(&mut thread.state, GuestThreadState::Completed)
5134 {
5135 fibers.push(fiber);
5136 }
5137 }
5138 }
5139
5140 if let Some(fiber) = self.worker.take() {
5141 fibers.push(fiber);
5142 }
5143
5144 let mut handle_item = |item| match item {
5145 WorkItem::ResumeFiber(fiber) => {
5146 fibers.push(fiber);
5147 }
5148 WorkItem::PushFuture(future) => {
5149 self.futures
5150 .get_mut()
5151 .as_mut()
5152 .unwrap()
5153 .push(future.into_inner());
5154 }
5155 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5156 }
5157 };
5158
5159 for item in mem::take(&mut self.high_priority) {
5160 handle_item(item);
5161 }
5162 for item in mem::take(&mut self.low_priority) {
5163 handle_item(item);
5164 }
5165
5166 if let Some(them) = self.futures.get_mut().take() {
5167 futures.push(them);
5168 }
5169 }
5170
5171 fn push<V: Send + Sync + 'static>(
5172 &mut self,
5173 value: V,
5174 ) -> Result<TableId<V>, ResourceTableError> {
5175 self.table.get_mut().push(value).map(TableId::from)
5176 }
5177
5178 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5179 self.table.get_mut().get_mut(&Resource::from(id))
5180 }
5181
5182 pub fn add_child<T: 'static, U: 'static>(
5183 &mut self,
5184 child: TableId<T>,
5185 parent: TableId<U>,
5186 ) -> Result<(), ResourceTableError> {
5187 self.table
5188 .get_mut()
5189 .add_child(Resource::from(child), Resource::from(parent))
5190 }
5191
5192 pub fn remove_child<T: 'static, U: 'static>(
5193 &mut self,
5194 child: TableId<T>,
5195 parent: TableId<U>,
5196 ) -> Result<(), ResourceTableError> {
5197 self.table
5198 .get_mut()
5199 .remove_child(Resource::from(child), Resource::from(parent))
5200 }
5201
5202 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5203 self.table.get_mut().delete(Resource::from(id))
5204 }
5205
5206 fn push_future(&mut self, future: HostTaskFuture) {
5207 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5214 }
5215
5216 fn push_high_priority(&mut self, item: WorkItem) {
5217 log::trace!("push high priority: {item:?}");
5218 self.high_priority.push(item);
5219 }
5220
5221 fn push_low_priority(&mut self, item: WorkItem) {
5222 log::trace!("push low priority: {item:?}");
5223 self.low_priority.push_front(item);
5224 }
5225
5226 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5227 if high_priority {
5228 self.push_high_priority(item);
5229 } else {
5230 self.push_low_priority(item);
5231 }
5232 }
5233
5234 fn promote_instance_local_thread_work_item(
5235 &mut self,
5236 current_instance: RuntimeComponentInstanceIndex,
5237 ) -> bool {
5238 self.promote_work_items_matching(|item: &WorkItem| match item {
5239 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5240 *instance == current_instance
5241 }
5242 _ => false,
5243 })
5244 }
5245
5246 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5247 self.promote_work_items_matching(|item: &WorkItem| match item {
5248 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5249 *t == thread
5250 }
5251 _ => false,
5252 })
5253 }
5254
5255 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5256 where
5257 F: FnMut(&WorkItem) -> bool,
5258 {
5259 if self.high_priority.iter().any(&mut predicate) {
5263 true
5264 }
5265 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5268 let item = self.low_priority.remove(idx).unwrap();
5269 self.push_high_priority(item);
5270 true
5271 } else {
5272 false
5273 }
5274 }
5275
5276 fn take_pending_cancellation(&mut self) -> Result<bool> {
5279 let thread = self.current_guest_thread()?;
5280 if let Some(event) = self.get_mut(thread.task)?.event.take() {
5281 assert!(matches!(event, Event::Cancelled));
5282 Ok(true)
5283 } else {
5284 Ok(false)
5285 }
5286 }
5287
5288 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5289 if self.may_block(task)? {
5290 Ok(())
5291 } else {
5292 Err(Trap::CannotBlockSyncTask.into())
5293 }
5294 }
5295
5296 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5297 let task = self.get_mut(task)?;
5298 Ok(task.async_function || task.returned_or_cancelled())
5299 }
5300
5301 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5307 let (task, is_host) = (task >> 1, task & 1 == 1);
5308 if is_host {
5309 let task: TableId<HostTask> = TableId::new(task);
5310 Ok(&mut self.get_mut(task)?.call_context)
5311 } else {
5312 let task: TableId<GuestTask> = TableId::new(task);
5313 Ok(&mut self.get_mut(task)?.call_context)
5314 }
5315 }
5316
5317 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5320 let (bits, is_host) = match self.current_thread {
5321 CurrentThread::Guest(id) => (id.task.rep(), false),
5322 CurrentThread::Host(id) => (id.rep(), true),
5323 CurrentThread::None => bail_bug!("current thread is not set"),
5324 };
5325 assert_eq!((bits << 1) >> 1, bits);
5326 Ok((bits << 1) | u32::from(is_host))
5327 }
5328
5329 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5330 match self.current_thread.guest() {
5331 Some(id) => Ok(*id),
5332 None => bail_bug!("current thread is not a guest thread"),
5333 }
5334 }
5335
5336 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5337 match self.current_thread.host() {
5338 Some(id) => Ok(id),
5339 None => bail_bug!("current thread is not a host thread"),
5340 }
5341 }
5342
5343 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5344 match self.futures.get_mut().as_mut() {
5345 Some(f) => Ok(f),
5346 None => bail_bug!("futures field of concurrent state is currently taken"),
5347 }
5348 }
5349
5350 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5351 self.table.get_mut()
5352 }
5353}
5354
5355fn for_any_lower<
5358 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5359>(
5360 fun: F,
5361) -> F {
5362 fun
5363}
5364
5365fn for_any_lift<
5367 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5368>(
5369 fun: F,
5370) -> F {
5371 fun
5372}
5373
5374fn checked<F: Future + Send + 'static>(
5379 id: StoreId,
5380 fut: F,
5381) -> impl Future<Output = F::Output> + Send + 'static {
5382 async move {
5383 let mut fut = pin!(fut);
5384 future::poll_fn(move |cx| {
5385 let message = "\
5386 `Future`s which depend on asynchronous component tasks, streams, or \
5387 futures to complete may only be polled from the event loop of the \
5388 store to which they belong. Please use \
5389 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5390 ";
5391 tls::try_get(|store| {
5392 let matched = match store {
5393 tls::TryGet::Some(store) => store.id() == id,
5394 tls::TryGet::Taken | tls::TryGet::None => false,
5395 };
5396
5397 if !matched {
5398 panic!("{message}")
5399 }
5400 });
5401 fut.as_mut().poll(cx)
5402 })
5403 .await
5404 }
5405}
5406
5407fn check_recursive_run() {
5410 tls::try_get(|store| {
5411 if !matches!(store, tls::TryGet::None) {
5412 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5413 }
5414 });
5415}
5416
5417fn unpack_callback_code(code: u32) -> (u32, u32) {
5418 (code & 0xF, code >> 4)
5419}
5420
5421struct WaitableCheckParams {
5425 set: TableId<WaitableSet>,
5426 options: OptionsIndex,
5427 payload: u32,
5428}
5429
5430enum WaitableCheck {
5433 Wait,
5434 Poll,
5435}
5436
5437pub(crate) struct PreparedCall<R> {
5439 handle: Func,
5441 thread: QualifiedThreadId,
5443 param_count: usize,
5445 rx: oneshot::Receiver<LiftedResult>,
5448 runtime_instance: RuntimeInstance,
5450 _phantom: PhantomData<R>,
5451}
5452
5453impl<R> PreparedCall<R> {
5454 pub(crate) fn task_id(&self) -> TaskId {
5456 TaskId {
5457 task: self.thread.task,
5458 runtime_instance: self.runtime_instance,
5459 }
5460 }
5461}
5462
5463pub(crate) struct TaskId {
5465 task: TableId<GuestTask>,
5466 runtime_instance: RuntimeInstance,
5467}
5468
5469impl TaskId {
5470 pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5476 let task = store.concurrent_state_mut().get_mut(self.task)?;
5477 let delete = if !task.already_lowered_parameters() {
5478 store.cancel_guest_subtask_without_lowered_parameters(
5479 self.runtime_instance,
5480 self.task,
5481 )?;
5482 true
5483 } else {
5484 task.host_future_state = HostFutureState::Dropped;
5485 task.ready_to_delete()
5486 };
5487 if delete {
5488 Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5489 }
5490 Ok(())
5491 }
5492}
5493
5494pub(crate) fn prepare_call<T, R>(
5500 mut store: StoreContextMut<T>,
5501 handle: Func,
5502 param_count: usize,
5503 host_future_present: bool,
5504 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5505 + Send
5506 + Sync
5507 + 'static,
5508 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5509 + Send
5510 + Sync
5511 + 'static,
5512) -> Result<PreparedCall<R>> {
5513 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5514
5515 let instance = handle.instance().id().get(store.0);
5516 let options = &instance.component().env_component().options[options];
5517 let ty = &instance.component().types()[ty];
5518 let async_function = ty.async_;
5519 let task_return_type = ty.results;
5520 let component_instance = raw_options.instance;
5521 let callback = options.callback.map(|i| instance.runtime_callback(i));
5522 let memory = options
5523 .memory()
5524 .map(|i| instance.runtime_memory(i))
5525 .map(SendSyncPtr::new);
5526 let string_encoding = options.string_encoding;
5527 let token = StoreToken::new(store.as_context_mut());
5528 let state = store.0.concurrent_state_mut();
5529
5530 let (tx, rx) = oneshot::channel();
5531
5532 let instance = handle.instance().runtime_instance(component_instance);
5533 let caller = state.current_thread;
5534 let thread = GuestTask::new(
5535 state,
5536 Box::new(for_any_lower(move |store, params| {
5537 lower_params(handle, token.as_context_mut(store), params)
5538 })),
5539 LiftResult {
5540 lift: Box::new(for_any_lift(move |store, result| {
5541 lift_result(handle, store, result)
5542 })),
5543 ty: task_return_type,
5544 memory,
5545 string_encoding,
5546 },
5547 Caller::Host {
5548 tx: Some(tx),
5549 host_future_present,
5550 caller,
5551 },
5552 callback.map(|callback| {
5553 let callback = SendSyncPtr::new(callback);
5554 let instance = handle.instance();
5555 Box::new(move |store: &mut dyn VMStore, event, handle| {
5556 let store = token.as_context_mut(store);
5557 unsafe { instance.call_callback(store, callback, event, handle) }
5560 }) as CallbackFn
5561 }),
5562 instance,
5563 async_function,
5564 )?;
5565
5566 if !store.0.may_enter(instance)? {
5567 bail!(Trap::CannotEnterComponent);
5568 }
5569
5570 Ok(PreparedCall {
5571 handle,
5572 thread,
5573 param_count,
5574 runtime_instance: instance,
5575 rx,
5576 _phantom: PhantomData,
5577 })
5578}
5579
5580pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5587 mut store: StoreContextMut<T>,
5588 prepared: PreparedCall<R>,
5589) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5590 let PreparedCall {
5591 handle,
5592 thread,
5593 param_count,
5594 rx,
5595 ..
5596 } = prepared;
5597
5598 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5599
5600 Ok(checked(
5601 store.0.id(),
5602 rx.map(move |result| match result {
5603 Ok(r) => match r.downcast() {
5604 Ok(r) => Ok(*r),
5605 Err(_) => bail_bug!("wrong type of value produced"),
5606 },
5607 Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5608 }),
5609 ))
5610}
5611
5612fn queue_call0<T: 'static>(
5615 store: StoreContextMut<T>,
5616 handle: Func,
5617 guest_thread: QualifiedThreadId,
5618 param_count: usize,
5619) -> Result<()> {
5620 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5621 let is_concurrent = raw_options.async_;
5622 let callback = raw_options.callback;
5623 let instance = handle.instance();
5624 let callee = handle.lifted_core_func(store.0);
5625 let post_return = handle.post_return_core_func(store.0);
5626 let callback = callback.map(|i| {
5627 let instance = instance.id().get(store.0);
5628 SendSyncPtr::new(instance.runtime_callback(i))
5629 });
5630
5631 log::trace!("queueing call {guest_thread:?}");
5632
5633 unsafe {
5637 instance.queue_call(
5638 store,
5639 guest_thread,
5640 SendSyncPtr::new(callee),
5641 param_count,
5642 1,
5643 is_concurrent,
5644 callback,
5645 post_return.map(SendSyncPtr::new),
5646 )
5647 }
5648}