1use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{Func, call_post_return};
56use crate::component::{
57 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61#[cfg(feature = "gc")]
62use crate::module::ModuleRegistry;
63use crate::prelude::*;
64use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
65#[cfg(feature = "gc")]
66use crate::vm::GcRootsList;
67use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
68use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMLazyThread, VMMemoryDefinition, VMStore};
69use crate::{
70 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
71};
72use alloc::borrow::ToOwned;
73use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
74use core::any::Any;
75use core::cell::UnsafeCell;
76use core::fmt;
77use core::future;
78use core::future::Future;
79use core::marker::PhantomData;
80use core::mem::{self, ManuallyDrop, MaybeUninit};
81use core::ops::DerefMut;
82use core::pin::{Pin, pin};
83use core::ptr::{self, NonNull};
84use core::task::{Context, Poll, Waker};
85use futures::channel::oneshot;
86use futures::stream::{FuturesUnordered, StreamExt};
87use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
88use table::{TableDebug, TableId};
89use wasmtime_environ::component::{
90 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
91 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96use wasmtime_environ::packed_option::ReservedValue;
97use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
98#[cfg(feature = "gc")]
99use wasmtime_unwinder::Unwind;
100
101pub use abort::JoinHandle;
102pub use func::{FuncCallConcurrent, TypedFuncCallConcurrent};
103pub use future_stream_any::{FutureAny, StreamAny};
104pub use futures_and_streams::{
105 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
106 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
107 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
108};
109pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
110
111mod abort;
112mod error_contexts;
113mod func;
114mod future_stream_any;
115mod futures_and_streams;
116pub(crate) mod table;
117pub(crate) mod tls;
118
119const BLOCKED: u32 = 0xffff_ffff;
122
123#[derive(Clone, Copy, Eq, PartialEq, Debug)]
125pub enum Status {
126 Starting = 0,
127 Started = 1,
128 Returned = 2,
129 StartCancelled = 3,
130 ReturnCancelled = 4,
131}
132
133impl Status {
134 pub fn pack(self, waitable: Option<u32>) -> u32 {
140 assert!(matches!(self, Status::Returned) == waitable.is_none());
141 let waitable = waitable.unwrap_or(0);
142 assert!(waitable < (1 << 28));
143 (waitable << 4) | (self as u32)
144 }
145}
146
147#[derive(Clone, Copy, Debug)]
150enum Event {
151 None,
152 Subtask {
153 status: Status,
154 },
155 StreamRead {
156 code: ReturnCode,
157 pending: Option<(TypeStreamTableIndex, u32)>,
158 },
159 StreamWrite {
160 code: ReturnCode,
161 pending: Option<(TypeStreamTableIndex, u32)>,
162 },
163 FutureRead {
164 code: ReturnCode,
165 pending: Option<(TypeFutureTableIndex, u32)>,
166 },
167 FutureWrite {
168 code: ReturnCode,
169 pending: Option<(TypeFutureTableIndex, u32)>,
170 },
171 Cancelled,
172}
173
174impl Event {
175 fn parts(self) -> (u32, u32) {
180 const EVENT_NONE: u32 = 0;
181 const EVENT_SUBTASK: u32 = 1;
182 const EVENT_STREAM_READ: u32 = 2;
183 const EVENT_STREAM_WRITE: u32 = 3;
184 const EVENT_FUTURE_READ: u32 = 4;
185 const EVENT_FUTURE_WRITE: u32 = 5;
186 const EVENT_CANCELLED: u32 = 6;
187 match self {
188 Event::None => (EVENT_NONE, 0),
189 Event::Cancelled => (EVENT_CANCELLED, 0),
190 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
191 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
192 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
193 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
194 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
195 }
196 }
197}
198
199mod callback_code {
201 pub const EXIT: u32 = 0;
202 pub const YIELD: u32 = 1;
203 pub const WAIT: u32 = 2;
204}
205
206const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
210
211pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
217 store: StoreContextMut<'a, T>,
218 get_data: fn(&mut T) -> D::Data<'_>,
219}
220
221impl<'a, T, D> Access<'a, T, D>
222where
223 D: HasData + ?Sized,
224 T: 'static,
225{
226 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
228 Self { store, get_data }
229 }
230
231 pub fn data_mut(&mut self) -> &mut T {
233 self.store.data_mut()
234 }
235
236 pub fn get(&mut self) -> D::Data<'_> {
238 (self.get_data)(self.data_mut())
239 }
240
241 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> Result<JoinHandle>
245 where
246 T: 'static,
247 {
248 let accessor = Accessor {
249 get_data: self.get_data,
250 token: StoreToken::new(self.store.as_context_mut()),
251 };
252 self.store
253 .as_context_mut()
254 .spawn_with_accessor(accessor, task)
255 }
256
257 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
260 self.get_data
261 }
262}
263
264impl<'a, T, D> AsContext for Access<'a, T, D>
265where
266 D: HasData + ?Sized,
267 T: 'static,
268{
269 type Data = T;
270
271 fn as_context(&self) -> StoreContext<'_, T> {
272 self.store.as_context()
273 }
274}
275
276impl<'a, T, D> AsContextMut for Access<'a, T, D>
277where
278 D: HasData + ?Sized,
279 T: 'static,
280{
281 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
282 self.store.as_context_mut()
283 }
284}
285
286pub struct Accessor<T: 'static, D = HasSelf<T>>
346where
347 D: HasData + ?Sized,
348{
349 token: StoreToken<T>,
350 get_data: fn(&mut T) -> D::Data<'_>,
351}
352
353pub trait AsAccessor {
370 type Data: 'static;
372
373 type AccessorData: HasData + ?Sized;
376
377 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
379}
380
381impl<T: AsAccessor + ?Sized> AsAccessor for &T {
382 type Data = T::Data;
383 type AccessorData = T::AccessorData;
384
385 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
386 T::as_accessor(self)
387 }
388}
389
390impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
391 type Data = T;
392 type AccessorData = D;
393
394 fn as_accessor(&self) -> &Accessor<T, D> {
395 self
396 }
397}
398
399const _: () = {
422 const fn assert<T: Send + Sync>() {}
423 assert::<Accessor<UnsafeCell<u32>>>();
424};
425
426impl<T> Accessor<T> {
427 pub(crate) fn new(token: StoreToken<T>) -> Self {
436 Self {
437 token,
438 get_data: |x| x,
439 }
440 }
441}
442
443impl<T, D> Accessor<T, D>
444where
445 D: HasData + ?Sized,
446{
447 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
465 tls::get(|vmstore| {
466 fun(Access {
467 store: self.token.as_context_mut(vmstore),
468 get_data: self.get_data,
469 })
470 })
471 }
472
473 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
476 self.get_data
477 }
478
479 pub fn with_getter<D2: HasData>(
496 &self,
497 get_data: fn(&mut T) -> D2::Data<'_>,
498 ) -> Accessor<T, D2> {
499 Accessor {
500 token: self.token,
501 get_data,
502 }
503 }
504
505 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> Result<JoinHandle>
521 where
522 T: 'static,
523 {
524 let accessor = self.clone_for_spawn();
525 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
526 }
527
528 fn clone_for_spawn(&self) -> Self {
529 Self {
530 token: self.token,
531 get_data: self.get_data,
532 }
533 }
534
535 pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
571 self.with(|mut access| {
572 let store = access.as_context_mut().0;
573 let state = store.concurrent_state_mut_without_forcing_current_thread();
574 if state.interesting_tasks == 0 {
575 Poll::Ready(())
576 } else {
577 state.interesting_tasks_empty_waker = Some(cx.waker().clone());
578 Poll::Pending
579 }
580 })
581 }
582}
583
584pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
596where
597 D: HasData + ?Sized,
598{
599 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
601}
602
603enum CallerInfo {
606 Async {
608 params: Vec<ValRaw>,
609 has_result: bool,
610 },
611 Sync {
613 params: Vec<ValRaw>,
614 result_count: u32,
615 },
616}
617
618enum WaitMode {
620 Fiber(StoreFiber<'static>),
622 Callback(Instance),
625}
626
627#[derive(Debug)]
629enum SuspendReason {
630 Waiting {
633 set: TableId<WaitableSet>,
634 thread: QualifiedThreadId,
635 skip_may_block_check: bool,
636 },
637 NeedWork,
640 Yielding {
643 thread: QualifiedThreadId,
644 cancellable: bool,
645 skip_may_block_check: bool,
646 },
647 ExplicitlySuspending {
649 thread: QualifiedThreadId,
650 skip_may_block_check: bool,
651 },
652}
653
654enum GuestCallKind {
656 DeliverEvent {
659 instance: Instance,
661 set: Option<TableId<WaitableSet>>,
666 },
667 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
673 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
674}
675
676impl fmt::Debug for GuestCallKind {
677 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
678 match self {
679 Self::DeliverEvent { instance, set } => f
680 .debug_struct("DeliverEvent")
681 .field("instance", instance)
682 .field("set", set)
683 .finish(),
684 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
685 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
686 }
687 }
688}
689
690#[derive(Copy, Clone, Debug)]
692pub enum SuspensionTarget {
693 SomeSuspended(u32),
694 Some(u32),
695 None,
696}
697
698impl SuspensionTarget {
699 fn is_none(&self) -> bool {
700 matches!(self, SuspensionTarget::None)
701 }
702 fn is_some(&self) -> bool {
703 !self.is_none()
704 }
705}
706
707#[derive(Debug)]
709struct GuestCall {
710 thread: QualifiedThreadId,
711 kind: GuestCallKind,
712}
713
714impl GuestCall {
715 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
725 let instance = store
726 .concurrent_state_mut()?
727 .get_mut(self.thread.task)?
728 .instance;
729 let state = store.instance_state(instance).concurrent_state();
730
731 let ready = match &self.kind {
732 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
733 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
734 GuestCallKind::StartExplicit(_) => true,
735 };
736 log::trace!(
737 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
738 state.do_not_enter,
739 state.backpressure
740 );
741 Ok(ready)
742 }
743}
744
745enum WorkerItem {
747 GuestCall(GuestCall),
748 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
749}
750
751enum WorkItem {
754 PushFuture(AlwaysMut<HostTaskFuture>),
756 ResumeFiber(StoreFiber<'static>),
758 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
760 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
762 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
764}
765
766impl fmt::Debug for WorkItem {
767 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
768 match self {
769 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
770 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
771 Self::ResumeThread(instance, thread) => f
772 .debug_tuple("ResumeThread")
773 .field(instance)
774 .field(thread)
775 .finish(),
776 Self::GuestCall(instance, call) => f
777 .debug_tuple("GuestCall")
778 .field(instance)
779 .field(call)
780 .finish(),
781 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
782 }
783 }
784}
785
786#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
788pub(crate) enum WaitResult {
789 Cancelled,
790 Completed,
791}
792
793pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
801 store: &mut dyn VMStore,
802 future: impl Future<Output = Result<R>> + Send + 'static,
803) -> Result<R> {
804 let task = store.current_host_thread()?;
805
806 let mut future = Box::pin(async move {
810 let result = future.await?;
811 tls::get(move |store| {
812 let state = store.concurrent_state_mut()?;
813 let host_state = &mut state.get_mut(task)?.state;
814 assert!(matches!(host_state, HostTaskState::CalleeStarted));
815 *host_state = HostTaskState::CalleeFinished(Box::new(result));
816
817 Waitable::Host(task).set_event(
818 state,
819 Some(Event::Subtask {
820 status: Status::Returned,
821 }),
822 )?;
823
824 Ok(())
825 })
826 }) as HostTaskFuture;
827
828 let poll = tls::set(store, || {
832 future
833 .as_mut()
834 .poll(&mut Context::from_waker(&Waker::noop()))
835 });
836
837 match poll {
838 Poll::Ready(result) => result?,
840
841 Poll::Pending => {
846 let state = store.concurrent_state_mut()?;
847 state.push_future(future);
848
849 let caller = state.get_mut(task)?.caller;
850 let set = state.get_mut(caller.thread)?.sync_call_set;
851 Waitable::Host(task).join(state, Some(set))?;
852
853 store.suspend(SuspendReason::Waiting {
854 set,
855 thread: caller,
856 skip_may_block_check: false,
857 })?;
858
859 Waitable::Host(task).join(store.concurrent_state_mut()?, None)?;
863 }
864 }
865
866 let host_state = &mut store.concurrent_state_mut()?.get_mut(task)?.state;
868 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
869 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
870 Ok(result) => *result,
871 Err(_) => bail_bug!("host task finished with wrong type of result"),
872 }),
873 _ => bail_bug!("unexpected host task state after completion"),
874 }
875}
876
877fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
879 let mut next = Some(call);
880 while let Some(call) = next.take() {
881 match call.kind {
882 GuestCallKind::DeliverEvent { instance, set } => {
883 let (event, waitable) =
884 match instance.get_event(store, call.thread.task, set, true)? {
885 Some(pair) => pair,
886 None => bail_bug!("delivering non-present event"),
887 };
888 let state = store.concurrent_state_mut()?;
889 let task = state.get_mut(call.thread.task)?;
890 let runtime_instance = task.instance;
891 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
892
893 log::trace!(
894 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
895 call.thread,
896 );
897
898 let old_thread = store.set_thread(call.thread)?;
899 log::trace!(
900 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
901 call.thread
902 );
903
904 store.enter_instance(runtime_instance);
905
906 let Some(callback) = store
907 .concurrent_state_mut()?
908 .get_mut(call.thread.task)?
909 .callback
910 .take()
911 else {
912 bail_bug!("guest task callback field not present")
913 };
914
915 let code = callback(store, event, handle)?;
916
917 store
918 .concurrent_state_mut()?
919 .get_mut(call.thread.task)?
920 .callback = Some(callback);
921
922 store.exit_instance(runtime_instance)?;
923
924 store.set_thread(old_thread)?;
925
926 next = instance.handle_callback_code(
927 store,
928 call.thread,
929 runtime_instance.index,
930 code,
931 )?;
932
933 log::trace!(
934 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
935 );
936 }
937 GuestCallKind::StartImplicit(fun) => {
938 next = fun(store)?;
939 }
940 GuestCallKind::StartExplicit(fun) => {
941 fun(store)?;
942 }
943 }
944 }
945
946 Ok(())
947}
948
949impl<T> Store<T> {
950 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
952 where
953 T: Send + 'static,
954 {
955 ensure!(
956 self.as_context().0.concurrency_support(),
957 "cannot use `run_concurrent` when Config::concurrency_support disabled",
958 );
959 self.as_context_mut().run_concurrent(fun).await
960 }
961
962 #[doc(hidden)]
963 pub fn assert_concurrent_state_empty(&mut self) {
964 self.as_context_mut().assert_concurrent_state_empty();
965 }
966
967 #[doc(hidden)]
968 pub fn concurrent_state_table_size(&mut self) -> usize {
969 self.as_context_mut().concurrent_state_table_size()
970 }
971
972 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> Result<JoinHandle>
974 where
975 T: 'static,
976 {
977 self.as_context_mut().spawn(task)
978 }
979}
980
981impl<T> StoreContextMut<'_, T> {
982 #[doc(hidden)]
993 pub fn assert_concurrent_state_empty(self) {
994 let store = self.0;
995 store
996 .store_data_mut()
997 .components
998 .assert_instance_states_empty();
999 let state = store.concurrent_state_mut().unwrap();
1000 assert!(
1001 state.table.get_mut().is_empty(),
1002 "non-empty table: {:?}",
1003 state.table.get_mut()
1004 );
1005 assert!(state.high_priority.is_empty());
1006 assert!(state.low_priority.is_empty());
1007 assert!(state.unforced_current_thread.is_none());
1008 assert!(state.futures_mut().unwrap().is_empty());
1009 assert!(state.global_error_context_ref_counts.is_empty());
1010 }
1011
1012 #[doc(hidden)]
1017 pub fn concurrent_state_table_size(&mut self) -> usize {
1018 self.0
1019 .concurrent_state_mut()
1020 .unwrap()
1021 .table
1022 .get_mut()
1023 .iter_mut()
1024 .count()
1025 }
1026
1027 pub fn spawn(mut self, task: impl AccessorTask<T>) -> Result<JoinHandle>
1037 where
1038 T: 'static,
1039 {
1040 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1041 self.spawn_with_accessor(accessor, task)
1042 }
1043
1044 fn spawn_with_accessor<D>(
1047 self,
1048 accessor: Accessor<T, D>,
1049 task: impl AccessorTask<T, D>,
1050 ) -> Result<JoinHandle>
1051 where
1052 T: 'static,
1053 D: HasData + ?Sized,
1054 {
1055 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1059 self.0
1060 .concurrent_state_mut()?
1061 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1062 Ok(handle)
1063 }
1064
1065 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1149 where
1150 T: Send + 'static,
1151 {
1152 ensure!(
1153 self.0.concurrency_support(),
1154 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1155 );
1156 self.do_run_concurrent(fun, false).await
1157 }
1158
1159 pub(super) async fn run_concurrent_trap_on_idle<R>(
1160 self,
1161 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1162 ) -> Result<R>
1163 where
1164 T: Send + 'static,
1165 {
1166 self.do_run_concurrent(fun, true).await
1167 }
1168
1169 async fn do_run_concurrent<R>(
1170 mut self,
1171 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1172 trap_on_idle: bool,
1173 ) -> Result<R>
1174 where
1175 T: Send + 'static,
1176 {
1177 debug_assert!(self.0.concurrency_support());
1178 check_recursive_run();
1179 let token = StoreToken::new(self.as_context_mut());
1180
1181 struct Dropper<'a, T: 'static, V> {
1182 store: StoreContextMut<'a, T>,
1183 value: ManuallyDrop<V>,
1184 }
1185
1186 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1187 fn drop(&mut self) {
1188 tls::set(self.store.0, || {
1189 unsafe { ManuallyDrop::drop(&mut self.value) }
1194 });
1195 }
1196 }
1197
1198 let accessor = &Accessor::new(token);
1199 let dropper = &mut Dropper {
1200 store: self,
1201 value: ManuallyDrop::new(fun(accessor)),
1202 };
1203 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1205
1206 dropper
1207 .store
1208 .as_context_mut()
1209 .poll_until(future, trap_on_idle)
1210 .await
1211 }
1212
1213 async fn poll_until<R>(
1219 mut self,
1220 mut future: Pin<&mut impl Future<Output = R>>,
1221 trap_on_idle: bool,
1222 ) -> Result<R>
1223 where
1224 T: Send + 'static,
1225 {
1226 struct Reset<'a, T: 'static> {
1227 store: StoreContextMut<'a, T>,
1228 futures: Option<FuturesUnordered<HostTaskFuture>>,
1229 }
1230
1231 impl<'a, T> Drop for Reset<'a, T> {
1232 fn drop(&mut self) {
1233 if let Some(futures) = self.futures.take() {
1234 *self
1235 .store
1236 .0
1237 .concurrent_state_mut_already_forced_current_thread()
1238 .futures
1239 .get_mut() = Some(futures);
1240 }
1241 }
1242 }
1243
1244 loop {
1245 let futures = self.0.concurrent_state_mut()?.futures.get_mut().take();
1249 let mut reset = Reset {
1250 store: self.as_context_mut(),
1251 futures,
1252 };
1253 let mut next = match reset.futures.as_mut() {
1254 Some(f) => pin!(f.next()),
1255 None => bail_bug!("concurrent state missing futures field"),
1256 };
1257
1258 enum PollResult<R> {
1259 Complete(R),
1260 ProcessWork {
1261 ready: Vec<WorkItem>,
1262 low_priority: bool,
1263 },
1264 }
1265
1266 let result = future::poll_fn(|cx| {
1267 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1270 return Poll::Ready(Ok(PollResult::Complete(value)));
1271 }
1272
1273 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1277 Poll::Ready(Some(output)) => {
1278 match output {
1279 Err(e) => return Poll::Ready(Err(e)),
1280 Ok(()) => {}
1281 }
1282 Poll::Ready(true)
1283 }
1284 Poll::Ready(None) => Poll::Ready(false),
1285 Poll::Pending => Poll::Pending,
1286 };
1287
1288 let state = reset.store.0.concurrent_state_mut()?;
1292 let mut ready = mem::take(&mut state.high_priority);
1293 let mut low_priority = false;
1294 if ready.is_empty() {
1295 if let Some(item) = state.low_priority.pop_back() {
1296 ready.push(item);
1297 low_priority = true;
1298 }
1299 }
1300 if !ready.is_empty() {
1301 return Poll::Ready(Ok(PollResult::ProcessWork {
1302 ready,
1303 low_priority,
1304 }));
1305 }
1306
1307 return match next {
1311 Poll::Ready(true) => {
1312 Poll::Ready(Ok(PollResult::ProcessWork {
1318 ready: Vec::new(),
1319 low_priority: false,
1320 }))
1321 }
1322 Poll::Ready(false) => {
1323 if let Poll::Ready(value) =
1327 tls::set(reset.store.0, || future.as_mut().poll(cx))
1328 {
1329 Poll::Ready(Ok(PollResult::Complete(value)))
1330 } else {
1331 if trap_on_idle {
1337 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1340 } else {
1341 Poll::Pending
1345 }
1346 }
1347 }
1348 Poll::Pending => Poll::Pending,
1353 };
1354 })
1355 .await;
1356
1357 drop(reset);
1361
1362 match result? {
1363 PollResult::Complete(value) => break Ok(value),
1366 PollResult::ProcessWork {
1369 ready,
1370 low_priority,
1371 } => {
1372 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1373 store: StoreContextMut<'a, T>,
1374 ready: I,
1375 }
1376
1377 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1378 fn drop(&mut self) {
1379 while let Some(item) = self.ready.next() {
1380 match item {
1381 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1382 WorkItem::PushFuture(future) => {
1383 tls::set(self.store.0, move || drop(future))
1384 }
1385 _ => {}
1386 }
1387 }
1388 }
1389 }
1390
1391 let mut dispose = Dispose {
1392 store: self.as_context_mut(),
1393 ready: ready.into_iter(),
1394 };
1395
1396 if low_priority {
1418 dispose.store.0.yield_now().await
1419 }
1420
1421 while let Some(item) = dispose.ready.next() {
1422 dispose
1423 .store
1424 .as_context_mut()
1425 .handle_work_item(item)
1426 .await?;
1427 }
1428 }
1429 }
1430 }
1431 }
1432
1433 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1435 where
1436 T: Send,
1437 {
1438 log::trace!("handle work item {item:?}");
1439 match item {
1440 WorkItem::PushFuture(future) => {
1441 self.0
1442 .concurrent_state_mut()?
1443 .futures_mut()?
1444 .push(future.into_inner());
1445 }
1446 WorkItem::ResumeFiber(fiber) => {
1447 self.0.resume_fiber(fiber).await?;
1448 }
1449 WorkItem::ResumeThread(_, thread) => {
1450 if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1451 &mut self.0.concurrent_state_mut()?.get_mut(thread.thread)?.state,
1452 GuestThreadState::Running,
1453 ) {
1454 self.0.resume_fiber(fiber).await?;
1455 } else {
1456 bail_bug!("cannot resume non-pending thread {thread:?}");
1457 }
1458 }
1459 WorkItem::GuestCall(_, call) => {
1460 if call.is_ready(self.0)? {
1461 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1462 } else {
1463 let state = self.0.concurrent_state_mut()?;
1464 let task = state.get_mut(call.thread.task)?;
1465 if !task.starting_sent {
1466 task.starting_sent = true;
1467 if let GuestCallKind::StartImplicit(_) = &call.kind {
1468 Waitable::Guest(call.thread.task).set_event(
1469 state,
1470 Some(Event::Subtask {
1471 status: Status::Starting,
1472 }),
1473 )?;
1474 }
1475 }
1476
1477 let instance = state.get_mut(call.thread.task)?.instance;
1478 self.0
1479 .instance_state(instance)
1480 .concurrent_state()
1481 .pending
1482 .insert(call.thread, call.kind);
1483 }
1484 }
1485 WorkItem::WorkerFunction(fun) => {
1486 self.run_on_worker(WorkerItem::Function(fun)).await?;
1487 }
1488 }
1489
1490 Ok(())
1491 }
1492
1493 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1495 where
1496 T: Send,
1497 {
1498 let worker = if let Some(fiber) = self.0.concurrent_state_mut()?.worker.take() {
1499 fiber
1500 } else {
1501 fiber::make_fiber(self.0, move |store| {
1502 loop {
1503 let Some(item) = store.concurrent_state_mut()?.worker_item.take() else {
1504 bail_bug!("worker_item not present when resuming fiber")
1505 };
1506 match item {
1507 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1508 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1509 }
1510
1511 store.suspend(SuspendReason::NeedWork)?;
1512 }
1513 })?
1514 };
1515
1516 let worker_item = &mut self.0.concurrent_state_mut()?.worker_item;
1517 assert!(worker_item.is_none());
1518 *worker_item = Some(item);
1519
1520 self.0.resume_fiber(worker).await
1521 }
1522
1523 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1528 where
1529 T: 'static,
1530 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1531 + Send
1532 + Sync
1533 + 'static,
1534 R: Send + Sync + 'static,
1535 {
1536 let token = StoreToken::new(self);
1537 async move {
1538 let mut accessor = Accessor::new(token);
1539 closure(&mut accessor).await
1540 }
1541 }
1542
1543 pub fn async_call_stack(&mut self) -> Result<impl Iterator<Item = GuestTaskId>> {
1553 let mut cur = Some(self.0.current_thread()?);
1554 let state = self.0.concurrent_state_mut()?;
1555 Ok(core::iter::from_fn(move || {
1556 while let Some(t) = cur {
1557 cur = state.parent(t);
1558 if let Some(thread) = t.guest() {
1559 return Some(GuestTaskId(thread.task));
1560 }
1561 }
1562
1563 None
1564 }))
1565 }
1566}
1567
1568impl StoreOpaque {
1569 pub(crate) fn current_thread(&mut self) -> Result<CurrentThread> {
1572 if !self.concurrency_support() {
1574 return Ok(CurrentThread::None);
1575 }
1576
1577 if !self
1580 .vm_store_context_mut()
1581 .current_thread_mut()
1582 .is_deferred()
1583 {
1584 return Ok(self
1585 .concurrent_state_mut_already_forced_current_thread()
1586 .unforced_current_thread);
1587 }
1588
1589 let state = self.concurrent_state_mut_without_forcing_current_thread();
1598 let id = match state.unforced_current_thread.guest().copied() {
1599 Some(thread) => state.get_mut(thread.task)?.instance.instance,
1600 None => bail_bug!("deferred component-model thread with non-guest base"),
1601 };
1602
1603 let mut frames = Vec::new();
1606 let mut cur = *self.vm_store_context_mut().current_thread_mut();
1607 while let Some(ptr) = cur.as_deferred() {
1608 let deferred = unsafe { ptr.as_non_null().as_ref() };
1613 frames.push((
1614 deferred.callee_async != 0,
1615 deferred.callee_instance,
1616 deferred.saved_context,
1617 ));
1618 cur = deferred.parent;
1619 }
1620
1621 *self.vm_store_context_mut().current_thread_mut() = VMLazyThread::forced();
1625
1626 let current_context = *self.vm_store_context_mut().component_context_mut();
1629
1630 for (callee_async, callee_instance, saved_context) in frames.into_iter().rev() {
1634 *self.vm_store_context_mut().component_context_mut() = saved_context;
1638 let callee = RuntimeInstance {
1639 instance: id,
1640 index: RuntimeComponentInstanceIndex::from_u32(callee_instance),
1641 };
1642 self.enter_guest_sync_call(None, callee_async, callee)?;
1643 }
1644
1645 *self.vm_store_context_mut().component_context_mut() = current_context;
1647
1648 Ok(self
1649 .concurrent_state_mut_without_forcing_current_thread()
1650 .unforced_current_thread)
1651 }
1652
1653 fn current_guest_thread(&mut self) -> Result<QualifiedThreadId> {
1654 match self.current_thread()?.guest() {
1655 Some(id) => Ok(*id),
1656 None => bail_bug!("current thread is not a guest thread"),
1657 }
1658 }
1659
1660 fn current_host_thread(&mut self) -> Result<TableId<HostTask>> {
1661 match self.current_thread()?.host() {
1662 Some(id) => Ok(id),
1663 None => bail_bug!("current thread is not a host thread"),
1664 }
1665 }
1666
1667 fn take_pending_cancellation(&mut self) -> Result<bool> {
1670 let thread = self.current_guest_thread()?;
1671 let task = self.concurrent_state_mut()?.get_mut(thread.task)?;
1672 if let Some(Event::Cancelled) = task.event {
1673 task.event.take();
1674 return Ok(true);
1675 }
1676 Ok(false)
1677 }
1678
1679 pub(crate) fn enter_guest_sync_call(
1691 &mut self,
1692 guest_caller: Option<RuntimeInstance>,
1693 callee_async: bool,
1694 callee: RuntimeInstance,
1695 ) -> Result<()> {
1696 log::trace!("enter sync call {callee:?}");
1697 if !self.concurrency_support() {
1698 return self.enter_call_not_concurrent();
1699 }
1700
1701 let thread = self.current_thread()?;
1702 let state = self.concurrent_state_mut()?;
1703 let instance = if let Some(thread) = thread.guest() {
1704 Some(state.get_mut(thread.task)?.instance)
1705 } else {
1706 None
1707 };
1708 if guest_caller.is_some() {
1709 debug_assert_eq!(instance, guest_caller);
1710 }
1711 let guest_thread = GuestTask::new(
1712 state,
1713 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1714 LiftResult {
1715 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1716 ty: TypeTupleIndex::reserved_value(),
1717 memory: None,
1718 string_encoding: StringEncoding::Utf8,
1719 },
1720 if let Some(thread) = thread.guest() {
1721 Caller::Guest { thread: *thread }
1722 } else {
1723 Caller::Host {
1724 tx: None,
1725 host_future_present: false,
1726 caller: thread,
1727 }
1728 },
1729 None,
1730 callee,
1731 callee_async,
1732 )?;
1733
1734 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1735 guest_thread.thread,
1736 self,
1737 callee.index,
1738 )?;
1739 self.set_thread(guest_thread)?;
1740
1741 Ok(())
1742 }
1743
1744 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1752 if !self.concurrency_support() {
1753 return Ok(self.exit_call_not_concurrent());
1754 }
1755 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1756 Some(t) => *t,
1757 None => bail_bug!("expected task when exiting"),
1758 };
1759 let task = self.concurrent_state_mut()?.get_mut(thread.task)?;
1760 let instance = task.instance;
1761 let caller = match &task.caller {
1762 &Caller::Guest { thread } => thread.into(),
1763 &Caller::Host { caller, .. } => caller,
1764 };
1765 task.lift_result = None;
1766 task.exited = true;
1767 self.set_thread(caller)?;
1768
1769 log::trace!("exit sync call {instance:?}");
1770 self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1771
1772 Ok(())
1773 }
1774
1775 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1783 if !self.concurrency_support() {
1784 self.enter_call_not_concurrent()?;
1785 return Ok(None);
1786 }
1787 let caller = self.current_guest_thread()?;
1788 let state = self.concurrent_state_mut()?;
1789 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1790 log::trace!("new host task {task:?}");
1791 self.set_thread(task)?;
1792 Ok(Some(task))
1793 }
1794
1795 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1801 if !self.concurrency_support() {
1802 return Ok(());
1803 }
1804 let task = self.current_host_thread()?;
1805 let caller = self.concurrent_state_mut()?.get_mut(task)?.caller;
1806 self.set_thread(caller)?;
1807 Ok(())
1808 }
1809
1810 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1817 match task {
1818 Some(task) => {
1819 log::trace!("delete host task {task:?}");
1820 self.concurrent_state_mut()?.delete(task)?;
1821 }
1822 None => {
1823 self.exit_call_not_concurrent();
1824 }
1825 }
1826 Ok(())
1827 }
1828
1829 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1837 if self.trapped() {
1838 return Ok(false);
1839 }
1840 if !self.concurrency_support() {
1841 return Ok(true);
1842 }
1843 let mut cur = Some(self.current_thread()?);
1844 let state = self.concurrent_state_mut()?;
1845 while let Some(t) = cur {
1846 if let Some(thread) = t.guest() {
1847 let task = state.get_mut(thread.task)?;
1848 if task.instance.instance == instance.instance {
1855 return Ok(false);
1856 }
1857 }
1858 cur = state.parent(t);
1859 }
1860 Ok(true)
1861 }
1862
1863 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1866 self.component_instance_mut(instance.instance)
1867 .instance_state(instance.index)
1868 }
1869
1870 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1876 let thread = thread.into();
1877 let state = self.concurrent_state_mut()?;
1878 let old_thread = mem::replace(&mut state.unforced_current_thread, thread);
1879
1880 if let Some(old_thread) = old_thread.guest() {
1888 let old_context = *self.vm_store_context_mut().component_context_mut();
1889 self.concurrent_state_mut()?
1890 .get_mut(old_thread.thread)?
1891 .context = old_context;
1892 }
1893 if cfg!(debug_assertions) {
1894 *self.vm_store_context_mut().component_context_mut() =
1895 [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1896 }
1897 if let Some(thread) = thread.guest() {
1898 let thread = self.concurrent_state_mut()?.get_mut(thread.thread)?;
1899 let context = thread.context;
1900 if cfg!(debug_assertions) {
1901 thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1902 }
1903 *self.vm_store_context_mut().component_context_mut() = context;
1904 }
1905
1906 let state = self.concurrent_state_mut()?;
1914 if let Some(old_thread) = old_thread.guest() {
1915 let instance = state.get_mut(old_thread.task)?.instance.instance;
1916 self.component_instance_mut(instance)
1917 .set_task_may_block(false)
1918 }
1919
1920 if thread.guest().is_some() {
1921 self.set_task_may_block()?;
1922 }
1923
1924 *self.vm_store_context_mut().current_thread_mut() = if thread.is_none() {
1926 VMLazyThread::none()
1927 } else {
1928 VMLazyThread::forced()
1929 };
1930
1931 Ok(old_thread)
1932 }
1933
1934 fn set_task_may_block(&mut self) -> Result<()> {
1937 let guest_thread = self.current_guest_thread()?;
1938 let state = self.concurrent_state_mut()?;
1939 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1940 let may_block = self.concurrent_state_mut()?.may_block(guest_thread.task)?;
1941 self.component_instance_mut(instance)
1942 .set_task_may_block(may_block);
1943 Ok(())
1944 }
1945
1946 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1947 if !self.concurrency_support() {
1948 return Ok(());
1949 }
1950 let task = self.current_guest_thread()?.task;
1951 let state = self.concurrent_state_mut()?;
1952 let instance = state.get_mut(task)?.instance.instance;
1953 let task_may_block = self.component_instance(instance).get_task_may_block();
1954
1955 if task_may_block {
1956 Ok(())
1957 } else {
1958 Err(Trap::CannotBlockSyncTask.into())
1959 }
1960 }
1961
1962 fn enter_instance(&mut self, instance: RuntimeInstance) {
1966 log::trace!("enter {instance:?}");
1967 self.instance_state(instance)
1968 .concurrent_state()
1969 .do_not_enter = true;
1970 }
1971
1972 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1976 log::trace!("exit {instance:?}");
1977 self.instance_state(instance)
1978 .concurrent_state()
1979 .do_not_enter = false;
1980 self.partition_pending(instance)
1981 }
1982
1983 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1988 for (thread, kind) in
1989 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1990 {
1991 let call = GuestCall { thread, kind };
1992 if call.is_ready(self)? {
1993 self.concurrent_state_mut()?
1994 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1995 } else {
1996 self.instance_state(instance)
1997 .concurrent_state()
1998 .pending
1999 .insert(call.thread, call.kind);
2000 }
2001 }
2002
2003 Ok(())
2004 }
2005
2006 pub(crate) fn backpressure_modify(
2008 &mut self,
2009 caller_instance: RuntimeInstance,
2010 modify: impl FnOnce(u16) -> Option<u16>,
2011 ) -> Result<()> {
2012 let state = self.instance_state(caller_instance).concurrent_state();
2013 let old = state.backpressure;
2014 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
2015 state.backpressure = new;
2016
2017 if old > 0 && new == 0 {
2018 self.partition_pending(caller_instance)?;
2021 }
2022
2023 Ok(())
2024 }
2025
2026 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
2029 let old_thread = self.current_thread()?;
2030 log::trace!("resume_fiber: save current thread {old_thread:?}");
2031
2032 let fiber = fiber::resolve_or_release(self, fiber).await?;
2033
2034 self.set_thread(old_thread)?;
2035
2036 let state = self.concurrent_state_mut()?;
2037
2038 if let Some(ot) = old_thread.guest() {
2039 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
2040 }
2041 log::trace!("resume_fiber: restore current thread {old_thread:?}");
2042
2043 if let Some(mut fiber) = fiber {
2044 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
2045 let reason = match state.suspend_reason.take() {
2047 Some(r) => r,
2048 None => bail_bug!("suspend reason missing when resuming fiber"),
2049 };
2050 match reason {
2051 SuspendReason::NeedWork => {
2052 if state.worker.is_none() {
2053 state.worker = Some(fiber);
2054 } else {
2055 fiber.dispose(self);
2056 }
2057 }
2058 SuspendReason::Yielding {
2059 thread,
2060 cancellable,
2061 ..
2062 } => {
2063 state.get_mut(thread.thread)?.state =
2064 GuestThreadState::Ready { fiber, cancellable };
2065 let instance = state.get_mut(thread.task)?.instance.index;
2066 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
2067 }
2068 SuspendReason::ExplicitlySuspending { thread, .. } => {
2069 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
2070 }
2071 SuspendReason::Waiting { set, thread, .. } => {
2072 let old = state
2073 .get_mut(set)?
2074 .waiting
2075 .insert(thread, WaitMode::Fiber(fiber));
2076 assert!(old.is_none());
2077 }
2078 };
2079 } else {
2080 log::trace!("resume_fiber: fiber has exited");
2081 }
2082
2083 Ok(())
2084 }
2085
2086 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
2092 log::trace!("suspend fiber: {reason:?}");
2093
2094 let task = match &reason {
2098 SuspendReason::Yielding { thread, .. }
2099 | SuspendReason::Waiting { thread, .. }
2100 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
2101 SuspendReason::NeedWork => None,
2102 };
2103
2104 let old_guest_thread = if task.is_some() {
2105 self.current_thread()?
2106 } else {
2107 CurrentThread::None
2108 };
2109
2110 debug_assert!(
2116 matches!(
2117 reason,
2118 SuspendReason::ExplicitlySuspending {
2119 skip_may_block_check: true,
2120 ..
2121 } | SuspendReason::Waiting {
2122 skip_may_block_check: true,
2123 ..
2124 } | SuspendReason::Yielding {
2125 skip_may_block_check: true,
2126 ..
2127 }
2128 ) || old_guest_thread
2129 .guest()
2130 .map(|thread| self.concurrent_state_mut()?.may_block(thread.task))
2131 .transpose()?
2132 .unwrap_or(true)
2133 );
2134
2135 let suspend_reason = &mut self.concurrent_state_mut()?.suspend_reason;
2136 assert!(suspend_reason.is_none());
2137 *suspend_reason = Some(reason);
2138
2139 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
2140
2141 if task.is_some() {
2142 self.set_thread(old_guest_thread)?;
2143 }
2144
2145 Ok(())
2146 }
2147
2148 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
2149 let caller = self.current_guest_thread()?;
2150 let state = self.concurrent_state_mut()?;
2151
2152 if waitable.common(state)?.set.is_some() {
2153 bail!(Trap::WaitableSyncAndAsync);
2154 }
2155
2156 let set = state.get_mut(caller.thread)?.sync_call_set;
2157 waitable.join(state, Some(set))?;
2158 self.suspend(SuspendReason::Waiting {
2159 set,
2160 thread: caller,
2161 skip_may_block_check: false,
2162 })?;
2163 let state = self.concurrent_state_mut()?;
2164 waitable.join(state, None)
2165 }
2166
2167 fn cleanup_thread(
2189 &mut self,
2190 guest_thread: QualifiedThreadId,
2191 runtime_instance: RuntimeInstance,
2192 cleanup_task: CleanupTask,
2193 ) -> Result<()> {
2194 let state = self.concurrent_state_mut()?;
2195 let thread_data = state.get_mut(guest_thread.thread)?;
2196 let sync_call_set = thread_data.sync_call_set;
2197 if let Some(guest_id) = thread_data.instance_rep {
2198 self.instance_state(runtime_instance)
2199 .thread_handle_table()
2200 .guest_thread_remove(guest_id)?;
2201 }
2202 let state = self.concurrent_state_mut()?;
2203
2204 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2206 if let Some(Event::Subtask {
2207 status: Status::Returned | Status::ReturnCancelled,
2208 }) = waitable.common(state)?.event
2209 {
2210 waitable.delete_from(state)?;
2211 }
2212 }
2213
2214 state.delete(guest_thread.thread)?;
2215 state.delete(sync_call_set)?;
2216 let task = state.get_mut(guest_thread.task)?;
2217 task.threads.remove(&guest_thread.thread);
2218
2219 if task.threads.is_empty() && !task.returned_or_cancelled() {
2220 bail!(Trap::NoAsyncResult);
2221 }
2222 let ready_to_delete = task.ready_to_delete();
2223
2224 if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2225 task.decremented_interesting_task_count = true;
2226
2227 debug_assert!(state.interesting_tasks > 0);
2228 state.interesting_tasks -= 1;
2229 if state.interesting_tasks == 0
2230 && let Some(waker) = state.interesting_tasks_empty_waker.take()
2231 {
2232 waker.wake();
2233 }
2234 }
2235
2236 match cleanup_task {
2237 CleanupTask::Yes => {
2238 if ready_to_delete {
2239 Waitable::Guest(guest_thread.task).delete_from(state)?;
2240 }
2241 }
2242 CleanupTask::No => {}
2243 }
2244
2245 Ok(())
2246 }
2247
2248 fn cancel_guest_subtask_without_lowered_parameters(
2261 &mut self,
2262 caller_instance: RuntimeInstance,
2263 guest_task: TableId<GuestTask>,
2264 ) -> Result<()> {
2265 let concurrent_state = self.concurrent_state_mut()?;
2266 let task = concurrent_state.get_mut(guest_task)?;
2267 assert!(!task.already_lowered_parameters());
2268 task.lower_params = None;
2272 task.lift_result = None;
2273 task.exited = true;
2274 let instance = task.instance;
2275
2276 assert_eq!(1, task.threads.len());
2279 let thread = *task.threads.iter().next().unwrap();
2280 self.cleanup_thread(
2281 QualifiedThreadId {
2282 task: guest_task,
2283 thread,
2284 },
2285 caller_instance,
2286 CleanupTask::No,
2287 )?;
2288
2289 let pending = &mut self.instance_state(instance).concurrent_state().pending;
2291 let pending_count = pending.len();
2292 pending.retain(|thread, _| thread.task != guest_task);
2293 if pending.len() == pending_count {
2295 bail!(Trap::SubtaskCancelAfterTerminal);
2296 }
2297 Ok(())
2298 }
2299
2300 pub(crate) fn current_scope_id(&mut self) -> Result<Option<u32>> {
2303 if !self.concurrency_support() {
2304 return self.current_scope_id_not_concurrent();
2305 }
2306 let (bits, is_host) = match self.current_thread()? {
2307 CurrentThread::Guest(id) => (id.task.rep(), false),
2308 CurrentThread::Host(id) => (id.rep(), true),
2309 CurrentThread::None => return Ok(None),
2310 };
2311 assert_eq!((bits << 1) >> 1, bits);
2312 Ok(Some((bits << 1) | u32::from(is_host)))
2313 }
2314}
2315
2316enum CleanupTask {
2317 Yes,
2318 No,
2319}
2320
2321impl Instance {
2322 fn get_event(
2325 self,
2326 store: &mut StoreOpaque,
2327 guest_task: TableId<GuestTask>,
2328 set: Option<TableId<WaitableSet>>,
2329 cancellable: bool,
2330 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2331 let state = store.concurrent_state_mut()?;
2332
2333 let event = &mut state.get_mut(guest_task)?.event;
2334 if let Some(ev) = event
2335 && (cancellable || !matches!(ev, Event::Cancelled))
2336 {
2337 log::trace!("deliver event {ev:?} to {guest_task:?}");
2338 let ev = *ev;
2339 *event = None;
2340 return Ok(Some((ev, None)));
2341 }
2342
2343 let set = match set {
2344 Some(set) => set,
2345 None => return Ok(None),
2346 };
2347 let waitable = match state.get_mut(set)?.ready.pop_first() {
2348 Some(v) => v,
2349 None => return Ok(None),
2350 };
2351
2352 let common = waitable.common(state)?;
2353 let handle = match common.handle {
2354 Some(h) => h,
2355 None => bail_bug!("handle not set when delivering event"),
2356 };
2357 let event = match common.event.take() {
2358 Some(e) => e,
2359 None => bail_bug!("event not set when delivering event"),
2360 };
2361
2362 log::trace!(
2363 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2364 );
2365
2366 waitable.on_delivery(store, self, event)?;
2367
2368 Ok(Some((event, Some((waitable, handle)))))
2369 }
2370
2371 fn handle_callback_code(
2377 self,
2378 store: &mut StoreOpaque,
2379 guest_thread: QualifiedThreadId,
2380 runtime_instance: RuntimeComponentInstanceIndex,
2381 code: u32,
2382 ) -> Result<Option<GuestCall>> {
2383 let (code, set) = unpack_callback_code(code);
2384
2385 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2386
2387 let state = store.concurrent_state_mut()?;
2388
2389 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2390 let set = store
2391 .instance_state(self.runtime_instance(runtime_instance))
2392 .handle_table()
2393 .waitable_set_rep(handle)?;
2394
2395 Ok(TableId::<WaitableSet>::new(set))
2396 };
2397
2398 Ok(match code {
2399 callback_code::EXIT => {
2400 log::trace!("implicit thread {guest_thread:?} completed");
2401 let task = store.concurrent_state_mut()?.get_mut(guest_thread.task)?;
2402 task.exited = true;
2403 task.callback = None;
2404 store.cleanup_thread(
2405 guest_thread,
2406 self.runtime_instance(runtime_instance),
2407 CleanupTask::Yes,
2408 )?;
2409 None
2410 }
2411 callback_code::YIELD => {
2412 let task = state.get_mut(guest_thread.task)?;
2413 if let Some(event) = task.event {
2418 assert!(matches!(event, Event::None | Event::Cancelled));
2419 } else {
2420 task.event = Some(Event::None);
2421 }
2422 let call = GuestCall {
2423 thread: guest_thread,
2424 kind: GuestCallKind::DeliverEvent {
2425 instance: self,
2426 set: None,
2427 },
2428 };
2429 if state.may_block(guest_thread.task)? {
2430 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2433 None
2434 } else {
2435 Some(call)
2439 }
2440 }
2441 callback_code::WAIT => {
2442 state.check_blocking_for(guest_thread.task)?;
2445
2446 let set = get_set(store, set)?;
2447 let state = store.concurrent_state_mut()?;
2448
2449 if state.get_mut(guest_thread.task)?.event.is_some()
2450 || !state.get_mut(set)?.ready.is_empty()
2451 {
2452 state.push_high_priority(WorkItem::GuestCall(
2454 runtime_instance,
2455 GuestCall {
2456 thread: guest_thread,
2457 kind: GuestCallKind::DeliverEvent {
2458 instance: self,
2459 set: Some(set),
2460 },
2461 },
2462 ));
2463 } else {
2464 let old = state
2472 .get_mut(guest_thread.thread)?
2473 .wake_on_cancel
2474 .replace(set);
2475 if !old.is_none() {
2476 bail_bug!("thread unexpectedly had wake_on_cancel set");
2477 }
2478 let old = state
2479 .get_mut(set)?
2480 .waiting
2481 .insert(guest_thread, WaitMode::Callback(self));
2482 if !old.is_none() {
2483 bail_bug!("set's waiting set already had this thread registered");
2484 }
2485 }
2486 None
2487 }
2488 _ => bail!(Trap::UnsupportedCallbackCode),
2489 })
2490 }
2491
2492 unsafe fn queue_call<T: 'static>(
2499 self,
2500 mut store: StoreContextMut<T>,
2501 guest_thread: QualifiedThreadId,
2502 callee: SendSyncPtr<VMFuncRef>,
2503 param_count: usize,
2504 result_count: usize,
2505 async_: bool,
2506 callback: Option<SendSyncPtr<VMFuncRef>>,
2507 post_return: Option<SendSyncPtr<VMFuncRef>>,
2508 ) -> Result<()> {
2509 unsafe fn make_call<T: 'static>(
2524 store: StoreContextMut<T>,
2525 guest_thread: QualifiedThreadId,
2526 callee: SendSyncPtr<VMFuncRef>,
2527 param_count: usize,
2528 result_count: usize,
2529 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2530 + Send
2531 + Sync
2532 + 'static
2533 + use<T> {
2534 let token = StoreToken::new(store);
2535 move |store: &mut dyn VMStore| {
2536 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2537
2538 store
2539 .concurrent_state_mut()?
2540 .get_mut(guest_thread.thread)?
2541 .state = GuestThreadState::Running;
2542 let task = store.concurrent_state_mut()?.get_mut(guest_thread.task)?;
2543 let lower = match task.lower_params.take() {
2544 Some(l) => l,
2545 None => bail_bug!("lower_params missing"),
2546 };
2547
2548 lower(store, &mut storage[..param_count])?;
2549
2550 let mut store = token.as_context_mut(store);
2551
2552 unsafe {
2555 crate::Func::call_unchecked_raw(
2556 &mut store,
2557 callee.as_non_null(),
2558 NonNull::new(
2559 &mut storage[..param_count.max(result_count)]
2560 as *mut [MaybeUninit<ValRaw>] as _,
2561 )
2562 .unwrap(),
2563 )?;
2564 }
2565
2566 Ok(storage)
2567 }
2568 }
2569
2570 let call = unsafe {
2574 make_call(
2575 store.as_context_mut(),
2576 guest_thread,
2577 callee,
2578 param_count,
2579 result_count,
2580 )
2581 };
2582
2583 let callee_instance = store
2584 .0
2585 .concurrent_state_mut()?
2586 .get_mut(guest_thread.task)?
2587 .instance;
2588
2589 let fun = if callback.is_some() {
2590 assert!(async_);
2591
2592 Box::new(move |store: &mut dyn VMStore| {
2593 self.add_guest_thread_to_instance_table(
2594 guest_thread.thread,
2595 store,
2596 callee_instance.index,
2597 )?;
2598 let old_thread = store.set_thread(guest_thread)?;
2599 log::trace!(
2600 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2601 );
2602
2603 store.enter_instance(callee_instance);
2604
2605 let storage = call(store)?;
2612
2613 store.exit_instance(callee_instance)?;
2614
2615 store.set_thread(old_thread)?;
2616 let state = store.concurrent_state_mut()?;
2617 if let Some(t) = old_thread.guest() {
2618 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2619 }
2620 log::trace!("stackless call: restored {old_thread:?} as current thread");
2621
2622 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2625
2626 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2627 })
2628 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2629 } else {
2630 let token = StoreToken::new(store.as_context_mut());
2631 Box::new(move |store: &mut dyn VMStore| {
2632 self.add_guest_thread_to_instance_table(
2633 guest_thread.thread,
2634 store,
2635 callee_instance.index,
2636 )?;
2637 let old_thread = store.set_thread(guest_thread)?;
2638 log::trace!(
2639 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2640 );
2641 let flags = self.id().get(store).instance_flags(callee_instance.index);
2642
2643 if !async_ {
2647 store.enter_instance(callee_instance);
2648 }
2649
2650 let storage = call(store)?;
2657
2658 if !async_ {
2659 let lift = {
2665 store.exit_instance(callee_instance)?;
2666
2667 let state = store.concurrent_state_mut()?;
2668 if !state.get_mut(guest_thread.task)?.result.is_none() {
2669 bail_bug!("task has already produced a result");
2670 }
2671
2672 match state.get_mut(guest_thread.task)?.lift_result.take() {
2673 Some(lift) => lift,
2674 None => bail_bug!("lift_result field is missing"),
2675 }
2676 };
2677
2678 let result = (lift.lift)(store, unsafe {
2681 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2682 &storage[..result_count],
2683 )
2684 })?;
2685
2686 let post_return_arg = match result_count {
2687 0 => ValRaw::i32(0),
2688 1 => unsafe { storage[0].assume_init() },
2691 _ => unreachable!(),
2692 };
2693
2694 unsafe {
2695 call_post_return(
2696 token.as_context_mut(store),
2697 post_return.map(|v| v.as_non_null()),
2698 post_return_arg,
2699 flags,
2700 )?;
2701 }
2702
2703 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2704 }
2705
2706 store.set_thread(old_thread)?;
2707
2708 store
2709 .concurrent_state_mut()?
2710 .get_mut(guest_thread.task)?
2711 .exited = true;
2712
2713 store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2715 Ok(None)
2716 })
2717 };
2718
2719 store
2720 .0
2721 .concurrent_state_mut()?
2722 .push_high_priority(WorkItem::GuestCall(
2723 callee_instance.index,
2724 GuestCall {
2725 thread: guest_thread,
2726 kind: GuestCallKind::StartImplicit(fun),
2727 },
2728 ));
2729
2730 Ok(())
2731 }
2732
2733 unsafe fn prepare_call<T: 'static>(
2746 self,
2747 mut store: StoreContextMut<T>,
2748 start: NonNull<VMFuncRef>,
2749 return_: NonNull<VMFuncRef>,
2750 caller_instance: RuntimeComponentInstanceIndex,
2751 callee_instance: RuntimeComponentInstanceIndex,
2752 task_return_type: TypeTupleIndex,
2753 callee_async: bool,
2754 memory: *mut VMMemoryDefinition,
2755 string_encoding: StringEncoding,
2756 caller_info: CallerInfo,
2757 ) -> Result<()> {
2758 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2759 store.0.check_blocking()?;
2763 }
2764
2765 enum ResultInfo {
2766 Heap { results: u32 },
2767 Stack { result_count: u32 },
2768 }
2769
2770 let result_info = match &caller_info {
2771 CallerInfo::Async {
2772 has_result: true,
2773 params,
2774 } => ResultInfo::Heap {
2775 results: match params.last() {
2776 Some(r) => r.get_u32(),
2777 None => bail_bug!("retptr missing"),
2778 },
2779 },
2780 CallerInfo::Async {
2781 has_result: false, ..
2782 } => ResultInfo::Stack { result_count: 0 },
2783 CallerInfo::Sync {
2784 result_count,
2785 params,
2786 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2787 results: match params.last() {
2788 Some(r) => r.get_u32(),
2789 None => bail_bug!("arg ptr missing"),
2790 },
2791 },
2792 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2793 result_count: *result_count,
2794 },
2795 };
2796
2797 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2798
2799 let start = SendSyncPtr::new(start);
2803 let return_ = SendSyncPtr::new(return_);
2804 let token = StoreToken::new(store.as_context_mut());
2805 let old_thread = store.0.current_guest_thread()?;
2806 let state = store.0.concurrent_state_mut()?;
2807
2808 debug_assert_eq!(
2809 state.get_mut(old_thread.task)?.instance,
2810 self.runtime_instance(caller_instance)
2811 );
2812
2813 let guest_thread = GuestTask::new(
2814 state,
2815 Box::new(move |store, dst| {
2816 let mut store = token.as_context_mut(store);
2817 assert!(dst.len() <= MAX_FLAT_PARAMS);
2818 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2820 let count = match caller_info {
2821 CallerInfo::Async { params, has_result } => {
2825 let params = ¶ms[..params.len() - usize::from(has_result)];
2826 for (param, src) in params.iter().zip(&mut src) {
2827 src.write(*param);
2828 }
2829 params.len()
2830 }
2831
2832 CallerInfo::Sync { params, .. } => {
2834 for (param, src) in params.iter().zip(&mut src) {
2835 src.write(*param);
2836 }
2837 params.len()
2838 }
2839 };
2840 unsafe {
2847 crate::Func::call_unchecked_raw(
2848 &mut store,
2849 start.as_non_null(),
2850 NonNull::new(
2851 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2852 )
2853 .unwrap(),
2854 )?;
2855 }
2856 dst.copy_from_slice(&src[..dst.len()]);
2857 let task = store.0.current_guest_thread()?.task;
2858 let state = store.0.concurrent_state_mut()?;
2859 Waitable::Guest(task).set_event(
2860 state,
2861 Some(Event::Subtask {
2862 status: Status::Started,
2863 }),
2864 )?;
2865 Ok(())
2866 }),
2867 LiftResult {
2868 lift: Box::new(move |store, src| {
2869 let mut store = token.as_context_mut(store);
2872 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2874 my_src.push(ValRaw::u32(*results));
2875 }
2876
2877 let prev = store.0.set_thread(old_thread)?;
2883
2884 unsafe {
2891 crate::Func::call_unchecked_raw(
2892 &mut store,
2893 return_.as_non_null(),
2894 my_src.as_mut_slice().into(),
2895 )?;
2896 }
2897
2898 store.0.set_thread(prev)?;
2901
2902 let thread = store.0.current_guest_thread()?;
2903 let state = store.0.concurrent_state_mut()?;
2904 if sync_caller {
2905 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2906 if let ResultInfo::Stack { result_count } = &result_info {
2907 match result_count {
2908 0 => None,
2909 1 => Some(my_src[0]),
2910 _ => unreachable!(),
2911 }
2912 } else {
2913 None
2914 },
2915 );
2916 }
2917 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2918 }),
2919 ty: task_return_type,
2920 memory: NonNull::new(memory).map(SendSyncPtr::new),
2921 string_encoding,
2922 },
2923 Caller::Guest { thread: old_thread },
2924 None,
2925 self.runtime_instance(callee_instance),
2926 callee_async,
2927 )?;
2928
2929 store.0.set_thread(guest_thread)?;
2932 log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2933
2934 Ok(())
2935 }
2936
2937 unsafe fn call_callback<T>(
2942 self,
2943 mut store: StoreContextMut<T>,
2944 function: SendSyncPtr<VMFuncRef>,
2945 event: Event,
2946 handle: u32,
2947 ) -> Result<u32> {
2948 let (ordinal, result) = event.parts();
2949 let params = &mut [
2950 ValRaw::u32(ordinal),
2951 ValRaw::u32(handle),
2952 ValRaw::u32(result),
2953 ];
2954 unsafe {
2959 crate::Func::call_unchecked_raw(
2960 &mut store,
2961 function.as_non_null(),
2962 params.as_mut_slice().into(),
2963 )?;
2964 }
2965 Ok(params[0].get_u32())
2966 }
2967
2968 unsafe fn start_call<T: 'static>(
2981 self,
2982 mut store: StoreContextMut<T>,
2983 callback: *mut VMFuncRef,
2984 post_return: *mut VMFuncRef,
2985 callee: NonNull<VMFuncRef>,
2986 param_count: u32,
2987 result_count: u32,
2988 flags: u32,
2989 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2990 ) -> Result<u32> {
2991 let token = StoreToken::new(store.as_context_mut());
2992 let async_caller = storage.is_none();
2993 let guest_thread = store.0.current_guest_thread()?;
2994 let state = store.0.concurrent_state_mut()?;
2995 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2996 let callee = SendSyncPtr::new(callee);
2997 let param_count = usize::try_from(param_count)?;
2998 assert!(param_count <= MAX_FLAT_PARAMS);
2999 let result_count = usize::try_from(result_count)?;
3000 assert!(result_count <= MAX_FLAT_RESULTS);
3001
3002 let task = state.get_mut(guest_thread.task)?;
3003 if let Some(callback) = NonNull::new(callback) {
3004 let callback = SendSyncPtr::new(callback);
3008 task.callback = Some(Box::new(move |store, event, handle| {
3009 let store = token.as_context_mut(store);
3010 unsafe { self.call_callback::<T>(store, callback, event, handle) }
3011 }));
3012 }
3013
3014 let Caller::Guest { thread: caller } = &task.caller else {
3015 bail_bug!("start_call unexpectedly invoked for host->guest call");
3018 };
3019 let caller = *caller;
3020 let caller_instance = state.get_mut(caller.task)?.instance;
3021
3022 unsafe {
3024 self.queue_call(
3025 store.as_context_mut(),
3026 guest_thread,
3027 callee,
3028 param_count,
3029 result_count,
3030 (flags & START_FLAG_ASYNC_CALLEE) != 0,
3031 NonNull::new(callback).map(SendSyncPtr::new),
3032 NonNull::new(post_return).map(SendSyncPtr::new),
3033 )?;
3034 }
3035
3036 let state = store.0.concurrent_state_mut()?;
3037
3038 let guest_waitable = Waitable::Guest(guest_thread.task);
3041 let old_set = guest_waitable.common(state)?.set;
3042 let set = state.get_mut(caller.thread)?.sync_call_set;
3043 guest_waitable.join(state, Some(set))?;
3044
3045 store.0.set_thread(CurrentThread::None)?;
3046
3047 let (status, waitable) = loop {
3063 store.0.suspend(SuspendReason::Waiting {
3064 set,
3065 thread: caller,
3066 skip_may_block_check: async_caller || !callee_async,
3074 })?;
3075
3076 let state = store.0.concurrent_state_mut()?;
3077
3078 log::trace!("taking event for {:?}", guest_thread.task);
3079 let event = guest_waitable.take_event(state)?;
3080 let Some(Event::Subtask { status }) = event else {
3081 bail_bug!("subtasks should only get subtask events, got {event:?}")
3082 };
3083
3084 log::trace!("status {status:?} for {:?}", guest_thread.task);
3085
3086 if status == Status::Returned {
3087 break (status, None);
3089 } else if async_caller {
3090 let handle = store
3094 .0
3095 .instance_state(caller_instance)
3096 .handle_table()
3097 .subtask_insert_guest(guest_thread.task.rep())?;
3098 store
3099 .0
3100 .concurrent_state_mut()?
3101 .get_mut(guest_thread.task)?
3102 .common
3103 .handle = Some(handle);
3104 break (status, Some(handle));
3105 } else {
3106 }
3110 };
3111
3112 guest_waitable.join(store.0.concurrent_state_mut()?, old_set)?;
3113
3114 store.0.set_thread(caller)?;
3116 store
3117 .0
3118 .concurrent_state_mut()?
3119 .get_mut(caller.thread)?
3120 .state = GuestThreadState::Running;
3121 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
3122
3123 if let Some(storage) = storage {
3124 let state = store.0.concurrent_state_mut()?;
3128 let task = state.get_mut(guest_thread.task)?;
3129 if let Some(result) = task.sync_result.take()? {
3130 if let Some(result) = result {
3131 storage[0] = MaybeUninit::new(result);
3132 }
3133
3134 if task.exited && task.ready_to_delete() {
3135 Waitable::Guest(guest_thread.task).delete_from(state)?;
3136 }
3137 }
3138 }
3139
3140 Ok(status.pack(waitable))
3141 }
3142
3143 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
3156 self,
3157 mut store: StoreContextMut<'_, T>,
3158 future: impl Future<Output = Result<R>> + Send + 'static,
3159 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
3160 ) -> Result<Option<u32>> {
3161 let token = StoreToken::new(store.as_context_mut());
3162 let task = store.0.current_host_thread()?;
3163 let state = store.0.concurrent_state_mut()?;
3164
3165 let (join_handle, future) = JoinHandle::run(future);
3168 {
3169 let state = &mut state.get_mut(task)?.state;
3170 assert!(matches!(state, HostTaskState::CalleeStarted));
3171 *state = HostTaskState::CalleeRunning(join_handle);
3172 }
3173
3174 let mut future = Box::pin(future);
3175
3176 let poll = tls::set(store.0, || {
3181 future
3182 .as_mut()
3183 .poll(&mut Context::from_waker(&Waker::noop()))
3184 });
3185
3186 match poll {
3187 Poll::Ready(result) => {
3189 let result = result.transpose()?;
3190 lower(store.as_context_mut(), result)?;
3191 return Ok(None);
3192 }
3193
3194 Poll::Pending => {}
3196 }
3197
3198 let future = Box::pin(async move {
3206 let result = match future.await {
3207 Some(result) => Some(result?),
3208 None => None,
3209 };
3210 let on_complete = move |store: &mut dyn VMStore| {
3211 let mut store = token.as_context_mut(store);
3215 let old = store.0.set_thread(task)?;
3216
3217 let status = if result.is_some() {
3218 Status::Returned
3219 } else {
3220 Status::ReturnCancelled
3221 };
3222
3223 lower(store.as_context_mut(), result)?;
3224 let state = store.0.concurrent_state_mut()?;
3225 match &mut state.get_mut(task)?.state {
3226 HostTaskState::CalleeDone { .. } => {}
3229
3230 other => *other = HostTaskState::CalleeDone { cancelled: false },
3232 }
3233 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3234
3235 store.0.set_thread(old)?;
3236 Ok(())
3237 };
3238
3239 tls::get(move |store| {
3244 store
3245 .concurrent_state_mut()?
3246 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3247 on_complete,
3248 ))));
3249 Ok(())
3250 })
3251 });
3252
3253 let state = store.0.concurrent_state_mut()?;
3256 state.push_future(future);
3257 let caller = state.get_mut(task)?.caller;
3258 let instance = state.get_mut(caller.task)?.instance;
3259 let handle = store
3260 .0
3261 .instance_state(instance)
3262 .handle_table()
3263 .subtask_insert_host(task.rep())?;
3264 store.0.concurrent_state_mut()?.get_mut(task)?.common.handle = Some(handle);
3265 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3266
3267 store.0.set_thread(caller)?;
3271 Ok(Some(handle))
3272 }
3273
3274 pub(crate) fn task_return(
3277 self,
3278 store: &mut dyn VMStore,
3279 ty: TypeTupleIndex,
3280 options: OptionsIndex,
3281 storage: &[ValRaw],
3282 ) -> Result<()> {
3283 let guest_thread = store.current_guest_thread()?;
3284 let state = store.concurrent_state_mut()?;
3285 let lift = state
3286 .get_mut(guest_thread.task)?
3287 .lift_result
3288 .take()
3289 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3290 if !state.get_mut(guest_thread.task)?.result.is_none() {
3291 bail_bug!("task result unexpectedly already set");
3292 }
3293
3294 let CanonicalOptions {
3295 string_encoding,
3296 data_model,
3297 ..
3298 } = &self.id().get(store).component().env_component().options[options];
3299
3300 let invalid = ty != lift.ty
3301 || string_encoding != &lift.string_encoding
3302 || match data_model {
3303 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3304 Some(memory) => {
3305 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3306 let actual = self.id().get(store).runtime_memory(memory);
3307 expected != actual.as_ptr()
3308 }
3309 None => false,
3312 },
3313 CanonicalOptionsDataModel::Gc { .. } => true,
3315 };
3316
3317 if invalid {
3318 bail!(Trap::TaskReturnInvalid);
3319 }
3320
3321 log::trace!("task.return for {guest_thread:?}");
3322
3323 let result = (lift.lift)(store, storage)?;
3324 self.task_complete(store, guest_thread.task, result, Status::Returned)
3325 }
3326
3327 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3329 let guest_thread = store.current_guest_thread()?;
3330 let state = store.concurrent_state_mut()?;
3331 let task = state.get_mut(guest_thread.task)?;
3332 if !task.cancel_sent {
3333 bail!(Trap::TaskCancelNotCancelled);
3334 }
3335 _ = task
3336 .lift_result
3337 .take()
3338 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3339
3340 if !task.result.is_none() {
3341 bail_bug!("task result should not bet set yet");
3342 }
3343
3344 log::trace!("task.cancel for {guest_thread:?}");
3345
3346 self.task_complete(
3347 store,
3348 guest_thread.task,
3349 Box::new(DummyResult),
3350 Status::ReturnCancelled,
3351 )
3352 }
3353
3354 fn task_complete(
3360 self,
3361 store: &mut StoreOpaque,
3362 guest_task: TableId<GuestTask>,
3363 result: Box<dyn Any + Send + Sync>,
3364 status: Status,
3365 ) -> Result<()> {
3366 store
3367 .component_resource_tables(Some(self))?
3368 .validate_scope_exit()?;
3369
3370 let state = store.concurrent_state_mut()?;
3371 let task = state.get_mut(guest_task)?;
3372
3373 if let Caller::Host { tx, .. } = &mut task.caller {
3374 if let Some(tx) = tx.take() {
3375 _ = tx.send(result);
3376 }
3377 } else {
3378 task.result = Some(result);
3379 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3380 }
3381
3382 Ok(())
3383 }
3384
3385 pub(crate) fn waitable_set_new(
3387 self,
3388 store: &mut StoreOpaque,
3389 caller_instance: RuntimeComponentInstanceIndex,
3390 ) -> Result<u32> {
3391 let set = store.concurrent_state_mut()?.push(WaitableSet::default())?;
3392 let handle = store
3393 .instance_state(self.runtime_instance(caller_instance))
3394 .handle_table()
3395 .waitable_set_insert(set.rep())?;
3396 log::trace!("new waitable set {set:?} (handle {handle})");
3397 Ok(handle)
3398 }
3399
3400 pub(crate) fn waitable_set_drop(
3402 self,
3403 store: &mut StoreOpaque,
3404 caller_instance: RuntimeComponentInstanceIndex,
3405 set: u32,
3406 ) -> Result<()> {
3407 let rep = store
3408 .instance_state(self.runtime_instance(caller_instance))
3409 .handle_table()
3410 .waitable_set_remove(set)?;
3411
3412 log::trace!("drop waitable set {rep} (handle {set})");
3413
3414 if !store
3418 .concurrent_state_mut()?
3419 .get_mut(TableId::<WaitableSet>::new(rep))?
3420 .waiting
3421 .is_empty()
3422 {
3423 bail!(Trap::WaitableSetDropHasWaiters);
3424 }
3425
3426 store
3427 .concurrent_state_mut()?
3428 .delete(TableId::<WaitableSet>::new(rep))?;
3429
3430 Ok(())
3431 }
3432
3433 pub(crate) fn waitable_join(
3435 self,
3436 store: &mut StoreOpaque,
3437 caller_instance: RuntimeComponentInstanceIndex,
3438 waitable_handle: u32,
3439 set_handle: u32,
3440 ) -> Result<()> {
3441 let mut instance = self.id().get_mut(store);
3442 let waitable =
3443 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3444
3445 let set = if set_handle == 0 {
3446 None
3447 } else {
3448 let set = instance.instance_states().0[caller_instance]
3449 .handle_table()
3450 .waitable_set_rep(set_handle)?;
3451
3452 let state = store.concurrent_state_mut()?;
3453 if let Some(old) = waitable.common(state)?.set
3454 && state.get_mut(old)?.is_sync_call_set
3455 {
3456 bail!(Trap::WaitableSyncAndAsync);
3457 }
3458
3459 Some(TableId::<WaitableSet>::new(set))
3460 };
3461
3462 log::trace!(
3463 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3464 );
3465
3466 waitable.join(store.concurrent_state_mut()?, set)
3467 }
3468
3469 pub(crate) fn subtask_drop(
3471 self,
3472 store: &mut StoreOpaque,
3473 caller_instance: RuntimeComponentInstanceIndex,
3474 task_id: u32,
3475 ) -> Result<()> {
3476 self.waitable_join(store, caller_instance, task_id, 0)?;
3477
3478 let (rep, is_host) = store
3479 .instance_state(self.runtime_instance(caller_instance))
3480 .handle_table()
3481 .subtask_remove(task_id)?;
3482
3483 let concurrent_state = store.concurrent_state_mut()?;
3484 let (waitable, delete) = if is_host {
3485 let id = TableId::<HostTask>::new(rep);
3486 let task = concurrent_state.get_mut(id)?;
3487 match &task.state {
3488 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3489 HostTaskState::CalleeDone { .. } => {}
3490 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3491 bail_bug!("invalid state for callee in `subtask.drop`")
3492 }
3493 }
3494 (Waitable::Host(id), true)
3495 } else {
3496 let id = TableId::<GuestTask>::new(rep);
3497 let task = concurrent_state.get_mut(id)?;
3498 if task.lift_result.is_some() {
3499 bail!(Trap::SubtaskDropNotResolved);
3500 }
3501 (
3502 Waitable::Guest(id),
3503 concurrent_state.get_mut(id)?.ready_to_delete(),
3504 )
3505 };
3506
3507 waitable.common(concurrent_state)?.handle = None;
3508
3509 if waitable.take_event(concurrent_state)?.is_some() {
3512 bail!(Trap::SubtaskDropNotResolved);
3513 }
3514
3515 if delete {
3516 waitable.delete_from(concurrent_state)?;
3517 }
3518
3519 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3520 Ok(())
3521 }
3522
3523 pub(crate) fn waitable_set_wait(
3525 self,
3526 store: &mut StoreOpaque,
3527 options: OptionsIndex,
3528 set: u32,
3529 payload: u32,
3530 ) -> Result<u32> {
3531 if !self.options(store, options).async_ {
3532 store.check_blocking()?;
3536 }
3537
3538 let &CanonicalOptions {
3539 cancellable,
3540 instance: caller_instance,
3541 ..
3542 } = &self.id().get(store).component().env_component().options[options];
3543 let rep = store
3544 .instance_state(self.runtime_instance(caller_instance))
3545 .handle_table()
3546 .waitable_set_rep(set)?;
3547
3548 self.waitable_check(
3549 store,
3550 cancellable,
3551 WaitableCheck::Wait,
3552 WaitableCheckParams {
3553 set: TableId::new(rep),
3554 options,
3555 payload,
3556 },
3557 )
3558 }
3559
3560 pub(crate) fn waitable_set_poll(
3562 self,
3563 store: &mut StoreOpaque,
3564 options: OptionsIndex,
3565 set: u32,
3566 payload: u32,
3567 ) -> Result<u32> {
3568 let &CanonicalOptions {
3569 cancellable,
3570 instance: caller_instance,
3571 ..
3572 } = &self.id().get(store).component().env_component().options[options];
3573 let rep = store
3574 .instance_state(self.runtime_instance(caller_instance))
3575 .handle_table()
3576 .waitable_set_rep(set)?;
3577
3578 self.waitable_check(
3579 store,
3580 cancellable,
3581 WaitableCheck::Poll,
3582 WaitableCheckParams {
3583 set: TableId::new(rep),
3584 options,
3585 payload,
3586 },
3587 )
3588 }
3589
3590 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3592 let thread_id = store.current_guest_thread()?.thread;
3593 match store
3594 .concurrent_state_mut()?
3595 .get_mut(thread_id)?
3596 .instance_rep
3597 {
3598 Some(r) => Ok(r),
3599 None => bail_bug!("thread should have instance_rep by now"),
3600 }
3601 }
3602
3603 pub(crate) fn thread_new_indirect<T: 'static>(
3605 self,
3606 mut store: StoreContextMut<T>,
3607 runtime_instance: RuntimeComponentInstanceIndex,
3608 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3610 start_func_idx: u32,
3611 context: i32,
3612 ) -> Result<u32> {
3613 log::trace!("creating new thread");
3614
3615 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3616 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3617 let callee = instance
3618 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3619 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3620 if callee.type_index(store.0) != start_func_ty.type_index() {
3621 bail!(Trap::ThreadNewIndirectInvalidType);
3622 }
3623
3624 let token = StoreToken::new(store.as_context_mut());
3625 let start_func = Box::new(
3626 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3627 let old_thread = store.set_thread(guest_thread)?;
3628 log::trace!(
3629 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3630 );
3631
3632 let mut store = token.as_context_mut(store);
3633 let mut params = [ValRaw::i32(context)];
3634 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3637
3638 store.0.set_thread(old_thread)?;
3639
3640 store.0.cleanup_thread(
3641 guest_thread,
3642 self.runtime_instance(runtime_instance),
3643 CleanupTask::Yes,
3644 )?;
3645 log::trace!("explicit thread {guest_thread:?} completed");
3646 let state = store.0.concurrent_state_mut()?;
3647 if let Some(t) = old_thread.guest() {
3648 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3649 }
3650 log::trace!("thread start: restored {old_thread:?} as current thread");
3651
3652 Ok(())
3653 },
3654 );
3655
3656 let current_thread = store.0.current_guest_thread()?;
3657 let state = store.0.concurrent_state_mut()?;
3658 let parent_task = current_thread.task;
3659
3660 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3661 let thread_id = state.push(new_thread)?;
3662 state.get_mut(parent_task)?.threads.insert(thread_id);
3663
3664 log::trace!("new thread with id {thread_id:?} created");
3665
3666 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3667 }
3668
3669 pub(crate) fn resume_thread(
3670 self,
3671 store: &mut StoreOpaque,
3672 runtime_instance: RuntimeComponentInstanceIndex,
3673 thread_idx: u32,
3674 high_priority: bool,
3675 allow_ready: bool,
3676 ) -> Result<()> {
3677 let thread_id =
3678 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3679 let state = store.concurrent_state_mut()?;
3680 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3681 let thread = state.get_mut(guest_thread.thread)?;
3682
3683 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3684 GuestThreadState::NotStartedExplicit(start_func) => {
3685 log::trace!("starting thread {guest_thread:?}");
3686 let guest_call = WorkItem::GuestCall(
3687 runtime_instance,
3688 GuestCall {
3689 thread: guest_thread,
3690 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3691 start_func(store, guest_thread)
3692 })),
3693 },
3694 );
3695 store
3696 .concurrent_state_mut()?
3697 .push_work_item(guest_call, high_priority);
3698 }
3699 GuestThreadState::Suspended(fiber) => {
3700 log::trace!("resuming thread {thread_id:?} that was suspended");
3701 store
3702 .concurrent_state_mut()?
3703 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3704 }
3705 GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3706 log::trace!("resuming thread {thread_id:?} that was ready");
3707 thread.state = GuestThreadState::Ready { fiber, cancellable };
3708 store
3709 .concurrent_state_mut()?
3710 .promote_thread_work_item(guest_thread);
3711 }
3712 other => {
3713 thread.state = other;
3714 bail!(Trap::CannotResumeThread);
3715 }
3716 }
3717 Ok(())
3718 }
3719
3720 fn add_guest_thread_to_instance_table(
3721 self,
3722 thread_id: TableId<GuestThread>,
3723 store: &mut StoreOpaque,
3724 runtime_instance: RuntimeComponentInstanceIndex,
3725 ) -> Result<u32> {
3726 let guest_id = store
3727 .instance_state(self.runtime_instance(runtime_instance))
3728 .thread_handle_table()
3729 .guest_thread_insert(thread_id.rep())?;
3730 store
3731 .concurrent_state_mut()?
3732 .get_mut(thread_id)?
3733 .instance_rep = Some(guest_id);
3734 Ok(guest_id)
3735 }
3736
3737 pub(crate) fn suspension_intrinsic(
3740 self,
3741 store: &mut StoreOpaque,
3742 caller: RuntimeComponentInstanceIndex,
3743 cancellable: bool,
3744 yielding: bool,
3745 to_thread: SuspensionTarget,
3746 ) -> Result<WaitResult> {
3747 let guest_thread = store.current_guest_thread()?;
3748 if to_thread.is_none() {
3749 let state = store.concurrent_state_mut()?;
3750 if yielding {
3751 if !state.may_block(guest_thread.task)? {
3753 if !state.promote_instance_local_thread_work_item(caller) {
3756 return Ok(WaitResult::Completed);
3758 }
3759 }
3760 } else {
3761 store.check_blocking()?;
3765 }
3766 }
3767
3768 if cancellable && store.take_pending_cancellation()? {
3770 return Ok(WaitResult::Cancelled);
3771 }
3772
3773 match to_thread {
3774 SuspensionTarget::SomeSuspended(thread) => {
3775 self.resume_thread(store, caller, thread, true, false)?
3776 }
3777 SuspensionTarget::Some(thread) => {
3778 self.resume_thread(store, caller, thread, true, true)?
3779 }
3780 SuspensionTarget::None => { }
3781 }
3782
3783 let reason = if yielding {
3784 SuspendReason::Yielding {
3785 thread: guest_thread,
3786 cancellable,
3787 skip_may_block_check: to_thread.is_some(),
3791 }
3792 } else {
3793 SuspendReason::ExplicitlySuspending {
3794 thread: guest_thread,
3795 skip_may_block_check: to_thread.is_some(),
3799 }
3800 };
3801
3802 store.suspend(reason)?;
3803
3804 if cancellable && store.take_pending_cancellation()? {
3805 Ok(WaitResult::Cancelled)
3806 } else {
3807 Ok(WaitResult::Completed)
3808 }
3809 }
3810
3811 fn waitable_check(
3813 self,
3814 store: &mut StoreOpaque,
3815 cancellable: bool,
3816 check: WaitableCheck,
3817 params: WaitableCheckParams,
3818 ) -> Result<u32> {
3819 let guest_thread = store.current_guest_thread()?;
3820
3821 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3822
3823 let state = store.concurrent_state_mut()?;
3824 let task = state.get_mut(guest_thread.task)?;
3825
3826 match &check {
3829 WaitableCheck::Wait => {
3830 let set = params.set;
3831
3832 if (task.event.is_none()
3833 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3834 && state.get_mut(set)?.ready.is_empty()
3835 {
3836 if cancellable {
3837 let old = state
3838 .get_mut(guest_thread.thread)?
3839 .wake_on_cancel
3840 .replace(set);
3841 if !old.is_none() {
3842 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3843 }
3844 }
3845
3846 store.suspend(SuspendReason::Waiting {
3847 set,
3848 thread: guest_thread,
3849 skip_may_block_check: false,
3850 })?;
3851 }
3852 }
3853 WaitableCheck::Poll => {}
3854 }
3855
3856 log::trace!(
3857 "waitable check for {guest_thread:?}; set {:?}, part two",
3858 params.set
3859 );
3860
3861 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3863
3864 let (ordinal, handle, result) = match &check {
3865 WaitableCheck::Wait => {
3866 let (event, waitable) = match event {
3867 Some(p) => p,
3868 None => bail_bug!("event expected to be present"),
3869 };
3870 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3871 let (ordinal, result) = event.parts();
3872 (ordinal, handle, result)
3873 }
3874 WaitableCheck::Poll => {
3875 if let Some((event, waitable)) = event {
3876 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3877 let (ordinal, result) = event.parts();
3878 (ordinal, handle, result)
3879 } else {
3880 log::trace!(
3881 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3882 guest_thread.task,
3883 params.set
3884 );
3885 let (ordinal, result) = Event::None.parts();
3886 (ordinal, 0, result)
3887 }
3888 }
3889 };
3890 let memory = self.options_memory_mut(store, params.options);
3891 let ptr = crate::component::func::validate_inbounds_dynamic(
3892 &CanonicalAbiInfo::POINTER_PAIR,
3893 memory,
3894 &ValRaw::u32(params.payload),
3895 )?;
3896 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3897 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3898 Ok(ordinal)
3899 }
3900
3901 pub(crate) fn subtask_cancel(
3903 self,
3904 store: &mut StoreOpaque,
3905 caller_instance: RuntimeComponentInstanceIndex,
3906 async_: bool,
3907 task_id: u32,
3908 ) -> Result<u32> {
3909 if !async_ {
3910 store.check_blocking()?;
3914 }
3915
3916 let (rep, is_host) = store
3917 .instance_state(self.runtime_instance(caller_instance))
3918 .handle_table()
3919 .subtask_rep(task_id)?;
3920 let waitable = if is_host {
3921 Waitable::Host(TableId::<HostTask>::new(rep))
3922 } else {
3923 Waitable::Guest(TableId::<GuestTask>::new(rep))
3924 };
3925 let concurrent_state = store.concurrent_state_mut()?;
3926
3927 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3928
3929 let needs_block;
3930 if let Waitable::Host(host_task) = waitable {
3931 let state = &mut concurrent_state.get_mut(host_task)?.state;
3932 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3933 HostTaskState::CalleeRunning(handle) => {
3940 handle.abort();
3941 needs_block = true;
3942 }
3943
3944 HostTaskState::CalleeDone { cancelled } => {
3947 if cancelled {
3948 bail!(Trap::SubtaskCancelAfterTerminal);
3949 } else {
3950 needs_block = false;
3953 }
3954 }
3955
3956 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3959 bail_bug!("invalid states for host callee")
3960 }
3961 }
3962 } else {
3963 let guest_task = TableId::<GuestTask>::new(rep);
3964 let task = concurrent_state.get_mut(guest_task)?;
3965 if !task.already_lowered_parameters() {
3966 store.cancel_guest_subtask_without_lowered_parameters(
3967 self.runtime_instance(caller_instance),
3968 guest_task,
3969 )?;
3970 return Ok(Status::StartCancelled as u32);
3971 } else if !task.returned_or_cancelled() {
3972 task.cancel_sent = true;
3975 task.event = Some(Event::Cancelled);
3980 let runtime_instance = task.instance.index;
3981 for thread in task.threads.clone() {
3982 let thread = QualifiedThreadId {
3983 task: guest_task,
3984 thread,
3985 };
3986 let thread_mut = concurrent_state.get_mut(thread.thread)?;
3987 if let Some(set) = thread_mut.wake_on_cancel.take() {
3988 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3990 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3991 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3992 runtime_instance,
3993 GuestCall {
3994 thread,
3995 kind: GuestCallKind::DeliverEvent {
3996 instance,
3997 set: None,
3998 },
3999 },
4000 ),
4001 None => bail_bug!("thread not present in wake_on_cancel set"),
4002 };
4003 concurrent_state.push_high_priority(item);
4004
4005 let caller = store.current_guest_thread()?;
4006 store.suspend(SuspendReason::Yielding {
4007 thread: caller,
4008 cancellable: false,
4009 skip_may_block_check: false,
4012 })?;
4013 break;
4014 } else if let GuestThreadState::Ready {
4015 cancellable: true, ..
4016 } = &thread_mut.state
4017 {
4018 concurrent_state.promote_thread_work_item(thread);
4021 let caller = store.current_guest_thread()?;
4022 store.suspend(SuspendReason::Yielding {
4023 thread: caller,
4024 cancellable: false,
4025 skip_may_block_check: false,
4026 })?;
4027 break;
4028 }
4029 }
4030
4031 needs_block = !store
4034 .concurrent_state_mut()?
4035 .get_mut(guest_task)?
4036 .returned_or_cancelled()
4037 } else {
4038 needs_block = false;
4039 }
4040 };
4041
4042 if needs_block {
4046 if async_ {
4047 return Ok(BLOCKED);
4048 }
4049
4050 store.wait_for_event(waitable)?;
4054
4055 }
4057
4058 let event = waitable.take_event(store.concurrent_state_mut()?)?;
4059 if let Some(Event::Subtask {
4060 status: status @ (Status::Returned | Status::ReturnCancelled),
4061 }) = event
4062 {
4063 Ok(status as u32)
4064 } else {
4065 bail!(Trap::SubtaskCancelAfterTerminal);
4066 }
4067 }
4068}
4069
4070pub trait VMComponentAsyncStore {
4078 unsafe fn prepare_call(
4084 &mut self,
4085 instance: Instance,
4086 memory: *mut VMMemoryDefinition,
4087 start: NonNull<VMFuncRef>,
4088 return_: NonNull<VMFuncRef>,
4089 caller_instance: RuntimeComponentInstanceIndex,
4090 callee_instance: RuntimeComponentInstanceIndex,
4091 task_return_type: TypeTupleIndex,
4092 callee_async: bool,
4093 string_encoding: StringEncoding,
4094 result_count: u32,
4095 storage: *mut ValRaw,
4096 storage_len: usize,
4097 ) -> Result<()>;
4098
4099 unsafe fn sync_start(
4102 &mut self,
4103 instance: Instance,
4104 callback: *mut VMFuncRef,
4105 callee: NonNull<VMFuncRef>,
4106 param_count: u32,
4107 storage: *mut MaybeUninit<ValRaw>,
4108 storage_len: usize,
4109 ) -> Result<()>;
4110
4111 unsafe fn async_start(
4114 &mut self,
4115 instance: Instance,
4116 callback: *mut VMFuncRef,
4117 post_return: *mut VMFuncRef,
4118 callee: NonNull<VMFuncRef>,
4119 param_count: u32,
4120 result_count: u32,
4121 flags: u32,
4122 ) -> Result<u32>;
4123
4124 fn future_write(
4126 &mut self,
4127 instance: Instance,
4128 caller: RuntimeComponentInstanceIndex,
4129 ty: TypeFutureTableIndex,
4130 options: OptionsIndex,
4131 future: u32,
4132 address: u32,
4133 ) -> Result<u32>;
4134
4135 fn future_read(
4137 &mut self,
4138 instance: Instance,
4139 caller: RuntimeComponentInstanceIndex,
4140 ty: TypeFutureTableIndex,
4141 options: OptionsIndex,
4142 future: u32,
4143 address: u32,
4144 ) -> Result<u32>;
4145
4146 fn future_drop_writable(
4148 &mut self,
4149 instance: Instance,
4150 ty: TypeFutureTableIndex,
4151 writer: u32,
4152 ) -> Result<()>;
4153
4154 fn stream_write(
4156 &mut self,
4157 instance: Instance,
4158 caller: RuntimeComponentInstanceIndex,
4159 ty: TypeStreamTableIndex,
4160 options: OptionsIndex,
4161 stream: u32,
4162 address: u32,
4163 count: u32,
4164 ) -> Result<u32>;
4165
4166 fn stream_read(
4168 &mut self,
4169 instance: Instance,
4170 caller: RuntimeComponentInstanceIndex,
4171 ty: TypeStreamTableIndex,
4172 options: OptionsIndex,
4173 stream: u32,
4174 address: u32,
4175 count: u32,
4176 ) -> Result<u32>;
4177
4178 fn flat_stream_write(
4181 &mut self,
4182 instance: Instance,
4183 caller: RuntimeComponentInstanceIndex,
4184 ty: TypeStreamTableIndex,
4185 options: OptionsIndex,
4186 payload_size: u32,
4187 payload_align: u32,
4188 stream: u32,
4189 address: u32,
4190 count: u32,
4191 ) -> Result<u32>;
4192
4193 fn flat_stream_read(
4196 &mut self,
4197 instance: Instance,
4198 caller: RuntimeComponentInstanceIndex,
4199 ty: TypeStreamTableIndex,
4200 options: OptionsIndex,
4201 payload_size: u32,
4202 payload_align: u32,
4203 stream: u32,
4204 address: u32,
4205 count: u32,
4206 ) -> Result<u32>;
4207
4208 fn stream_drop_writable(
4210 &mut self,
4211 instance: Instance,
4212 ty: TypeStreamTableIndex,
4213 writer: u32,
4214 ) -> Result<()>;
4215
4216 fn error_context_debug_message(
4218 &mut self,
4219 instance: Instance,
4220 ty: TypeComponentLocalErrorContextTableIndex,
4221 options: OptionsIndex,
4222 err_ctx_handle: u32,
4223 debug_msg_address: u32,
4224 ) -> Result<()>;
4225
4226 fn thread_new_indirect(
4228 &mut self,
4229 instance: Instance,
4230 caller: RuntimeComponentInstanceIndex,
4231 func_ty_idx: TypeFuncIndex,
4232 start_func_table_idx: RuntimeTableIndex,
4233 start_func_idx: u32,
4234 context: i32,
4235 ) -> Result<u32>;
4236}
4237
4238impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4240 unsafe fn prepare_call(
4241 &mut self,
4242 instance: Instance,
4243 memory: *mut VMMemoryDefinition,
4244 start: NonNull<VMFuncRef>,
4245 return_: NonNull<VMFuncRef>,
4246 caller_instance: RuntimeComponentInstanceIndex,
4247 callee_instance: RuntimeComponentInstanceIndex,
4248 task_return_type: TypeTupleIndex,
4249 callee_async: bool,
4250 string_encoding: StringEncoding,
4251 result_count_or_max_if_async: u32,
4252 storage: *mut ValRaw,
4253 storage_len: usize,
4254 ) -> Result<()> {
4255 let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4259
4260 unsafe {
4261 instance.prepare_call(
4262 StoreContextMut(self),
4263 start,
4264 return_,
4265 caller_instance,
4266 callee_instance,
4267 task_return_type,
4268 callee_async,
4269 memory,
4270 string_encoding,
4271 match result_count_or_max_if_async {
4272 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4273 params,
4274 has_result: false,
4275 },
4276 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4277 params,
4278 has_result: true,
4279 },
4280 result_count => CallerInfo::Sync {
4281 params,
4282 result_count,
4283 },
4284 },
4285 )
4286 }
4287 }
4288
4289 unsafe fn sync_start(
4290 &mut self,
4291 instance: Instance,
4292 callback: *mut VMFuncRef,
4293 callee: NonNull<VMFuncRef>,
4294 param_count: u32,
4295 storage: *mut MaybeUninit<ValRaw>,
4296 storage_len: usize,
4297 ) -> Result<()> {
4298 unsafe {
4299 instance
4300 .start_call(
4301 StoreContextMut(self),
4302 callback,
4303 ptr::null_mut(),
4304 callee,
4305 param_count,
4306 1,
4307 START_FLAG_ASYNC_CALLEE,
4308 Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4312 )
4313 .map(drop)
4314 }
4315 }
4316
4317 unsafe fn async_start(
4318 &mut self,
4319 instance: Instance,
4320 callback: *mut VMFuncRef,
4321 post_return: *mut VMFuncRef,
4322 callee: NonNull<VMFuncRef>,
4323 param_count: u32,
4324 result_count: u32,
4325 flags: u32,
4326 ) -> Result<u32> {
4327 unsafe {
4328 instance.start_call(
4329 StoreContextMut(self),
4330 callback,
4331 post_return,
4332 callee,
4333 param_count,
4334 result_count,
4335 flags,
4336 None,
4337 )
4338 }
4339 }
4340
4341 fn future_write(
4342 &mut self,
4343 instance: Instance,
4344 caller: RuntimeComponentInstanceIndex,
4345 ty: TypeFutureTableIndex,
4346 options: OptionsIndex,
4347 future: u32,
4348 address: u32,
4349 ) -> Result<u32> {
4350 instance
4351 .guest_write(
4352 StoreContextMut(self),
4353 caller,
4354 TransmitIndex::Future(ty),
4355 options,
4356 None,
4357 future,
4358 address,
4359 1,
4360 )
4361 .map(|result| result.encode())
4362 }
4363
4364 fn future_read(
4365 &mut self,
4366 instance: Instance,
4367 caller: RuntimeComponentInstanceIndex,
4368 ty: TypeFutureTableIndex,
4369 options: OptionsIndex,
4370 future: u32,
4371 address: u32,
4372 ) -> Result<u32> {
4373 instance
4374 .guest_read(
4375 StoreContextMut(self),
4376 caller,
4377 TransmitIndex::Future(ty),
4378 options,
4379 None,
4380 future,
4381 address,
4382 1,
4383 )
4384 .map(|result| result.encode())
4385 }
4386
4387 fn stream_write(
4388 &mut self,
4389 instance: Instance,
4390 caller: RuntimeComponentInstanceIndex,
4391 ty: TypeStreamTableIndex,
4392 options: OptionsIndex,
4393 stream: u32,
4394 address: u32,
4395 count: u32,
4396 ) -> Result<u32> {
4397 instance
4398 .guest_write(
4399 StoreContextMut(self),
4400 caller,
4401 TransmitIndex::Stream(ty),
4402 options,
4403 None,
4404 stream,
4405 address,
4406 count,
4407 )
4408 .map(|result| result.encode())
4409 }
4410
4411 fn stream_read(
4412 &mut self,
4413 instance: Instance,
4414 caller: RuntimeComponentInstanceIndex,
4415 ty: TypeStreamTableIndex,
4416 options: OptionsIndex,
4417 stream: u32,
4418 address: u32,
4419 count: u32,
4420 ) -> Result<u32> {
4421 instance
4422 .guest_read(
4423 StoreContextMut(self),
4424 caller,
4425 TransmitIndex::Stream(ty),
4426 options,
4427 None,
4428 stream,
4429 address,
4430 count,
4431 )
4432 .map(|result| result.encode())
4433 }
4434
4435 fn future_drop_writable(
4436 &mut self,
4437 instance: Instance,
4438 ty: TypeFutureTableIndex,
4439 writer: u32,
4440 ) -> Result<()> {
4441 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4442 }
4443
4444 fn flat_stream_write(
4445 &mut self,
4446 instance: Instance,
4447 caller: RuntimeComponentInstanceIndex,
4448 ty: TypeStreamTableIndex,
4449 options: OptionsIndex,
4450 payload_size: u32,
4451 payload_align: u32,
4452 stream: u32,
4453 address: u32,
4454 count: u32,
4455 ) -> Result<u32> {
4456 instance
4457 .guest_write(
4458 StoreContextMut(self),
4459 caller,
4460 TransmitIndex::Stream(ty),
4461 options,
4462 Some(FlatAbi {
4463 size: payload_size,
4464 align: payload_align,
4465 }),
4466 stream,
4467 address,
4468 count,
4469 )
4470 .map(|result| result.encode())
4471 }
4472
4473 fn flat_stream_read(
4474 &mut self,
4475 instance: Instance,
4476 caller: RuntimeComponentInstanceIndex,
4477 ty: TypeStreamTableIndex,
4478 options: OptionsIndex,
4479 payload_size: u32,
4480 payload_align: u32,
4481 stream: u32,
4482 address: u32,
4483 count: u32,
4484 ) -> Result<u32> {
4485 instance
4486 .guest_read(
4487 StoreContextMut(self),
4488 caller,
4489 TransmitIndex::Stream(ty),
4490 options,
4491 Some(FlatAbi {
4492 size: payload_size,
4493 align: payload_align,
4494 }),
4495 stream,
4496 address,
4497 count,
4498 )
4499 .map(|result| result.encode())
4500 }
4501
4502 fn stream_drop_writable(
4503 &mut self,
4504 instance: Instance,
4505 ty: TypeStreamTableIndex,
4506 writer: u32,
4507 ) -> Result<()> {
4508 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4509 }
4510
4511 fn error_context_debug_message(
4512 &mut self,
4513 instance: Instance,
4514 ty: TypeComponentLocalErrorContextTableIndex,
4515 options: OptionsIndex,
4516 err_ctx_handle: u32,
4517 debug_msg_address: u32,
4518 ) -> Result<()> {
4519 instance.error_context_debug_message(
4520 StoreContextMut(self),
4521 ty,
4522 options,
4523 err_ctx_handle,
4524 debug_msg_address,
4525 )
4526 }
4527
4528 fn thread_new_indirect(
4529 &mut self,
4530 instance: Instance,
4531 caller: RuntimeComponentInstanceIndex,
4532 func_ty_idx: TypeFuncIndex,
4533 start_func_table_idx: RuntimeTableIndex,
4534 start_func_idx: u32,
4535 context: i32,
4536 ) -> Result<u32> {
4537 instance.thread_new_indirect(
4538 StoreContextMut(self),
4539 caller,
4540 func_ty_idx,
4541 start_func_table_idx,
4542 start_func_idx,
4543 context,
4544 )
4545 }
4546}
4547
4548type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4549
4550pub(crate) struct HostTask {
4554 common: WaitableCommon,
4555
4556 caller: QualifiedThreadId,
4558
4559 call_context: CallContext,
4562
4563 state: HostTaskState,
4564}
4565
4566enum HostTaskState {
4567 CalleeStarted,
4572
4573 CalleeRunning(JoinHandle),
4578
4579 CalleeFinished(LiftedResult),
4583
4584 CalleeDone { cancelled: bool },
4587}
4588
4589impl HostTask {
4590 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4591 Self {
4592 common: WaitableCommon::default(),
4593 call_context: CallContext::default(),
4594 caller,
4595 state,
4596 }
4597 }
4598}
4599
4600impl TableDebug for HostTask {
4601 fn type_name() -> &'static str {
4602 "HostTask"
4603 }
4604}
4605
4606type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4607
4608enum Caller {
4610 Host {
4612 tx: Option<oneshot::Sender<LiftedResult>>,
4614 host_future_present: bool,
4617 caller: CurrentThread,
4621 },
4622 Guest {
4624 thread: QualifiedThreadId,
4626 },
4627}
4628
4629struct LiftResult {
4632 lift: RawLift,
4633 ty: TypeTupleIndex,
4634 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4635 string_encoding: StringEncoding,
4636}
4637
4638#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4643pub(crate) struct QualifiedThreadId {
4644 task: TableId<GuestTask>,
4645 thread: TableId<GuestThread>,
4646}
4647
4648impl QualifiedThreadId {
4649 fn qualify(
4650 state: &mut ConcurrentState,
4651 thread: TableId<GuestThread>,
4652 ) -> Result<QualifiedThreadId> {
4653 Ok(QualifiedThreadId {
4654 task: state.get_mut(thread)?.parent_task,
4655 thread,
4656 })
4657 }
4658}
4659
4660impl fmt::Debug for QualifiedThreadId {
4661 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4662 f.debug_tuple("QualifiedThreadId")
4663 .field(&self.task.rep())
4664 .field(&self.thread.rep())
4665 .finish()
4666 }
4667}
4668
4669enum GuestThreadState {
4670 NotStartedImplicit,
4671 NotStartedExplicit(
4672 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4673 ),
4674 Running,
4675 Suspended(StoreFiber<'static>),
4676 Ready {
4677 fiber: StoreFiber<'static>,
4678 cancellable: bool,
4679 },
4680 Completed,
4681}
4682pub struct GuestThread {
4683 context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4686 parent_task: TableId<GuestTask>,
4688 wake_on_cancel: Option<TableId<WaitableSet>>,
4691 state: GuestThreadState,
4693 instance_rep: Option<u32>,
4696 sync_call_set: TableId<WaitableSet>,
4698}
4699
4700impl GuestThread {
4701 fn from_instance(
4704 state: Pin<&mut ComponentInstance>,
4705 caller_instance: RuntimeComponentInstanceIndex,
4706 guest_thread: u32,
4707 ) -> Result<TableId<Self>> {
4708 let rep = state.instance_states().0[caller_instance]
4709 .thread_handle_table()
4710 .guest_thread_rep(guest_thread)?;
4711 Ok(TableId::new(rep))
4712 }
4713
4714 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4715 let sync_call_set = state.push(WaitableSet {
4716 is_sync_call_set: true,
4717 ..WaitableSet::default()
4718 })?;
4719 Ok(Self {
4720 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4721 parent_task,
4722 wake_on_cancel: None,
4723 state: GuestThreadState::NotStartedImplicit,
4724 instance_rep: None,
4725 sync_call_set,
4726 })
4727 }
4728
4729 fn new_explicit(
4730 state: &mut ConcurrentState,
4731 parent_task: TableId<GuestTask>,
4732 start_func: Box<
4733 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4734 >,
4735 ) -> Result<Self> {
4736 let sync_call_set = state.push(WaitableSet {
4737 is_sync_call_set: true,
4738 ..WaitableSet::default()
4739 })?;
4740 Ok(Self {
4741 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4742 parent_task,
4743 wake_on_cancel: None,
4744 state: GuestThreadState::NotStartedExplicit(start_func),
4745 instance_rep: None,
4746 sync_call_set,
4747 })
4748 }
4749}
4750
4751impl TableDebug for GuestThread {
4752 fn type_name() -> &'static str {
4753 "GuestThread"
4754 }
4755}
4756
4757enum SyncResult {
4758 NotProduced,
4759 Produced(Option<ValRaw>),
4760 Taken,
4761}
4762
4763impl SyncResult {
4764 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4765 Ok(match mem::replace(self, SyncResult::Taken) {
4766 SyncResult::NotProduced => None,
4767 SyncResult::Produced(val) => Some(val),
4768 SyncResult::Taken => {
4769 bail_bug!("attempted to take a synchronous result that was already taken")
4770 }
4771 })
4772 }
4773}
4774
4775#[derive(Debug)]
4776enum HostFutureState {
4777 NotApplicable,
4778 Live,
4779 Dropped,
4780}
4781
4782pub(crate) struct GuestTask {
4784 common: WaitableCommon,
4786 lower_params: Option<RawLower>,
4788 lift_result: Option<LiftResult>,
4790 result: Option<LiftedResult>,
4793 callback: Option<CallbackFn>,
4796 caller: Caller,
4798 call_context: CallContext,
4803 sync_result: SyncResult,
4806 cancel_sent: bool,
4809 starting_sent: bool,
4812 instance: RuntimeInstance,
4819 event: Option<Event>,
4822 exited: bool,
4824 threads: HashSet<TableId<GuestThread>>,
4826 host_future_state: HostFutureState,
4829 async_function: bool,
4832
4833 decremented_interesting_task_count: bool,
4834}
4835
4836impl GuestTask {
4837 fn already_lowered_parameters(&self) -> bool {
4838 self.lower_params.is_none()
4840 }
4841
4842 fn returned_or_cancelled(&self) -> bool {
4843 self.lift_result.is_none()
4845 }
4846
4847 fn ready_to_delete(&self) -> bool {
4848 let threads_completed = self.threads.is_empty();
4849 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4850 let pending_completion_event = matches!(
4851 self.common.event,
4852 Some(Event::Subtask {
4853 status: Status::Returned | Status::ReturnCancelled
4854 })
4855 );
4856 let ready = threads_completed
4857 && !has_sync_result
4858 && !pending_completion_event
4859 && !matches!(self.host_future_state, HostFutureState::Live);
4860 log::trace!(
4861 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4862 threads_completed,
4863 has_sync_result,
4864 pending_completion_event,
4865 self.host_future_state
4866 );
4867 ready
4868 }
4869
4870 fn new(
4871 state: &mut ConcurrentState,
4872 lower_params: RawLower,
4873 lift_result: LiftResult,
4874 caller: Caller,
4875 callback: Option<CallbackFn>,
4876 instance: RuntimeInstance,
4877 async_function: bool,
4878 ) -> Result<QualifiedThreadId> {
4879 let host_future_state = match &caller {
4880 Caller::Guest { .. } => HostFutureState::NotApplicable,
4881 Caller::Host {
4882 host_future_present,
4883 ..
4884 } => {
4885 if *host_future_present {
4886 HostFutureState::Live
4887 } else {
4888 HostFutureState::NotApplicable
4889 }
4890 }
4891 };
4892 let task = state.push(Self {
4893 common: WaitableCommon::default(),
4894 lower_params: Some(lower_params),
4895 lift_result: Some(lift_result),
4896 result: None,
4897 callback,
4898 caller,
4899 call_context: CallContext::default(),
4900 sync_result: SyncResult::NotProduced,
4901 cancel_sent: false,
4902 starting_sent: false,
4903 instance,
4904 event: None,
4905 exited: false,
4906 threads: HashSet::new(),
4907 host_future_state,
4908 async_function,
4909 decremented_interesting_task_count: false,
4910 })?;
4911 let new_thread = GuestThread::new_implicit(state, task)?;
4912 let thread = state.push(new_thread)?;
4913 state.get_mut(task)?.threads.insert(thread);
4914 state.interesting_tasks += 1;
4915 Ok(QualifiedThreadId { task, thread })
4916 }
4917}
4918
4919impl TableDebug for GuestTask {
4920 fn type_name() -> &'static str {
4921 "GuestTask"
4922 }
4923}
4924
4925#[derive(Default)]
4927struct WaitableCommon {
4928 event: Option<Event>,
4930 set: Option<TableId<WaitableSet>>,
4932 handle: Option<u32>,
4934}
4935
4936#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4938enum Waitable {
4939 Host(TableId<HostTask>),
4941 Guest(TableId<GuestTask>),
4943 Transmit(TableId<TransmitHandle>),
4945}
4946
4947impl Waitable {
4948 fn from_instance(
4951 state: Pin<&mut ComponentInstance>,
4952 caller_instance: RuntimeComponentInstanceIndex,
4953 waitable: u32,
4954 ) -> Result<Self> {
4955 use crate::runtime::vm::component::Waitable;
4956
4957 let (waitable, kind) = state.instance_states().0[caller_instance]
4958 .handle_table()
4959 .waitable_rep(waitable)?;
4960
4961 Ok(match kind {
4962 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4963 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4964 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4965 })
4966 }
4967
4968 fn rep(&self) -> u32 {
4970 match self {
4971 Self::Host(id) => id.rep(),
4972 Self::Guest(id) => id.rep(),
4973 Self::Transmit(id) => id.rep(),
4974 }
4975 }
4976
4977 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4981 log::trace!("waitable {self:?} join set {set:?}");
4982
4983 let old = mem::replace(&mut self.common(state)?.set, set);
4984
4985 if let Some(old) = old {
4986 match *self {
4987 Waitable::Host(id) => state.remove_child(id, old),
4988 Waitable::Guest(id) => state.remove_child(id, old),
4989 Waitable::Transmit(id) => state.remove_child(id, old),
4990 }?;
4991
4992 state.get_mut(old)?.ready.remove(self);
4993 }
4994
4995 if let Some(set) = set {
4996 match *self {
4997 Waitable::Host(id) => state.add_child(id, set),
4998 Waitable::Guest(id) => state.add_child(id, set),
4999 Waitable::Transmit(id) => state.add_child(id, set),
5000 }?;
5001
5002 if self.common(state)?.event.is_some() {
5003 self.mark_ready(state)?;
5004 }
5005 }
5006
5007 Ok(())
5008 }
5009
5010 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
5012 Ok(match self {
5013 Self::Host(id) => &mut state.get_mut(*id)?.common,
5014 Self::Guest(id) => &mut state.get_mut(*id)?.common,
5015 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
5016 })
5017 }
5018
5019 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
5023 log::trace!("set event for {self:?}: {event:?}");
5024 self.common(state)?.event = event;
5025 self.mark_ready(state)
5026 }
5027
5028 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
5030 let common = self.common(state)?;
5031 let event = common.event.take();
5032 if let Some(set) = self.common(state)?.set {
5033 state.get_mut(set)?.ready.remove(self);
5034 }
5035
5036 Ok(event)
5037 }
5038
5039 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
5043 if let Some(set) = self.common(state)?.set {
5044 state.get_mut(set)?.ready.insert(*self);
5045 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
5046 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
5047 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
5048
5049 let item = match mode {
5050 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
5051 WaitMode::Callback(instance) => WorkItem::GuestCall(
5052 state.get_mut(thread.task)?.instance.index,
5053 GuestCall {
5054 thread,
5055 kind: GuestCallKind::DeliverEvent {
5056 instance,
5057 set: Some(set),
5058 },
5059 },
5060 ),
5061 };
5062 state.push_high_priority(item);
5063 }
5064 }
5065 Ok(())
5066 }
5067
5068 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
5070 match self {
5071 Self::Host(task) => {
5072 log::trace!("delete host task {task:?}");
5073 state.delete(*task)?;
5074 }
5075 Self::Guest(task) => {
5076 log::trace!("delete guest task {task:?}");
5077 let task = state.delete(*task)?;
5078
5079 debug_assert!(task.decremented_interesting_task_count);
5086 }
5087 Self::Transmit(task) => {
5088 state.delete(*task)?;
5089 }
5090 }
5091
5092 Ok(())
5093 }
5094}
5095
5096impl fmt::Debug for Waitable {
5097 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5098 match self {
5099 Self::Host(id) => write!(f, "{id:?}"),
5100 Self::Guest(id) => write!(f, "{id:?}"),
5101 Self::Transmit(id) => write!(f, "{id:?}"),
5102 }
5103 }
5104}
5105
5106#[derive(Default)]
5108struct WaitableSet {
5109 ready: BTreeSet<Waitable>,
5111 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
5113 is_sync_call_set: bool,
5116}
5117
5118impl TableDebug for WaitableSet {
5119 fn type_name() -> &'static str {
5120 "WaitableSet"
5121 }
5122}
5123
5124type RawLower =
5126 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
5127
5128type RawLift = Box<
5130 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5131>;
5132
5133type LiftedResult = Box<dyn Any + Send + Sync>;
5137
5138struct DummyResult;
5141
5142#[derive(Default)]
5144pub struct ConcurrentInstanceState {
5145 backpressure: u16,
5147 do_not_enter: bool,
5149 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
5152}
5153
5154impl ConcurrentInstanceState {
5155 pub fn pending_is_empty(&self) -> bool {
5156 self.pending.is_empty()
5157 }
5158}
5159
5160#[derive(Debug, Copy, Clone)]
5161pub(crate) enum CurrentThread {
5162 Guest(QualifiedThreadId),
5163 Host(TableId<HostTask>),
5164 None,
5165}
5166
5167impl CurrentThread {
5168 fn guest(&self) -> Option<&QualifiedThreadId> {
5169 match self {
5170 Self::Guest(id) => Some(id),
5171 _ => None,
5172 }
5173 }
5174
5175 fn host(&self) -> Option<TableId<HostTask>> {
5176 match self {
5177 Self::Host(id) => Some(*id),
5178 _ => None,
5179 }
5180 }
5181
5182 fn is_none(&self) -> bool {
5183 matches!(self, Self::None)
5184 }
5185}
5186
5187impl From<QualifiedThreadId> for CurrentThread {
5188 fn from(id: QualifiedThreadId) -> Self {
5189 Self::Guest(id)
5190 }
5191}
5192
5193impl From<TableId<HostTask>> for CurrentThread {
5194 fn from(id: TableId<HostTask>) -> Self {
5195 Self::Host(id)
5196 }
5197}
5198
5199pub struct ConcurrentState {
5201 unforced_current_thread: CurrentThread,
5207
5208 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5213 table: AlwaysMut<ResourceTable>,
5215 high_priority: Vec<WorkItem>,
5217 low_priority: VecDeque<WorkItem>,
5219 suspend_reason: Option<SuspendReason>,
5223 worker: Option<StoreFiber<'static>>,
5227 worker_item: Option<WorkerItem>,
5229
5230 global_error_context_ref_counts:
5243 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5244
5245 interesting_tasks: usize,
5258
5259 interesting_tasks_empty_waker: Option<Waker>,
5263}
5264
5265impl Default for ConcurrentState {
5266 fn default() -> Self {
5267 Self {
5268 unforced_current_thread: CurrentThread::None,
5269 table: AlwaysMut::new(ResourceTable::new()),
5270 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5271 high_priority: Vec::new(),
5272 low_priority: VecDeque::new(),
5273 suspend_reason: None,
5274 worker: None,
5275 worker_item: None,
5276 global_error_context_ref_counts: BTreeMap::new(),
5277 interesting_tasks: 0,
5278 interesting_tasks_empty_waker: None,
5279 }
5280 }
5281}
5282
5283impl ConcurrentState {
5284 pub(crate) fn take_fibers_and_futures(
5301 &mut self,
5302 fibers: &mut Vec<StoreFiber<'static>>,
5303 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5304 ) {
5305 for entry in self.table.get_mut().iter_mut() {
5306 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5307 for mode in mem::take(&mut set.waiting).into_values() {
5308 if let WaitMode::Fiber(fiber) = mode {
5309 fibers.push(fiber);
5310 }
5311 }
5312 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5313 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5314 mem::replace(&mut thread.state, GuestThreadState::Completed)
5315 {
5316 fibers.push(fiber);
5317 }
5318 }
5319 }
5320
5321 if let Some(fiber) = self.worker.take() {
5322 fibers.push(fiber);
5323 }
5324
5325 let mut handle_item = |item| match item {
5326 WorkItem::ResumeFiber(fiber) => {
5327 fibers.push(fiber);
5328 }
5329 WorkItem::PushFuture(future) => {
5330 self.futures
5331 .get_mut()
5332 .as_mut()
5333 .unwrap()
5334 .push(future.into_inner());
5335 }
5336 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5337 }
5338 };
5339
5340 for item in mem::take(&mut self.high_priority) {
5341 handle_item(item);
5342 }
5343 for item in mem::take(&mut self.low_priority) {
5344 handle_item(item);
5345 }
5346
5347 if let Some(them) = self.futures.get_mut().take() {
5348 futures.push(them);
5349 }
5350 }
5351
5352 #[cfg(feature = "gc")]
5353 pub(crate) fn trace_fiber_roots(
5354 &mut self,
5355 modules: &ModuleRegistry,
5356 unwind: &dyn Unwind,
5357 gc_roots_list: &mut GcRootsList,
5358 ) {
5359 let ConcurrentState {
5360 table,
5361 worker,
5362 high_priority,
5363 low_priority,
5364
5365 futures: _,
5369
5370 worker_item: _,
5372 unforced_current_thread: _,
5373 suspend_reason: _,
5374 global_error_context_ref_counts: _,
5375 interesting_tasks: _,
5376 interesting_tasks_empty_waker: _,
5377 } = self;
5378
5379 for entry in table.get_mut().iter_mut() {
5380 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5381 for mode in set.waiting.values_mut() {
5382 if let WaitMode::Fiber(fiber) = mode {
5383 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5384 }
5385 }
5386 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5387 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5388 &mut thread.state
5389 {
5390 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5391 }
5392 }
5393 }
5394
5395 if let Some(fiber) = worker {
5396 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5397 }
5398
5399 let mut handle_item = |item: &mut WorkItem| match item {
5400 WorkItem::ResumeFiber(fiber) => {
5401 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5402 }
5403 WorkItem::PushFuture(_future) => {
5404 }
5407 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5408 }
5409 };
5410
5411 for item in high_priority {
5412 handle_item(item);
5413 }
5414 for item in low_priority {
5415 handle_item(item);
5416 }
5417 }
5418
5419 fn push<V: Send + Sync + 'static>(
5420 &mut self,
5421 value: V,
5422 ) -> Result<TableId<V>, ResourceTableError> {
5423 self.table.get_mut().push(value).map(TableId::from)
5424 }
5425
5426 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5427 self.table.get_mut().get_mut(&Resource::from(id))
5428 }
5429
5430 pub fn add_child<T: 'static, U: 'static>(
5431 &mut self,
5432 child: TableId<T>,
5433 parent: TableId<U>,
5434 ) -> Result<(), ResourceTableError> {
5435 self.table
5436 .get_mut()
5437 .add_child(Resource::from(child), Resource::from(parent))
5438 }
5439
5440 pub fn remove_child<T: 'static, U: 'static>(
5441 &mut self,
5442 child: TableId<T>,
5443 parent: TableId<U>,
5444 ) -> Result<(), ResourceTableError> {
5445 self.table
5446 .get_mut()
5447 .remove_child(Resource::from(child), Resource::from(parent))
5448 }
5449
5450 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5451 self.table.get_mut().delete(Resource::from(id))
5452 }
5453
5454 fn push_future(&mut self, future: HostTaskFuture) {
5455 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5462 }
5463
5464 fn push_high_priority(&mut self, item: WorkItem) {
5465 log::trace!("push high priority: {item:?}");
5466 self.high_priority.push(item);
5467 }
5468
5469 fn push_low_priority(&mut self, item: WorkItem) {
5470 log::trace!("push low priority: {item:?}");
5471 self.low_priority.push_front(item);
5472 }
5473
5474 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5475 if high_priority {
5476 self.push_high_priority(item);
5477 } else {
5478 self.push_low_priority(item);
5479 }
5480 }
5481
5482 fn promote_instance_local_thread_work_item(
5483 &mut self,
5484 current_instance: RuntimeComponentInstanceIndex,
5485 ) -> bool {
5486 self.promote_work_items_matching(|item: &WorkItem| match item {
5487 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5488 *instance == current_instance
5489 }
5490 _ => false,
5491 })
5492 }
5493
5494 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5495 self.promote_work_items_matching(|item: &WorkItem| match item {
5496 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5497 *t == thread
5498 }
5499 _ => false,
5500 })
5501 }
5502
5503 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5504 where
5505 F: FnMut(&WorkItem) -> bool,
5506 {
5507 if self.high_priority.iter().any(&mut predicate) {
5511 true
5512 }
5513 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5516 let item = self.low_priority.remove(idx).unwrap();
5517 self.push_high_priority(item);
5518 true
5519 } else {
5520 false
5521 }
5522 }
5523
5524 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5525 if self.may_block(task)? {
5526 Ok(())
5527 } else {
5528 Err(Trap::CannotBlockSyncTask.into())
5529 }
5530 }
5531
5532 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5533 let task = self.get_mut(task)?;
5534 Ok(task.async_function || task.returned_or_cancelled())
5535 }
5536
5537 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5543 let (task, is_host) = (task >> 1, task & 1 == 1);
5544 if is_host {
5545 let task: TableId<HostTask> = TableId::new(task);
5546 Ok(&mut self.get_mut(task)?.call_context)
5547 } else {
5548 let task: TableId<GuestTask> = TableId::new(task);
5549 Ok(&mut self.get_mut(task)?.call_context)
5550 }
5551 }
5552
5553 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5554 match self.futures.get_mut().as_mut() {
5555 Some(f) => Ok(f),
5556 None => bail_bug!("futures field of concurrent state is currently taken"),
5557 }
5558 }
5559
5560 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5561 self.table.get_mut()
5562 }
5563
5564 fn parent(&mut self, cur: CurrentThread) -> Option<CurrentThread> {
5566 match cur {
5567 CurrentThread::Guest(thread) => {
5568 let task = self.get_mut(thread.task).ok()?;
5569 Some(match task.caller {
5570 Caller::Host { caller, .. } => caller,
5571 Caller::Guest { thread } => thread.into(),
5572 })
5573 }
5574 CurrentThread::Host(id) => Some(self.get_mut(id).ok()?.caller.into()),
5575 CurrentThread::None => None,
5576 }
5577 }
5578}
5579
5580fn for_any_lower<
5583 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5584>(
5585 fun: F,
5586) -> F {
5587 fun
5588}
5589
5590fn for_any_lift<
5592 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5593>(
5594 fun: F,
5595) -> F {
5596 fun
5597}
5598
5599fn check_ambient_store(id: StoreId) {
5600 let message = "\
5601 `Future`s which depend on asynchronous component tasks, streams, or \
5602 futures to complete may only be polled from the event loop of the \
5603 store to which they belong. Please use \
5604 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5605 ";
5606 tls::try_get(|store| {
5607 let matched = match store {
5608 tls::TryGet::Some(store) => store.id() == id,
5609 tls::TryGet::Taken | tls::TryGet::None => false,
5610 };
5611
5612 if !matched {
5613 panic!("{message}")
5614 }
5615 });
5616}
5617
5618fn check_recursive_run() {
5621 tls::try_get(|store| {
5622 if !matches!(store, tls::TryGet::None) {
5623 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5624 }
5625 });
5626}
5627
5628fn unpack_callback_code(code: u32) -> (u32, u32) {
5629 (code & 0xF, code >> 4)
5630}
5631
5632struct WaitableCheckParams {
5636 set: TableId<WaitableSet>,
5637 options: OptionsIndex,
5638 payload: u32,
5639}
5640
5641enum WaitableCheck {
5644 Wait,
5645 Poll,
5646}
5647
5648#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
5657pub struct GuestTaskId(TableId<GuestTask>);
5658
5659pub(crate) struct PreparedCall<R> {
5661 handle: Func,
5663 thread: QualifiedThreadId,
5665 param_count: usize,
5667 rx: oneshot::Receiver<LiftedResult>,
5670 runtime_instance: RuntimeInstance,
5672 _phantom: PhantomData<R>,
5673}
5674
5675impl<R> PreparedCall<R> {
5676 pub(crate) fn task_id(&self) -> TaskId {
5678 TaskId {
5679 task: self.thread.task,
5680 runtime_instance: self.runtime_instance,
5681 }
5682 }
5683}
5684
5685pub(crate) struct TaskId {
5687 task: TableId<GuestTask>,
5688 runtime_instance: RuntimeInstance,
5689}
5690
5691impl TaskId {
5692 pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5698 let task = store.concurrent_state_mut()?.get_mut(self.task)?;
5699 let delete = if !task.already_lowered_parameters() {
5700 store.cancel_guest_subtask_without_lowered_parameters(
5701 self.runtime_instance,
5702 self.task,
5703 )?;
5704 true
5705 } else {
5706 task.host_future_state = HostFutureState::Dropped;
5707 task.ready_to_delete()
5708 };
5709 if delete {
5710 Waitable::Guest(self.task).delete_from(store.concurrent_state_mut()?)?
5711 }
5712 Ok(())
5713 }
5714}
5715
5716pub(crate) fn prepare_call<T, R>(
5722 mut store: StoreContextMut<T>,
5723 handle: Func,
5724 param_count: usize,
5725 host_future_present: bool,
5726 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5727 + Send
5728 + Sync
5729 + 'static,
5730 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5731 + Send
5732 + Sync
5733 + 'static,
5734) -> Result<PreparedCall<R>> {
5735 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5736
5737 let instance = handle.instance().id().get(store.0);
5738 let options = &instance.component().env_component().options[options];
5739 let ty = &instance.component().types()[ty];
5740 let async_function = ty.async_;
5741 let task_return_type = ty.results;
5742 let component_instance = raw_options.instance;
5743 let callback = options.callback.map(|i| instance.runtime_callback(i));
5744 let memory = options
5745 .memory()
5746 .map(|i| instance.runtime_memory(i))
5747 .map(SendSyncPtr::new);
5748 let string_encoding = options.string_encoding;
5749 let token = StoreToken::new(store.as_context_mut());
5750 let caller = store.0.current_thread()?;
5751 let state = store.0.concurrent_state_mut()?;
5752
5753 let (tx, rx) = oneshot::channel();
5754
5755 let instance = handle.instance().runtime_instance(component_instance);
5756 let thread = GuestTask::new(
5757 state,
5758 Box::new(for_any_lower(move |store, params| {
5759 lower_params(handle, token.as_context_mut(store), params)
5760 })),
5761 LiftResult {
5762 lift: Box::new(for_any_lift(move |store, result| {
5763 lift_result(handle, store, result)
5764 })),
5765 ty: task_return_type,
5766 memory,
5767 string_encoding,
5768 },
5769 Caller::Host {
5770 tx: Some(tx),
5771 host_future_present,
5772 caller,
5773 },
5774 callback.map(|callback| {
5775 let callback = SendSyncPtr::new(callback);
5776 let instance = handle.instance();
5777 Box::new(move |store: &mut dyn VMStore, event, handle| {
5778 let store = token.as_context_mut(store);
5779 unsafe { instance.call_callback(store, callback, event, handle) }
5782 }) as CallbackFn
5783 }),
5784 instance,
5785 async_function,
5786 )?;
5787
5788 if !store.0.may_enter(instance)? {
5789 bail!(Trap::CannotEnterComponent);
5790 }
5791
5792 Ok(PreparedCall {
5793 handle,
5794 thread,
5795 param_count,
5796 runtime_instance: instance,
5797 rx,
5798 _phantom: PhantomData,
5799 })
5800}
5801
5802pub(crate) struct QueuedCall<R> {
5803 store: StoreId,
5804 task: TableId<GuestTask>,
5805 rx: oneshot::Receiver<LiftedResult>,
5806 _marker: PhantomData<fn() -> R>,
5807}
5808
5809impl<R> QueuedCall<R> {
5810 pub(crate) fn new<T: 'static>(
5817 mut store: StoreContextMut<T>,
5818 prepared: PreparedCall<R>,
5819 ) -> Result<QueuedCall<R>> {
5820 let PreparedCall {
5821 handle,
5822 thread,
5823 param_count,
5824 rx,
5825 ..
5826 } = prepared;
5827
5828 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5829
5830 Ok(QueuedCall {
5831 store: store.0.id(),
5832 task: thread.task,
5833 rx,
5834 _marker: PhantomData,
5835 })
5836 }
5837
5838 fn task(&self) -> GuestTaskId {
5839 GuestTaskId(self.task)
5840 }
5841}
5842
5843impl<R> Future for QueuedCall<R>
5844where
5845 R: 'static,
5846{
5847 type Output = Result<R>;
5848
5849 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5850 check_ambient_store(self.store);
5851 Pin::new(&mut self.rx).poll(cx).map(|result| match result {
5852 Ok(r) => match r.downcast() {
5853 Ok(r) => Ok(*r),
5854 Err(_) => bail_bug!("wrong type of value produced"),
5855 },
5856 Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5857 })
5858 }
5859}
5860
5861fn queue_call0<T: 'static>(
5864 store: StoreContextMut<T>,
5865 handle: Func,
5866 guest_thread: QualifiedThreadId,
5867 param_count: usize,
5868) -> Result<()> {
5869 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5870 let is_concurrent = raw_options.async_;
5871 let callback = raw_options.callback;
5872 let instance = handle.instance();
5873 let callee = handle.lifted_core_func(store.0);
5874 let post_return = handle.post_return_core_func(store.0);
5875 let callback = callback.map(|i| {
5876 let instance = instance.id().get(store.0);
5877 SendSyncPtr::new(instance.runtime_callback(i))
5878 });
5879
5880 log::trace!("queueing call {guest_thread:?}");
5881
5882 unsafe {
5886 instance.queue_call(
5887 store,
5888 guest_thread,
5889 SendSyncPtr::new(callee),
5890 param_count,
5891 1,
5892 is_concurrent,
5893 callback,
5894 post_return.map(SendSyncPtr::new),
5895 )
5896 }
5897}