1use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{Func, call_post_return};
56use crate::component::{
57 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61#[cfg(feature = "gc")]
62use crate::module::ModuleRegistry;
63use crate::prelude::*;
64use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
65#[cfg(feature = "gc")]
66use crate::vm::GcRootsList;
67use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
68use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
69use crate::{
70 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
71};
72use alloc::borrow::ToOwned;
73use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
74use core::any::Any;
75use core::cell::UnsafeCell;
76use core::fmt;
77use core::future;
78use core::future::Future;
79use core::marker::PhantomData;
80use core::mem::{self, ManuallyDrop, MaybeUninit};
81use core::ops::DerefMut;
82use core::pin::{Pin, pin};
83use core::ptr::{self, NonNull};
84use core::task::{Context, Poll, Waker};
85use futures::channel::oneshot;
86use futures::stream::{FuturesUnordered, StreamExt};
87use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
88use table::{TableDebug, TableId};
89use wasmtime_environ::component::{
90 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
91 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96use wasmtime_environ::packed_option::ReservedValue;
97use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
98#[cfg(feature = "gc")]
99use wasmtime_unwinder::Unwind;
100
101pub use abort::JoinHandle;
102pub use func::{FuncCallConcurrent, TypedFuncCallConcurrent};
103pub use future_stream_any::{FutureAny, StreamAny};
104pub use futures_and_streams::{
105 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
106 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
107 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
108};
109pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
110
111mod abort;
112mod error_contexts;
113mod func;
114mod future_stream_any;
115mod futures_and_streams;
116pub(crate) mod table;
117pub(crate) mod tls;
118
119const BLOCKED: u32 = 0xffff_ffff;
122
123#[derive(Clone, Copy, Eq, PartialEq, Debug)]
125pub enum Status {
126 Starting = 0,
127 Started = 1,
128 Returned = 2,
129 StartCancelled = 3,
130 ReturnCancelled = 4,
131}
132
133impl Status {
134 pub fn pack(self, waitable: Option<u32>) -> u32 {
140 assert!(matches!(self, Status::Returned) == waitable.is_none());
141 let waitable = waitable.unwrap_or(0);
142 assert!(waitable < (1 << 28));
143 (waitable << 4) | (self as u32)
144 }
145}
146
147#[derive(Clone, Copy, Debug)]
150enum Event {
151 None,
152 Subtask {
153 status: Status,
154 },
155 StreamRead {
156 code: ReturnCode,
157 pending: Option<(TypeStreamTableIndex, u32)>,
158 },
159 StreamWrite {
160 code: ReturnCode,
161 pending: Option<(TypeStreamTableIndex, u32)>,
162 },
163 FutureRead {
164 code: ReturnCode,
165 pending: Option<(TypeFutureTableIndex, u32)>,
166 },
167 FutureWrite {
168 code: ReturnCode,
169 pending: Option<(TypeFutureTableIndex, u32)>,
170 },
171 Cancelled,
172}
173
174impl Event {
175 fn parts(self) -> (u32, u32) {
180 const EVENT_NONE: u32 = 0;
181 const EVENT_SUBTASK: u32 = 1;
182 const EVENT_STREAM_READ: u32 = 2;
183 const EVENT_STREAM_WRITE: u32 = 3;
184 const EVENT_FUTURE_READ: u32 = 4;
185 const EVENT_FUTURE_WRITE: u32 = 5;
186 const EVENT_CANCELLED: u32 = 6;
187 match self {
188 Event::None => (EVENT_NONE, 0),
189 Event::Cancelled => (EVENT_CANCELLED, 0),
190 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
191 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
192 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
193 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
194 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
195 }
196 }
197}
198
199mod callback_code {
201 pub const EXIT: u32 = 0;
202 pub const YIELD: u32 = 1;
203 pub const WAIT: u32 = 2;
204}
205
206const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
210
211pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
217 store: StoreContextMut<'a, T>,
218 get_data: fn(&mut T) -> D::Data<'_>,
219}
220
221impl<'a, T, D> Access<'a, T, D>
222where
223 D: HasData + ?Sized,
224 T: 'static,
225{
226 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
228 Self { store, get_data }
229 }
230
231 pub fn data_mut(&mut self) -> &mut T {
233 self.store.data_mut()
234 }
235
236 pub fn get(&mut self) -> D::Data<'_> {
238 (self.get_data)(self.data_mut())
239 }
240
241 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
245 where
246 T: 'static,
247 {
248 let accessor = Accessor {
249 get_data: self.get_data,
250 token: StoreToken::new(self.store.as_context_mut()),
251 };
252 self.store
253 .as_context_mut()
254 .spawn_with_accessor(accessor, task)
255 }
256
257 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
260 self.get_data
261 }
262}
263
264impl<'a, T, D> AsContext for Access<'a, T, D>
265where
266 D: HasData + ?Sized,
267 T: 'static,
268{
269 type Data = T;
270
271 fn as_context(&self) -> StoreContext<'_, T> {
272 self.store.as_context()
273 }
274}
275
276impl<'a, T, D> AsContextMut for Access<'a, T, D>
277where
278 D: HasData + ?Sized,
279 T: 'static,
280{
281 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
282 self.store.as_context_mut()
283 }
284}
285
286pub struct Accessor<T: 'static, D = HasSelf<T>>
346where
347 D: HasData + ?Sized,
348{
349 token: StoreToken<T>,
350 get_data: fn(&mut T) -> D::Data<'_>,
351}
352
353pub trait AsAccessor {
370 type Data: 'static;
372
373 type AccessorData: HasData + ?Sized;
376
377 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
379}
380
381impl<T: AsAccessor + ?Sized> AsAccessor for &T {
382 type Data = T::Data;
383 type AccessorData = T::AccessorData;
384
385 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
386 T::as_accessor(self)
387 }
388}
389
390impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
391 type Data = T;
392 type AccessorData = D;
393
394 fn as_accessor(&self) -> &Accessor<T, D> {
395 self
396 }
397}
398
399const _: () = {
422 const fn assert<T: Send + Sync>() {}
423 assert::<Accessor<UnsafeCell<u32>>>();
424};
425
426impl<T> Accessor<T> {
427 pub(crate) fn new(token: StoreToken<T>) -> Self {
436 Self {
437 token,
438 get_data: |x| x,
439 }
440 }
441}
442
443impl<T, D> Accessor<T, D>
444where
445 D: HasData + ?Sized,
446{
447 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
465 tls::get(|vmstore| {
466 fun(Access {
467 store: self.token.as_context_mut(vmstore),
468 get_data: self.get_data,
469 })
470 })
471 }
472
473 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
476 self.get_data
477 }
478
479 pub fn with_getter<D2: HasData>(
496 &self,
497 get_data: fn(&mut T) -> D2::Data<'_>,
498 ) -> Accessor<T, D2> {
499 Accessor {
500 token: self.token,
501 get_data,
502 }
503 }
504
505 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
521 where
522 T: 'static,
523 {
524 let accessor = self.clone_for_spawn();
525 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
526 }
527
528 fn clone_for_spawn(&self) -> Self {
529 Self {
530 token: self.token,
531 get_data: self.get_data,
532 }
533 }
534
535 pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
571 self.with(|mut access| {
572 let store = access.as_context_mut().0;
573 let state = store.concurrent_state_mut();
574 if state.interesting_tasks == 0 {
575 Poll::Ready(())
576 } else {
577 state.interesting_tasks_empty_waker = Some(cx.waker().clone());
578 Poll::Pending
579 }
580 })
581 }
582}
583
584pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
596where
597 D: HasData + ?Sized,
598{
599 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
601}
602
603enum CallerInfo {
606 Async {
608 params: Vec<ValRaw>,
609 has_result: bool,
610 },
611 Sync {
613 params: Vec<ValRaw>,
614 result_count: u32,
615 },
616}
617
618enum WaitMode {
620 Fiber(StoreFiber<'static>),
622 Callback(Instance),
625}
626
627#[derive(Debug)]
629enum SuspendReason {
630 Waiting {
633 set: TableId<WaitableSet>,
634 thread: QualifiedThreadId,
635 skip_may_block_check: bool,
636 },
637 NeedWork,
640 Yielding {
643 thread: QualifiedThreadId,
644 cancellable: bool,
645 skip_may_block_check: bool,
646 },
647 ExplicitlySuspending {
649 thread: QualifiedThreadId,
650 skip_may_block_check: bool,
651 },
652}
653
654enum GuestCallKind {
656 DeliverEvent {
659 instance: Instance,
661 set: Option<TableId<WaitableSet>>,
666 },
667 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
673 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
674}
675
676impl fmt::Debug for GuestCallKind {
677 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
678 match self {
679 Self::DeliverEvent { instance, set } => f
680 .debug_struct("DeliverEvent")
681 .field("instance", instance)
682 .field("set", set)
683 .finish(),
684 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
685 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
686 }
687 }
688}
689
690#[derive(Copy, Clone, Debug)]
692pub enum SuspensionTarget {
693 SomeSuspended(u32),
694 Some(u32),
695 None,
696}
697
698impl SuspensionTarget {
699 fn is_none(&self) -> bool {
700 matches!(self, SuspensionTarget::None)
701 }
702 fn is_some(&self) -> bool {
703 !self.is_none()
704 }
705}
706
707#[derive(Debug)]
709struct GuestCall {
710 thread: QualifiedThreadId,
711 kind: GuestCallKind,
712}
713
714impl GuestCall {
715 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
725 let instance = store
726 .concurrent_state_mut()
727 .get_mut(self.thread.task)?
728 .instance;
729 let state = store.instance_state(instance).concurrent_state();
730
731 let ready = match &self.kind {
732 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
733 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
734 GuestCallKind::StartExplicit(_) => true,
735 };
736 log::trace!(
737 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
738 state.do_not_enter,
739 state.backpressure
740 );
741 Ok(ready)
742 }
743}
744
745enum WorkerItem {
747 GuestCall(GuestCall),
748 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
749}
750
751enum WorkItem {
754 PushFuture(AlwaysMut<HostTaskFuture>),
756 ResumeFiber(StoreFiber<'static>),
758 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
760 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
762 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
764}
765
766impl fmt::Debug for WorkItem {
767 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
768 match self {
769 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
770 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
771 Self::ResumeThread(instance, thread) => f
772 .debug_tuple("ResumeThread")
773 .field(instance)
774 .field(thread)
775 .finish(),
776 Self::GuestCall(instance, call) => f
777 .debug_tuple("GuestCall")
778 .field(instance)
779 .field(call)
780 .finish(),
781 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
782 }
783 }
784}
785
786#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
788pub(crate) enum WaitResult {
789 Cancelled,
790 Completed,
791}
792
793pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
801 store: &mut dyn VMStore,
802 future: impl Future<Output = Result<R>> + Send + 'static,
803) -> Result<R> {
804 let state = store.concurrent_state_mut();
805 let task = state.current_host_thread()?;
806
807 let mut future = Box::pin(async move {
811 let result = future.await?;
812 tls::get(move |store| {
813 let state = store.concurrent_state_mut();
814 let host_state = &mut state.get_mut(task)?.state;
815 assert!(matches!(host_state, HostTaskState::CalleeStarted));
816 *host_state = HostTaskState::CalleeFinished(Box::new(result));
817
818 Waitable::Host(task).set_event(
819 state,
820 Some(Event::Subtask {
821 status: Status::Returned,
822 }),
823 )?;
824
825 Ok(())
826 })
827 }) as HostTaskFuture;
828
829 let poll = tls::set(store, || {
833 future
834 .as_mut()
835 .poll(&mut Context::from_waker(&Waker::noop()))
836 });
837
838 match poll {
839 Poll::Ready(result) => result?,
841
842 Poll::Pending => {
847 let state = store.concurrent_state_mut();
848 state.push_future(future);
849
850 let caller = state.get_mut(task)?.caller;
851 let set = state.get_mut(caller.thread)?.sync_call_set;
852 Waitable::Host(task).join(state, Some(set))?;
853
854 store.suspend(SuspendReason::Waiting {
855 set,
856 thread: caller,
857 skip_may_block_check: false,
858 })?;
859
860 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
864 }
865 }
866
867 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
869 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
870 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
871 Ok(result) => *result,
872 Err(_) => bail_bug!("host task finished with wrong type of result"),
873 }),
874 _ => bail_bug!("unexpected host task state after completion"),
875 }
876}
877
878fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
880 let mut next = Some(call);
881 while let Some(call) = next.take() {
882 match call.kind {
883 GuestCallKind::DeliverEvent { instance, set } => {
884 let (event, waitable) =
885 match instance.get_event(store, call.thread.task, set, true)? {
886 Some(pair) => pair,
887 None => bail_bug!("delivering non-present event"),
888 };
889 let state = store.concurrent_state_mut();
890 let task = state.get_mut(call.thread.task)?;
891 let runtime_instance = task.instance;
892 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
893
894 log::trace!(
895 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
896 call.thread,
897 );
898
899 let old_thread = store.set_thread(call.thread)?;
900 log::trace!(
901 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
902 call.thread
903 );
904
905 store.enter_instance(runtime_instance);
906
907 let Some(callback) = store
908 .concurrent_state_mut()
909 .get_mut(call.thread.task)?
910 .callback
911 .take()
912 else {
913 bail_bug!("guest task callback field not present")
914 };
915
916 let code = callback(store, event, handle)?;
917
918 store
919 .concurrent_state_mut()
920 .get_mut(call.thread.task)?
921 .callback = Some(callback);
922
923 store.exit_instance(runtime_instance)?;
924
925 store.set_thread(old_thread)?;
926
927 next = instance.handle_callback_code(
928 store,
929 call.thread,
930 runtime_instance.index,
931 code,
932 )?;
933
934 log::trace!(
935 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
936 );
937 }
938 GuestCallKind::StartImplicit(fun) => {
939 next = fun(store)?;
940 }
941 GuestCallKind::StartExplicit(fun) => {
942 fun(store)?;
943 }
944 }
945 }
946
947 Ok(())
948}
949
950impl<T> Store<T> {
951 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
953 where
954 T: Send + 'static,
955 {
956 ensure!(
957 self.as_context().0.concurrency_support(),
958 "cannot use `run_concurrent` when Config::concurrency_support disabled",
959 );
960 self.as_context_mut().run_concurrent(fun).await
961 }
962
963 #[doc(hidden)]
964 pub fn assert_concurrent_state_empty(&mut self) {
965 self.as_context_mut().assert_concurrent_state_empty();
966 }
967
968 #[doc(hidden)]
969 pub fn concurrent_state_table_size(&mut self) -> usize {
970 self.as_context_mut().concurrent_state_table_size()
971 }
972
973 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
975 where
976 T: 'static,
977 {
978 self.as_context_mut().spawn(task)
979 }
980}
981
982impl<T> StoreContextMut<'_, T> {
983 #[doc(hidden)]
994 pub fn assert_concurrent_state_empty(self) {
995 let store = self.0;
996 store
997 .store_data_mut()
998 .components
999 .assert_instance_states_empty();
1000 let state = store.concurrent_state_mut();
1001 assert!(
1002 state.table.get_mut().is_empty(),
1003 "non-empty table: {:?}",
1004 state.table.get_mut()
1005 );
1006 assert!(state.high_priority.is_empty());
1007 assert!(state.low_priority.is_empty());
1008 assert!(state.current_thread.is_none());
1009 assert!(state.futures_mut().unwrap().is_empty());
1010 assert!(state.global_error_context_ref_counts.is_empty());
1011 }
1012
1013 #[doc(hidden)]
1018 pub fn concurrent_state_table_size(&mut self) -> usize {
1019 self.0
1020 .concurrent_state_mut()
1021 .table
1022 .get_mut()
1023 .iter_mut()
1024 .count()
1025 }
1026
1027 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1037 where
1038 T: 'static,
1039 {
1040 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1041 self.spawn_with_accessor(accessor, task)
1042 }
1043
1044 fn spawn_with_accessor<D>(
1047 self,
1048 accessor: Accessor<T, D>,
1049 task: impl AccessorTask<T, D>,
1050 ) -> JoinHandle
1051 where
1052 T: 'static,
1053 D: HasData + ?Sized,
1054 {
1055 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1059 self.0
1060 .concurrent_state_mut()
1061 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1062 handle
1063 }
1064
1065 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1149 where
1150 T: Send + 'static,
1151 {
1152 ensure!(
1153 self.0.concurrency_support(),
1154 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1155 );
1156 self.do_run_concurrent(fun, false).await
1157 }
1158
1159 pub(super) async fn run_concurrent_trap_on_idle<R>(
1160 self,
1161 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1162 ) -> Result<R>
1163 where
1164 T: Send + 'static,
1165 {
1166 self.do_run_concurrent(fun, true).await
1167 }
1168
1169 async fn do_run_concurrent<R>(
1170 mut self,
1171 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1172 trap_on_idle: bool,
1173 ) -> Result<R>
1174 where
1175 T: Send + 'static,
1176 {
1177 debug_assert!(self.0.concurrency_support());
1178 check_recursive_run();
1179 let token = StoreToken::new(self.as_context_mut());
1180
1181 struct Dropper<'a, T: 'static, V> {
1182 store: StoreContextMut<'a, T>,
1183 value: ManuallyDrop<V>,
1184 }
1185
1186 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1187 fn drop(&mut self) {
1188 tls::set(self.store.0, || {
1189 unsafe { ManuallyDrop::drop(&mut self.value) }
1194 });
1195 }
1196 }
1197
1198 let accessor = &Accessor::new(token);
1199 let dropper = &mut Dropper {
1200 store: self,
1201 value: ManuallyDrop::new(fun(accessor)),
1202 };
1203 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1205
1206 dropper
1207 .store
1208 .as_context_mut()
1209 .poll_until(future, trap_on_idle)
1210 .await
1211 }
1212
1213 async fn poll_until<R>(
1219 mut self,
1220 mut future: Pin<&mut impl Future<Output = R>>,
1221 trap_on_idle: bool,
1222 ) -> Result<R>
1223 where
1224 T: Send + 'static,
1225 {
1226 struct Reset<'a, T: 'static> {
1227 store: StoreContextMut<'a, T>,
1228 futures: Option<FuturesUnordered<HostTaskFuture>>,
1229 }
1230
1231 impl<'a, T> Drop for Reset<'a, T> {
1232 fn drop(&mut self) {
1233 if let Some(futures) = self.futures.take() {
1234 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1235 }
1236 }
1237 }
1238
1239 loop {
1240 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1244 let mut reset = Reset {
1245 store: self.as_context_mut(),
1246 futures,
1247 };
1248 let mut next = match reset.futures.as_mut() {
1249 Some(f) => pin!(f.next()),
1250 None => bail_bug!("concurrent state missing futures field"),
1251 };
1252
1253 enum PollResult<R> {
1254 Complete(R),
1255 ProcessWork {
1256 ready: Vec<WorkItem>,
1257 low_priority: bool,
1258 },
1259 }
1260
1261 let result = future::poll_fn(|cx| {
1262 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1265 return Poll::Ready(Ok(PollResult::Complete(value)));
1266 }
1267
1268 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1272 Poll::Ready(Some(output)) => {
1273 match output {
1274 Err(e) => return Poll::Ready(Err(e)),
1275 Ok(()) => {}
1276 }
1277 Poll::Ready(true)
1278 }
1279 Poll::Ready(None) => Poll::Ready(false),
1280 Poll::Pending => Poll::Pending,
1281 };
1282
1283 let state = reset.store.0.concurrent_state_mut();
1287 let mut ready = mem::take(&mut state.high_priority);
1288 let mut low_priority = false;
1289 if ready.is_empty() {
1290 if let Some(item) = state.low_priority.pop_back() {
1291 ready.push(item);
1292 low_priority = true;
1293 }
1294 }
1295 if !ready.is_empty() {
1296 return Poll::Ready(Ok(PollResult::ProcessWork {
1297 ready,
1298 low_priority,
1299 }));
1300 }
1301
1302 return match next {
1306 Poll::Ready(true) => {
1307 Poll::Ready(Ok(PollResult::ProcessWork {
1313 ready: Vec::new(),
1314 low_priority: false,
1315 }))
1316 }
1317 Poll::Ready(false) => {
1318 if let Poll::Ready(value) =
1322 tls::set(reset.store.0, || future.as_mut().poll(cx))
1323 {
1324 Poll::Ready(Ok(PollResult::Complete(value)))
1325 } else {
1326 if trap_on_idle {
1332 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1335 } else {
1336 Poll::Pending
1340 }
1341 }
1342 }
1343 Poll::Pending => Poll::Pending,
1348 };
1349 })
1350 .await;
1351
1352 drop(reset);
1356
1357 match result? {
1358 PollResult::Complete(value) => break Ok(value),
1361 PollResult::ProcessWork {
1364 ready,
1365 low_priority,
1366 } => {
1367 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1368 store: StoreContextMut<'a, T>,
1369 ready: I,
1370 }
1371
1372 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1373 fn drop(&mut self) {
1374 while let Some(item) = self.ready.next() {
1375 match item {
1376 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1377 WorkItem::PushFuture(future) => {
1378 tls::set(self.store.0, move || drop(future))
1379 }
1380 _ => {}
1381 }
1382 }
1383 }
1384 }
1385
1386 let mut dispose = Dispose {
1387 store: self.as_context_mut(),
1388 ready: ready.into_iter(),
1389 };
1390
1391 if low_priority {
1413 dispose.store.0.yield_now().await
1414 }
1415
1416 while let Some(item) = dispose.ready.next() {
1417 dispose
1418 .store
1419 .as_context_mut()
1420 .handle_work_item(item)
1421 .await?;
1422 }
1423 }
1424 }
1425 }
1426 }
1427
1428 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1430 where
1431 T: Send,
1432 {
1433 log::trace!("handle work item {item:?}");
1434 match item {
1435 WorkItem::PushFuture(future) => {
1436 self.0
1437 .concurrent_state_mut()
1438 .futures_mut()?
1439 .push(future.into_inner());
1440 }
1441 WorkItem::ResumeFiber(fiber) => {
1442 self.0.resume_fiber(fiber).await?;
1443 }
1444 WorkItem::ResumeThread(_, thread) => {
1445 if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1446 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1447 GuestThreadState::Running,
1448 ) {
1449 self.0.resume_fiber(fiber).await?;
1450 } else {
1451 bail_bug!("cannot resume non-pending thread {thread:?}");
1452 }
1453 }
1454 WorkItem::GuestCall(_, call) => {
1455 if call.is_ready(self.0)? {
1456 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1457 } else {
1458 let state = self.0.concurrent_state_mut();
1459 let task = state.get_mut(call.thread.task)?;
1460 if !task.starting_sent {
1461 task.starting_sent = true;
1462 if let GuestCallKind::StartImplicit(_) = &call.kind {
1463 Waitable::Guest(call.thread.task).set_event(
1464 state,
1465 Some(Event::Subtask {
1466 status: Status::Starting,
1467 }),
1468 )?;
1469 }
1470 }
1471
1472 let instance = state.get_mut(call.thread.task)?.instance;
1473 self.0
1474 .instance_state(instance)
1475 .concurrent_state()
1476 .pending
1477 .insert(call.thread, call.kind);
1478 }
1479 }
1480 WorkItem::WorkerFunction(fun) => {
1481 self.run_on_worker(WorkerItem::Function(fun)).await?;
1482 }
1483 }
1484
1485 Ok(())
1486 }
1487
1488 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1490 where
1491 T: Send,
1492 {
1493 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1494 fiber
1495 } else {
1496 fiber::make_fiber(self.0, move |store| {
1497 loop {
1498 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1499 bail_bug!("worker_item not present when resuming fiber")
1500 };
1501 match item {
1502 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1503 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1504 }
1505
1506 store.suspend(SuspendReason::NeedWork)?;
1507 }
1508 })?
1509 };
1510
1511 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1512 assert!(worker_item.is_none());
1513 *worker_item = Some(item);
1514
1515 self.0.resume_fiber(worker).await
1516 }
1517
1518 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1523 where
1524 T: 'static,
1525 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1526 + Send
1527 + Sync
1528 + 'static,
1529 R: Send + Sync + 'static,
1530 {
1531 let token = StoreToken::new(self);
1532 async move {
1533 let mut accessor = Accessor::new(token);
1534 closure(&mut accessor).await
1535 }
1536 }
1537
1538 pub fn async_call_stack(&mut self) -> impl Iterator<Item = GuestTaskId> {
1548 let state = self.0.concurrent_state_mut();
1549 let mut cur = Some(state.current_thread);
1550 core::iter::from_fn(move || {
1551 while let Some(t) = cur {
1552 cur = state.parent(t);
1553 if let Some(thread) = t.guest() {
1554 return Some(GuestTaskId(thread.task));
1555 }
1556 }
1557
1558 None
1559 })
1560 }
1561}
1562
1563impl StoreOpaque {
1564 pub(crate) fn enter_guest_sync_call(
1571 &mut self,
1572 guest_caller: Option<RuntimeInstance>,
1573 callee_async: bool,
1574 callee: RuntimeInstance,
1575 ) -> Result<()> {
1576 log::trace!("enter sync call {callee:?}");
1577 if !self.concurrency_support() {
1578 return self.enter_call_not_concurrent();
1579 }
1580
1581 let state = self.concurrent_state_mut();
1582 let thread = state.current_thread;
1583 let instance = if let Some(thread) = thread.guest() {
1584 Some(state.get_mut(thread.task)?.instance)
1585 } else {
1586 None
1587 };
1588 if guest_caller.is_some() {
1589 debug_assert_eq!(instance, guest_caller);
1590 }
1591 let guest_thread = GuestTask::new(
1592 state,
1593 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1594 LiftResult {
1595 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1596 ty: TypeTupleIndex::reserved_value(),
1597 memory: None,
1598 string_encoding: StringEncoding::Utf8,
1599 },
1600 if let Some(thread) = thread.guest() {
1601 Caller::Guest { thread: *thread }
1602 } else {
1603 Caller::Host {
1604 tx: None,
1605 host_future_present: false,
1606 caller: thread,
1607 }
1608 },
1609 None,
1610 callee,
1611 callee_async,
1612 )?;
1613
1614 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1615 guest_thread.thread,
1616 self,
1617 callee.index,
1618 )?;
1619 self.set_thread(guest_thread)?;
1620
1621 Ok(())
1622 }
1623
1624 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1626 if !self.concurrency_support() {
1627 return Ok(self.exit_call_not_concurrent());
1628 }
1629 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1630 Some(t) => *t,
1631 None => bail_bug!("expected task when exiting"),
1632 };
1633 let task = self.concurrent_state_mut().get_mut(thread.task)?;
1634 let instance = task.instance;
1635 let caller = match &task.caller {
1636 &Caller::Guest { thread } => thread.into(),
1637 &Caller::Host { caller, .. } => caller,
1638 };
1639 task.lift_result = None;
1640 task.exited = true;
1641 self.set_thread(caller)?;
1642
1643 log::trace!("exit sync call {instance:?}");
1644 self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1645
1646 Ok(())
1647 }
1648
1649 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1657 if !self.concurrency_support() {
1658 self.enter_call_not_concurrent()?;
1659 return Ok(None);
1660 }
1661 let state = self.concurrent_state_mut();
1662 let caller = state.current_guest_thread()?;
1663 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1664 log::trace!("new host task {task:?}");
1665 self.set_thread(task)?;
1666 Ok(Some(task))
1667 }
1668
1669 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1675 if !self.concurrency_support() {
1676 return Ok(());
1677 }
1678 let task = self.concurrent_state_mut().current_host_thread()?;
1679 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1680 self.set_thread(caller)?;
1681 Ok(())
1682 }
1683
1684 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1691 match task {
1692 Some(task) => {
1693 log::trace!("delete host task {task:?}");
1694 self.concurrent_state_mut().delete(task)?;
1695 }
1696 None => {
1697 self.exit_call_not_concurrent();
1698 }
1699 }
1700 Ok(())
1701 }
1702
1703 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1711 if self.trapped() {
1712 return Ok(false);
1713 }
1714 if !self.concurrency_support() {
1715 return Ok(true);
1716 }
1717 let state = self.concurrent_state_mut();
1718 let mut cur = Some(state.current_thread);
1719 while let Some(t) = cur {
1720 if let Some(thread) = t.guest() {
1721 let task = state.get_mut(thread.task)?;
1722 if task.instance.instance == instance.instance {
1729 return Ok(false);
1730 }
1731 }
1732 cur = state.parent(t);
1733 }
1734 Ok(true)
1735 }
1736
1737 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1740 self.component_instance_mut(instance.instance)
1741 .instance_state(instance.index)
1742 }
1743
1744 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1750 let thread = thread.into();
1751 let state = self.concurrent_state_mut();
1752 let old_thread = mem::replace(&mut state.current_thread, thread);
1753
1754 if let Some(old_thread) = old_thread.guest() {
1762 let old_context = self.vm_store_context().component_context;
1763 self.concurrent_state_mut()
1764 .get_mut(old_thread.thread)?
1765 .context = old_context;
1766 }
1767 if cfg!(debug_assertions) {
1768 self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1769 }
1770 if let Some(thread) = thread.guest() {
1771 let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1772 let context = thread.context;
1773 if cfg!(debug_assertions) {
1774 thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1775 }
1776 self.vm_store_context_mut().component_context = context;
1777 }
1778
1779 let state = self.concurrent_state_mut();
1787 if let Some(old_thread) = old_thread.guest() {
1788 let instance = state.get_mut(old_thread.task)?.instance.instance;
1789 self.component_instance_mut(instance)
1790 .set_task_may_block(false)
1791 }
1792
1793 if thread.guest().is_some() {
1794 self.set_task_may_block()?;
1795 }
1796
1797 Ok(old_thread)
1798 }
1799
1800 fn set_task_may_block(&mut self) -> Result<()> {
1803 let state = self.concurrent_state_mut();
1804 let guest_thread = state.current_guest_thread()?;
1805 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1806 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1807 self.component_instance_mut(instance)
1808 .set_task_may_block(may_block);
1809 Ok(())
1810 }
1811
1812 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1813 if !self.concurrency_support() {
1814 return Ok(());
1815 }
1816 let state = self.concurrent_state_mut();
1817 let task = state.current_guest_thread()?.task;
1818 let instance = state.get_mut(task)?.instance.instance;
1819 let task_may_block = self.component_instance(instance).get_task_may_block();
1820
1821 if task_may_block {
1822 Ok(())
1823 } else {
1824 Err(Trap::CannotBlockSyncTask.into())
1825 }
1826 }
1827
1828 fn enter_instance(&mut self, instance: RuntimeInstance) {
1832 log::trace!("enter {instance:?}");
1833 self.instance_state(instance)
1834 .concurrent_state()
1835 .do_not_enter = true;
1836 }
1837
1838 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1842 log::trace!("exit {instance:?}");
1843 self.instance_state(instance)
1844 .concurrent_state()
1845 .do_not_enter = false;
1846 self.partition_pending(instance)
1847 }
1848
1849 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1854 for (thread, kind) in
1855 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1856 {
1857 let call = GuestCall { thread, kind };
1858 if call.is_ready(self)? {
1859 self.concurrent_state_mut()
1860 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1861 } else {
1862 self.instance_state(instance)
1863 .concurrent_state()
1864 .pending
1865 .insert(call.thread, call.kind);
1866 }
1867 }
1868
1869 Ok(())
1870 }
1871
1872 pub(crate) fn backpressure_modify(
1874 &mut self,
1875 caller_instance: RuntimeInstance,
1876 modify: impl FnOnce(u16) -> Option<u16>,
1877 ) -> Result<()> {
1878 let state = self.instance_state(caller_instance).concurrent_state();
1879 let old = state.backpressure;
1880 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1881 state.backpressure = new;
1882
1883 if old > 0 && new == 0 {
1884 self.partition_pending(caller_instance)?;
1887 }
1888
1889 Ok(())
1890 }
1891
1892 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1895 let old_thread = self.concurrent_state_mut().current_thread;
1896 log::trace!("resume_fiber: save current thread {old_thread:?}");
1897
1898 let fiber = fiber::resolve_or_release(self, fiber).await?;
1899
1900 self.set_thread(old_thread)?;
1901
1902 let state = self.concurrent_state_mut();
1903
1904 if let Some(ot) = old_thread.guest() {
1905 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1906 }
1907 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1908
1909 if let Some(mut fiber) = fiber {
1910 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1911 let reason = match state.suspend_reason.take() {
1913 Some(r) => r,
1914 None => bail_bug!("suspend reason missing when resuming fiber"),
1915 };
1916 match reason {
1917 SuspendReason::NeedWork => {
1918 if state.worker.is_none() {
1919 state.worker = Some(fiber);
1920 } else {
1921 fiber.dispose(self);
1922 }
1923 }
1924 SuspendReason::Yielding {
1925 thread,
1926 cancellable,
1927 ..
1928 } => {
1929 state.get_mut(thread.thread)?.state =
1930 GuestThreadState::Ready { fiber, cancellable };
1931 let instance = state.get_mut(thread.task)?.instance.index;
1932 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1933 }
1934 SuspendReason::ExplicitlySuspending { thread, .. } => {
1935 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1936 }
1937 SuspendReason::Waiting { set, thread, .. } => {
1938 let old = state
1939 .get_mut(set)?
1940 .waiting
1941 .insert(thread, WaitMode::Fiber(fiber));
1942 assert!(old.is_none());
1943 }
1944 };
1945 } else {
1946 log::trace!("resume_fiber: fiber has exited");
1947 }
1948
1949 Ok(())
1950 }
1951
1952 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1958 log::trace!("suspend fiber: {reason:?}");
1959
1960 let task = match &reason {
1964 SuspendReason::Yielding { thread, .. }
1965 | SuspendReason::Waiting { thread, .. }
1966 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1967 SuspendReason::NeedWork => None,
1968 };
1969
1970 let old_guest_thread = if task.is_some() {
1971 self.concurrent_state_mut().current_thread
1972 } else {
1973 CurrentThread::None
1974 };
1975
1976 debug_assert!(
1982 matches!(
1983 reason,
1984 SuspendReason::ExplicitlySuspending {
1985 skip_may_block_check: true,
1986 ..
1987 } | SuspendReason::Waiting {
1988 skip_may_block_check: true,
1989 ..
1990 } | SuspendReason::Yielding {
1991 skip_may_block_check: true,
1992 ..
1993 }
1994 ) || old_guest_thread
1995 .guest()
1996 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1997 .transpose()?
1998 .unwrap_or(true)
1999 );
2000
2001 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
2002 assert!(suspend_reason.is_none());
2003 *suspend_reason = Some(reason);
2004
2005 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
2006
2007 if task.is_some() {
2008 self.set_thread(old_guest_thread)?;
2009 }
2010
2011 Ok(())
2012 }
2013
2014 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
2015 let state = self.concurrent_state_mut();
2016
2017 if waitable.common(state)?.set.is_some() {
2018 bail!(Trap::WaitableSyncAndAsync);
2019 }
2020
2021 let caller = state.current_guest_thread()?;
2022 let set = state.get_mut(caller.thread)?.sync_call_set;
2023 waitable.join(state, Some(set))?;
2024 self.suspend(SuspendReason::Waiting {
2025 set,
2026 thread: caller,
2027 skip_may_block_check: false,
2028 })?;
2029 let state = self.concurrent_state_mut();
2030 waitable.join(state, None)
2031 }
2032
2033 fn cleanup_thread(
2055 &mut self,
2056 guest_thread: QualifiedThreadId,
2057 runtime_instance: RuntimeInstance,
2058 cleanup_task: CleanupTask,
2059 ) -> Result<()> {
2060 let state = self.concurrent_state_mut();
2061 let thread_data = state.get_mut(guest_thread.thread)?;
2062 let sync_call_set = thread_data.sync_call_set;
2063 if let Some(guest_id) = thread_data.instance_rep {
2064 self.instance_state(runtime_instance)
2065 .thread_handle_table()
2066 .guest_thread_remove(guest_id)?;
2067 }
2068 let state = self.concurrent_state_mut();
2069
2070 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2072 if let Some(Event::Subtask {
2073 status: Status::Returned | Status::ReturnCancelled,
2074 }) = waitable.common(state)?.event
2075 {
2076 waitable.delete_from(state)?;
2077 }
2078 }
2079
2080 state.delete(guest_thread.thread)?;
2081 state.delete(sync_call_set)?;
2082 let task = state.get_mut(guest_thread.task)?;
2083 task.threads.remove(&guest_thread.thread);
2084
2085 if task.threads.is_empty() && !task.returned_or_cancelled() {
2086 bail!(Trap::NoAsyncResult);
2087 }
2088 let ready_to_delete = task.ready_to_delete();
2089
2090 if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2091 task.decremented_interesting_task_count = true;
2092
2093 debug_assert!(state.interesting_tasks > 0);
2094 state.interesting_tasks -= 1;
2095 if state.interesting_tasks == 0
2096 && let Some(waker) = state.interesting_tasks_empty_waker.take()
2097 {
2098 waker.wake();
2099 }
2100 }
2101
2102 match cleanup_task {
2103 CleanupTask::Yes => {
2104 if ready_to_delete {
2105 Waitable::Guest(guest_thread.task).delete_from(state)?;
2106 }
2107 }
2108 CleanupTask::No => {}
2109 }
2110
2111 Ok(())
2112 }
2113
2114 fn cancel_guest_subtask_without_lowered_parameters(
2127 &mut self,
2128 caller_instance: RuntimeInstance,
2129 guest_task: TableId<GuestTask>,
2130 ) -> Result<()> {
2131 let concurrent_state = self.concurrent_state_mut();
2132 let task = concurrent_state.get_mut(guest_task)?;
2133 assert!(!task.already_lowered_parameters());
2134 task.lower_params = None;
2138 task.lift_result = None;
2139 task.exited = true;
2140 let instance = task.instance;
2141
2142 assert_eq!(1, task.threads.len());
2145 let thread = *task.threads.iter().next().unwrap();
2146 self.cleanup_thread(
2147 QualifiedThreadId {
2148 task: guest_task,
2149 thread,
2150 },
2151 caller_instance,
2152 CleanupTask::No,
2153 )?;
2154
2155 let pending = &mut self.instance_state(instance).concurrent_state().pending;
2157 let pending_count = pending.len();
2158 pending.retain(|thread, _| thread.task != guest_task);
2159 if pending.len() == pending_count {
2161 bail!(Trap::SubtaskCancelAfterTerminal);
2162 }
2163 Ok(())
2164 }
2165}
2166
2167enum CleanupTask {
2168 Yes,
2169 No,
2170}
2171
2172impl Instance {
2173 fn get_event(
2176 self,
2177 store: &mut StoreOpaque,
2178 guest_task: TableId<GuestTask>,
2179 set: Option<TableId<WaitableSet>>,
2180 cancellable: bool,
2181 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2182 let state = store.concurrent_state_mut();
2183
2184 let event = &mut state.get_mut(guest_task)?.event;
2185 if let Some(ev) = event
2186 && (cancellable || !matches!(ev, Event::Cancelled))
2187 {
2188 log::trace!("deliver event {ev:?} to {guest_task:?}");
2189 let ev = *ev;
2190 *event = None;
2191 return Ok(Some((ev, None)));
2192 }
2193
2194 let set = match set {
2195 Some(set) => set,
2196 None => return Ok(None),
2197 };
2198 let waitable = match state.get_mut(set)?.ready.pop_first() {
2199 Some(v) => v,
2200 None => return Ok(None),
2201 };
2202
2203 let common = waitable.common(state)?;
2204 let handle = match common.handle {
2205 Some(h) => h,
2206 None => bail_bug!("handle not set when delivering event"),
2207 };
2208 let event = match common.event.take() {
2209 Some(e) => e,
2210 None => bail_bug!("event not set when delivering event"),
2211 };
2212
2213 log::trace!(
2214 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2215 );
2216
2217 waitable.on_delivery(store, self, event)?;
2218
2219 Ok(Some((event, Some((waitable, handle)))))
2220 }
2221
2222 fn handle_callback_code(
2228 self,
2229 store: &mut StoreOpaque,
2230 guest_thread: QualifiedThreadId,
2231 runtime_instance: RuntimeComponentInstanceIndex,
2232 code: u32,
2233 ) -> Result<Option<GuestCall>> {
2234 let (code, set) = unpack_callback_code(code);
2235
2236 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2237
2238 let state = store.concurrent_state_mut();
2239
2240 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2241 let set = store
2242 .instance_state(self.runtime_instance(runtime_instance))
2243 .handle_table()
2244 .waitable_set_rep(handle)?;
2245
2246 Ok(TableId::<WaitableSet>::new(set))
2247 };
2248
2249 Ok(match code {
2250 callback_code::EXIT => {
2251 log::trace!("implicit thread {guest_thread:?} completed");
2252 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2253 task.exited = true;
2254 task.callback = None;
2255 store.cleanup_thread(
2256 guest_thread,
2257 self.runtime_instance(runtime_instance),
2258 CleanupTask::Yes,
2259 )?;
2260 None
2261 }
2262 callback_code::YIELD => {
2263 let task = state.get_mut(guest_thread.task)?;
2264 if let Some(event) = task.event {
2269 assert!(matches!(event, Event::None | Event::Cancelled));
2270 } else {
2271 task.event = Some(Event::None);
2272 }
2273 let call = GuestCall {
2274 thread: guest_thread,
2275 kind: GuestCallKind::DeliverEvent {
2276 instance: self,
2277 set: None,
2278 },
2279 };
2280 if state.may_block(guest_thread.task)? {
2281 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2284 None
2285 } else {
2286 Some(call)
2290 }
2291 }
2292 callback_code::WAIT => {
2293 state.check_blocking_for(guest_thread.task)?;
2296
2297 let set = get_set(store, set)?;
2298 let state = store.concurrent_state_mut();
2299
2300 if state.get_mut(guest_thread.task)?.event.is_some()
2301 || !state.get_mut(set)?.ready.is_empty()
2302 {
2303 state.push_high_priority(WorkItem::GuestCall(
2305 runtime_instance,
2306 GuestCall {
2307 thread: guest_thread,
2308 kind: GuestCallKind::DeliverEvent {
2309 instance: self,
2310 set: Some(set),
2311 },
2312 },
2313 ));
2314 } else {
2315 let old = state
2323 .get_mut(guest_thread.thread)?
2324 .wake_on_cancel
2325 .replace(set);
2326 if !old.is_none() {
2327 bail_bug!("thread unexpectedly had wake_on_cancel set");
2328 }
2329 let old = state
2330 .get_mut(set)?
2331 .waiting
2332 .insert(guest_thread, WaitMode::Callback(self));
2333 if !old.is_none() {
2334 bail_bug!("set's waiting set already had this thread registered");
2335 }
2336 }
2337 None
2338 }
2339 _ => bail!(Trap::UnsupportedCallbackCode),
2340 })
2341 }
2342
2343 unsafe fn queue_call<T: 'static>(
2350 self,
2351 mut store: StoreContextMut<T>,
2352 guest_thread: QualifiedThreadId,
2353 callee: SendSyncPtr<VMFuncRef>,
2354 param_count: usize,
2355 result_count: usize,
2356 async_: bool,
2357 callback: Option<SendSyncPtr<VMFuncRef>>,
2358 post_return: Option<SendSyncPtr<VMFuncRef>>,
2359 ) -> Result<()> {
2360 unsafe fn make_call<T: 'static>(
2375 store: StoreContextMut<T>,
2376 guest_thread: QualifiedThreadId,
2377 callee: SendSyncPtr<VMFuncRef>,
2378 param_count: usize,
2379 result_count: usize,
2380 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2381 + Send
2382 + Sync
2383 + 'static
2384 + use<T> {
2385 let token = StoreToken::new(store);
2386 move |store: &mut dyn VMStore| {
2387 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2388
2389 store
2390 .concurrent_state_mut()
2391 .get_mut(guest_thread.thread)?
2392 .state = GuestThreadState::Running;
2393 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2394 let lower = match task.lower_params.take() {
2395 Some(l) => l,
2396 None => bail_bug!("lower_params missing"),
2397 };
2398
2399 lower(store, &mut storage[..param_count])?;
2400
2401 let mut store = token.as_context_mut(store);
2402
2403 unsafe {
2406 crate::Func::call_unchecked_raw(
2407 &mut store,
2408 callee.as_non_null(),
2409 NonNull::new(
2410 &mut storage[..param_count.max(result_count)]
2411 as *mut [MaybeUninit<ValRaw>] as _,
2412 )
2413 .unwrap(),
2414 )?;
2415 }
2416
2417 Ok(storage)
2418 }
2419 }
2420
2421 let call = unsafe {
2425 make_call(
2426 store.as_context_mut(),
2427 guest_thread,
2428 callee,
2429 param_count,
2430 result_count,
2431 )
2432 };
2433
2434 let callee_instance = store
2435 .0
2436 .concurrent_state_mut()
2437 .get_mut(guest_thread.task)?
2438 .instance;
2439
2440 let fun = if callback.is_some() {
2441 assert!(async_);
2442
2443 Box::new(move |store: &mut dyn VMStore| {
2444 self.add_guest_thread_to_instance_table(
2445 guest_thread.thread,
2446 store,
2447 callee_instance.index,
2448 )?;
2449 let old_thread = store.set_thread(guest_thread)?;
2450 log::trace!(
2451 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2452 );
2453
2454 store.enter_instance(callee_instance);
2455
2456 let storage = call(store)?;
2463
2464 store.exit_instance(callee_instance)?;
2465
2466 store.set_thread(old_thread)?;
2467 let state = store.concurrent_state_mut();
2468 if let Some(t) = old_thread.guest() {
2469 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2470 }
2471 log::trace!("stackless call: restored {old_thread:?} as current thread");
2472
2473 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2476
2477 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2478 })
2479 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2480 } else {
2481 let token = StoreToken::new(store.as_context_mut());
2482 Box::new(move |store: &mut dyn VMStore| {
2483 self.add_guest_thread_to_instance_table(
2484 guest_thread.thread,
2485 store,
2486 callee_instance.index,
2487 )?;
2488 let old_thread = store.set_thread(guest_thread)?;
2489 log::trace!(
2490 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2491 );
2492 let flags = self.id().get(store).instance_flags(callee_instance.index);
2493
2494 if !async_ {
2498 store.enter_instance(callee_instance);
2499 }
2500
2501 let storage = call(store)?;
2508
2509 if !async_ {
2510 let lift = {
2516 store.exit_instance(callee_instance)?;
2517
2518 let state = store.concurrent_state_mut();
2519 if !state.get_mut(guest_thread.task)?.result.is_none() {
2520 bail_bug!("task has already produced a result");
2521 }
2522
2523 match state.get_mut(guest_thread.task)?.lift_result.take() {
2524 Some(lift) => lift,
2525 None => bail_bug!("lift_result field is missing"),
2526 }
2527 };
2528
2529 let result = (lift.lift)(store, unsafe {
2532 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2533 &storage[..result_count],
2534 )
2535 })?;
2536
2537 let post_return_arg = match result_count {
2538 0 => ValRaw::i32(0),
2539 1 => unsafe { storage[0].assume_init() },
2542 _ => unreachable!(),
2543 };
2544
2545 unsafe {
2546 call_post_return(
2547 token.as_context_mut(store),
2548 post_return.map(|v| v.as_non_null()),
2549 post_return_arg,
2550 flags,
2551 )?;
2552 }
2553
2554 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2555 }
2556
2557 store.set_thread(old_thread)?;
2558
2559 store
2560 .concurrent_state_mut()
2561 .get_mut(guest_thread.task)?
2562 .exited = true;
2563
2564 store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2566 Ok(None)
2567 })
2568 };
2569
2570 store
2571 .0
2572 .concurrent_state_mut()
2573 .push_high_priority(WorkItem::GuestCall(
2574 callee_instance.index,
2575 GuestCall {
2576 thread: guest_thread,
2577 kind: GuestCallKind::StartImplicit(fun),
2578 },
2579 ));
2580
2581 Ok(())
2582 }
2583
2584 unsafe fn prepare_call<T: 'static>(
2597 self,
2598 mut store: StoreContextMut<T>,
2599 start: NonNull<VMFuncRef>,
2600 return_: NonNull<VMFuncRef>,
2601 caller_instance: RuntimeComponentInstanceIndex,
2602 callee_instance: RuntimeComponentInstanceIndex,
2603 task_return_type: TypeTupleIndex,
2604 callee_async: bool,
2605 memory: *mut VMMemoryDefinition,
2606 string_encoding: StringEncoding,
2607 caller_info: CallerInfo,
2608 ) -> Result<()> {
2609 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2610 store.0.check_blocking()?;
2614 }
2615
2616 enum ResultInfo {
2617 Heap { results: u32 },
2618 Stack { result_count: u32 },
2619 }
2620
2621 let result_info = match &caller_info {
2622 CallerInfo::Async {
2623 has_result: true,
2624 params,
2625 } => ResultInfo::Heap {
2626 results: match params.last() {
2627 Some(r) => r.get_u32(),
2628 None => bail_bug!("retptr missing"),
2629 },
2630 },
2631 CallerInfo::Async {
2632 has_result: false, ..
2633 } => ResultInfo::Stack { result_count: 0 },
2634 CallerInfo::Sync {
2635 result_count,
2636 params,
2637 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2638 results: match params.last() {
2639 Some(r) => r.get_u32(),
2640 None => bail_bug!("arg ptr missing"),
2641 },
2642 },
2643 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2644 result_count: *result_count,
2645 },
2646 };
2647
2648 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2649
2650 let start = SendSyncPtr::new(start);
2654 let return_ = SendSyncPtr::new(return_);
2655 let token = StoreToken::new(store.as_context_mut());
2656 let state = store.0.concurrent_state_mut();
2657 let old_thread = state.current_guest_thread()?;
2658
2659 debug_assert_eq!(
2660 state.get_mut(old_thread.task)?.instance,
2661 self.runtime_instance(caller_instance)
2662 );
2663
2664 let guest_thread = GuestTask::new(
2665 state,
2666 Box::new(move |store, dst| {
2667 let mut store = token.as_context_mut(store);
2668 assert!(dst.len() <= MAX_FLAT_PARAMS);
2669 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2671 let count = match caller_info {
2672 CallerInfo::Async { params, has_result } => {
2676 let params = ¶ms[..params.len() - usize::from(has_result)];
2677 for (param, src) in params.iter().zip(&mut src) {
2678 src.write(*param);
2679 }
2680 params.len()
2681 }
2682
2683 CallerInfo::Sync { params, .. } => {
2685 for (param, src) in params.iter().zip(&mut src) {
2686 src.write(*param);
2687 }
2688 params.len()
2689 }
2690 };
2691 unsafe {
2698 crate::Func::call_unchecked_raw(
2699 &mut store,
2700 start.as_non_null(),
2701 NonNull::new(
2702 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2703 )
2704 .unwrap(),
2705 )?;
2706 }
2707 dst.copy_from_slice(&src[..dst.len()]);
2708 let state = store.0.concurrent_state_mut();
2709 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2710 state,
2711 Some(Event::Subtask {
2712 status: Status::Started,
2713 }),
2714 )?;
2715 Ok(())
2716 }),
2717 LiftResult {
2718 lift: Box::new(move |store, src| {
2719 let mut store = token.as_context_mut(store);
2722 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2724 my_src.push(ValRaw::u32(*results));
2725 }
2726
2727 let prev = store.0.set_thread(old_thread)?;
2733
2734 unsafe {
2741 crate::Func::call_unchecked_raw(
2742 &mut store,
2743 return_.as_non_null(),
2744 my_src.as_mut_slice().into(),
2745 )?;
2746 }
2747
2748 store.0.set_thread(prev)?;
2751
2752 let state = store.0.concurrent_state_mut();
2753 let thread = state.current_guest_thread()?;
2754 if sync_caller {
2755 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2756 if let ResultInfo::Stack { result_count } = &result_info {
2757 match result_count {
2758 0 => None,
2759 1 => Some(my_src[0]),
2760 _ => unreachable!(),
2761 }
2762 } else {
2763 None
2764 },
2765 );
2766 }
2767 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2768 }),
2769 ty: task_return_type,
2770 memory: NonNull::new(memory).map(SendSyncPtr::new),
2771 string_encoding,
2772 },
2773 Caller::Guest { thread: old_thread },
2774 None,
2775 self.runtime_instance(callee_instance),
2776 callee_async,
2777 )?;
2778
2779 store.0.set_thread(guest_thread)?;
2782 log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2783
2784 Ok(())
2785 }
2786
2787 unsafe fn call_callback<T>(
2792 self,
2793 mut store: StoreContextMut<T>,
2794 function: SendSyncPtr<VMFuncRef>,
2795 event: Event,
2796 handle: u32,
2797 ) -> Result<u32> {
2798 let (ordinal, result) = event.parts();
2799 let params = &mut [
2800 ValRaw::u32(ordinal),
2801 ValRaw::u32(handle),
2802 ValRaw::u32(result),
2803 ];
2804 unsafe {
2809 crate::Func::call_unchecked_raw(
2810 &mut store,
2811 function.as_non_null(),
2812 params.as_mut_slice().into(),
2813 )?;
2814 }
2815 Ok(params[0].get_u32())
2816 }
2817
2818 unsafe fn start_call<T: 'static>(
2831 self,
2832 mut store: StoreContextMut<T>,
2833 callback: *mut VMFuncRef,
2834 post_return: *mut VMFuncRef,
2835 callee: NonNull<VMFuncRef>,
2836 param_count: u32,
2837 result_count: u32,
2838 flags: u32,
2839 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2840 ) -> Result<u32> {
2841 let token = StoreToken::new(store.as_context_mut());
2842 let async_caller = storage.is_none();
2843 let state = store.0.concurrent_state_mut();
2844 let guest_thread = state.current_guest_thread()?;
2845 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2846 let callee = SendSyncPtr::new(callee);
2847 let param_count = usize::try_from(param_count)?;
2848 assert!(param_count <= MAX_FLAT_PARAMS);
2849 let result_count = usize::try_from(result_count)?;
2850 assert!(result_count <= MAX_FLAT_RESULTS);
2851
2852 let task = state.get_mut(guest_thread.task)?;
2853 if let Some(callback) = NonNull::new(callback) {
2854 let callback = SendSyncPtr::new(callback);
2858 task.callback = Some(Box::new(move |store, event, handle| {
2859 let store = token.as_context_mut(store);
2860 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2861 }));
2862 }
2863
2864 let Caller::Guest { thread: caller } = &task.caller else {
2865 bail_bug!("start_call unexpectedly invoked for host->guest call");
2868 };
2869 let caller = *caller;
2870 let caller_instance = state.get_mut(caller.task)?.instance;
2871
2872 unsafe {
2874 self.queue_call(
2875 store.as_context_mut(),
2876 guest_thread,
2877 callee,
2878 param_count,
2879 result_count,
2880 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2881 NonNull::new(callback).map(SendSyncPtr::new),
2882 NonNull::new(post_return).map(SendSyncPtr::new),
2883 )?;
2884 }
2885
2886 let state = store.0.concurrent_state_mut();
2887
2888 let guest_waitable = Waitable::Guest(guest_thread.task);
2891 let old_set = guest_waitable.common(state)?.set;
2892 let set = state.get_mut(caller.thread)?.sync_call_set;
2893 guest_waitable.join(state, Some(set))?;
2894
2895 store.0.set_thread(CurrentThread::None)?;
2896
2897 let (status, waitable) = loop {
2913 store.0.suspend(SuspendReason::Waiting {
2914 set,
2915 thread: caller,
2916 skip_may_block_check: async_caller || !callee_async,
2924 })?;
2925
2926 let state = store.0.concurrent_state_mut();
2927
2928 log::trace!("taking event for {:?}", guest_thread.task);
2929 let event = guest_waitable.take_event(state)?;
2930 let Some(Event::Subtask { status }) = event else {
2931 bail_bug!("subtasks should only get subtask events, got {event:?}")
2932 };
2933
2934 log::trace!("status {status:?} for {:?}", guest_thread.task);
2935
2936 if status == Status::Returned {
2937 break (status, None);
2939 } else if async_caller {
2940 let handle = store
2944 .0
2945 .instance_state(caller_instance)
2946 .handle_table()
2947 .subtask_insert_guest(guest_thread.task.rep())?;
2948 store
2949 .0
2950 .concurrent_state_mut()
2951 .get_mut(guest_thread.task)?
2952 .common
2953 .handle = Some(handle);
2954 break (status, Some(handle));
2955 } else {
2956 }
2960 };
2961
2962 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2963
2964 store.0.set_thread(caller)?;
2966 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2967 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2968
2969 if let Some(storage) = storage {
2970 let state = store.0.concurrent_state_mut();
2974 let task = state.get_mut(guest_thread.task)?;
2975 if let Some(result) = task.sync_result.take()? {
2976 if let Some(result) = result {
2977 storage[0] = MaybeUninit::new(result);
2978 }
2979
2980 if task.exited && task.ready_to_delete() {
2981 Waitable::Guest(guest_thread.task).delete_from(state)?;
2982 }
2983 }
2984 }
2985
2986 Ok(status.pack(waitable))
2987 }
2988
2989 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
3002 self,
3003 mut store: StoreContextMut<'_, T>,
3004 future: impl Future<Output = Result<R>> + Send + 'static,
3005 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
3006 ) -> Result<Option<u32>> {
3007 let token = StoreToken::new(store.as_context_mut());
3008 let state = store.0.concurrent_state_mut();
3009 let task = state.current_host_thread()?;
3010
3011 let (join_handle, future) = JoinHandle::run(future);
3014 {
3015 let state = &mut state.get_mut(task)?.state;
3016 assert!(matches!(state, HostTaskState::CalleeStarted));
3017 *state = HostTaskState::CalleeRunning(join_handle);
3018 }
3019
3020 let mut future = Box::pin(future);
3021
3022 let poll = tls::set(store.0, || {
3027 future
3028 .as_mut()
3029 .poll(&mut Context::from_waker(&Waker::noop()))
3030 });
3031
3032 match poll {
3033 Poll::Ready(result) => {
3035 let result = result.transpose()?;
3036 lower(store.as_context_mut(), result)?;
3037 return Ok(None);
3038 }
3039
3040 Poll::Pending => {}
3042 }
3043
3044 let future = Box::pin(async move {
3052 let result = match future.await {
3053 Some(result) => Some(result?),
3054 None => None,
3055 };
3056 let on_complete = move |store: &mut dyn VMStore| {
3057 let mut store = token.as_context_mut(store);
3061 let old = store.0.set_thread(task)?;
3062
3063 let status = if result.is_some() {
3064 Status::Returned
3065 } else {
3066 Status::ReturnCancelled
3067 };
3068
3069 lower(store.as_context_mut(), result)?;
3070 let state = store.0.concurrent_state_mut();
3071 match &mut state.get_mut(task)?.state {
3072 HostTaskState::CalleeDone { .. } => {}
3075
3076 other => *other = HostTaskState::CalleeDone { cancelled: false },
3078 }
3079 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3080
3081 store.0.set_thread(old)?;
3082 Ok(())
3083 };
3084
3085 tls::get(move |store| {
3090 store
3091 .concurrent_state_mut()
3092 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3093 on_complete,
3094 ))));
3095 Ok(())
3096 })
3097 });
3098
3099 let state = store.0.concurrent_state_mut();
3102 state.push_future(future);
3103 let caller = state.get_mut(task)?.caller;
3104 let instance = state.get_mut(caller.task)?.instance;
3105 let handle = store
3106 .0
3107 .instance_state(instance)
3108 .handle_table()
3109 .subtask_insert_host(task.rep())?;
3110 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3111 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3112
3113 store.0.set_thread(caller)?;
3117 Ok(Some(handle))
3118 }
3119
3120 pub(crate) fn task_return(
3123 self,
3124 store: &mut dyn VMStore,
3125 ty: TypeTupleIndex,
3126 options: OptionsIndex,
3127 storage: &[ValRaw],
3128 ) -> Result<()> {
3129 let state = store.concurrent_state_mut();
3130 let guest_thread = state.current_guest_thread()?;
3131 let lift = state
3132 .get_mut(guest_thread.task)?
3133 .lift_result
3134 .take()
3135 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3136 if !state.get_mut(guest_thread.task)?.result.is_none() {
3137 bail_bug!("task result unexpectedly already set");
3138 }
3139
3140 let CanonicalOptions {
3141 string_encoding,
3142 data_model,
3143 ..
3144 } = &self.id().get(store).component().env_component().options[options];
3145
3146 let invalid = ty != lift.ty
3147 || string_encoding != &lift.string_encoding
3148 || match data_model {
3149 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3150 Some(memory) => {
3151 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3152 let actual = self.id().get(store).runtime_memory(memory);
3153 expected != actual.as_ptr()
3154 }
3155 None => false,
3158 },
3159 CanonicalOptionsDataModel::Gc { .. } => true,
3161 };
3162
3163 if invalid {
3164 bail!(Trap::TaskReturnInvalid);
3165 }
3166
3167 log::trace!("task.return for {guest_thread:?}");
3168
3169 let result = (lift.lift)(store, storage)?;
3170 self.task_complete(store, guest_thread.task, result, Status::Returned)
3171 }
3172
3173 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3175 let state = store.concurrent_state_mut();
3176 let guest_thread = state.current_guest_thread()?;
3177 let task = state.get_mut(guest_thread.task)?;
3178 if !task.cancel_sent {
3179 bail!(Trap::TaskCancelNotCancelled);
3180 }
3181 _ = task
3182 .lift_result
3183 .take()
3184 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3185
3186 if !task.result.is_none() {
3187 bail_bug!("task result should not bet set yet");
3188 }
3189
3190 log::trace!("task.cancel for {guest_thread:?}");
3191
3192 self.task_complete(
3193 store,
3194 guest_thread.task,
3195 Box::new(DummyResult),
3196 Status::ReturnCancelled,
3197 )
3198 }
3199
3200 fn task_complete(
3206 self,
3207 store: &mut StoreOpaque,
3208 guest_task: TableId<GuestTask>,
3209 result: Box<dyn Any + Send + Sync>,
3210 status: Status,
3211 ) -> Result<()> {
3212 store
3213 .component_resource_tables(Some(self))
3214 .validate_scope_exit()?;
3215
3216 let state = store.concurrent_state_mut();
3217 let task = state.get_mut(guest_task)?;
3218
3219 if let Caller::Host { tx, .. } = &mut task.caller {
3220 if let Some(tx) = tx.take() {
3221 _ = tx.send(result);
3222 }
3223 } else {
3224 task.result = Some(result);
3225 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3226 }
3227
3228 Ok(())
3229 }
3230
3231 pub(crate) fn waitable_set_new(
3233 self,
3234 store: &mut StoreOpaque,
3235 caller_instance: RuntimeComponentInstanceIndex,
3236 ) -> Result<u32> {
3237 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3238 let handle = store
3239 .instance_state(self.runtime_instance(caller_instance))
3240 .handle_table()
3241 .waitable_set_insert(set.rep())?;
3242 log::trace!("new waitable set {set:?} (handle {handle})");
3243 Ok(handle)
3244 }
3245
3246 pub(crate) fn waitable_set_drop(
3248 self,
3249 store: &mut StoreOpaque,
3250 caller_instance: RuntimeComponentInstanceIndex,
3251 set: u32,
3252 ) -> Result<()> {
3253 let rep = store
3254 .instance_state(self.runtime_instance(caller_instance))
3255 .handle_table()
3256 .waitable_set_remove(set)?;
3257
3258 log::trace!("drop waitable set {rep} (handle {set})");
3259
3260 if !store
3264 .concurrent_state_mut()
3265 .get_mut(TableId::<WaitableSet>::new(rep))?
3266 .waiting
3267 .is_empty()
3268 {
3269 bail!(Trap::WaitableSetDropHasWaiters);
3270 }
3271
3272 store
3273 .concurrent_state_mut()
3274 .delete(TableId::<WaitableSet>::new(rep))?;
3275
3276 Ok(())
3277 }
3278
3279 pub(crate) fn waitable_join(
3281 self,
3282 store: &mut StoreOpaque,
3283 caller_instance: RuntimeComponentInstanceIndex,
3284 waitable_handle: u32,
3285 set_handle: u32,
3286 ) -> Result<()> {
3287 let mut instance = self.id().get_mut(store);
3288 let waitable =
3289 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3290
3291 let set = if set_handle == 0 {
3292 None
3293 } else {
3294 let set = instance.instance_states().0[caller_instance]
3295 .handle_table()
3296 .waitable_set_rep(set_handle)?;
3297
3298 let state = store.concurrent_state_mut();
3299 if let Some(old) = waitable.common(state)?.set
3300 && state.get_mut(old)?.is_sync_call_set
3301 {
3302 bail!(Trap::WaitableSyncAndAsync);
3303 }
3304
3305 Some(TableId::<WaitableSet>::new(set))
3306 };
3307
3308 log::trace!(
3309 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3310 );
3311
3312 waitable.join(store.concurrent_state_mut(), set)
3313 }
3314
3315 pub(crate) fn subtask_drop(
3317 self,
3318 store: &mut StoreOpaque,
3319 caller_instance: RuntimeComponentInstanceIndex,
3320 task_id: u32,
3321 ) -> Result<()> {
3322 self.waitable_join(store, caller_instance, task_id, 0)?;
3323
3324 let (rep, is_host) = store
3325 .instance_state(self.runtime_instance(caller_instance))
3326 .handle_table()
3327 .subtask_remove(task_id)?;
3328
3329 let concurrent_state = store.concurrent_state_mut();
3330 let (waitable, delete) = if is_host {
3331 let id = TableId::<HostTask>::new(rep);
3332 let task = concurrent_state.get_mut(id)?;
3333 match &task.state {
3334 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3335 HostTaskState::CalleeDone { .. } => {}
3336 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3337 bail_bug!("invalid state for callee in `subtask.drop`")
3338 }
3339 }
3340 (Waitable::Host(id), true)
3341 } else {
3342 let id = TableId::<GuestTask>::new(rep);
3343 let task = concurrent_state.get_mut(id)?;
3344 if task.lift_result.is_some() {
3345 bail!(Trap::SubtaskDropNotResolved);
3346 }
3347 (
3348 Waitable::Guest(id),
3349 concurrent_state.get_mut(id)?.ready_to_delete(),
3350 )
3351 };
3352
3353 waitable.common(concurrent_state)?.handle = None;
3354
3355 if waitable.take_event(concurrent_state)?.is_some() {
3358 bail!(Trap::SubtaskDropNotResolved);
3359 }
3360
3361 if delete {
3362 waitable.delete_from(concurrent_state)?;
3363 }
3364
3365 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3366 Ok(())
3367 }
3368
3369 pub(crate) fn waitable_set_wait(
3371 self,
3372 store: &mut StoreOpaque,
3373 options: OptionsIndex,
3374 set: u32,
3375 payload: u32,
3376 ) -> Result<u32> {
3377 if !self.options(store, options).async_ {
3378 store.check_blocking()?;
3382 }
3383
3384 let &CanonicalOptions {
3385 cancellable,
3386 instance: caller_instance,
3387 ..
3388 } = &self.id().get(store).component().env_component().options[options];
3389 let rep = store
3390 .instance_state(self.runtime_instance(caller_instance))
3391 .handle_table()
3392 .waitable_set_rep(set)?;
3393
3394 self.waitable_check(
3395 store,
3396 cancellable,
3397 WaitableCheck::Wait,
3398 WaitableCheckParams {
3399 set: TableId::new(rep),
3400 options,
3401 payload,
3402 },
3403 )
3404 }
3405
3406 pub(crate) fn waitable_set_poll(
3408 self,
3409 store: &mut StoreOpaque,
3410 options: OptionsIndex,
3411 set: u32,
3412 payload: u32,
3413 ) -> Result<u32> {
3414 let &CanonicalOptions {
3415 cancellable,
3416 instance: caller_instance,
3417 ..
3418 } = &self.id().get(store).component().env_component().options[options];
3419 let rep = store
3420 .instance_state(self.runtime_instance(caller_instance))
3421 .handle_table()
3422 .waitable_set_rep(set)?;
3423
3424 self.waitable_check(
3425 store,
3426 cancellable,
3427 WaitableCheck::Poll,
3428 WaitableCheckParams {
3429 set: TableId::new(rep),
3430 options,
3431 payload,
3432 },
3433 )
3434 }
3435
3436 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3438 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3439 match store
3440 .concurrent_state_mut()
3441 .get_mut(thread_id)?
3442 .instance_rep
3443 {
3444 Some(r) => Ok(r),
3445 None => bail_bug!("thread should have instance_rep by now"),
3446 }
3447 }
3448
3449 pub(crate) fn thread_new_indirect<T: 'static>(
3451 self,
3452 mut store: StoreContextMut<T>,
3453 runtime_instance: RuntimeComponentInstanceIndex,
3454 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3456 start_func_idx: u32,
3457 context: i32,
3458 ) -> Result<u32> {
3459 log::trace!("creating new thread");
3460
3461 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3462 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3463 let callee = instance
3464 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3465 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3466 if callee.type_index(store.0) != start_func_ty.type_index() {
3467 bail!(Trap::ThreadNewIndirectInvalidType);
3468 }
3469
3470 let token = StoreToken::new(store.as_context_mut());
3471 let start_func = Box::new(
3472 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3473 let old_thread = store.set_thread(guest_thread)?;
3474 log::trace!(
3475 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3476 );
3477
3478 let mut store = token.as_context_mut(store);
3479 let mut params = [ValRaw::i32(context)];
3480 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3483
3484 store.0.set_thread(old_thread)?;
3485
3486 store.0.cleanup_thread(
3487 guest_thread,
3488 self.runtime_instance(runtime_instance),
3489 CleanupTask::Yes,
3490 )?;
3491 log::trace!("explicit thread {guest_thread:?} completed");
3492 let state = store.0.concurrent_state_mut();
3493 if let Some(t) = old_thread.guest() {
3494 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3495 }
3496 log::trace!("thread start: restored {old_thread:?} as current thread");
3497
3498 Ok(())
3499 },
3500 );
3501
3502 let state = store.0.concurrent_state_mut();
3503 let current_thread = state.current_guest_thread()?;
3504 let parent_task = current_thread.task;
3505
3506 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3507 let thread_id = state.push(new_thread)?;
3508 state.get_mut(parent_task)?.threads.insert(thread_id);
3509
3510 log::trace!("new thread with id {thread_id:?} created");
3511
3512 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3513 }
3514
3515 pub(crate) fn resume_thread(
3516 self,
3517 store: &mut StoreOpaque,
3518 runtime_instance: RuntimeComponentInstanceIndex,
3519 thread_idx: u32,
3520 high_priority: bool,
3521 allow_ready: bool,
3522 ) -> Result<()> {
3523 let thread_id =
3524 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3525 let state = store.concurrent_state_mut();
3526 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3527 let thread = state.get_mut(guest_thread.thread)?;
3528
3529 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3530 GuestThreadState::NotStartedExplicit(start_func) => {
3531 log::trace!("starting thread {guest_thread:?}");
3532 let guest_call = WorkItem::GuestCall(
3533 runtime_instance,
3534 GuestCall {
3535 thread: guest_thread,
3536 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3537 start_func(store, guest_thread)
3538 })),
3539 },
3540 );
3541 store
3542 .concurrent_state_mut()
3543 .push_work_item(guest_call, high_priority);
3544 }
3545 GuestThreadState::Suspended(fiber) => {
3546 log::trace!("resuming thread {thread_id:?} that was suspended");
3547 store
3548 .concurrent_state_mut()
3549 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3550 }
3551 GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3552 log::trace!("resuming thread {thread_id:?} that was ready");
3553 thread.state = GuestThreadState::Ready { fiber, cancellable };
3554 store
3555 .concurrent_state_mut()
3556 .promote_thread_work_item(guest_thread);
3557 }
3558 other => {
3559 thread.state = other;
3560 bail!(Trap::CannotResumeThread);
3561 }
3562 }
3563 Ok(())
3564 }
3565
3566 fn add_guest_thread_to_instance_table(
3567 self,
3568 thread_id: TableId<GuestThread>,
3569 store: &mut StoreOpaque,
3570 runtime_instance: RuntimeComponentInstanceIndex,
3571 ) -> Result<u32> {
3572 let guest_id = store
3573 .instance_state(self.runtime_instance(runtime_instance))
3574 .thread_handle_table()
3575 .guest_thread_insert(thread_id.rep())?;
3576 store
3577 .concurrent_state_mut()
3578 .get_mut(thread_id)?
3579 .instance_rep = Some(guest_id);
3580 Ok(guest_id)
3581 }
3582
3583 pub(crate) fn suspension_intrinsic(
3586 self,
3587 store: &mut StoreOpaque,
3588 caller: RuntimeComponentInstanceIndex,
3589 cancellable: bool,
3590 yielding: bool,
3591 to_thread: SuspensionTarget,
3592 ) -> Result<WaitResult> {
3593 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3594 if to_thread.is_none() {
3595 let state = store.concurrent_state_mut();
3596 if yielding {
3597 if !state.may_block(guest_thread.task)? {
3599 if !state.promote_instance_local_thread_work_item(caller) {
3602 return Ok(WaitResult::Completed);
3604 }
3605 }
3606 } else {
3607 store.check_blocking()?;
3611 }
3612 }
3613
3614 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3616 return Ok(WaitResult::Cancelled);
3617 }
3618
3619 match to_thread {
3620 SuspensionTarget::SomeSuspended(thread) => {
3621 self.resume_thread(store, caller, thread, true, false)?
3622 }
3623 SuspensionTarget::Some(thread) => {
3624 self.resume_thread(store, caller, thread, true, true)?
3625 }
3626 SuspensionTarget::None => { }
3627 }
3628
3629 let reason = if yielding {
3630 SuspendReason::Yielding {
3631 thread: guest_thread,
3632 cancellable,
3633 skip_may_block_check: to_thread.is_some(),
3637 }
3638 } else {
3639 SuspendReason::ExplicitlySuspending {
3640 thread: guest_thread,
3641 skip_may_block_check: to_thread.is_some(),
3645 }
3646 };
3647
3648 store.suspend(reason)?;
3649
3650 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3651 Ok(WaitResult::Cancelled)
3652 } else {
3653 Ok(WaitResult::Completed)
3654 }
3655 }
3656
3657 fn waitable_check(
3659 self,
3660 store: &mut StoreOpaque,
3661 cancellable: bool,
3662 check: WaitableCheck,
3663 params: WaitableCheckParams,
3664 ) -> Result<u32> {
3665 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3666
3667 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3668
3669 let state = store.concurrent_state_mut();
3670 let task = state.get_mut(guest_thread.task)?;
3671
3672 match &check {
3675 WaitableCheck::Wait => {
3676 let set = params.set;
3677
3678 if (task.event.is_none()
3679 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3680 && state.get_mut(set)?.ready.is_empty()
3681 {
3682 if cancellable {
3683 let old = state
3684 .get_mut(guest_thread.thread)?
3685 .wake_on_cancel
3686 .replace(set);
3687 if !old.is_none() {
3688 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3689 }
3690 }
3691
3692 store.suspend(SuspendReason::Waiting {
3693 set,
3694 thread: guest_thread,
3695 skip_may_block_check: false,
3696 })?;
3697 }
3698 }
3699 WaitableCheck::Poll => {}
3700 }
3701
3702 log::trace!(
3703 "waitable check for {guest_thread:?}; set {:?}, part two",
3704 params.set
3705 );
3706
3707 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3709
3710 let (ordinal, handle, result) = match &check {
3711 WaitableCheck::Wait => {
3712 let (event, waitable) = match event {
3713 Some(p) => p,
3714 None => bail_bug!("event expected to be present"),
3715 };
3716 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3717 let (ordinal, result) = event.parts();
3718 (ordinal, handle, result)
3719 }
3720 WaitableCheck::Poll => {
3721 if let Some((event, waitable)) = event {
3722 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3723 let (ordinal, result) = event.parts();
3724 (ordinal, handle, result)
3725 } else {
3726 log::trace!(
3727 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3728 guest_thread.task,
3729 params.set
3730 );
3731 let (ordinal, result) = Event::None.parts();
3732 (ordinal, 0, result)
3733 }
3734 }
3735 };
3736 let memory = self.options_memory_mut(store, params.options);
3737 let ptr = crate::component::func::validate_inbounds_dynamic(
3738 &CanonicalAbiInfo::POINTER_PAIR,
3739 memory,
3740 &ValRaw::u32(params.payload),
3741 )?;
3742 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3743 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3744 Ok(ordinal)
3745 }
3746
3747 pub(crate) fn subtask_cancel(
3749 self,
3750 store: &mut StoreOpaque,
3751 caller_instance: RuntimeComponentInstanceIndex,
3752 async_: bool,
3753 task_id: u32,
3754 ) -> Result<u32> {
3755 if !async_ {
3756 store.check_blocking()?;
3760 }
3761
3762 let (rep, is_host) = store
3763 .instance_state(self.runtime_instance(caller_instance))
3764 .handle_table()
3765 .subtask_rep(task_id)?;
3766 let waitable = if is_host {
3767 Waitable::Host(TableId::<HostTask>::new(rep))
3768 } else {
3769 Waitable::Guest(TableId::<GuestTask>::new(rep))
3770 };
3771 let concurrent_state = store.concurrent_state_mut();
3772
3773 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3774
3775 let needs_block;
3776 if let Waitable::Host(host_task) = waitable {
3777 let state = &mut concurrent_state.get_mut(host_task)?.state;
3778 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3779 HostTaskState::CalleeRunning(handle) => {
3786 handle.abort();
3787 needs_block = true;
3788 }
3789
3790 HostTaskState::CalleeDone { cancelled } => {
3793 if cancelled {
3794 bail!(Trap::SubtaskCancelAfterTerminal);
3795 } else {
3796 needs_block = false;
3799 }
3800 }
3801
3802 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3805 bail_bug!("invalid states for host callee")
3806 }
3807 }
3808 } else {
3809 let guest_task = TableId::<GuestTask>::new(rep);
3810 let task = concurrent_state.get_mut(guest_task)?;
3811 if !task.already_lowered_parameters() {
3812 store.cancel_guest_subtask_without_lowered_parameters(
3813 self.runtime_instance(caller_instance),
3814 guest_task,
3815 )?;
3816 return Ok(Status::StartCancelled as u32);
3817 } else if !task.returned_or_cancelled() {
3818 task.cancel_sent = true;
3821 task.event = Some(Event::Cancelled);
3826 let runtime_instance = task.instance.index;
3827 for thread in task.threads.clone() {
3828 let thread = QualifiedThreadId {
3829 task: guest_task,
3830 thread,
3831 };
3832 let thread_mut = concurrent_state.get_mut(thread.thread)?;
3833 if let Some(set) = thread_mut.wake_on_cancel.take() {
3834 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3836 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3837 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3838 runtime_instance,
3839 GuestCall {
3840 thread,
3841 kind: GuestCallKind::DeliverEvent {
3842 instance,
3843 set: None,
3844 },
3845 },
3846 ),
3847 None => bail_bug!("thread not present in wake_on_cancel set"),
3848 };
3849 concurrent_state.push_high_priority(item);
3850
3851 let caller = concurrent_state.current_guest_thread()?;
3852 store.suspend(SuspendReason::Yielding {
3853 thread: caller,
3854 cancellable: false,
3855 skip_may_block_check: false,
3858 })?;
3859 break;
3860 } else if let GuestThreadState::Ready {
3861 cancellable: true, ..
3862 } = &thread_mut.state
3863 {
3864 let caller = concurrent_state.current_guest_thread()?;
3867 concurrent_state.promote_thread_work_item(thread);
3868 store.suspend(SuspendReason::Yielding {
3869 thread: caller,
3870 cancellable: false,
3871 skip_may_block_check: false,
3872 })?;
3873 break;
3874 }
3875 }
3876
3877 needs_block = !store
3880 .concurrent_state_mut()
3881 .get_mut(guest_task)?
3882 .returned_or_cancelled()
3883 } else {
3884 needs_block = false;
3885 }
3886 };
3887
3888 if needs_block {
3892 if async_ {
3893 return Ok(BLOCKED);
3894 }
3895
3896 store.wait_for_event(waitable)?;
3900
3901 }
3903
3904 let event = waitable.take_event(store.concurrent_state_mut())?;
3905 if let Some(Event::Subtask {
3906 status: status @ (Status::Returned | Status::ReturnCancelled),
3907 }) = event
3908 {
3909 Ok(status as u32)
3910 } else {
3911 bail!(Trap::SubtaskCancelAfterTerminal);
3912 }
3913 }
3914}
3915
3916pub trait VMComponentAsyncStore {
3924 unsafe fn prepare_call(
3930 &mut self,
3931 instance: Instance,
3932 memory: *mut VMMemoryDefinition,
3933 start: NonNull<VMFuncRef>,
3934 return_: NonNull<VMFuncRef>,
3935 caller_instance: RuntimeComponentInstanceIndex,
3936 callee_instance: RuntimeComponentInstanceIndex,
3937 task_return_type: TypeTupleIndex,
3938 callee_async: bool,
3939 string_encoding: StringEncoding,
3940 result_count: u32,
3941 storage: *mut ValRaw,
3942 storage_len: usize,
3943 ) -> Result<()>;
3944
3945 unsafe fn sync_start(
3948 &mut self,
3949 instance: Instance,
3950 callback: *mut VMFuncRef,
3951 callee: NonNull<VMFuncRef>,
3952 param_count: u32,
3953 storage: *mut MaybeUninit<ValRaw>,
3954 storage_len: usize,
3955 ) -> Result<()>;
3956
3957 unsafe fn async_start(
3960 &mut self,
3961 instance: Instance,
3962 callback: *mut VMFuncRef,
3963 post_return: *mut VMFuncRef,
3964 callee: NonNull<VMFuncRef>,
3965 param_count: u32,
3966 result_count: u32,
3967 flags: u32,
3968 ) -> Result<u32>;
3969
3970 fn future_write(
3972 &mut self,
3973 instance: Instance,
3974 caller: RuntimeComponentInstanceIndex,
3975 ty: TypeFutureTableIndex,
3976 options: OptionsIndex,
3977 future: u32,
3978 address: u32,
3979 ) -> Result<u32>;
3980
3981 fn future_read(
3983 &mut self,
3984 instance: Instance,
3985 caller: RuntimeComponentInstanceIndex,
3986 ty: TypeFutureTableIndex,
3987 options: OptionsIndex,
3988 future: u32,
3989 address: u32,
3990 ) -> Result<u32>;
3991
3992 fn future_drop_writable(
3994 &mut self,
3995 instance: Instance,
3996 ty: TypeFutureTableIndex,
3997 writer: u32,
3998 ) -> Result<()>;
3999
4000 fn stream_write(
4002 &mut self,
4003 instance: Instance,
4004 caller: RuntimeComponentInstanceIndex,
4005 ty: TypeStreamTableIndex,
4006 options: OptionsIndex,
4007 stream: u32,
4008 address: u32,
4009 count: u32,
4010 ) -> Result<u32>;
4011
4012 fn stream_read(
4014 &mut self,
4015 instance: Instance,
4016 caller: RuntimeComponentInstanceIndex,
4017 ty: TypeStreamTableIndex,
4018 options: OptionsIndex,
4019 stream: u32,
4020 address: u32,
4021 count: u32,
4022 ) -> Result<u32>;
4023
4024 fn flat_stream_write(
4027 &mut self,
4028 instance: Instance,
4029 caller: RuntimeComponentInstanceIndex,
4030 ty: TypeStreamTableIndex,
4031 options: OptionsIndex,
4032 payload_size: u32,
4033 payload_align: u32,
4034 stream: u32,
4035 address: u32,
4036 count: u32,
4037 ) -> Result<u32>;
4038
4039 fn flat_stream_read(
4042 &mut self,
4043 instance: Instance,
4044 caller: RuntimeComponentInstanceIndex,
4045 ty: TypeStreamTableIndex,
4046 options: OptionsIndex,
4047 payload_size: u32,
4048 payload_align: u32,
4049 stream: u32,
4050 address: u32,
4051 count: u32,
4052 ) -> Result<u32>;
4053
4054 fn stream_drop_writable(
4056 &mut self,
4057 instance: Instance,
4058 ty: TypeStreamTableIndex,
4059 writer: u32,
4060 ) -> Result<()>;
4061
4062 fn error_context_debug_message(
4064 &mut self,
4065 instance: Instance,
4066 ty: TypeComponentLocalErrorContextTableIndex,
4067 options: OptionsIndex,
4068 err_ctx_handle: u32,
4069 debug_msg_address: u32,
4070 ) -> Result<()>;
4071
4072 fn thread_new_indirect(
4074 &mut self,
4075 instance: Instance,
4076 caller: RuntimeComponentInstanceIndex,
4077 func_ty_idx: TypeFuncIndex,
4078 start_func_table_idx: RuntimeTableIndex,
4079 start_func_idx: u32,
4080 context: i32,
4081 ) -> Result<u32>;
4082}
4083
4084impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4086 unsafe fn prepare_call(
4087 &mut self,
4088 instance: Instance,
4089 memory: *mut VMMemoryDefinition,
4090 start: NonNull<VMFuncRef>,
4091 return_: NonNull<VMFuncRef>,
4092 caller_instance: RuntimeComponentInstanceIndex,
4093 callee_instance: RuntimeComponentInstanceIndex,
4094 task_return_type: TypeTupleIndex,
4095 callee_async: bool,
4096 string_encoding: StringEncoding,
4097 result_count_or_max_if_async: u32,
4098 storage: *mut ValRaw,
4099 storage_len: usize,
4100 ) -> Result<()> {
4101 let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4105
4106 unsafe {
4107 instance.prepare_call(
4108 StoreContextMut(self),
4109 start,
4110 return_,
4111 caller_instance,
4112 callee_instance,
4113 task_return_type,
4114 callee_async,
4115 memory,
4116 string_encoding,
4117 match result_count_or_max_if_async {
4118 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4119 params,
4120 has_result: false,
4121 },
4122 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4123 params,
4124 has_result: true,
4125 },
4126 result_count => CallerInfo::Sync {
4127 params,
4128 result_count,
4129 },
4130 },
4131 )
4132 }
4133 }
4134
4135 unsafe fn sync_start(
4136 &mut self,
4137 instance: Instance,
4138 callback: *mut VMFuncRef,
4139 callee: NonNull<VMFuncRef>,
4140 param_count: u32,
4141 storage: *mut MaybeUninit<ValRaw>,
4142 storage_len: usize,
4143 ) -> Result<()> {
4144 unsafe {
4145 instance
4146 .start_call(
4147 StoreContextMut(self),
4148 callback,
4149 ptr::null_mut(),
4150 callee,
4151 param_count,
4152 1,
4153 START_FLAG_ASYNC_CALLEE,
4154 Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4158 )
4159 .map(drop)
4160 }
4161 }
4162
4163 unsafe fn async_start(
4164 &mut self,
4165 instance: Instance,
4166 callback: *mut VMFuncRef,
4167 post_return: *mut VMFuncRef,
4168 callee: NonNull<VMFuncRef>,
4169 param_count: u32,
4170 result_count: u32,
4171 flags: u32,
4172 ) -> Result<u32> {
4173 unsafe {
4174 instance.start_call(
4175 StoreContextMut(self),
4176 callback,
4177 post_return,
4178 callee,
4179 param_count,
4180 result_count,
4181 flags,
4182 None,
4183 )
4184 }
4185 }
4186
4187 fn future_write(
4188 &mut self,
4189 instance: Instance,
4190 caller: RuntimeComponentInstanceIndex,
4191 ty: TypeFutureTableIndex,
4192 options: OptionsIndex,
4193 future: u32,
4194 address: u32,
4195 ) -> Result<u32> {
4196 instance
4197 .guest_write(
4198 StoreContextMut(self),
4199 caller,
4200 TransmitIndex::Future(ty),
4201 options,
4202 None,
4203 future,
4204 address,
4205 1,
4206 )
4207 .map(|result| result.encode())
4208 }
4209
4210 fn future_read(
4211 &mut self,
4212 instance: Instance,
4213 caller: RuntimeComponentInstanceIndex,
4214 ty: TypeFutureTableIndex,
4215 options: OptionsIndex,
4216 future: u32,
4217 address: u32,
4218 ) -> Result<u32> {
4219 instance
4220 .guest_read(
4221 StoreContextMut(self),
4222 caller,
4223 TransmitIndex::Future(ty),
4224 options,
4225 None,
4226 future,
4227 address,
4228 1,
4229 )
4230 .map(|result| result.encode())
4231 }
4232
4233 fn stream_write(
4234 &mut self,
4235 instance: Instance,
4236 caller: RuntimeComponentInstanceIndex,
4237 ty: TypeStreamTableIndex,
4238 options: OptionsIndex,
4239 stream: u32,
4240 address: u32,
4241 count: u32,
4242 ) -> Result<u32> {
4243 instance
4244 .guest_write(
4245 StoreContextMut(self),
4246 caller,
4247 TransmitIndex::Stream(ty),
4248 options,
4249 None,
4250 stream,
4251 address,
4252 count,
4253 )
4254 .map(|result| result.encode())
4255 }
4256
4257 fn stream_read(
4258 &mut self,
4259 instance: Instance,
4260 caller: RuntimeComponentInstanceIndex,
4261 ty: TypeStreamTableIndex,
4262 options: OptionsIndex,
4263 stream: u32,
4264 address: u32,
4265 count: u32,
4266 ) -> Result<u32> {
4267 instance
4268 .guest_read(
4269 StoreContextMut(self),
4270 caller,
4271 TransmitIndex::Stream(ty),
4272 options,
4273 None,
4274 stream,
4275 address,
4276 count,
4277 )
4278 .map(|result| result.encode())
4279 }
4280
4281 fn future_drop_writable(
4282 &mut self,
4283 instance: Instance,
4284 ty: TypeFutureTableIndex,
4285 writer: u32,
4286 ) -> Result<()> {
4287 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4288 }
4289
4290 fn flat_stream_write(
4291 &mut self,
4292 instance: Instance,
4293 caller: RuntimeComponentInstanceIndex,
4294 ty: TypeStreamTableIndex,
4295 options: OptionsIndex,
4296 payload_size: u32,
4297 payload_align: u32,
4298 stream: u32,
4299 address: u32,
4300 count: u32,
4301 ) -> Result<u32> {
4302 instance
4303 .guest_write(
4304 StoreContextMut(self),
4305 caller,
4306 TransmitIndex::Stream(ty),
4307 options,
4308 Some(FlatAbi {
4309 size: payload_size,
4310 align: payload_align,
4311 }),
4312 stream,
4313 address,
4314 count,
4315 )
4316 .map(|result| result.encode())
4317 }
4318
4319 fn flat_stream_read(
4320 &mut self,
4321 instance: Instance,
4322 caller: RuntimeComponentInstanceIndex,
4323 ty: TypeStreamTableIndex,
4324 options: OptionsIndex,
4325 payload_size: u32,
4326 payload_align: u32,
4327 stream: u32,
4328 address: u32,
4329 count: u32,
4330 ) -> Result<u32> {
4331 instance
4332 .guest_read(
4333 StoreContextMut(self),
4334 caller,
4335 TransmitIndex::Stream(ty),
4336 options,
4337 Some(FlatAbi {
4338 size: payload_size,
4339 align: payload_align,
4340 }),
4341 stream,
4342 address,
4343 count,
4344 )
4345 .map(|result| result.encode())
4346 }
4347
4348 fn stream_drop_writable(
4349 &mut self,
4350 instance: Instance,
4351 ty: TypeStreamTableIndex,
4352 writer: u32,
4353 ) -> Result<()> {
4354 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4355 }
4356
4357 fn error_context_debug_message(
4358 &mut self,
4359 instance: Instance,
4360 ty: TypeComponentLocalErrorContextTableIndex,
4361 options: OptionsIndex,
4362 err_ctx_handle: u32,
4363 debug_msg_address: u32,
4364 ) -> Result<()> {
4365 instance.error_context_debug_message(
4366 StoreContextMut(self),
4367 ty,
4368 options,
4369 err_ctx_handle,
4370 debug_msg_address,
4371 )
4372 }
4373
4374 fn thread_new_indirect(
4375 &mut self,
4376 instance: Instance,
4377 caller: RuntimeComponentInstanceIndex,
4378 func_ty_idx: TypeFuncIndex,
4379 start_func_table_idx: RuntimeTableIndex,
4380 start_func_idx: u32,
4381 context: i32,
4382 ) -> Result<u32> {
4383 instance.thread_new_indirect(
4384 StoreContextMut(self),
4385 caller,
4386 func_ty_idx,
4387 start_func_table_idx,
4388 start_func_idx,
4389 context,
4390 )
4391 }
4392}
4393
4394type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4395
4396pub(crate) struct HostTask {
4400 common: WaitableCommon,
4401
4402 caller: QualifiedThreadId,
4404
4405 call_context: CallContext,
4408
4409 state: HostTaskState,
4410}
4411
4412enum HostTaskState {
4413 CalleeStarted,
4418
4419 CalleeRunning(JoinHandle),
4424
4425 CalleeFinished(LiftedResult),
4429
4430 CalleeDone { cancelled: bool },
4433}
4434
4435impl HostTask {
4436 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4437 Self {
4438 common: WaitableCommon::default(),
4439 call_context: CallContext::default(),
4440 caller,
4441 state,
4442 }
4443 }
4444}
4445
4446impl TableDebug for HostTask {
4447 fn type_name() -> &'static str {
4448 "HostTask"
4449 }
4450}
4451
4452type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4453
4454enum Caller {
4456 Host {
4458 tx: Option<oneshot::Sender<LiftedResult>>,
4460 host_future_present: bool,
4463 caller: CurrentThread,
4467 },
4468 Guest {
4470 thread: QualifiedThreadId,
4472 },
4473}
4474
4475struct LiftResult {
4478 lift: RawLift,
4479 ty: TypeTupleIndex,
4480 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4481 string_encoding: StringEncoding,
4482}
4483
4484#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4489pub(crate) struct QualifiedThreadId {
4490 task: TableId<GuestTask>,
4491 thread: TableId<GuestThread>,
4492}
4493
4494impl QualifiedThreadId {
4495 fn qualify(
4496 state: &mut ConcurrentState,
4497 thread: TableId<GuestThread>,
4498 ) -> Result<QualifiedThreadId> {
4499 Ok(QualifiedThreadId {
4500 task: state.get_mut(thread)?.parent_task,
4501 thread,
4502 })
4503 }
4504}
4505
4506impl fmt::Debug for QualifiedThreadId {
4507 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4508 f.debug_tuple("QualifiedThreadId")
4509 .field(&self.task.rep())
4510 .field(&self.thread.rep())
4511 .finish()
4512 }
4513}
4514
4515enum GuestThreadState {
4516 NotStartedImplicit,
4517 NotStartedExplicit(
4518 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4519 ),
4520 Running,
4521 Suspended(StoreFiber<'static>),
4522 Ready {
4523 fiber: StoreFiber<'static>,
4524 cancellable: bool,
4525 },
4526 Completed,
4527}
4528pub struct GuestThread {
4529 context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4532 parent_task: TableId<GuestTask>,
4534 wake_on_cancel: Option<TableId<WaitableSet>>,
4537 state: GuestThreadState,
4539 instance_rep: Option<u32>,
4542 sync_call_set: TableId<WaitableSet>,
4544}
4545
4546impl GuestThread {
4547 fn from_instance(
4550 state: Pin<&mut ComponentInstance>,
4551 caller_instance: RuntimeComponentInstanceIndex,
4552 guest_thread: u32,
4553 ) -> Result<TableId<Self>> {
4554 let rep = state.instance_states().0[caller_instance]
4555 .thread_handle_table()
4556 .guest_thread_rep(guest_thread)?;
4557 Ok(TableId::new(rep))
4558 }
4559
4560 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4561 let sync_call_set = state.push(WaitableSet {
4562 is_sync_call_set: true,
4563 ..WaitableSet::default()
4564 })?;
4565 Ok(Self {
4566 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4567 parent_task,
4568 wake_on_cancel: None,
4569 state: GuestThreadState::NotStartedImplicit,
4570 instance_rep: None,
4571 sync_call_set,
4572 })
4573 }
4574
4575 fn new_explicit(
4576 state: &mut ConcurrentState,
4577 parent_task: TableId<GuestTask>,
4578 start_func: Box<
4579 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4580 >,
4581 ) -> Result<Self> {
4582 let sync_call_set = state.push(WaitableSet {
4583 is_sync_call_set: true,
4584 ..WaitableSet::default()
4585 })?;
4586 Ok(Self {
4587 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4588 parent_task,
4589 wake_on_cancel: None,
4590 state: GuestThreadState::NotStartedExplicit(start_func),
4591 instance_rep: None,
4592 sync_call_set,
4593 })
4594 }
4595}
4596
4597impl TableDebug for GuestThread {
4598 fn type_name() -> &'static str {
4599 "GuestThread"
4600 }
4601}
4602
4603enum SyncResult {
4604 NotProduced,
4605 Produced(Option<ValRaw>),
4606 Taken,
4607}
4608
4609impl SyncResult {
4610 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4611 Ok(match mem::replace(self, SyncResult::Taken) {
4612 SyncResult::NotProduced => None,
4613 SyncResult::Produced(val) => Some(val),
4614 SyncResult::Taken => {
4615 bail_bug!("attempted to take a synchronous result that was already taken")
4616 }
4617 })
4618 }
4619}
4620
4621#[derive(Debug)]
4622enum HostFutureState {
4623 NotApplicable,
4624 Live,
4625 Dropped,
4626}
4627
4628pub(crate) struct GuestTask {
4630 common: WaitableCommon,
4632 lower_params: Option<RawLower>,
4634 lift_result: Option<LiftResult>,
4636 result: Option<LiftedResult>,
4639 callback: Option<CallbackFn>,
4642 caller: Caller,
4644 call_context: CallContext,
4649 sync_result: SyncResult,
4652 cancel_sent: bool,
4655 starting_sent: bool,
4658 instance: RuntimeInstance,
4665 event: Option<Event>,
4668 exited: bool,
4670 threads: HashSet<TableId<GuestThread>>,
4672 host_future_state: HostFutureState,
4675 async_function: bool,
4678
4679 decremented_interesting_task_count: bool,
4680}
4681
4682impl GuestTask {
4683 fn already_lowered_parameters(&self) -> bool {
4684 self.lower_params.is_none()
4686 }
4687
4688 fn returned_or_cancelled(&self) -> bool {
4689 self.lift_result.is_none()
4691 }
4692
4693 fn ready_to_delete(&self) -> bool {
4694 let threads_completed = self.threads.is_empty();
4695 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4696 let pending_completion_event = matches!(
4697 self.common.event,
4698 Some(Event::Subtask {
4699 status: Status::Returned | Status::ReturnCancelled
4700 })
4701 );
4702 let ready = threads_completed
4703 && !has_sync_result
4704 && !pending_completion_event
4705 && !matches!(self.host_future_state, HostFutureState::Live);
4706 log::trace!(
4707 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4708 threads_completed,
4709 has_sync_result,
4710 pending_completion_event,
4711 self.host_future_state
4712 );
4713 ready
4714 }
4715
4716 fn new(
4717 state: &mut ConcurrentState,
4718 lower_params: RawLower,
4719 lift_result: LiftResult,
4720 caller: Caller,
4721 callback: Option<CallbackFn>,
4722 instance: RuntimeInstance,
4723 async_function: bool,
4724 ) -> Result<QualifiedThreadId> {
4725 let host_future_state = match &caller {
4726 Caller::Guest { .. } => HostFutureState::NotApplicable,
4727 Caller::Host {
4728 host_future_present,
4729 ..
4730 } => {
4731 if *host_future_present {
4732 HostFutureState::Live
4733 } else {
4734 HostFutureState::NotApplicable
4735 }
4736 }
4737 };
4738 let task = state.push(Self {
4739 common: WaitableCommon::default(),
4740 lower_params: Some(lower_params),
4741 lift_result: Some(lift_result),
4742 result: None,
4743 callback,
4744 caller,
4745 call_context: CallContext::default(),
4746 sync_result: SyncResult::NotProduced,
4747 cancel_sent: false,
4748 starting_sent: false,
4749 instance,
4750 event: None,
4751 exited: false,
4752 threads: HashSet::new(),
4753 host_future_state,
4754 async_function,
4755 decremented_interesting_task_count: false,
4756 })?;
4757 let new_thread = GuestThread::new_implicit(state, task)?;
4758 let thread = state.push(new_thread)?;
4759 state.get_mut(task)?.threads.insert(thread);
4760 state.interesting_tasks += 1;
4761 Ok(QualifiedThreadId { task, thread })
4762 }
4763}
4764
4765impl TableDebug for GuestTask {
4766 fn type_name() -> &'static str {
4767 "GuestTask"
4768 }
4769}
4770
4771#[derive(Default)]
4773struct WaitableCommon {
4774 event: Option<Event>,
4776 set: Option<TableId<WaitableSet>>,
4778 handle: Option<u32>,
4780}
4781
4782#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4784enum Waitable {
4785 Host(TableId<HostTask>),
4787 Guest(TableId<GuestTask>),
4789 Transmit(TableId<TransmitHandle>),
4791}
4792
4793impl Waitable {
4794 fn from_instance(
4797 state: Pin<&mut ComponentInstance>,
4798 caller_instance: RuntimeComponentInstanceIndex,
4799 waitable: u32,
4800 ) -> Result<Self> {
4801 use crate::runtime::vm::component::Waitable;
4802
4803 let (waitable, kind) = state.instance_states().0[caller_instance]
4804 .handle_table()
4805 .waitable_rep(waitable)?;
4806
4807 Ok(match kind {
4808 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4809 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4810 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4811 })
4812 }
4813
4814 fn rep(&self) -> u32 {
4816 match self {
4817 Self::Host(id) => id.rep(),
4818 Self::Guest(id) => id.rep(),
4819 Self::Transmit(id) => id.rep(),
4820 }
4821 }
4822
4823 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4827 log::trace!("waitable {self:?} join set {set:?}");
4828
4829 let old = mem::replace(&mut self.common(state)?.set, set);
4830
4831 if let Some(old) = old {
4832 match *self {
4833 Waitable::Host(id) => state.remove_child(id, old),
4834 Waitable::Guest(id) => state.remove_child(id, old),
4835 Waitable::Transmit(id) => state.remove_child(id, old),
4836 }?;
4837
4838 state.get_mut(old)?.ready.remove(self);
4839 }
4840
4841 if let Some(set) = set {
4842 match *self {
4843 Waitable::Host(id) => state.add_child(id, set),
4844 Waitable::Guest(id) => state.add_child(id, set),
4845 Waitable::Transmit(id) => state.add_child(id, set),
4846 }?;
4847
4848 if self.common(state)?.event.is_some() {
4849 self.mark_ready(state)?;
4850 }
4851 }
4852
4853 Ok(())
4854 }
4855
4856 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4858 Ok(match self {
4859 Self::Host(id) => &mut state.get_mut(*id)?.common,
4860 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4861 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4862 })
4863 }
4864
4865 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4869 log::trace!("set event for {self:?}: {event:?}");
4870 self.common(state)?.event = event;
4871 self.mark_ready(state)
4872 }
4873
4874 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4876 let common = self.common(state)?;
4877 let event = common.event.take();
4878 if let Some(set) = self.common(state)?.set {
4879 state.get_mut(set)?.ready.remove(self);
4880 }
4881
4882 Ok(event)
4883 }
4884
4885 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4889 if let Some(set) = self.common(state)?.set {
4890 state.get_mut(set)?.ready.insert(*self);
4891 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4892 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4893 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4894
4895 let item = match mode {
4896 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4897 WaitMode::Callback(instance) => WorkItem::GuestCall(
4898 state.get_mut(thread.task)?.instance.index,
4899 GuestCall {
4900 thread,
4901 kind: GuestCallKind::DeliverEvent {
4902 instance,
4903 set: Some(set),
4904 },
4905 },
4906 ),
4907 };
4908 state.push_high_priority(item);
4909 }
4910 }
4911 Ok(())
4912 }
4913
4914 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4916 match self {
4917 Self::Host(task) => {
4918 log::trace!("delete host task {task:?}");
4919 state.delete(*task)?;
4920 }
4921 Self::Guest(task) => {
4922 log::trace!("delete guest task {task:?}");
4923 let task = state.delete(*task)?;
4924
4925 debug_assert!(task.decremented_interesting_task_count);
4932 }
4933 Self::Transmit(task) => {
4934 state.delete(*task)?;
4935 }
4936 }
4937
4938 Ok(())
4939 }
4940}
4941
4942impl fmt::Debug for Waitable {
4943 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4944 match self {
4945 Self::Host(id) => write!(f, "{id:?}"),
4946 Self::Guest(id) => write!(f, "{id:?}"),
4947 Self::Transmit(id) => write!(f, "{id:?}"),
4948 }
4949 }
4950}
4951
4952#[derive(Default)]
4954struct WaitableSet {
4955 ready: BTreeSet<Waitable>,
4957 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4959 is_sync_call_set: bool,
4962}
4963
4964impl TableDebug for WaitableSet {
4965 fn type_name() -> &'static str {
4966 "WaitableSet"
4967 }
4968}
4969
4970type RawLower =
4972 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4973
4974type RawLift = Box<
4976 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4977>;
4978
4979type LiftedResult = Box<dyn Any + Send + Sync>;
4983
4984struct DummyResult;
4987
4988#[derive(Default)]
4990pub struct ConcurrentInstanceState {
4991 backpressure: u16,
4993 do_not_enter: bool,
4995 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4998}
4999
5000impl ConcurrentInstanceState {
5001 pub fn pending_is_empty(&self) -> bool {
5002 self.pending.is_empty()
5003 }
5004}
5005
5006#[derive(Debug, Copy, Clone)]
5007pub(crate) enum CurrentThread {
5008 Guest(QualifiedThreadId),
5009 Host(TableId<HostTask>),
5010 None,
5011}
5012
5013impl CurrentThread {
5014 fn guest(&self) -> Option<&QualifiedThreadId> {
5015 match self {
5016 Self::Guest(id) => Some(id),
5017 _ => None,
5018 }
5019 }
5020
5021 fn host(&self) -> Option<TableId<HostTask>> {
5022 match self {
5023 Self::Host(id) => Some(*id),
5024 _ => None,
5025 }
5026 }
5027
5028 fn is_none(&self) -> bool {
5029 matches!(self, Self::None)
5030 }
5031}
5032
5033impl From<QualifiedThreadId> for CurrentThread {
5034 fn from(id: QualifiedThreadId) -> Self {
5035 Self::Guest(id)
5036 }
5037}
5038
5039impl From<TableId<HostTask>> for CurrentThread {
5040 fn from(id: TableId<HostTask>) -> Self {
5041 Self::Host(id)
5042 }
5043}
5044
5045pub struct ConcurrentState {
5047 current_thread: CurrentThread,
5049
5050 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5055 table: AlwaysMut<ResourceTable>,
5057 high_priority: Vec<WorkItem>,
5059 low_priority: VecDeque<WorkItem>,
5061 suspend_reason: Option<SuspendReason>,
5065 worker: Option<StoreFiber<'static>>,
5069 worker_item: Option<WorkerItem>,
5071
5072 global_error_context_ref_counts:
5085 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5086
5087 interesting_tasks: usize,
5100
5101 interesting_tasks_empty_waker: Option<Waker>,
5105}
5106
5107impl Default for ConcurrentState {
5108 fn default() -> Self {
5109 Self {
5110 current_thread: CurrentThread::None,
5111 table: AlwaysMut::new(ResourceTable::new()),
5112 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5113 high_priority: Vec::new(),
5114 low_priority: VecDeque::new(),
5115 suspend_reason: None,
5116 worker: None,
5117 worker_item: None,
5118 global_error_context_ref_counts: BTreeMap::new(),
5119 interesting_tasks: 0,
5120 interesting_tasks_empty_waker: None,
5121 }
5122 }
5123}
5124
5125impl ConcurrentState {
5126 pub(crate) fn take_fibers_and_futures(
5143 &mut self,
5144 fibers: &mut Vec<StoreFiber<'static>>,
5145 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5146 ) {
5147 for entry in self.table.get_mut().iter_mut() {
5148 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5149 for mode in mem::take(&mut set.waiting).into_values() {
5150 if let WaitMode::Fiber(fiber) = mode {
5151 fibers.push(fiber);
5152 }
5153 }
5154 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5155 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5156 mem::replace(&mut thread.state, GuestThreadState::Completed)
5157 {
5158 fibers.push(fiber);
5159 }
5160 }
5161 }
5162
5163 if let Some(fiber) = self.worker.take() {
5164 fibers.push(fiber);
5165 }
5166
5167 let mut handle_item = |item| match item {
5168 WorkItem::ResumeFiber(fiber) => {
5169 fibers.push(fiber);
5170 }
5171 WorkItem::PushFuture(future) => {
5172 self.futures
5173 .get_mut()
5174 .as_mut()
5175 .unwrap()
5176 .push(future.into_inner());
5177 }
5178 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5179 }
5180 };
5181
5182 for item in mem::take(&mut self.high_priority) {
5183 handle_item(item);
5184 }
5185 for item in mem::take(&mut self.low_priority) {
5186 handle_item(item);
5187 }
5188
5189 if let Some(them) = self.futures.get_mut().take() {
5190 futures.push(them);
5191 }
5192 }
5193
5194 #[cfg(feature = "gc")]
5195 pub(crate) fn trace_fiber_roots(
5196 &mut self,
5197 modules: &ModuleRegistry,
5198 unwind: &dyn Unwind,
5199 gc_roots_list: &mut GcRootsList,
5200 ) {
5201 let ConcurrentState {
5202 table,
5203 worker,
5204 high_priority,
5205 low_priority,
5206
5207 futures: _,
5211
5212 worker_item: _,
5214 current_thread: _,
5215 suspend_reason: _,
5216 global_error_context_ref_counts: _,
5217 interesting_tasks: _,
5218 interesting_tasks_empty_waker: _,
5219 } = self;
5220
5221 for entry in table.get_mut().iter_mut() {
5222 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5223 for mode in set.waiting.values_mut() {
5224 if let WaitMode::Fiber(fiber) = mode {
5225 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5226 }
5227 }
5228 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5229 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5230 &mut thread.state
5231 {
5232 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5233 }
5234 }
5235 }
5236
5237 if let Some(fiber) = worker {
5238 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5239 }
5240
5241 let mut handle_item = |item: &mut WorkItem| match item {
5242 WorkItem::ResumeFiber(fiber) => {
5243 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5244 }
5245 WorkItem::PushFuture(_future) => {
5246 }
5249 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5250 }
5251 };
5252
5253 for item in high_priority {
5254 handle_item(item);
5255 }
5256 for item in low_priority {
5257 handle_item(item);
5258 }
5259 }
5260
5261 fn push<V: Send + Sync + 'static>(
5262 &mut self,
5263 value: V,
5264 ) -> Result<TableId<V>, ResourceTableError> {
5265 self.table.get_mut().push(value).map(TableId::from)
5266 }
5267
5268 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5269 self.table.get_mut().get_mut(&Resource::from(id))
5270 }
5271
5272 pub fn add_child<T: 'static, U: 'static>(
5273 &mut self,
5274 child: TableId<T>,
5275 parent: TableId<U>,
5276 ) -> Result<(), ResourceTableError> {
5277 self.table
5278 .get_mut()
5279 .add_child(Resource::from(child), Resource::from(parent))
5280 }
5281
5282 pub fn remove_child<T: 'static, U: 'static>(
5283 &mut self,
5284 child: TableId<T>,
5285 parent: TableId<U>,
5286 ) -> Result<(), ResourceTableError> {
5287 self.table
5288 .get_mut()
5289 .remove_child(Resource::from(child), Resource::from(parent))
5290 }
5291
5292 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5293 self.table.get_mut().delete(Resource::from(id))
5294 }
5295
5296 fn push_future(&mut self, future: HostTaskFuture) {
5297 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5304 }
5305
5306 fn push_high_priority(&mut self, item: WorkItem) {
5307 log::trace!("push high priority: {item:?}");
5308 self.high_priority.push(item);
5309 }
5310
5311 fn push_low_priority(&mut self, item: WorkItem) {
5312 log::trace!("push low priority: {item:?}");
5313 self.low_priority.push_front(item);
5314 }
5315
5316 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5317 if high_priority {
5318 self.push_high_priority(item);
5319 } else {
5320 self.push_low_priority(item);
5321 }
5322 }
5323
5324 fn promote_instance_local_thread_work_item(
5325 &mut self,
5326 current_instance: RuntimeComponentInstanceIndex,
5327 ) -> bool {
5328 self.promote_work_items_matching(|item: &WorkItem| match item {
5329 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5330 *instance == current_instance
5331 }
5332 _ => false,
5333 })
5334 }
5335
5336 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5337 self.promote_work_items_matching(|item: &WorkItem| match item {
5338 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5339 *t == thread
5340 }
5341 _ => false,
5342 })
5343 }
5344
5345 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5346 where
5347 F: FnMut(&WorkItem) -> bool,
5348 {
5349 if self.high_priority.iter().any(&mut predicate) {
5353 true
5354 }
5355 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5358 let item = self.low_priority.remove(idx).unwrap();
5359 self.push_high_priority(item);
5360 true
5361 } else {
5362 false
5363 }
5364 }
5365
5366 fn take_pending_cancellation(&mut self) -> Result<bool> {
5369 let thread = self.current_guest_thread()?;
5370 let task = self.get_mut(thread.task)?;
5371 if let Some(Event::Cancelled) = task.event {
5372 task.event.take();
5373 return Ok(true);
5374 }
5375 Ok(false)
5376 }
5377
5378 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5379 if self.may_block(task)? {
5380 Ok(())
5381 } else {
5382 Err(Trap::CannotBlockSyncTask.into())
5383 }
5384 }
5385
5386 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5387 let task = self.get_mut(task)?;
5388 Ok(task.async_function || task.returned_or_cancelled())
5389 }
5390
5391 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5397 let (task, is_host) = (task >> 1, task & 1 == 1);
5398 if is_host {
5399 let task: TableId<HostTask> = TableId::new(task);
5400 Ok(&mut self.get_mut(task)?.call_context)
5401 } else {
5402 let task: TableId<GuestTask> = TableId::new(task);
5403 Ok(&mut self.get_mut(task)?.call_context)
5404 }
5405 }
5406
5407 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5410 let (bits, is_host) = match self.current_thread {
5411 CurrentThread::Guest(id) => (id.task.rep(), false),
5412 CurrentThread::Host(id) => (id.rep(), true),
5413 CurrentThread::None => bail_bug!("current thread is not set"),
5414 };
5415 assert_eq!((bits << 1) >> 1, bits);
5416 Ok((bits << 1) | u32::from(is_host))
5417 }
5418
5419 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5420 match self.current_thread.guest() {
5421 Some(id) => Ok(*id),
5422 None => bail_bug!("current thread is not a guest thread"),
5423 }
5424 }
5425
5426 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5427 match self.current_thread.host() {
5428 Some(id) => Ok(id),
5429 None => bail_bug!("current thread is not a host thread"),
5430 }
5431 }
5432
5433 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5434 match self.futures.get_mut().as_mut() {
5435 Some(f) => Ok(f),
5436 None => bail_bug!("futures field of concurrent state is currently taken"),
5437 }
5438 }
5439
5440 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5441 self.table.get_mut()
5442 }
5443
5444 fn parent(&mut self, cur: CurrentThread) -> Option<CurrentThread> {
5446 match cur {
5447 CurrentThread::Guest(thread) => {
5448 let task = self.get_mut(thread.task).ok()?;
5449 Some(match task.caller {
5450 Caller::Host { caller, .. } => caller,
5451 Caller::Guest { thread } => thread.into(),
5452 })
5453 }
5454 CurrentThread::Host(id) => Some(self.get_mut(id).ok()?.caller.into()),
5455 CurrentThread::None => None,
5456 }
5457 }
5458}
5459
5460fn for_any_lower<
5463 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5464>(
5465 fun: F,
5466) -> F {
5467 fun
5468}
5469
5470fn for_any_lift<
5472 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5473>(
5474 fun: F,
5475) -> F {
5476 fun
5477}
5478
5479fn check_ambient_store(id: StoreId) {
5480 let message = "\
5481 `Future`s which depend on asynchronous component tasks, streams, or \
5482 futures to complete may only be polled from the event loop of the \
5483 store to which they belong. Please use \
5484 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5485 ";
5486 tls::try_get(|store| {
5487 let matched = match store {
5488 tls::TryGet::Some(store) => store.id() == id,
5489 tls::TryGet::Taken | tls::TryGet::None => false,
5490 };
5491
5492 if !matched {
5493 panic!("{message}")
5494 }
5495 });
5496}
5497
5498fn check_recursive_run() {
5501 tls::try_get(|store| {
5502 if !matches!(store, tls::TryGet::None) {
5503 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5504 }
5505 });
5506}
5507
5508fn unpack_callback_code(code: u32) -> (u32, u32) {
5509 (code & 0xF, code >> 4)
5510}
5511
5512struct WaitableCheckParams {
5516 set: TableId<WaitableSet>,
5517 options: OptionsIndex,
5518 payload: u32,
5519}
5520
5521enum WaitableCheck {
5524 Wait,
5525 Poll,
5526}
5527
5528#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
5537pub struct GuestTaskId(TableId<GuestTask>);
5538
5539pub(crate) struct PreparedCall<R> {
5541 handle: Func,
5543 thread: QualifiedThreadId,
5545 param_count: usize,
5547 rx: oneshot::Receiver<LiftedResult>,
5550 runtime_instance: RuntimeInstance,
5552 _phantom: PhantomData<R>,
5553}
5554
5555impl<R> PreparedCall<R> {
5556 pub(crate) fn task_id(&self) -> TaskId {
5558 TaskId {
5559 task: self.thread.task,
5560 runtime_instance: self.runtime_instance,
5561 }
5562 }
5563}
5564
5565pub(crate) struct TaskId {
5567 task: TableId<GuestTask>,
5568 runtime_instance: RuntimeInstance,
5569}
5570
5571impl TaskId {
5572 pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5578 let task = store.concurrent_state_mut().get_mut(self.task)?;
5579 let delete = if !task.already_lowered_parameters() {
5580 store.cancel_guest_subtask_without_lowered_parameters(
5581 self.runtime_instance,
5582 self.task,
5583 )?;
5584 true
5585 } else {
5586 task.host_future_state = HostFutureState::Dropped;
5587 task.ready_to_delete()
5588 };
5589 if delete {
5590 Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5591 }
5592 Ok(())
5593 }
5594}
5595
5596pub(crate) fn prepare_call<T, R>(
5602 mut store: StoreContextMut<T>,
5603 handle: Func,
5604 param_count: usize,
5605 host_future_present: bool,
5606 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5607 + Send
5608 + Sync
5609 + 'static,
5610 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5611 + Send
5612 + Sync
5613 + 'static,
5614) -> Result<PreparedCall<R>> {
5615 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5616
5617 let instance = handle.instance().id().get(store.0);
5618 let options = &instance.component().env_component().options[options];
5619 let ty = &instance.component().types()[ty];
5620 let async_function = ty.async_;
5621 let task_return_type = ty.results;
5622 let component_instance = raw_options.instance;
5623 let callback = options.callback.map(|i| instance.runtime_callback(i));
5624 let memory = options
5625 .memory()
5626 .map(|i| instance.runtime_memory(i))
5627 .map(SendSyncPtr::new);
5628 let string_encoding = options.string_encoding;
5629 let token = StoreToken::new(store.as_context_mut());
5630 let state = store.0.concurrent_state_mut();
5631
5632 let (tx, rx) = oneshot::channel();
5633
5634 let instance = handle.instance().runtime_instance(component_instance);
5635 let caller = state.current_thread;
5636 let thread = GuestTask::new(
5637 state,
5638 Box::new(for_any_lower(move |store, params| {
5639 lower_params(handle, token.as_context_mut(store), params)
5640 })),
5641 LiftResult {
5642 lift: Box::new(for_any_lift(move |store, result| {
5643 lift_result(handle, store, result)
5644 })),
5645 ty: task_return_type,
5646 memory,
5647 string_encoding,
5648 },
5649 Caller::Host {
5650 tx: Some(tx),
5651 host_future_present,
5652 caller,
5653 },
5654 callback.map(|callback| {
5655 let callback = SendSyncPtr::new(callback);
5656 let instance = handle.instance();
5657 Box::new(move |store: &mut dyn VMStore, event, handle| {
5658 let store = token.as_context_mut(store);
5659 unsafe { instance.call_callback(store, callback, event, handle) }
5662 }) as CallbackFn
5663 }),
5664 instance,
5665 async_function,
5666 )?;
5667
5668 if !store.0.may_enter(instance)? {
5669 bail!(Trap::CannotEnterComponent);
5670 }
5671
5672 Ok(PreparedCall {
5673 handle,
5674 thread,
5675 param_count,
5676 runtime_instance: instance,
5677 rx,
5678 _phantom: PhantomData,
5679 })
5680}
5681
5682pub(crate) struct QueuedCall<R> {
5683 store: StoreId,
5684 task: TableId<GuestTask>,
5685 rx: oneshot::Receiver<LiftedResult>,
5686 _marker: PhantomData<fn() -> R>,
5687}
5688
5689impl<R> QueuedCall<R> {
5690 pub(crate) fn new<T: 'static>(
5697 mut store: StoreContextMut<T>,
5698 prepared: PreparedCall<R>,
5699 ) -> Result<QueuedCall<R>> {
5700 let PreparedCall {
5701 handle,
5702 thread,
5703 param_count,
5704 rx,
5705 ..
5706 } = prepared;
5707
5708 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5709
5710 Ok(QueuedCall {
5711 store: store.0.id(),
5712 task: thread.task,
5713 rx,
5714 _marker: PhantomData,
5715 })
5716 }
5717
5718 fn task(&self) -> GuestTaskId {
5719 GuestTaskId(self.task)
5720 }
5721}
5722
5723impl<R> Future for QueuedCall<R>
5724where
5725 R: 'static,
5726{
5727 type Output = Result<R>;
5728
5729 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5730 check_ambient_store(self.store);
5731 Pin::new(&mut self.rx).poll(cx).map(|result| match result {
5732 Ok(r) => match r.downcast() {
5733 Ok(r) => Ok(*r),
5734 Err(_) => bail_bug!("wrong type of value produced"),
5735 },
5736 Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5737 })
5738 }
5739}
5740
5741fn queue_call0<T: 'static>(
5744 store: StoreContextMut<T>,
5745 handle: Func,
5746 guest_thread: QualifiedThreadId,
5747 param_count: usize,
5748) -> Result<()> {
5749 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5750 let is_concurrent = raw_options.async_;
5751 let callback = raw_options.callback;
5752 let instance = handle.instance();
5753 let callee = handle.lifted_core_func(store.0);
5754 let post_return = handle.post_return_core_func(store.0);
5755 let callback = callback.map(|i| {
5756 let instance = instance.id().get(store.0);
5757 SendSyncPtr::new(instance.runtime_callback(i))
5758 });
5759
5760 log::trace!("queueing call {guest_thread:?}");
5761
5762 unsafe {
5766 instance.queue_call(
5767 store,
5768 guest_thread,
5769 SendSyncPtr::new(callee),
5770 param_count,
5771 1,
5772 is_concurrent,
5773 callback,
5774 post_return.map(SendSyncPtr::new),
5775 )
5776 }
5777}