1use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{Func, call_post_return};
56use crate::component::{
57 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61#[cfg(feature = "gc")]
62use crate::module::ModuleRegistry;
63use crate::prelude::*;
64use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
65#[cfg(feature = "gc")]
66use crate::vm::GcRootsList;
67use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
68use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMLazyThread, VMMemoryDefinition, VMStore};
69use crate::{
70 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
71};
72use alloc::borrow::ToOwned;
73use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
74use core::any::Any;
75use core::cell::UnsafeCell;
76use core::fmt;
77use core::future;
78use core::future::Future;
79use core::marker::PhantomData;
80use core::mem::{self, ManuallyDrop, MaybeUninit};
81use core::ops::DerefMut;
82use core::pin::{Pin, pin};
83use core::ptr::{self, NonNull};
84use core::task::{Context, Poll, Waker};
85use futures::channel::oneshot;
86use futures::stream::{FuturesUnordered, StreamExt};
87use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
88use table::{TableDebug, TableId};
89use wasmtime_environ::component::{
90 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
91 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96use wasmtime_environ::packed_option::ReservedValue;
97use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
98#[cfg(feature = "gc")]
99use wasmtime_unwinder::Unwind;
100
101pub use abort::JoinHandle;
102pub use func::{FuncCallConcurrent, TypedFuncCallConcurrent};
103pub use future_stream_any::{FutureAny, StreamAny};
104pub use futures_and_streams::{
105 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
106 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
107 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
108};
109pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
110
111mod abort;
112mod error_contexts;
113mod func;
114mod future_stream_any;
115mod futures_and_streams;
116pub(crate) mod table;
117pub(crate) mod tls;
118
119const BLOCKED: u32 = 0xffff_ffff;
122
123#[derive(Clone, Copy, Eq, PartialEq, Debug)]
125pub enum Status {
126 Starting = 0,
127 Started = 1,
128 Returned = 2,
129 StartCancelled = 3,
130 ReturnCancelled = 4,
131}
132
133impl Status {
134 pub fn pack(self, waitable: Option<u32>) -> u32 {
140 assert!(matches!(self, Status::Returned) == waitable.is_none());
141 let waitable = waitable.unwrap_or(0);
142 assert!(waitable < (1 << 28));
143 (waitable << 4) | (self as u32)
144 }
145}
146
147#[derive(Clone, Copy, Debug)]
150enum Event {
151 None,
152 Subtask {
153 status: Status,
154 },
155 StreamRead {
156 code: ReturnCode,
157 pending: Option<(TypeStreamTableIndex, u32)>,
158 },
159 StreamWrite {
160 code: ReturnCode,
161 pending: Option<(TypeStreamTableIndex, u32)>,
162 },
163 FutureRead {
164 code: ReturnCode,
165 pending: Option<(TypeFutureTableIndex, u32)>,
166 },
167 FutureWrite {
168 code: ReturnCode,
169 pending: Option<(TypeFutureTableIndex, u32)>,
170 },
171 Cancelled,
172}
173
174impl Event {
175 fn parts(self) -> (u32, u32) {
180 const EVENT_NONE: u32 = 0;
181 const EVENT_SUBTASK: u32 = 1;
182 const EVENT_STREAM_READ: u32 = 2;
183 const EVENT_STREAM_WRITE: u32 = 3;
184 const EVENT_FUTURE_READ: u32 = 4;
185 const EVENT_FUTURE_WRITE: u32 = 5;
186 const EVENT_CANCELLED: u32 = 6;
187 match self {
188 Event::None => (EVENT_NONE, 0),
189 Event::Cancelled => (EVENT_CANCELLED, 0),
190 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
191 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
192 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
193 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
194 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
195 }
196 }
197}
198
199mod callback_code {
201 pub const EXIT: u32 = 0;
202 pub const YIELD: u32 = 1;
203 pub const WAIT: u32 = 2;
204}
205
206const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
210
211pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
217 store: StoreContextMut<'a, T>,
218 get_data: fn(&mut T) -> D::Data<'_>,
219}
220
221impl<'a, T, D> Access<'a, T, D>
222where
223 D: HasData + ?Sized,
224 T: 'static,
225{
226 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
228 Self { store, get_data }
229 }
230
231 pub fn data_mut(&mut self) -> &mut T {
233 self.store.data_mut()
234 }
235
236 pub fn get(&mut self) -> D::Data<'_> {
238 (self.get_data)(self.data_mut())
239 }
240
241 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> Result<JoinHandle>
245 where
246 T: 'static,
247 {
248 let accessor = Accessor {
249 get_data: self.get_data,
250 token: StoreToken::new(self.store.as_context_mut()),
251 };
252 self.store
253 .as_context_mut()
254 .spawn_with_accessor(accessor, task)
255 }
256
257 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
260 self.get_data
261 }
262}
263
264impl<'a, T, D> AsContext for Access<'a, T, D>
265where
266 D: HasData + ?Sized,
267 T: 'static,
268{
269 type Data = T;
270
271 fn as_context(&self) -> StoreContext<'_, T> {
272 self.store.as_context()
273 }
274}
275
276impl<'a, T, D> AsContextMut for Access<'a, T, D>
277where
278 D: HasData + ?Sized,
279 T: 'static,
280{
281 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
282 self.store.as_context_mut()
283 }
284}
285
286pub struct Accessor<T: 'static, D = HasSelf<T>>
346where
347 D: HasData + ?Sized,
348{
349 token: StoreToken<T>,
350 get_data: fn(&mut T) -> D::Data<'_>,
351}
352
353pub trait AsAccessor {
370 type Data: 'static;
372
373 type AccessorData: HasData + ?Sized;
376
377 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
379}
380
381impl<T: AsAccessor + ?Sized> AsAccessor for &T {
382 type Data = T::Data;
383 type AccessorData = T::AccessorData;
384
385 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
386 T::as_accessor(self)
387 }
388}
389
390impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
391 type Data = T;
392 type AccessorData = D;
393
394 fn as_accessor(&self) -> &Accessor<T, D> {
395 self
396 }
397}
398
399const _: () = {
422 const fn assert<T: Send + Sync>() {}
423 assert::<Accessor<UnsafeCell<u32>>>();
424};
425
426impl<T> Accessor<T> {
427 pub(crate) fn new(token: StoreToken<T>) -> Self {
436 Self {
437 token,
438 get_data: |x| x,
439 }
440 }
441}
442
443impl<T, D> Accessor<T, D>
444where
445 D: HasData + ?Sized,
446{
447 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
465 tls::get(|vmstore| {
466 fun(Access {
467 store: self.token.as_context_mut(vmstore),
468 get_data: self.get_data,
469 })
470 })
471 }
472
473 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
476 self.get_data
477 }
478
479 pub fn with_getter<D2: HasData>(
496 &self,
497 get_data: fn(&mut T) -> D2::Data<'_>,
498 ) -> Accessor<T, D2> {
499 Accessor {
500 token: self.token,
501 get_data,
502 }
503 }
504
505 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> Result<JoinHandle>
521 where
522 T: 'static,
523 {
524 let accessor = self.clone_for_spawn();
525 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
526 }
527
528 fn clone_for_spawn(&self) -> Self {
529 Self {
530 token: self.token,
531 get_data: self.get_data,
532 }
533 }
534
535 pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
571 self.with(|mut access| {
572 let store = access.as_context_mut().0;
573 let state = store.concurrent_state_mut_without_forcing_current_thread();
574 if state.interesting_tasks == 0 {
575 Poll::Ready(())
576 } else {
577 state.interesting_tasks_empty_waker = Some(cx.waker().clone());
578 Poll::Pending
579 }
580 })
581 }
582
583 pub fn poll_ready_for_concurrent_call(&self, func: Func, cx: &mut Context<'_>) -> Poll<()> {
600 self.with(|mut access| {
601 let store = access.as_context_mut().0;
602 let (_, _, _, raw_options) = func.abi_info(store);
603 let instance = func.instance().runtime_instance(raw_options.instance);
604 let state = store.instance_state(instance).concurrent_state();
605 if state.backpressure == 0 {
606 Poll::Ready(())
607 } else {
608 store
609 .concurrent_state_mut_without_forcing_current_thread()
610 .ready_for_concurrent_call_waker = Some(cx.waker().clone());
611 Poll::Pending
612 }
613 })
614 }
615}
616
617pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
629where
630 D: HasData + ?Sized,
631{
632 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
634}
635
636enum CallerInfo {
639 Async {
641 params: Vec<ValRaw>,
642 has_result: bool,
643 },
644 Sync {
646 params: Vec<ValRaw>,
647 result_count: u32,
648 },
649}
650
651enum WaitMode {
653 Fiber(StoreFiber<'static>),
655 Callback(Instance),
658}
659
660#[derive(Debug)]
662enum SuspendReason {
663 Waiting {
666 set: TableId<WaitableSet>,
667 thread: QualifiedThreadId,
668 skip_may_block_check: bool,
669 },
670 NeedWork,
673 Yielding {
676 thread: QualifiedThreadId,
677 cancellable: bool,
678 skip_may_block_check: bool,
679 },
680 ExplicitlySuspending {
682 thread: QualifiedThreadId,
683 skip_may_block_check: bool,
684 },
685}
686
687enum GuestCallKind {
689 DeliverEvent {
692 instance: Instance,
694 set: Option<TableId<WaitableSet>>,
699 },
700 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
706 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
707}
708
709impl fmt::Debug for GuestCallKind {
710 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
711 match self {
712 Self::DeliverEvent { instance, set } => f
713 .debug_struct("DeliverEvent")
714 .field("instance", instance)
715 .field("set", set)
716 .finish(),
717 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
718 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
719 }
720 }
721}
722
723#[derive(Copy, Clone, Debug)]
725pub enum SuspensionTarget {
726 SomeSuspended(u32),
727 Some(u32),
728 None,
729}
730
731impl SuspensionTarget {
732 fn is_none(&self) -> bool {
733 matches!(self, SuspensionTarget::None)
734 }
735 fn is_some(&self) -> bool {
736 !self.is_none()
737 }
738}
739
740#[derive(Debug)]
742struct GuestCall {
743 thread: QualifiedThreadId,
744 kind: GuestCallKind,
745}
746
747impl GuestCall {
748 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
758 let instance = store
759 .concurrent_state_mut()?
760 .get_mut(self.thread.task)?
761 .instance;
762 let state = store.instance_state(instance).concurrent_state();
763
764 let ready = match &self.kind {
765 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
766 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
767 GuestCallKind::StartExplicit(_) => true,
768 };
769 log::trace!(
770 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
771 state.do_not_enter,
772 state.backpressure
773 );
774 Ok(ready)
775 }
776}
777
778enum WorkerItem {
780 GuestCall(GuestCall),
781 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
782}
783
784enum WorkItem {
787 PushFuture(AlwaysMut<HostTaskFuture>),
789 ResumeFiber(StoreFiber<'static>),
791 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
793 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
795 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
797}
798
799impl fmt::Debug for WorkItem {
800 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
801 match self {
802 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
803 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
804 Self::ResumeThread(instance, thread) => f
805 .debug_tuple("ResumeThread")
806 .field(instance)
807 .field(thread)
808 .finish(),
809 Self::GuestCall(instance, call) => f
810 .debug_tuple("GuestCall")
811 .field(instance)
812 .field(call)
813 .finish(),
814 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
815 }
816 }
817}
818
819#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
821pub(crate) enum WaitResult {
822 Cancelled,
823 Completed,
824}
825
826pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
834 store: &mut dyn VMStore,
835 future: impl Future<Output = Result<R>> + Send + 'static,
836) -> Result<R> {
837 let task = store.current_host_thread()?;
838
839 let mut future = Box::pin(async move {
843 let result = future.await?;
844 tls::get(move |store| {
845 let state = store.concurrent_state_mut()?;
846 let host_state = &mut state.get_mut(task)?.state;
847 assert!(matches!(host_state, HostTaskState::CalleeStarted));
848 *host_state = HostTaskState::CalleeFinished(Box::new(result));
849
850 Waitable::Host(task).set_event(
851 state,
852 Some(Event::Subtask {
853 status: Status::Returned,
854 }),
855 )?;
856
857 Ok(())
858 })
859 }) as HostTaskFuture;
860
861 let poll = tls::set(store, || {
865 future
866 .as_mut()
867 .poll(&mut Context::from_waker(&Waker::noop()))
868 });
869
870 match poll {
871 Poll::Ready(result) => result?,
873
874 Poll::Pending => {
879 let state = store.concurrent_state_mut()?;
880 state.push_future(future);
881
882 let caller = state.get_mut(task)?.caller;
883 let set = state.get_mut(caller.thread)?.sync_call_set;
884 Waitable::Host(task).join(state, Some(set))?;
885
886 store.suspend(SuspendReason::Waiting {
887 set,
888 thread: caller,
889 skip_may_block_check: false,
890 })?;
891
892 Waitable::Host(task).join(store.concurrent_state_mut()?, None)?;
896 }
897 }
898
899 let host_state = &mut store.concurrent_state_mut()?.get_mut(task)?.state;
901 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
902 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
903 Ok(result) => *result,
904 Err(_) => bail_bug!("host task finished with wrong type of result"),
905 }),
906 _ => bail_bug!("unexpected host task state after completion"),
907 }
908}
909
910fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
912 let mut next = Some(call);
913 while let Some(call) = next.take() {
914 match call.kind {
915 GuestCallKind::DeliverEvent { instance, set } => {
916 let (event, waitable) =
917 match instance.get_event(store, call.thread.task, set, true)? {
918 Some(pair) => pair,
919 None => bail_bug!("delivering non-present event"),
920 };
921 let state = store.concurrent_state_mut()?;
922 let task = state.get_mut(call.thread.task)?;
923 let runtime_instance = task.instance;
924 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
925
926 log::trace!(
927 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
928 call.thread,
929 );
930
931 let old_thread = store.set_thread(call.thread)?;
932 log::trace!(
933 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
934 call.thread
935 );
936
937 store.enter_instance(runtime_instance);
938
939 let Some(callback) = store
940 .concurrent_state_mut()?
941 .get_mut(call.thread.task)?
942 .callback
943 .take()
944 else {
945 bail_bug!("guest task callback field not present")
946 };
947
948 let code = callback(store, event, handle)?;
949
950 store
951 .concurrent_state_mut()?
952 .get_mut(call.thread.task)?
953 .callback = Some(callback);
954
955 store.exit_instance(runtime_instance)?;
956
957 store.set_thread(old_thread)?;
958
959 next = instance.handle_callback_code(
960 store,
961 call.thread,
962 runtime_instance.index,
963 code,
964 )?;
965
966 log::trace!(
967 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
968 );
969 }
970 GuestCallKind::StartImplicit(fun) => {
971 next = fun(store)?;
972 }
973 GuestCallKind::StartExplicit(fun) => {
974 fun(store)?;
975 }
976 }
977 }
978
979 Ok(())
980}
981
982impl<T> Store<T> {
983 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
985 where
986 T: Send + 'static,
987 {
988 ensure!(
989 self.as_context().0.concurrency_support(),
990 "cannot use `run_concurrent` when Config::concurrency_support disabled",
991 );
992 self.as_context_mut().run_concurrent(fun).await
993 }
994
995 #[doc(hidden)]
996 pub fn assert_concurrent_state_empty(&mut self) {
997 self.as_context_mut().assert_concurrent_state_empty();
998 }
999
1000 #[doc(hidden)]
1001 pub fn concurrent_state_table_size(&mut self) -> usize {
1002 self.as_context_mut().concurrent_state_table_size()
1003 }
1004
1005 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> Result<JoinHandle>
1007 where
1008 T: 'static,
1009 {
1010 self.as_context_mut().spawn(task)
1011 }
1012}
1013
1014impl<T> StoreContextMut<'_, T> {
1015 #[doc(hidden)]
1026 pub fn assert_concurrent_state_empty(self) {
1027 let store = self.0;
1028 store
1029 .store_data_mut()
1030 .components
1031 .assert_instance_states_empty();
1032 let state = store.concurrent_state_mut().unwrap();
1033 assert!(
1034 state.table.get_mut().is_empty(),
1035 "non-empty table: {:?}",
1036 state.table.get_mut()
1037 );
1038 assert!(state.high_priority.is_empty());
1039 assert!(state.low_priority.is_empty());
1040 assert!(state.unforced_current_thread.is_none());
1041 assert!(state.futures_mut().unwrap().is_empty());
1042 assert!(state.global_error_context_ref_counts.is_empty());
1043 }
1044
1045 #[doc(hidden)]
1050 pub fn concurrent_state_table_size(&mut self) -> usize {
1051 self.0
1052 .concurrent_state_mut()
1053 .unwrap()
1054 .table
1055 .get_mut()
1056 .iter_mut()
1057 .count()
1058 }
1059
1060 pub fn spawn(mut self, task: impl AccessorTask<T>) -> Result<JoinHandle>
1070 where
1071 T: 'static,
1072 {
1073 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1074 self.spawn_with_accessor(accessor, task)
1075 }
1076
1077 fn spawn_with_accessor<D>(
1080 self,
1081 accessor: Accessor<T, D>,
1082 task: impl AccessorTask<T, D>,
1083 ) -> Result<JoinHandle>
1084 where
1085 T: 'static,
1086 D: HasData + ?Sized,
1087 {
1088 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1092 self.0
1093 .concurrent_state_mut()?
1094 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1095 Ok(handle)
1096 }
1097
1098 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1182 where
1183 T: Send + 'static,
1184 {
1185 ensure!(
1186 self.0.concurrency_support(),
1187 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1188 );
1189 self.do_run_concurrent(fun, false).await
1190 }
1191
1192 pub(super) async fn run_concurrent_trap_on_idle<R>(
1193 self,
1194 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1195 ) -> Result<R>
1196 where
1197 T: Send + 'static,
1198 {
1199 self.do_run_concurrent(fun, true).await
1200 }
1201
1202 async fn do_run_concurrent<R>(
1203 mut self,
1204 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1205 trap_on_idle: bool,
1206 ) -> Result<R>
1207 where
1208 T: Send + 'static,
1209 {
1210 debug_assert!(self.0.concurrency_support());
1211 check_recursive_run();
1212 let token = StoreToken::new(self.as_context_mut());
1213
1214 struct Dropper<'a, T: 'static, V> {
1215 store: StoreContextMut<'a, T>,
1216 value: ManuallyDrop<V>,
1217 }
1218
1219 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1220 fn drop(&mut self) {
1221 tls::set(self.store.0, || {
1222 unsafe { ManuallyDrop::drop(&mut self.value) }
1227 });
1228 }
1229 }
1230
1231 let accessor = &Accessor::new(token);
1232 let dropper = &mut Dropper {
1233 store: self,
1234 value: ManuallyDrop::new(fun(accessor)),
1235 };
1236 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1238
1239 dropper
1240 .store
1241 .as_context_mut()
1242 .poll_until(future, trap_on_idle)
1243 .await
1244 }
1245
1246 async fn poll_until<R>(
1252 mut self,
1253 mut future: Pin<&mut impl Future<Output = R>>,
1254 trap_on_idle: bool,
1255 ) -> Result<R>
1256 where
1257 T: Send + 'static,
1258 {
1259 struct Reset<'a, T: 'static> {
1260 store: StoreContextMut<'a, T>,
1261 futures: Option<FuturesUnordered<HostTaskFuture>>,
1262 }
1263
1264 impl<'a, T> Drop for Reset<'a, T> {
1265 fn drop(&mut self) {
1266 if let Some(futures) = self.futures.take() {
1267 *self
1268 .store
1269 .0
1270 .concurrent_state_mut_already_forced_current_thread()
1271 .futures
1272 .get_mut() = Some(futures);
1273 }
1274 }
1275 }
1276
1277 loop {
1278 let futures = self.0.concurrent_state_mut()?.futures.get_mut().take();
1282 let mut reset = Reset {
1283 store: self.as_context_mut(),
1284 futures,
1285 };
1286 let mut next = match reset.futures.as_mut() {
1287 Some(f) => pin!(f.next()),
1288 None => bail_bug!("concurrent state missing futures field"),
1289 };
1290
1291 enum PollResult<R> {
1292 Complete(R),
1293 ProcessWork {
1294 ready: Vec<WorkItem>,
1295 low_priority: bool,
1296 },
1297 }
1298
1299 let result = future::poll_fn(|cx| {
1300 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1303 return Poll::Ready(Ok(PollResult::Complete(value)));
1304 }
1305
1306 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1310 Poll::Ready(Some(output)) => {
1311 match output {
1312 Err(e) => return Poll::Ready(Err(e)),
1313 Ok(()) => {}
1314 }
1315 Poll::Ready(true)
1316 }
1317 Poll::Ready(None) => Poll::Ready(false),
1318 Poll::Pending => Poll::Pending,
1319 };
1320
1321 let state = reset.store.0.concurrent_state_mut()?;
1325 let mut ready = mem::take(&mut state.high_priority);
1326 let mut low_priority = false;
1327 if ready.is_empty() {
1328 if let Some(item) = state.low_priority.pop_back() {
1329 ready.push(item);
1330 low_priority = true;
1331 }
1332 }
1333 if !ready.is_empty() {
1334 return Poll::Ready(Ok(PollResult::ProcessWork {
1335 ready,
1336 low_priority,
1337 }));
1338 }
1339
1340 return match next {
1344 Poll::Ready(true) => {
1345 Poll::Ready(Ok(PollResult::ProcessWork {
1351 ready: Vec::new(),
1352 low_priority: false,
1353 }))
1354 }
1355 Poll::Ready(false) => {
1356 if let Poll::Ready(value) =
1360 tls::set(reset.store.0, || future.as_mut().poll(cx))
1361 {
1362 Poll::Ready(Ok(PollResult::Complete(value)))
1363 } else {
1364 if trap_on_idle {
1370 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1373 } else {
1374 Poll::Pending
1378 }
1379 }
1380 }
1381 Poll::Pending => Poll::Pending,
1386 };
1387 })
1388 .await;
1389
1390 drop(reset);
1394
1395 match result? {
1396 PollResult::Complete(value) => break Ok(value),
1399 PollResult::ProcessWork {
1402 ready,
1403 low_priority,
1404 } => {
1405 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1406 store: StoreContextMut<'a, T>,
1407 ready: I,
1408 }
1409
1410 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1411 fn drop(&mut self) {
1412 while let Some(item) = self.ready.next() {
1413 match item {
1414 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1415 WorkItem::PushFuture(future) => {
1416 tls::set(self.store.0, move || drop(future))
1417 }
1418 _ => {}
1419 }
1420 }
1421 }
1422 }
1423
1424 let mut dispose = Dispose {
1425 store: self.as_context_mut(),
1426 ready: ready.into_iter(),
1427 };
1428
1429 if low_priority {
1451 dispose.store.0.yield_now().await
1452 }
1453
1454 while let Some(item) = dispose.ready.next() {
1455 dispose
1456 .store
1457 .as_context_mut()
1458 .handle_work_item(item)
1459 .await?;
1460 }
1461 }
1462 }
1463 }
1464 }
1465
1466 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1468 where
1469 T: Send,
1470 {
1471 log::trace!("handle work item {item:?}");
1472 match item {
1473 WorkItem::PushFuture(future) => {
1474 self.0
1475 .concurrent_state_mut()?
1476 .futures_mut()?
1477 .push(future.into_inner());
1478 }
1479 WorkItem::ResumeFiber(fiber) => {
1480 self.0.resume_fiber(fiber).await?;
1481 }
1482 WorkItem::ResumeThread(_, thread) => {
1483 if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1484 &mut self.0.concurrent_state_mut()?.get_mut(thread.thread)?.state,
1485 GuestThreadState::Running,
1486 ) {
1487 self.0.resume_fiber(fiber).await?;
1488 } else {
1489 bail_bug!("cannot resume non-pending thread {thread:?}");
1490 }
1491 }
1492 WorkItem::GuestCall(_, call) => {
1493 if call.is_ready(self.0)? {
1494 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1495 } else {
1496 let state = self.0.concurrent_state_mut()?;
1497 let task = state.get_mut(call.thread.task)?;
1498 if !task.starting_sent {
1499 task.starting_sent = true;
1500 if let GuestCallKind::StartImplicit(_) = &call.kind {
1501 Waitable::Guest(call.thread.task).set_event(
1502 state,
1503 Some(Event::Subtask {
1504 status: Status::Starting,
1505 }),
1506 )?;
1507 }
1508 }
1509
1510 let instance = state.get_mut(call.thread.task)?.instance;
1511 self.0
1512 .instance_state(instance)
1513 .concurrent_state()
1514 .pending
1515 .insert(call.thread, call.kind);
1516 }
1517 }
1518 WorkItem::WorkerFunction(fun) => {
1519 self.run_on_worker(WorkerItem::Function(fun)).await?;
1520 }
1521 }
1522
1523 Ok(())
1524 }
1525
1526 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1528 where
1529 T: Send,
1530 {
1531 let worker = if let Some(fiber) = self.0.concurrent_state_mut()?.worker.take() {
1532 fiber
1533 } else {
1534 fiber::make_fiber(self.0, move |store| {
1535 loop {
1536 let Some(item) = store.concurrent_state_mut()?.worker_item.take() else {
1537 bail_bug!("worker_item not present when resuming fiber")
1538 };
1539 match item {
1540 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1541 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1542 }
1543
1544 store.suspend(SuspendReason::NeedWork)?;
1545 }
1546 })?
1547 };
1548
1549 let worker_item = &mut self.0.concurrent_state_mut()?.worker_item;
1550 assert!(worker_item.is_none());
1551 *worker_item = Some(item);
1552
1553 self.0.resume_fiber(worker).await
1554 }
1555
1556 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1561 where
1562 T: 'static,
1563 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1564 + Send
1565 + Sync
1566 + 'static,
1567 R: Send + Sync + 'static,
1568 {
1569 let token = StoreToken::new(self);
1570 async move {
1571 let mut accessor = Accessor::new(token);
1572 closure(&mut accessor).await
1573 }
1574 }
1575
1576 pub fn async_call_stack(&mut self) -> Result<impl Iterator<Item = GuestTaskId>> {
1586 let mut cur = Some(self.0.current_thread()?);
1587 let state = self.0.concurrent_state_mut()?;
1588 Ok(core::iter::from_fn(move || {
1589 while let Some(t) = cur {
1590 cur = state.parent(t);
1591 if let Some(thread) = t.guest() {
1592 return Some(GuestTaskId(thread.task));
1593 }
1594 }
1595
1596 None
1597 }))
1598 }
1599}
1600
1601impl StoreOpaque {
1602 pub(crate) fn current_thread(&mut self) -> Result<CurrentThread> {
1605 if !self.concurrency_support() {
1607 return Ok(CurrentThread::None);
1608 }
1609
1610 if !self
1613 .vm_store_context_mut()
1614 .current_thread_mut()
1615 .is_deferred()
1616 {
1617 return Ok(self
1618 .concurrent_state_mut_already_forced_current_thread()
1619 .unforced_current_thread);
1620 }
1621
1622 let state = self.concurrent_state_mut_without_forcing_current_thread();
1631 let id = match state.unforced_current_thread.guest().copied() {
1632 Some(thread) => state.get_mut(thread.task)?.instance.instance,
1633 None => bail_bug!("deferred component-model thread with non-guest base"),
1634 };
1635
1636 let mut frames = Vec::new();
1639 let mut cur = *self.vm_store_context_mut().current_thread_mut();
1640 while let Some(ptr) = cur.as_deferred() {
1641 let deferred = unsafe { ptr.as_non_null().as_ref() };
1646 frames.push((
1647 deferred.callee_async != 0,
1648 deferred.callee_instance,
1649 deferred.saved_context,
1650 ));
1651 cur = deferred.parent;
1652 }
1653
1654 *self.vm_store_context_mut().current_thread_mut() = VMLazyThread::forced();
1658
1659 let current_context = *self.vm_store_context_mut().component_context_mut();
1662
1663 for (callee_async, callee_instance, saved_context) in frames.into_iter().rev() {
1667 *self.vm_store_context_mut().component_context_mut() = saved_context;
1671 let callee = RuntimeInstance {
1672 instance: id,
1673 index: RuntimeComponentInstanceIndex::from_u32(callee_instance),
1674 };
1675 self.enter_guest_sync_call(None, callee_async, callee)?;
1676 }
1677
1678 *self.vm_store_context_mut().component_context_mut() = current_context;
1680
1681 Ok(self
1682 .concurrent_state_mut_without_forcing_current_thread()
1683 .unforced_current_thread)
1684 }
1685
1686 fn current_guest_thread(&mut self) -> Result<QualifiedThreadId> {
1687 match self.current_thread()?.guest() {
1688 Some(id) => Ok(*id),
1689 None => bail_bug!("current thread is not a guest thread"),
1690 }
1691 }
1692
1693 fn current_host_thread(&mut self) -> Result<TableId<HostTask>> {
1694 match self.current_thread()?.host() {
1695 Some(id) => Ok(id),
1696 None => bail_bug!("current thread is not a host thread"),
1697 }
1698 }
1699
1700 fn take_pending_cancellation(&mut self) -> Result<bool> {
1703 let thread = self.current_guest_thread()?;
1704 let task = self.concurrent_state_mut()?.get_mut(thread.task)?;
1705 if let Some(Event::Cancelled) = task.event {
1706 task.event.take();
1707 return Ok(true);
1708 }
1709 Ok(false)
1710 }
1711
1712 pub(crate) fn enter_guest_sync_call(
1724 &mut self,
1725 guest_caller: Option<RuntimeInstance>,
1726 callee_async: bool,
1727 callee: RuntimeInstance,
1728 ) -> Result<()> {
1729 log::trace!("enter sync call {callee:?}");
1730 if !self.concurrency_support() {
1731 return self.enter_call_not_concurrent();
1732 }
1733
1734 let thread = self.current_thread()?;
1735 let state = self.concurrent_state_mut()?;
1736 let instance = if let Some(thread) = thread.guest() {
1737 Some(state.get_mut(thread.task)?.instance)
1738 } else {
1739 None
1740 };
1741 if guest_caller.is_some() {
1742 debug_assert_eq!(instance, guest_caller);
1743 }
1744 let guest_thread = GuestTask::new(
1745 state,
1746 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1747 LiftResult {
1748 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1749 ty: TypeTupleIndex::reserved_value(),
1750 memory: None,
1751 string_encoding: StringEncoding::Utf8,
1752 },
1753 if let Some(thread) = thread.guest() {
1754 Caller::Guest { thread: *thread }
1755 } else {
1756 Caller::Host {
1757 tx: None,
1758 host_future_present: false,
1759 caller: thread,
1760 }
1761 },
1762 None,
1763 callee,
1764 callee_async,
1765 )?;
1766
1767 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1768 guest_thread.thread,
1769 self,
1770 callee.index,
1771 )?;
1772 self.set_thread(guest_thread)?;
1773
1774 Ok(())
1775 }
1776
1777 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1785 if !self.concurrency_support() {
1786 return Ok(self.exit_call_not_concurrent());
1787 }
1788 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1789 Some(t) => *t,
1790 None => bail_bug!("expected task when exiting"),
1791 };
1792 let task = self.concurrent_state_mut()?.get_mut(thread.task)?;
1793 let instance = task.instance;
1794 let caller = match &task.caller {
1795 &Caller::Guest { thread } => thread.into(),
1796 &Caller::Host { caller, .. } => caller,
1797 };
1798 task.lift_result = None;
1799 task.exited = true;
1800 self.set_thread(caller)?;
1801
1802 log::trace!("exit sync call {instance:?}");
1803 self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1804
1805 Ok(())
1806 }
1807
1808 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1816 if !self.concurrency_support() {
1817 self.enter_call_not_concurrent()?;
1818 return Ok(None);
1819 }
1820 let caller = self.current_guest_thread()?;
1821 let state = self.concurrent_state_mut()?;
1822 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1823 log::trace!("new host task {task:?}");
1824 self.set_thread(task)?;
1825 Ok(Some(task))
1826 }
1827
1828 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1834 if !self.concurrency_support() {
1835 return Ok(());
1836 }
1837 let task = self.current_host_thread()?;
1838 let caller = self.concurrent_state_mut()?.get_mut(task)?.caller;
1839 self.set_thread(caller)?;
1840 Ok(())
1841 }
1842
1843 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1850 match task {
1851 Some(task) => {
1852 log::trace!("delete host task {task:?}");
1853 self.concurrent_state_mut()?.delete(task)?;
1854 }
1855 None => {
1856 self.exit_call_not_concurrent();
1857 }
1858 }
1859 Ok(())
1860 }
1861
1862 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1870 if self.trapped() {
1871 return Ok(false);
1872 }
1873 if !self.concurrency_support() {
1874 return Ok(true);
1875 }
1876 let mut cur = Some(self.current_thread()?);
1877 let state = self.concurrent_state_mut()?;
1878 while let Some(t) = cur {
1879 if let Some(thread) = t.guest() {
1880 let task = state.get_mut(thread.task)?;
1881 if task.instance.instance == instance.instance {
1888 return Ok(false);
1889 }
1890 }
1891 cur = state.parent(t);
1892 }
1893 Ok(true)
1894 }
1895
1896 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1899 self.component_instance_mut(instance.instance)
1900 .instance_state(instance.index)
1901 }
1902
1903 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1909 let thread = thread.into();
1910 let state = self.concurrent_state_mut()?;
1911 let old_thread = mem::replace(&mut state.unforced_current_thread, thread);
1912
1913 if let Some(old_thread) = old_thread.guest() {
1921 let old_context = *self.vm_store_context_mut().component_context_mut();
1922 self.concurrent_state_mut()?
1923 .get_mut(old_thread.thread)?
1924 .context = old_context;
1925 }
1926 if cfg!(debug_assertions) {
1927 *self.vm_store_context_mut().component_context_mut() =
1928 [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1929 }
1930 if let Some(thread) = thread.guest() {
1931 let thread = self.concurrent_state_mut()?.get_mut(thread.thread)?;
1932 let context = thread.context;
1933 if cfg!(debug_assertions) {
1934 thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1935 }
1936 *self.vm_store_context_mut().component_context_mut() = context;
1937 }
1938
1939 let state = self.concurrent_state_mut()?;
1947 if let Some(old_thread) = old_thread.guest() {
1948 let instance = state.get_mut(old_thread.task)?.instance.instance;
1949 self.component_instance_mut(instance)
1950 .set_task_may_block(false)
1951 }
1952
1953 if thread.guest().is_some() {
1954 self.set_task_may_block()?;
1955 }
1956
1957 *self.vm_store_context_mut().current_thread_mut() = if thread.is_none() {
1959 VMLazyThread::none()
1960 } else {
1961 VMLazyThread::forced()
1962 };
1963
1964 Ok(old_thread)
1965 }
1966
1967 fn set_task_may_block(&mut self) -> Result<()> {
1970 let guest_thread = self.current_guest_thread()?;
1971 let state = self.concurrent_state_mut()?;
1972 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1973 let may_block = self.concurrent_state_mut()?.may_block(guest_thread.task)?;
1974 self.component_instance_mut(instance)
1975 .set_task_may_block(may_block);
1976 Ok(())
1977 }
1978
1979 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1980 if !self.concurrency_support() {
1981 return Ok(());
1982 }
1983 let task = self.current_guest_thread()?.task;
1984 let state = self.concurrent_state_mut()?;
1985 let instance = state.get_mut(task)?.instance.instance;
1986 let task_may_block = self.component_instance(instance).get_task_may_block();
1987
1988 if task_may_block {
1989 Ok(())
1990 } else {
1991 Err(Trap::CannotBlockSyncTask.into())
1992 }
1993 }
1994
1995 fn enter_instance(&mut self, instance: RuntimeInstance) {
1999 log::trace!("enter {instance:?}");
2000 self.instance_state(instance)
2001 .concurrent_state()
2002 .do_not_enter = true;
2003 }
2004
2005 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
2009 log::trace!("exit {instance:?}");
2010 self.instance_state(instance)
2011 .concurrent_state()
2012 .do_not_enter = false;
2013 self.partition_pending(instance)
2014 }
2015
2016 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
2024 for (thread, kind) in
2025 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
2026 {
2027 let call = GuestCall { thread, kind };
2028 if call.is_ready(self)? {
2029 self.concurrent_state_mut()?
2030 .push_high_priority(WorkItem::GuestCall(instance.index, call));
2031 } else {
2032 self.instance_state(instance)
2033 .concurrent_state()
2034 .pending
2035 .insert(call.thread, call.kind);
2036 }
2037 }
2038
2039 if let Some(waker) = self
2040 .concurrent_state_mut()?
2041 .ready_for_concurrent_call_waker
2042 .take()
2043 {
2044 waker.wake();
2045 }
2046
2047 Ok(())
2048 }
2049
2050 pub(crate) fn backpressure_modify(
2052 &mut self,
2053 caller_instance: RuntimeInstance,
2054 modify: impl FnOnce(u16) -> Option<u16>,
2055 ) -> Result<()> {
2056 let state = self.instance_state(caller_instance).concurrent_state();
2057 let old = state.backpressure;
2058 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
2059 state.backpressure = new;
2060
2061 if old > 0 && new == 0 {
2062 self.partition_pending(caller_instance)?;
2065 }
2066
2067 Ok(())
2068 }
2069
2070 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
2073 let old_thread = self.current_thread()?;
2074 log::trace!("resume_fiber: save current thread {old_thread:?}");
2075
2076 let fiber = fiber::resolve_or_release(self, fiber).await?;
2077
2078 self.set_thread(old_thread)?;
2079
2080 let state = self.concurrent_state_mut()?;
2081
2082 if let Some(ot) = old_thread.guest() {
2083 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
2084 }
2085 log::trace!("resume_fiber: restore current thread {old_thread:?}");
2086
2087 if let Some(mut fiber) = fiber {
2088 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
2089 let reason = match state.suspend_reason.take() {
2091 Some(r) => r,
2092 None => bail_bug!("suspend reason missing when resuming fiber"),
2093 };
2094 match reason {
2095 SuspendReason::NeedWork => {
2096 if state.worker.is_none() {
2097 state.worker = Some(fiber);
2098 } else {
2099 fiber.dispose(self);
2100 }
2101 }
2102 SuspendReason::Yielding {
2103 thread,
2104 cancellable,
2105 ..
2106 } => {
2107 state.get_mut(thread.thread)?.state =
2108 GuestThreadState::Ready { fiber, cancellable };
2109 let instance = state.get_mut(thread.task)?.instance.index;
2110 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
2111 }
2112 SuspendReason::ExplicitlySuspending { thread, .. } => {
2113 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
2114 }
2115 SuspendReason::Waiting { set, thread, .. } => {
2116 let old = state
2117 .get_mut(set)?
2118 .waiting
2119 .insert(thread, WaitMode::Fiber(fiber));
2120 assert!(old.is_none());
2121 }
2122 };
2123 } else {
2124 log::trace!("resume_fiber: fiber has exited");
2125 }
2126
2127 Ok(())
2128 }
2129
2130 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
2136 log::trace!("suspend fiber: {reason:?}");
2137
2138 let task = match &reason {
2142 SuspendReason::Yielding { thread, .. }
2143 | SuspendReason::Waiting { thread, .. }
2144 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
2145 SuspendReason::NeedWork => None,
2146 };
2147
2148 let old_guest_thread = if task.is_some() {
2149 self.current_thread()?
2150 } else {
2151 CurrentThread::None
2152 };
2153
2154 debug_assert!(
2160 matches!(
2161 reason,
2162 SuspendReason::ExplicitlySuspending {
2163 skip_may_block_check: true,
2164 ..
2165 } | SuspendReason::Waiting {
2166 skip_may_block_check: true,
2167 ..
2168 } | SuspendReason::Yielding {
2169 skip_may_block_check: true,
2170 ..
2171 }
2172 ) || old_guest_thread
2173 .guest()
2174 .map(|thread| self.concurrent_state_mut()?.may_block(thread.task))
2175 .transpose()?
2176 .unwrap_or(true)
2177 );
2178
2179 let suspend_reason = &mut self.concurrent_state_mut()?.suspend_reason;
2180 assert!(suspend_reason.is_none());
2181 *suspend_reason = Some(reason);
2182
2183 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
2184
2185 if task.is_some() {
2186 self.set_thread(old_guest_thread)?;
2187 }
2188
2189 Ok(())
2190 }
2191
2192 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
2193 let caller = self.current_guest_thread()?;
2194 let state = self.concurrent_state_mut()?;
2195
2196 if waitable.common(state)?.set.is_some() {
2197 bail!(Trap::WaitableSyncAndAsync);
2198 }
2199
2200 let set = state.get_mut(caller.thread)?.sync_call_set;
2201 waitable.join(state, Some(set))?;
2202 self.suspend(SuspendReason::Waiting {
2203 set,
2204 thread: caller,
2205 skip_may_block_check: false,
2206 })?;
2207 let state = self.concurrent_state_mut()?;
2208 waitable.join(state, None)
2209 }
2210
2211 fn cleanup_thread(
2233 &mut self,
2234 guest_thread: QualifiedThreadId,
2235 runtime_instance: RuntimeInstance,
2236 cleanup_task: CleanupTask,
2237 ) -> Result<()> {
2238 let state = self.concurrent_state_mut()?;
2239 let thread_data = state.get_mut(guest_thread.thread)?;
2240 let sync_call_set = thread_data.sync_call_set;
2241 if let Some(guest_id) = thread_data.instance_rep {
2242 self.instance_state(runtime_instance)
2243 .thread_handle_table()
2244 .guest_thread_remove(guest_id)?;
2245 }
2246 let state = self.concurrent_state_mut()?;
2247
2248 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2250 if let Some(Event::Subtask {
2251 status: Status::Returned | Status::ReturnCancelled,
2252 }) = waitable.common(state)?.event
2253 {
2254 waitable.delete_from(state)?;
2255 }
2256 }
2257
2258 state.delete(guest_thread.thread)?;
2259 state.delete(sync_call_set)?;
2260 let task = state.get_mut(guest_thread.task)?;
2261 task.threads.remove(&guest_thread.thread);
2262
2263 if task.threads.is_empty() && !task.returned_or_cancelled() {
2264 bail!(Trap::NoAsyncResult);
2265 }
2266 let ready_to_delete = task.ready_to_delete();
2267
2268 if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2269 task.decremented_interesting_task_count = true;
2270
2271 debug_assert!(state.interesting_tasks > 0);
2272 state.interesting_tasks -= 1;
2273 if state.interesting_tasks == 0
2274 && let Some(waker) = state.interesting_tasks_empty_waker.take()
2275 {
2276 waker.wake();
2277 }
2278 }
2279
2280 match cleanup_task {
2281 CleanupTask::Yes => {
2282 if ready_to_delete {
2283 Waitable::Guest(guest_thread.task).delete_from(state)?;
2284 }
2285 }
2286 CleanupTask::No => {}
2287 }
2288
2289 Ok(())
2290 }
2291
2292 fn cancel_guest_subtask_without_lowered_parameters(
2305 &mut self,
2306 caller_instance: RuntimeInstance,
2307 guest_task: TableId<GuestTask>,
2308 ) -> Result<()> {
2309 let concurrent_state = self.concurrent_state_mut()?;
2310 let task = concurrent_state.get_mut(guest_task)?;
2311 assert!(!task.already_lowered_parameters());
2312 task.lower_params = None;
2316 task.lift_result = None;
2317 task.exited = true;
2318 let instance = task.instance;
2319
2320 assert_eq!(1, task.threads.len());
2323 let thread = *task.threads.iter().next().unwrap();
2324 self.cleanup_thread(
2325 QualifiedThreadId {
2326 task: guest_task,
2327 thread,
2328 },
2329 caller_instance,
2330 CleanupTask::No,
2331 )?;
2332
2333 let pending = &mut self.instance_state(instance).concurrent_state().pending;
2335 let pending_count = pending.len();
2336 pending.retain(|thread, _| thread.task != guest_task);
2337 if pending.len() == pending_count {
2339 bail!(Trap::SubtaskCancelAfterTerminal);
2340 }
2341 Ok(())
2342 }
2343
2344 pub(crate) fn current_scope_id(&mut self) -> Result<Option<u32>> {
2347 if !self.concurrency_support() {
2348 return self.current_scope_id_not_concurrent();
2349 }
2350 let (bits, is_host) = match self.current_thread()? {
2351 CurrentThread::Guest(id) => (id.task.rep(), false),
2352 CurrentThread::Host(id) => (id.rep(), true),
2353 CurrentThread::None => return Ok(None),
2354 };
2355 assert_eq!((bits << 1) >> 1, bits);
2356 Ok(Some((bits << 1) | u32::from(is_host)))
2357 }
2358}
2359
2360enum CleanupTask {
2361 Yes,
2362 No,
2363}
2364
2365impl Instance {
2366 fn get_event(
2369 self,
2370 store: &mut StoreOpaque,
2371 guest_task: TableId<GuestTask>,
2372 set: Option<TableId<WaitableSet>>,
2373 cancellable: bool,
2374 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2375 let state = store.concurrent_state_mut()?;
2376
2377 let event = &mut state.get_mut(guest_task)?.event;
2378 if let Some(ev) = event
2379 && (cancellable || !matches!(ev, Event::Cancelled))
2380 {
2381 log::trace!("deliver event {ev:?} to {guest_task:?}");
2382 let ev = *ev;
2383 *event = None;
2384 return Ok(Some((ev, None)));
2385 }
2386
2387 let set = match set {
2388 Some(set) => set,
2389 None => return Ok(None),
2390 };
2391 let waitable = match state.get_mut(set)?.ready.pop_first() {
2392 Some(v) => v,
2393 None => return Ok(None),
2394 };
2395
2396 let common = waitable.common(state)?;
2397 let handle = match common.handle {
2398 Some(h) => h,
2399 None => bail_bug!("handle not set when delivering event"),
2400 };
2401 let event = match common.event.take() {
2402 Some(e) => e,
2403 None => bail_bug!("event not set when delivering event"),
2404 };
2405
2406 log::trace!(
2407 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2408 );
2409
2410 waitable.on_delivery(store, self, event)?;
2411
2412 Ok(Some((event, Some((waitable, handle)))))
2413 }
2414
2415 fn handle_callback_code(
2421 self,
2422 store: &mut StoreOpaque,
2423 guest_thread: QualifiedThreadId,
2424 runtime_instance: RuntimeComponentInstanceIndex,
2425 code: u32,
2426 ) -> Result<Option<GuestCall>> {
2427 let (code, set) = unpack_callback_code(code);
2428
2429 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2430
2431 let state = store.concurrent_state_mut()?;
2432
2433 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2434 let set = store
2435 .instance_state(self.runtime_instance(runtime_instance))
2436 .handle_table()
2437 .waitable_set_rep(handle)?;
2438
2439 Ok(TableId::<WaitableSet>::new(set))
2440 };
2441
2442 Ok(match code {
2443 callback_code::EXIT => {
2444 log::trace!("implicit thread {guest_thread:?} completed");
2445 let task = store.concurrent_state_mut()?.get_mut(guest_thread.task)?;
2446 task.exited = true;
2447 task.callback = None;
2448 store.cleanup_thread(
2449 guest_thread,
2450 self.runtime_instance(runtime_instance),
2451 CleanupTask::Yes,
2452 )?;
2453 None
2454 }
2455 callback_code::YIELD => {
2456 let task = state.get_mut(guest_thread.task)?;
2457 if let Some(event) = task.event {
2462 assert!(matches!(event, Event::None | Event::Cancelled));
2463 } else {
2464 task.event = Some(Event::None);
2465 }
2466 let call = GuestCall {
2467 thread: guest_thread,
2468 kind: GuestCallKind::DeliverEvent {
2469 instance: self,
2470 set: None,
2471 },
2472 };
2473 if state.may_block(guest_thread.task)? {
2474 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2477 None
2478 } else {
2479 Some(call)
2483 }
2484 }
2485 callback_code::WAIT => {
2486 state.check_blocking_for(guest_thread.task)?;
2489
2490 let set = get_set(store, set)?;
2491 let state = store.concurrent_state_mut()?;
2492
2493 if state.get_mut(guest_thread.task)?.event.is_some()
2494 || !state.get_mut(set)?.ready.is_empty()
2495 {
2496 state.push_high_priority(WorkItem::GuestCall(
2498 runtime_instance,
2499 GuestCall {
2500 thread: guest_thread,
2501 kind: GuestCallKind::DeliverEvent {
2502 instance: self,
2503 set: Some(set),
2504 },
2505 },
2506 ));
2507 } else {
2508 let old = state
2516 .get_mut(guest_thread.thread)?
2517 .wake_on_cancel
2518 .replace(set);
2519 if !old.is_none() {
2520 bail_bug!("thread unexpectedly had wake_on_cancel set");
2521 }
2522 let old = state
2523 .get_mut(set)?
2524 .waiting
2525 .insert(guest_thread, WaitMode::Callback(self));
2526 if !old.is_none() {
2527 bail_bug!("set's waiting set already had this thread registered");
2528 }
2529 }
2530 None
2531 }
2532 _ => bail!(Trap::UnsupportedCallbackCode),
2533 })
2534 }
2535
2536 unsafe fn queue_call<T: 'static>(
2543 self,
2544 mut store: StoreContextMut<T>,
2545 guest_thread: QualifiedThreadId,
2546 callee: SendSyncPtr<VMFuncRef>,
2547 param_count: usize,
2548 result_count: usize,
2549 async_: bool,
2550 callback: Option<SendSyncPtr<VMFuncRef>>,
2551 post_return: Option<SendSyncPtr<VMFuncRef>>,
2552 ) -> Result<()> {
2553 unsafe fn make_call<T: 'static>(
2568 store: StoreContextMut<T>,
2569 guest_thread: QualifiedThreadId,
2570 callee: SendSyncPtr<VMFuncRef>,
2571 param_count: usize,
2572 result_count: usize,
2573 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2574 + Send
2575 + Sync
2576 + 'static
2577 + use<T> {
2578 let token = StoreToken::new(store);
2579 move |store: &mut dyn VMStore| {
2580 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2581
2582 store
2583 .concurrent_state_mut()?
2584 .get_mut(guest_thread.thread)?
2585 .state = GuestThreadState::Running;
2586 let task = store.concurrent_state_mut()?.get_mut(guest_thread.task)?;
2587 let lower = match task.lower_params.take() {
2588 Some(l) => l,
2589 None => bail_bug!("lower_params missing"),
2590 };
2591
2592 lower(store, &mut storage[..param_count])?;
2593
2594 let mut store = token.as_context_mut(store);
2595
2596 unsafe {
2599 crate::Func::call_unchecked_raw(
2600 &mut store,
2601 callee.as_non_null(),
2602 NonNull::new(
2603 &mut storage[..param_count.max(result_count)]
2604 as *mut [MaybeUninit<ValRaw>] as _,
2605 )
2606 .unwrap(),
2607 )?;
2608 }
2609
2610 Ok(storage)
2611 }
2612 }
2613
2614 let call = unsafe {
2618 make_call(
2619 store.as_context_mut(),
2620 guest_thread,
2621 callee,
2622 param_count,
2623 result_count,
2624 )
2625 };
2626
2627 let callee_instance = store
2628 .0
2629 .concurrent_state_mut()?
2630 .get_mut(guest_thread.task)?
2631 .instance;
2632
2633 let fun = if callback.is_some() {
2634 assert!(async_);
2635
2636 Box::new(move |store: &mut dyn VMStore| {
2637 self.add_guest_thread_to_instance_table(
2638 guest_thread.thread,
2639 store,
2640 callee_instance.index,
2641 )?;
2642 let old_thread = store.set_thread(guest_thread)?;
2643 log::trace!(
2644 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2645 );
2646
2647 store.enter_instance(callee_instance);
2648
2649 let storage = call(store)?;
2656
2657 store.exit_instance(callee_instance)?;
2658
2659 store.set_thread(old_thread)?;
2660 let state = store.concurrent_state_mut()?;
2661 if let Some(t) = old_thread.guest() {
2662 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2663 }
2664 log::trace!("stackless call: restored {old_thread:?} as current thread");
2665
2666 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2669
2670 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2671 })
2672 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2673 } else {
2674 let token = StoreToken::new(store.as_context_mut());
2675 Box::new(move |store: &mut dyn VMStore| {
2676 self.add_guest_thread_to_instance_table(
2677 guest_thread.thread,
2678 store,
2679 callee_instance.index,
2680 )?;
2681 let old_thread = store.set_thread(guest_thread)?;
2682 log::trace!(
2683 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2684 );
2685 let flags = self.id().get(store).instance_flags(callee_instance.index);
2686
2687 if !async_ {
2691 store.enter_instance(callee_instance);
2692 }
2693
2694 let storage = call(store)?;
2701
2702 if !async_ {
2703 let lift = {
2709 store.exit_instance(callee_instance)?;
2710
2711 let state = store.concurrent_state_mut()?;
2712 if !state.get_mut(guest_thread.task)?.result.is_none() {
2713 bail_bug!("task has already produced a result");
2714 }
2715
2716 match state.get_mut(guest_thread.task)?.lift_result.take() {
2717 Some(lift) => lift,
2718 None => bail_bug!("lift_result field is missing"),
2719 }
2720 };
2721
2722 let result = (lift.lift)(store, unsafe {
2725 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2726 &storage[..result_count],
2727 )
2728 })?;
2729
2730 let post_return_arg = match result_count {
2731 0 => ValRaw::i32(0),
2732 1 => unsafe { storage[0].assume_init() },
2735 _ => unreachable!(),
2736 };
2737
2738 unsafe {
2739 call_post_return(
2740 token.as_context_mut(store),
2741 post_return.map(|v| v.as_non_null()),
2742 post_return_arg,
2743 flags,
2744 )?;
2745 }
2746
2747 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2748 }
2749
2750 store.set_thread(old_thread)?;
2751
2752 store
2753 .concurrent_state_mut()?
2754 .get_mut(guest_thread.task)?
2755 .exited = true;
2756
2757 store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2759 Ok(None)
2760 })
2761 };
2762
2763 store
2764 .0
2765 .concurrent_state_mut()?
2766 .push_high_priority(WorkItem::GuestCall(
2767 callee_instance.index,
2768 GuestCall {
2769 thread: guest_thread,
2770 kind: GuestCallKind::StartImplicit(fun),
2771 },
2772 ));
2773
2774 Ok(())
2775 }
2776
2777 unsafe fn prepare_call<T: 'static>(
2790 self,
2791 mut store: StoreContextMut<T>,
2792 start: NonNull<VMFuncRef>,
2793 return_: NonNull<VMFuncRef>,
2794 caller_instance: RuntimeComponentInstanceIndex,
2795 callee_instance: RuntimeComponentInstanceIndex,
2796 task_return_type: TypeTupleIndex,
2797 callee_async: bool,
2798 memory: *mut VMMemoryDefinition,
2799 string_encoding: StringEncoding,
2800 caller_info: CallerInfo,
2801 ) -> Result<()> {
2802 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2803 store.0.check_blocking()?;
2807 }
2808
2809 enum ResultInfo {
2810 Heap { results: u32 },
2811 Stack { result_count: u32 },
2812 }
2813
2814 let result_info = match &caller_info {
2815 CallerInfo::Async {
2816 has_result: true,
2817 params,
2818 } => ResultInfo::Heap {
2819 results: match params.last() {
2820 Some(r) => r.get_u32(),
2821 None => bail_bug!("retptr missing"),
2822 },
2823 },
2824 CallerInfo::Async {
2825 has_result: false, ..
2826 } => ResultInfo::Stack { result_count: 0 },
2827 CallerInfo::Sync {
2828 result_count,
2829 params,
2830 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2831 results: match params.last() {
2832 Some(r) => r.get_u32(),
2833 None => bail_bug!("arg ptr missing"),
2834 },
2835 },
2836 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2837 result_count: *result_count,
2838 },
2839 };
2840
2841 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2842
2843 let start = SendSyncPtr::new(start);
2847 let return_ = SendSyncPtr::new(return_);
2848 let token = StoreToken::new(store.as_context_mut());
2849 let old_thread = store.0.current_guest_thread()?;
2850 let state = store.0.concurrent_state_mut()?;
2851
2852 debug_assert_eq!(
2853 state.get_mut(old_thread.task)?.instance,
2854 self.runtime_instance(caller_instance)
2855 );
2856
2857 let guest_thread = GuestTask::new(
2858 state,
2859 Box::new(move |store, dst| {
2860 let mut store = token.as_context_mut(store);
2861 assert!(dst.len() <= MAX_FLAT_PARAMS);
2862 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2864 let count = match caller_info {
2865 CallerInfo::Async { params, has_result } => {
2869 let params = ¶ms[..params.len() - usize::from(has_result)];
2870 for (param, src) in params.iter().zip(&mut src) {
2871 src.write(*param);
2872 }
2873 params.len()
2874 }
2875
2876 CallerInfo::Sync { params, .. } => {
2878 for (param, src) in params.iter().zip(&mut src) {
2879 src.write(*param);
2880 }
2881 params.len()
2882 }
2883 };
2884 unsafe {
2891 crate::Func::call_unchecked_raw(
2892 &mut store,
2893 start.as_non_null(),
2894 NonNull::new(
2895 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2896 )
2897 .unwrap(),
2898 )?;
2899 }
2900 dst.copy_from_slice(&src[..dst.len()]);
2901 let task = store.0.current_guest_thread()?.task;
2902 let state = store.0.concurrent_state_mut()?;
2903 Waitable::Guest(task).set_event(
2904 state,
2905 Some(Event::Subtask {
2906 status: Status::Started,
2907 }),
2908 )?;
2909 Ok(())
2910 }),
2911 LiftResult {
2912 lift: Box::new(move |store, src| {
2913 let mut store = token.as_context_mut(store);
2916 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2918 my_src.push(ValRaw::u32(*results));
2919 }
2920
2921 let prev = store.0.set_thread(old_thread)?;
2927
2928 unsafe {
2935 crate::Func::call_unchecked_raw(
2936 &mut store,
2937 return_.as_non_null(),
2938 my_src.as_mut_slice().into(),
2939 )?;
2940 }
2941
2942 store.0.set_thread(prev)?;
2945
2946 let thread = store.0.current_guest_thread()?;
2947 let state = store.0.concurrent_state_mut()?;
2948 if sync_caller {
2949 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2950 if let ResultInfo::Stack { result_count } = &result_info {
2951 match result_count {
2952 0 => None,
2953 1 => Some(my_src[0]),
2954 _ => unreachable!(),
2955 }
2956 } else {
2957 None
2958 },
2959 );
2960 }
2961 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2962 }),
2963 ty: task_return_type,
2964 memory: NonNull::new(memory).map(SendSyncPtr::new),
2965 string_encoding,
2966 },
2967 Caller::Guest { thread: old_thread },
2968 None,
2969 self.runtime_instance(callee_instance),
2970 callee_async,
2971 )?;
2972
2973 store.0.set_thread(guest_thread)?;
2976 log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2977
2978 Ok(())
2979 }
2980
2981 unsafe fn call_callback<T>(
2986 self,
2987 mut store: StoreContextMut<T>,
2988 function: SendSyncPtr<VMFuncRef>,
2989 event: Event,
2990 handle: u32,
2991 ) -> Result<u32> {
2992 let (ordinal, result) = event.parts();
2993 let params = &mut [
2994 ValRaw::u32(ordinal),
2995 ValRaw::u32(handle),
2996 ValRaw::u32(result),
2997 ];
2998 unsafe {
3003 crate::Func::call_unchecked_raw(
3004 &mut store,
3005 function.as_non_null(),
3006 params.as_mut_slice().into(),
3007 )?;
3008 }
3009 Ok(params[0].get_u32())
3010 }
3011
3012 unsafe fn start_call<T: 'static>(
3025 self,
3026 mut store: StoreContextMut<T>,
3027 callback: *mut VMFuncRef,
3028 post_return: *mut VMFuncRef,
3029 callee: NonNull<VMFuncRef>,
3030 param_count: u32,
3031 result_count: u32,
3032 flags: u32,
3033 storage: Option<&mut [MaybeUninit<ValRaw>]>,
3034 ) -> Result<u32> {
3035 let token = StoreToken::new(store.as_context_mut());
3036 let async_caller = storage.is_none();
3037 let guest_thread = store.0.current_guest_thread()?;
3038 let state = store.0.concurrent_state_mut()?;
3039 let callee_async = state.get_mut(guest_thread.task)?.async_function;
3040 let callee = SendSyncPtr::new(callee);
3041 let param_count = usize::try_from(param_count)?;
3042 assert!(param_count <= MAX_FLAT_PARAMS);
3043 let result_count = usize::try_from(result_count)?;
3044 assert!(result_count <= MAX_FLAT_RESULTS);
3045
3046 let task = state.get_mut(guest_thread.task)?;
3047 if let Some(callback) = NonNull::new(callback) {
3048 let callback = SendSyncPtr::new(callback);
3052 task.callback = Some(Box::new(move |store, event, handle| {
3053 let store = token.as_context_mut(store);
3054 unsafe { self.call_callback::<T>(store, callback, event, handle) }
3055 }));
3056 }
3057
3058 let Caller::Guest { thread: caller } = &task.caller else {
3059 bail_bug!("start_call unexpectedly invoked for host->guest call");
3062 };
3063 let caller = *caller;
3064 let caller_instance = state.get_mut(caller.task)?.instance;
3065
3066 unsafe {
3068 self.queue_call(
3069 store.as_context_mut(),
3070 guest_thread,
3071 callee,
3072 param_count,
3073 result_count,
3074 (flags & START_FLAG_ASYNC_CALLEE) != 0,
3075 NonNull::new(callback).map(SendSyncPtr::new),
3076 NonNull::new(post_return).map(SendSyncPtr::new),
3077 )?;
3078 }
3079
3080 let state = store.0.concurrent_state_mut()?;
3081
3082 let guest_waitable = Waitable::Guest(guest_thread.task);
3085 let old_set = guest_waitable.common(state)?.set;
3086 let set = state.get_mut(caller.thread)?.sync_call_set;
3087 guest_waitable.join(state, Some(set))?;
3088
3089 store.0.set_thread(CurrentThread::None)?;
3090
3091 let (status, waitable) = loop {
3107 store.0.suspend(SuspendReason::Waiting {
3108 set,
3109 thread: caller,
3110 skip_may_block_check: async_caller || !callee_async,
3118 })?;
3119
3120 let state = store.0.concurrent_state_mut()?;
3121
3122 log::trace!("taking event for {:?}", guest_thread.task);
3123 let event = guest_waitable.take_event(state)?;
3124 let Some(Event::Subtask { status }) = event else {
3125 bail_bug!("subtasks should only get subtask events, got {event:?}")
3126 };
3127
3128 log::trace!("status {status:?} for {:?}", guest_thread.task);
3129
3130 if status == Status::Returned {
3131 break (status, None);
3133 } else if async_caller {
3134 let handle = store
3138 .0
3139 .instance_state(caller_instance)
3140 .handle_table()
3141 .subtask_insert_guest(guest_thread.task.rep())?;
3142 store
3143 .0
3144 .concurrent_state_mut()?
3145 .get_mut(guest_thread.task)?
3146 .common
3147 .handle = Some(handle);
3148 break (status, Some(handle));
3149 } else {
3150 }
3154 };
3155
3156 guest_waitable.join(store.0.concurrent_state_mut()?, old_set)?;
3157
3158 store.0.set_thread(caller)?;
3160 store
3161 .0
3162 .concurrent_state_mut()?
3163 .get_mut(caller.thread)?
3164 .state = GuestThreadState::Running;
3165 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
3166
3167 if let Some(storage) = storage {
3168 let state = store.0.concurrent_state_mut()?;
3172 let task = state.get_mut(guest_thread.task)?;
3173 if let Some(result) = task.sync_result.take()? {
3174 if let Some(result) = result {
3175 storage[0] = MaybeUninit::new(result);
3176 }
3177
3178 if task.exited && task.ready_to_delete() {
3179 Waitable::Guest(guest_thread.task).delete_from(state)?;
3180 }
3181 }
3182 }
3183
3184 Ok(status.pack(waitable))
3185 }
3186
3187 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
3200 self,
3201 mut store: StoreContextMut<'_, T>,
3202 future: impl Future<Output = Result<R>> + Send + 'static,
3203 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
3204 ) -> Result<Option<u32>> {
3205 let token = StoreToken::new(store.as_context_mut());
3206 let task = store.0.current_host_thread()?;
3207 let state = store.0.concurrent_state_mut()?;
3208
3209 let (join_handle, future) = JoinHandle::run(future);
3212 {
3213 let state = &mut state.get_mut(task)?.state;
3214 assert!(matches!(state, HostTaskState::CalleeStarted));
3215 *state = HostTaskState::CalleeRunning(join_handle);
3216 }
3217
3218 let mut future = Box::pin(future);
3219
3220 let poll = tls::set(store.0, || {
3225 future
3226 .as_mut()
3227 .poll(&mut Context::from_waker(&Waker::noop()))
3228 });
3229
3230 match poll {
3231 Poll::Ready(result) => {
3233 let result = result.transpose()?;
3234 lower(store.as_context_mut(), result)?;
3235 return Ok(None);
3236 }
3237
3238 Poll::Pending => {}
3240 }
3241
3242 let future = Box::pin(async move {
3250 let result = match future.await {
3251 Some(result) => Some(result?),
3252 None => None,
3253 };
3254 let on_complete = move |store: &mut dyn VMStore| {
3255 let mut store = token.as_context_mut(store);
3259 let old = store.0.set_thread(task)?;
3260
3261 let status = if result.is_some() {
3262 Status::Returned
3263 } else {
3264 Status::ReturnCancelled
3265 };
3266
3267 lower(store.as_context_mut(), result)?;
3268 let state = store.0.concurrent_state_mut()?;
3269 match &mut state.get_mut(task)?.state {
3270 HostTaskState::CalleeDone { .. } => {}
3273
3274 other => *other = HostTaskState::CalleeDone { cancelled: false },
3276 }
3277 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3278
3279 store.0.set_thread(old)?;
3280 Ok(())
3281 };
3282
3283 tls::get(move |store| {
3288 store
3289 .concurrent_state_mut()?
3290 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3291 on_complete,
3292 ))));
3293 Ok(())
3294 })
3295 });
3296
3297 let state = store.0.concurrent_state_mut()?;
3300 state.push_future(future);
3301 let caller = state.get_mut(task)?.caller;
3302 let instance = state.get_mut(caller.task)?.instance;
3303 let handle = store
3304 .0
3305 .instance_state(instance)
3306 .handle_table()
3307 .subtask_insert_host(task.rep())?;
3308 store.0.concurrent_state_mut()?.get_mut(task)?.common.handle = Some(handle);
3309 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3310
3311 store.0.set_thread(caller)?;
3315 Ok(Some(handle))
3316 }
3317
3318 pub(crate) fn task_return(
3321 self,
3322 store: &mut dyn VMStore,
3323 ty: TypeTupleIndex,
3324 options: OptionsIndex,
3325 storage: &[ValRaw],
3326 ) -> Result<()> {
3327 let guest_thread = store.current_guest_thread()?;
3328 let state = store.concurrent_state_mut()?;
3329 let lift = state
3330 .get_mut(guest_thread.task)?
3331 .lift_result
3332 .take()
3333 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3334 if !state.get_mut(guest_thread.task)?.result.is_none() {
3335 bail_bug!("task result unexpectedly already set");
3336 }
3337
3338 let CanonicalOptions {
3339 string_encoding,
3340 data_model,
3341 ..
3342 } = &self.id().get(store).component().env_component().options[options];
3343
3344 let invalid = ty != lift.ty
3345 || string_encoding != &lift.string_encoding
3346 || match data_model {
3347 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3348 Some(memory) => {
3349 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3350 let actual = self.id().get(store).runtime_memory(memory);
3351 expected != actual.as_ptr()
3352 }
3353 None => false,
3356 },
3357 CanonicalOptionsDataModel::Gc { .. } => true,
3359 };
3360
3361 if invalid {
3362 bail!(Trap::TaskReturnInvalid);
3363 }
3364
3365 log::trace!("task.return for {guest_thread:?}");
3366
3367 let result = (lift.lift)(store, storage)?;
3368 self.task_complete(store, guest_thread.task, result, Status::Returned)
3369 }
3370
3371 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3373 let guest_thread = store.current_guest_thread()?;
3374 let state = store.concurrent_state_mut()?;
3375 let task = state.get_mut(guest_thread.task)?;
3376 if !task.cancel_sent {
3377 bail!(Trap::TaskCancelNotCancelled);
3378 }
3379 _ = task
3380 .lift_result
3381 .take()
3382 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3383
3384 if !task.result.is_none() {
3385 bail_bug!("task result should not bet set yet");
3386 }
3387
3388 log::trace!("task.cancel for {guest_thread:?}");
3389
3390 self.task_complete(
3391 store,
3392 guest_thread.task,
3393 Box::new(DummyResult),
3394 Status::ReturnCancelled,
3395 )
3396 }
3397
3398 fn task_complete(
3404 self,
3405 store: &mut StoreOpaque,
3406 guest_task: TableId<GuestTask>,
3407 result: Box<dyn Any + Send + Sync>,
3408 status: Status,
3409 ) -> Result<()> {
3410 store
3411 .component_resource_tables(Some(self))?
3412 .validate_scope_exit()?;
3413
3414 let state = store.concurrent_state_mut()?;
3415 let task = state.get_mut(guest_task)?;
3416
3417 if let Caller::Host { tx, .. } = &mut task.caller {
3418 if let Some(tx) = tx.take() {
3419 _ = tx.send(result);
3420 }
3421 } else {
3422 task.result = Some(result);
3423 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3424 }
3425
3426 Ok(())
3427 }
3428
3429 pub(crate) fn waitable_set_new(
3431 self,
3432 store: &mut StoreOpaque,
3433 caller_instance: RuntimeComponentInstanceIndex,
3434 ) -> Result<u32> {
3435 let set = store.concurrent_state_mut()?.push(WaitableSet::default())?;
3436 let handle = store
3437 .instance_state(self.runtime_instance(caller_instance))
3438 .handle_table()
3439 .waitable_set_insert(set.rep())?;
3440 log::trace!("new waitable set {set:?} (handle {handle})");
3441 Ok(handle)
3442 }
3443
3444 pub(crate) fn waitable_set_drop(
3446 self,
3447 store: &mut StoreOpaque,
3448 caller_instance: RuntimeComponentInstanceIndex,
3449 set: u32,
3450 ) -> Result<()> {
3451 let rep = store
3452 .instance_state(self.runtime_instance(caller_instance))
3453 .handle_table()
3454 .waitable_set_remove(set)?;
3455
3456 log::trace!("drop waitable set {rep} (handle {set})");
3457
3458 if !store
3462 .concurrent_state_mut()?
3463 .get_mut(TableId::<WaitableSet>::new(rep))?
3464 .waiting
3465 .is_empty()
3466 {
3467 bail!(Trap::WaitableSetDropHasWaiters);
3468 }
3469
3470 store
3471 .concurrent_state_mut()?
3472 .delete(TableId::<WaitableSet>::new(rep))?;
3473
3474 Ok(())
3475 }
3476
3477 pub(crate) fn waitable_join(
3479 self,
3480 store: &mut StoreOpaque,
3481 caller_instance: RuntimeComponentInstanceIndex,
3482 waitable_handle: u32,
3483 set_handle: u32,
3484 ) -> Result<()> {
3485 let mut instance = self.id().get_mut(store);
3486 let waitable =
3487 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3488
3489 let set = if set_handle == 0 {
3490 None
3491 } else {
3492 let set = instance.instance_states().0[caller_instance]
3493 .handle_table()
3494 .waitable_set_rep(set_handle)?;
3495
3496 let state = store.concurrent_state_mut()?;
3497 if let Some(old) = waitable.common(state)?.set
3498 && state.get_mut(old)?.is_sync_call_set
3499 {
3500 bail!(Trap::WaitableSyncAndAsync);
3501 }
3502
3503 Some(TableId::<WaitableSet>::new(set))
3504 };
3505
3506 log::trace!(
3507 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3508 );
3509
3510 waitable.join(store.concurrent_state_mut()?, set)
3511 }
3512
3513 pub(crate) fn subtask_drop(
3515 self,
3516 store: &mut StoreOpaque,
3517 caller_instance: RuntimeComponentInstanceIndex,
3518 task_id: u32,
3519 ) -> Result<()> {
3520 self.waitable_join(store, caller_instance, task_id, 0)?;
3521
3522 let (rep, is_host) = store
3523 .instance_state(self.runtime_instance(caller_instance))
3524 .handle_table()
3525 .subtask_remove(task_id)?;
3526
3527 let concurrent_state = store.concurrent_state_mut()?;
3528 let (waitable, delete) = if is_host {
3529 let id = TableId::<HostTask>::new(rep);
3530 let task = concurrent_state.get_mut(id)?;
3531 match &task.state {
3532 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3533 HostTaskState::CalleeDone { .. } => {}
3534 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3535 bail_bug!("invalid state for callee in `subtask.drop`")
3536 }
3537 }
3538 (Waitable::Host(id), true)
3539 } else {
3540 let id = TableId::<GuestTask>::new(rep);
3541 let task = concurrent_state.get_mut(id)?;
3542 if task.lift_result.is_some() {
3543 bail!(Trap::SubtaskDropNotResolved);
3544 }
3545 (
3546 Waitable::Guest(id),
3547 concurrent_state.get_mut(id)?.ready_to_delete(),
3548 )
3549 };
3550
3551 waitable.common(concurrent_state)?.handle = None;
3552
3553 if waitable.take_event(concurrent_state)?.is_some() {
3556 bail!(Trap::SubtaskDropNotResolved);
3557 }
3558
3559 if delete {
3560 waitable.delete_from(concurrent_state)?;
3561 }
3562
3563 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3564 Ok(())
3565 }
3566
3567 pub(crate) fn waitable_set_wait(
3569 self,
3570 store: &mut StoreOpaque,
3571 options: OptionsIndex,
3572 set: u32,
3573 payload: u32,
3574 ) -> Result<u32> {
3575 if !self.options(store, options).async_ {
3576 store.check_blocking()?;
3580 }
3581
3582 let &CanonicalOptions {
3583 cancellable,
3584 instance: caller_instance,
3585 ..
3586 } = &self.id().get(store).component().env_component().options[options];
3587 let rep = store
3588 .instance_state(self.runtime_instance(caller_instance))
3589 .handle_table()
3590 .waitable_set_rep(set)?;
3591
3592 self.waitable_check(
3593 store,
3594 cancellable,
3595 WaitableCheck::Wait,
3596 WaitableCheckParams {
3597 set: TableId::new(rep),
3598 options,
3599 payload,
3600 },
3601 )
3602 }
3603
3604 pub(crate) fn waitable_set_poll(
3606 self,
3607 store: &mut StoreOpaque,
3608 options: OptionsIndex,
3609 set: u32,
3610 payload: u32,
3611 ) -> Result<u32> {
3612 let &CanonicalOptions {
3613 cancellable,
3614 instance: caller_instance,
3615 ..
3616 } = &self.id().get(store).component().env_component().options[options];
3617 let rep = store
3618 .instance_state(self.runtime_instance(caller_instance))
3619 .handle_table()
3620 .waitable_set_rep(set)?;
3621
3622 self.waitable_check(
3623 store,
3624 cancellable,
3625 WaitableCheck::Poll,
3626 WaitableCheckParams {
3627 set: TableId::new(rep),
3628 options,
3629 payload,
3630 },
3631 )
3632 }
3633
3634 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3636 let thread_id = store.current_guest_thread()?.thread;
3637 match store
3638 .concurrent_state_mut()?
3639 .get_mut(thread_id)?
3640 .instance_rep
3641 {
3642 Some(r) => Ok(r),
3643 None => bail_bug!("thread should have instance_rep by now"),
3644 }
3645 }
3646
3647 pub(crate) fn thread_new_indirect<T: 'static>(
3649 self,
3650 mut store: StoreContextMut<T>,
3651 runtime_instance: RuntimeComponentInstanceIndex,
3652 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3654 start_func_idx: u32,
3655 context: i32,
3656 ) -> Result<u32> {
3657 log::trace!("creating new thread");
3658
3659 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3660 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3661 let callee = instance
3662 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3663 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3664 if callee.type_index(store.0) != start_func_ty.type_index() {
3665 bail!(Trap::ThreadNewIndirectInvalidType);
3666 }
3667
3668 let token = StoreToken::new(store.as_context_mut());
3669 let start_func = Box::new(
3670 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3671 let old_thread = store.set_thread(guest_thread)?;
3672 log::trace!(
3673 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3674 );
3675
3676 let mut store = token.as_context_mut(store);
3677 let mut params = [ValRaw::i32(context)];
3678 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3681
3682 store.0.set_thread(old_thread)?;
3683
3684 store.0.cleanup_thread(
3685 guest_thread,
3686 self.runtime_instance(runtime_instance),
3687 CleanupTask::Yes,
3688 )?;
3689 log::trace!("explicit thread {guest_thread:?} completed");
3690 let state = store.0.concurrent_state_mut()?;
3691 if let Some(t) = old_thread.guest() {
3692 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3693 }
3694 log::trace!("thread start: restored {old_thread:?} as current thread");
3695
3696 Ok(())
3697 },
3698 );
3699
3700 let current_thread = store.0.current_guest_thread()?;
3701 let state = store.0.concurrent_state_mut()?;
3702 let parent_task = current_thread.task;
3703
3704 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3705 let thread_id = state.push(new_thread)?;
3706 state.get_mut(parent_task)?.threads.insert(thread_id);
3707
3708 log::trace!("new thread with id {thread_id:?} created");
3709
3710 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3711 }
3712
3713 pub(crate) fn resume_thread(
3714 self,
3715 store: &mut StoreOpaque,
3716 runtime_instance: RuntimeComponentInstanceIndex,
3717 thread_idx: u32,
3718 high_priority: bool,
3719 allow_ready: bool,
3720 ) -> Result<()> {
3721 let thread_id =
3722 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3723 let state = store.concurrent_state_mut()?;
3724 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3725 let thread = state.get_mut(guest_thread.thread)?;
3726
3727 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3728 GuestThreadState::NotStartedExplicit(start_func) => {
3729 log::trace!("starting thread {guest_thread:?}");
3730 let guest_call = WorkItem::GuestCall(
3731 runtime_instance,
3732 GuestCall {
3733 thread: guest_thread,
3734 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3735 start_func(store, guest_thread)
3736 })),
3737 },
3738 );
3739 store
3740 .concurrent_state_mut()?
3741 .push_work_item(guest_call, high_priority);
3742 }
3743 GuestThreadState::Suspended(fiber) => {
3744 log::trace!("resuming thread {thread_id:?} that was suspended");
3745 store
3746 .concurrent_state_mut()?
3747 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3748 }
3749 GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3750 log::trace!("resuming thread {thread_id:?} that was ready");
3751 thread.state = GuestThreadState::Ready { fiber, cancellable };
3752 store
3753 .concurrent_state_mut()?
3754 .promote_thread_work_item(guest_thread);
3755 }
3756 other => {
3757 thread.state = other;
3758 bail!(Trap::CannotResumeThread);
3759 }
3760 }
3761 Ok(())
3762 }
3763
3764 fn add_guest_thread_to_instance_table(
3765 self,
3766 thread_id: TableId<GuestThread>,
3767 store: &mut StoreOpaque,
3768 runtime_instance: RuntimeComponentInstanceIndex,
3769 ) -> Result<u32> {
3770 let guest_id = store
3771 .instance_state(self.runtime_instance(runtime_instance))
3772 .thread_handle_table()
3773 .guest_thread_insert(thread_id.rep())?;
3774 store
3775 .concurrent_state_mut()?
3776 .get_mut(thread_id)?
3777 .instance_rep = Some(guest_id);
3778 Ok(guest_id)
3779 }
3780
3781 pub(crate) fn suspension_intrinsic(
3784 self,
3785 store: &mut StoreOpaque,
3786 caller: RuntimeComponentInstanceIndex,
3787 cancellable: bool,
3788 yielding: bool,
3789 to_thread: SuspensionTarget,
3790 ) -> Result<WaitResult> {
3791 let guest_thread = store.current_guest_thread()?;
3792 if to_thread.is_none() {
3793 let state = store.concurrent_state_mut()?;
3794 if yielding {
3795 if !state.may_block(guest_thread.task)? {
3797 if !state.promote_instance_local_thread_work_item(caller) {
3800 return Ok(WaitResult::Completed);
3802 }
3803 }
3804 } else {
3805 store.check_blocking()?;
3809 }
3810 }
3811
3812 if cancellable && store.take_pending_cancellation()? {
3814 return Ok(WaitResult::Cancelled);
3815 }
3816
3817 match to_thread {
3818 SuspensionTarget::SomeSuspended(thread) => {
3819 self.resume_thread(store, caller, thread, true, false)?
3820 }
3821 SuspensionTarget::Some(thread) => {
3822 self.resume_thread(store, caller, thread, true, true)?
3823 }
3824 SuspensionTarget::None => { }
3825 }
3826
3827 let reason = if yielding {
3828 SuspendReason::Yielding {
3829 thread: guest_thread,
3830 cancellable,
3831 skip_may_block_check: to_thread.is_some(),
3835 }
3836 } else {
3837 SuspendReason::ExplicitlySuspending {
3838 thread: guest_thread,
3839 skip_may_block_check: to_thread.is_some(),
3843 }
3844 };
3845
3846 store.suspend(reason)?;
3847
3848 if cancellable && store.take_pending_cancellation()? {
3849 Ok(WaitResult::Cancelled)
3850 } else {
3851 Ok(WaitResult::Completed)
3852 }
3853 }
3854
3855 fn waitable_check(
3857 self,
3858 store: &mut StoreOpaque,
3859 cancellable: bool,
3860 check: WaitableCheck,
3861 params: WaitableCheckParams,
3862 ) -> Result<u32> {
3863 let guest_thread = store.current_guest_thread()?;
3864
3865 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3866
3867 let state = store.concurrent_state_mut()?;
3868 let task = state.get_mut(guest_thread.task)?;
3869
3870 match &check {
3873 WaitableCheck::Wait => {
3874 let set = params.set;
3875
3876 if (task.event.is_none()
3877 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3878 && state.get_mut(set)?.ready.is_empty()
3879 {
3880 if cancellable {
3881 let old = state
3882 .get_mut(guest_thread.thread)?
3883 .wake_on_cancel
3884 .replace(set);
3885 if !old.is_none() {
3886 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3887 }
3888 }
3889
3890 store.suspend(SuspendReason::Waiting {
3891 set,
3892 thread: guest_thread,
3893 skip_may_block_check: false,
3894 })?;
3895 }
3896 }
3897 WaitableCheck::Poll => {}
3898 }
3899
3900 log::trace!(
3901 "waitable check for {guest_thread:?}; set {:?}, part two",
3902 params.set
3903 );
3904
3905 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3907
3908 let (ordinal, handle, result) = match &check {
3909 WaitableCheck::Wait => {
3910 let (event, waitable) = match event {
3911 Some(p) => p,
3912 None => bail_bug!("event expected to be present"),
3913 };
3914 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3915 let (ordinal, result) = event.parts();
3916 (ordinal, handle, result)
3917 }
3918 WaitableCheck::Poll => {
3919 if let Some((event, waitable)) = event {
3920 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3921 let (ordinal, result) = event.parts();
3922 (ordinal, handle, result)
3923 } else {
3924 log::trace!(
3925 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3926 guest_thread.task,
3927 params.set
3928 );
3929 let (ordinal, result) = Event::None.parts();
3930 (ordinal, 0, result)
3931 }
3932 }
3933 };
3934 let memory = self.options_memory_mut(store, params.options);
3935 let ptr = crate::component::func::validate_inbounds_dynamic(
3936 &CanonicalAbiInfo::POINTER_PAIR,
3937 memory,
3938 &ValRaw::u32(params.payload),
3939 )?;
3940 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3941 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3942 Ok(ordinal)
3943 }
3944
3945 pub(crate) fn subtask_cancel(
3947 self,
3948 store: &mut StoreOpaque,
3949 caller_instance: RuntimeComponentInstanceIndex,
3950 async_: bool,
3951 task_id: u32,
3952 ) -> Result<u32> {
3953 if !async_ {
3954 store.check_blocking()?;
3958 }
3959
3960 let (rep, is_host) = store
3961 .instance_state(self.runtime_instance(caller_instance))
3962 .handle_table()
3963 .subtask_rep(task_id)?;
3964 let waitable = if is_host {
3965 Waitable::Host(TableId::<HostTask>::new(rep))
3966 } else {
3967 Waitable::Guest(TableId::<GuestTask>::new(rep))
3968 };
3969 let concurrent_state = store.concurrent_state_mut()?;
3970
3971 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3972
3973 let needs_block;
3974 if let Waitable::Host(host_task) = waitable {
3975 let state = &mut concurrent_state.get_mut(host_task)?.state;
3976 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3977 HostTaskState::CalleeRunning(handle) => {
3984 handle.abort();
3985 needs_block = true;
3986 }
3987
3988 HostTaskState::CalleeDone { cancelled } => {
3991 if cancelled {
3992 bail!(Trap::SubtaskCancelAfterTerminal);
3993 } else {
3994 needs_block = false;
3997 }
3998 }
3999
4000 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
4003 bail_bug!("invalid states for host callee")
4004 }
4005 }
4006 } else {
4007 let guest_task = TableId::<GuestTask>::new(rep);
4008 let task = concurrent_state.get_mut(guest_task)?;
4009 if !task.already_lowered_parameters() {
4010 store.cancel_guest_subtask_without_lowered_parameters(
4011 self.runtime_instance(caller_instance),
4012 guest_task,
4013 )?;
4014 return Ok(Status::StartCancelled as u32);
4015 } else if !task.returned_or_cancelled() {
4016 task.cancel_sent = true;
4019 task.event = Some(Event::Cancelled);
4024 let runtime_instance = task.instance.index;
4025 for thread in task.threads.clone() {
4026 let thread = QualifiedThreadId {
4027 task: guest_task,
4028 thread,
4029 };
4030 let thread_mut = concurrent_state.get_mut(thread.thread)?;
4031 if let Some(set) = thread_mut.wake_on_cancel.take() {
4032 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
4034 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
4035 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
4036 runtime_instance,
4037 GuestCall {
4038 thread,
4039 kind: GuestCallKind::DeliverEvent {
4040 instance,
4041 set: None,
4042 },
4043 },
4044 ),
4045 None => bail_bug!("thread not present in wake_on_cancel set"),
4046 };
4047 concurrent_state.push_high_priority(item);
4048
4049 let caller = store.current_guest_thread()?;
4050 store.suspend(SuspendReason::Yielding {
4051 thread: caller,
4052 cancellable: false,
4053 skip_may_block_check: false,
4056 })?;
4057 break;
4058 } else if let GuestThreadState::Ready {
4059 cancellable: true, ..
4060 } = &thread_mut.state
4061 {
4062 concurrent_state.promote_thread_work_item(thread);
4065 let caller = store.current_guest_thread()?;
4066 store.suspend(SuspendReason::Yielding {
4067 thread: caller,
4068 cancellable: false,
4069 skip_may_block_check: false,
4070 })?;
4071 break;
4072 }
4073 }
4074
4075 needs_block = !store
4078 .concurrent_state_mut()?
4079 .get_mut(guest_task)?
4080 .returned_or_cancelled()
4081 } else {
4082 needs_block = false;
4083 }
4084 };
4085
4086 if needs_block {
4090 if async_ {
4091 return Ok(BLOCKED);
4092 }
4093
4094 store.wait_for_event(waitable)?;
4098
4099 }
4101
4102 let event = waitable.take_event(store.concurrent_state_mut()?)?;
4103 if let Some(Event::Subtask {
4104 status: status @ (Status::Returned | Status::ReturnCancelled),
4105 }) = event
4106 {
4107 Ok(status as u32)
4108 } else {
4109 bail!(Trap::SubtaskCancelAfterTerminal);
4110 }
4111 }
4112}
4113
4114pub trait VMComponentAsyncStore {
4122 unsafe fn prepare_call(
4128 &mut self,
4129 instance: Instance,
4130 memory: *mut VMMemoryDefinition,
4131 start: NonNull<VMFuncRef>,
4132 return_: NonNull<VMFuncRef>,
4133 caller_instance: RuntimeComponentInstanceIndex,
4134 callee_instance: RuntimeComponentInstanceIndex,
4135 task_return_type: TypeTupleIndex,
4136 callee_async: bool,
4137 string_encoding: StringEncoding,
4138 result_count: u32,
4139 storage: *mut ValRaw,
4140 storage_len: usize,
4141 ) -> Result<()>;
4142
4143 unsafe fn sync_start(
4146 &mut self,
4147 instance: Instance,
4148 callback: *mut VMFuncRef,
4149 callee: NonNull<VMFuncRef>,
4150 param_count: u32,
4151 storage: *mut MaybeUninit<ValRaw>,
4152 storage_len: usize,
4153 ) -> Result<()>;
4154
4155 unsafe fn async_start(
4158 &mut self,
4159 instance: Instance,
4160 callback: *mut VMFuncRef,
4161 post_return: *mut VMFuncRef,
4162 callee: NonNull<VMFuncRef>,
4163 param_count: u32,
4164 result_count: u32,
4165 flags: u32,
4166 ) -> Result<u32>;
4167
4168 fn future_write(
4170 &mut self,
4171 instance: Instance,
4172 caller: RuntimeComponentInstanceIndex,
4173 ty: TypeFutureTableIndex,
4174 options: OptionsIndex,
4175 future: u32,
4176 address: u32,
4177 ) -> Result<u32>;
4178
4179 fn future_read(
4181 &mut self,
4182 instance: Instance,
4183 caller: RuntimeComponentInstanceIndex,
4184 ty: TypeFutureTableIndex,
4185 options: OptionsIndex,
4186 future: u32,
4187 address: u32,
4188 ) -> Result<u32>;
4189
4190 fn future_drop_writable(
4192 &mut self,
4193 instance: Instance,
4194 ty: TypeFutureTableIndex,
4195 writer: u32,
4196 ) -> Result<()>;
4197
4198 fn stream_write(
4200 &mut self,
4201 instance: Instance,
4202 caller: RuntimeComponentInstanceIndex,
4203 ty: TypeStreamTableIndex,
4204 options: OptionsIndex,
4205 stream: u32,
4206 address: u32,
4207 count: u32,
4208 ) -> Result<u32>;
4209
4210 fn stream_read(
4212 &mut self,
4213 instance: Instance,
4214 caller: RuntimeComponentInstanceIndex,
4215 ty: TypeStreamTableIndex,
4216 options: OptionsIndex,
4217 stream: u32,
4218 address: u32,
4219 count: u32,
4220 ) -> Result<u32>;
4221
4222 fn flat_stream_write(
4225 &mut self,
4226 instance: Instance,
4227 caller: RuntimeComponentInstanceIndex,
4228 ty: TypeStreamTableIndex,
4229 options: OptionsIndex,
4230 payload_size: u32,
4231 payload_align: u32,
4232 stream: u32,
4233 address: u32,
4234 count: u32,
4235 ) -> Result<u32>;
4236
4237 fn flat_stream_read(
4240 &mut self,
4241 instance: Instance,
4242 caller: RuntimeComponentInstanceIndex,
4243 ty: TypeStreamTableIndex,
4244 options: OptionsIndex,
4245 payload_size: u32,
4246 payload_align: u32,
4247 stream: u32,
4248 address: u32,
4249 count: u32,
4250 ) -> Result<u32>;
4251
4252 fn stream_drop_writable(
4254 &mut self,
4255 instance: Instance,
4256 ty: TypeStreamTableIndex,
4257 writer: u32,
4258 ) -> Result<()>;
4259
4260 fn error_context_debug_message(
4262 &mut self,
4263 instance: Instance,
4264 ty: TypeComponentLocalErrorContextTableIndex,
4265 options: OptionsIndex,
4266 err_ctx_handle: u32,
4267 debug_msg_address: u32,
4268 ) -> Result<()>;
4269
4270 fn thread_new_indirect(
4272 &mut self,
4273 instance: Instance,
4274 caller: RuntimeComponentInstanceIndex,
4275 func_ty_idx: TypeFuncIndex,
4276 start_func_table_idx: RuntimeTableIndex,
4277 start_func_idx: u32,
4278 context: i32,
4279 ) -> Result<u32>;
4280}
4281
4282impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4284 unsafe fn prepare_call(
4285 &mut self,
4286 instance: Instance,
4287 memory: *mut VMMemoryDefinition,
4288 start: NonNull<VMFuncRef>,
4289 return_: NonNull<VMFuncRef>,
4290 caller_instance: RuntimeComponentInstanceIndex,
4291 callee_instance: RuntimeComponentInstanceIndex,
4292 task_return_type: TypeTupleIndex,
4293 callee_async: bool,
4294 string_encoding: StringEncoding,
4295 result_count_or_max_if_async: u32,
4296 storage: *mut ValRaw,
4297 storage_len: usize,
4298 ) -> Result<()> {
4299 let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4303
4304 unsafe {
4305 instance.prepare_call(
4306 StoreContextMut(self),
4307 start,
4308 return_,
4309 caller_instance,
4310 callee_instance,
4311 task_return_type,
4312 callee_async,
4313 memory,
4314 string_encoding,
4315 match result_count_or_max_if_async {
4316 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4317 params,
4318 has_result: false,
4319 },
4320 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4321 params,
4322 has_result: true,
4323 },
4324 result_count => CallerInfo::Sync {
4325 params,
4326 result_count,
4327 },
4328 },
4329 )
4330 }
4331 }
4332
4333 unsafe fn sync_start(
4334 &mut self,
4335 instance: Instance,
4336 callback: *mut VMFuncRef,
4337 callee: NonNull<VMFuncRef>,
4338 param_count: u32,
4339 storage: *mut MaybeUninit<ValRaw>,
4340 storage_len: usize,
4341 ) -> Result<()> {
4342 unsafe {
4343 instance
4344 .start_call(
4345 StoreContextMut(self),
4346 callback,
4347 ptr::null_mut(),
4348 callee,
4349 param_count,
4350 1,
4351 START_FLAG_ASYNC_CALLEE,
4352 Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4356 )
4357 .map(drop)
4358 }
4359 }
4360
4361 unsafe fn async_start(
4362 &mut self,
4363 instance: Instance,
4364 callback: *mut VMFuncRef,
4365 post_return: *mut VMFuncRef,
4366 callee: NonNull<VMFuncRef>,
4367 param_count: u32,
4368 result_count: u32,
4369 flags: u32,
4370 ) -> Result<u32> {
4371 unsafe {
4372 instance.start_call(
4373 StoreContextMut(self),
4374 callback,
4375 post_return,
4376 callee,
4377 param_count,
4378 result_count,
4379 flags,
4380 None,
4381 )
4382 }
4383 }
4384
4385 fn future_write(
4386 &mut self,
4387 instance: Instance,
4388 caller: RuntimeComponentInstanceIndex,
4389 ty: TypeFutureTableIndex,
4390 options: OptionsIndex,
4391 future: u32,
4392 address: u32,
4393 ) -> Result<u32> {
4394 instance
4395 .guest_write(
4396 StoreContextMut(self),
4397 caller,
4398 TransmitIndex::Future(ty),
4399 options,
4400 None,
4401 future,
4402 address,
4403 1,
4404 )
4405 .map(|result| result.encode())
4406 }
4407
4408 fn future_read(
4409 &mut self,
4410 instance: Instance,
4411 caller: RuntimeComponentInstanceIndex,
4412 ty: TypeFutureTableIndex,
4413 options: OptionsIndex,
4414 future: u32,
4415 address: u32,
4416 ) -> Result<u32> {
4417 instance
4418 .guest_read(
4419 StoreContextMut(self),
4420 caller,
4421 TransmitIndex::Future(ty),
4422 options,
4423 None,
4424 future,
4425 address,
4426 1,
4427 )
4428 .map(|result| result.encode())
4429 }
4430
4431 fn stream_write(
4432 &mut self,
4433 instance: Instance,
4434 caller: RuntimeComponentInstanceIndex,
4435 ty: TypeStreamTableIndex,
4436 options: OptionsIndex,
4437 stream: u32,
4438 address: u32,
4439 count: u32,
4440 ) -> Result<u32> {
4441 instance
4442 .guest_write(
4443 StoreContextMut(self),
4444 caller,
4445 TransmitIndex::Stream(ty),
4446 options,
4447 None,
4448 stream,
4449 address,
4450 count,
4451 )
4452 .map(|result| result.encode())
4453 }
4454
4455 fn stream_read(
4456 &mut self,
4457 instance: Instance,
4458 caller: RuntimeComponentInstanceIndex,
4459 ty: TypeStreamTableIndex,
4460 options: OptionsIndex,
4461 stream: u32,
4462 address: u32,
4463 count: u32,
4464 ) -> Result<u32> {
4465 instance
4466 .guest_read(
4467 StoreContextMut(self),
4468 caller,
4469 TransmitIndex::Stream(ty),
4470 options,
4471 None,
4472 stream,
4473 address,
4474 count,
4475 )
4476 .map(|result| result.encode())
4477 }
4478
4479 fn future_drop_writable(
4480 &mut self,
4481 instance: Instance,
4482 ty: TypeFutureTableIndex,
4483 writer: u32,
4484 ) -> Result<()> {
4485 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4486 }
4487
4488 fn flat_stream_write(
4489 &mut self,
4490 instance: Instance,
4491 caller: RuntimeComponentInstanceIndex,
4492 ty: TypeStreamTableIndex,
4493 options: OptionsIndex,
4494 payload_size: u32,
4495 payload_align: u32,
4496 stream: u32,
4497 address: u32,
4498 count: u32,
4499 ) -> Result<u32> {
4500 instance
4501 .guest_write(
4502 StoreContextMut(self),
4503 caller,
4504 TransmitIndex::Stream(ty),
4505 options,
4506 Some(FlatAbi {
4507 size: payload_size,
4508 align: payload_align,
4509 }),
4510 stream,
4511 address,
4512 count,
4513 )
4514 .map(|result| result.encode())
4515 }
4516
4517 fn flat_stream_read(
4518 &mut self,
4519 instance: Instance,
4520 caller: RuntimeComponentInstanceIndex,
4521 ty: TypeStreamTableIndex,
4522 options: OptionsIndex,
4523 payload_size: u32,
4524 payload_align: u32,
4525 stream: u32,
4526 address: u32,
4527 count: u32,
4528 ) -> Result<u32> {
4529 instance
4530 .guest_read(
4531 StoreContextMut(self),
4532 caller,
4533 TransmitIndex::Stream(ty),
4534 options,
4535 Some(FlatAbi {
4536 size: payload_size,
4537 align: payload_align,
4538 }),
4539 stream,
4540 address,
4541 count,
4542 )
4543 .map(|result| result.encode())
4544 }
4545
4546 fn stream_drop_writable(
4547 &mut self,
4548 instance: Instance,
4549 ty: TypeStreamTableIndex,
4550 writer: u32,
4551 ) -> Result<()> {
4552 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4553 }
4554
4555 fn error_context_debug_message(
4556 &mut self,
4557 instance: Instance,
4558 ty: TypeComponentLocalErrorContextTableIndex,
4559 options: OptionsIndex,
4560 err_ctx_handle: u32,
4561 debug_msg_address: u32,
4562 ) -> Result<()> {
4563 instance.error_context_debug_message(
4564 StoreContextMut(self),
4565 ty,
4566 options,
4567 err_ctx_handle,
4568 debug_msg_address,
4569 )
4570 }
4571
4572 fn thread_new_indirect(
4573 &mut self,
4574 instance: Instance,
4575 caller: RuntimeComponentInstanceIndex,
4576 func_ty_idx: TypeFuncIndex,
4577 start_func_table_idx: RuntimeTableIndex,
4578 start_func_idx: u32,
4579 context: i32,
4580 ) -> Result<u32> {
4581 instance.thread_new_indirect(
4582 StoreContextMut(self),
4583 caller,
4584 func_ty_idx,
4585 start_func_table_idx,
4586 start_func_idx,
4587 context,
4588 )
4589 }
4590}
4591
4592type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4593
4594pub(crate) struct HostTask {
4598 common: WaitableCommon,
4599
4600 caller: QualifiedThreadId,
4602
4603 call_context: CallContext,
4606
4607 state: HostTaskState,
4608}
4609
4610enum HostTaskState {
4611 CalleeStarted,
4616
4617 CalleeRunning(JoinHandle),
4622
4623 CalleeFinished(LiftedResult),
4627
4628 CalleeDone { cancelled: bool },
4631}
4632
4633impl HostTask {
4634 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4635 Self {
4636 common: WaitableCommon::default(),
4637 call_context: CallContext::default(),
4638 caller,
4639 state,
4640 }
4641 }
4642}
4643
4644impl TableDebug for HostTask {
4645 fn type_name() -> &'static str {
4646 "HostTask"
4647 }
4648}
4649
4650type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4651
4652enum Caller {
4654 Host {
4656 tx: Option<oneshot::Sender<LiftedResult>>,
4658 host_future_present: bool,
4661 caller: CurrentThread,
4665 },
4666 Guest {
4668 thread: QualifiedThreadId,
4670 },
4671}
4672
4673struct LiftResult {
4676 lift: RawLift,
4677 ty: TypeTupleIndex,
4678 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4679 string_encoding: StringEncoding,
4680}
4681
4682#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4687pub(crate) struct QualifiedThreadId {
4688 task: TableId<GuestTask>,
4689 thread: TableId<GuestThread>,
4690}
4691
4692impl QualifiedThreadId {
4693 fn qualify(
4694 state: &mut ConcurrentState,
4695 thread: TableId<GuestThread>,
4696 ) -> Result<QualifiedThreadId> {
4697 Ok(QualifiedThreadId {
4698 task: state.get_mut(thread)?.parent_task,
4699 thread,
4700 })
4701 }
4702}
4703
4704impl fmt::Debug for QualifiedThreadId {
4705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4706 f.debug_tuple("QualifiedThreadId")
4707 .field(&self.task.rep())
4708 .field(&self.thread.rep())
4709 .finish()
4710 }
4711}
4712
4713enum GuestThreadState {
4714 NotStartedImplicit,
4715 NotStartedExplicit(
4716 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4717 ),
4718 Running,
4719 Suspended(StoreFiber<'static>),
4720 Ready {
4721 fiber: StoreFiber<'static>,
4722 cancellable: bool,
4723 },
4724 Completed,
4725}
4726pub struct GuestThread {
4727 context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4730 parent_task: TableId<GuestTask>,
4732 wake_on_cancel: Option<TableId<WaitableSet>>,
4735 state: GuestThreadState,
4737 instance_rep: Option<u32>,
4740 sync_call_set: TableId<WaitableSet>,
4742}
4743
4744impl GuestThread {
4745 fn from_instance(
4748 state: Pin<&mut ComponentInstance>,
4749 caller_instance: RuntimeComponentInstanceIndex,
4750 guest_thread: u32,
4751 ) -> Result<TableId<Self>> {
4752 let rep = state.instance_states().0[caller_instance]
4753 .thread_handle_table()
4754 .guest_thread_rep(guest_thread)?;
4755 Ok(TableId::new(rep))
4756 }
4757
4758 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4759 let sync_call_set = state.push(WaitableSet {
4760 is_sync_call_set: true,
4761 ..WaitableSet::default()
4762 })?;
4763 Ok(Self {
4764 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4765 parent_task,
4766 wake_on_cancel: None,
4767 state: GuestThreadState::NotStartedImplicit,
4768 instance_rep: None,
4769 sync_call_set,
4770 })
4771 }
4772
4773 fn new_explicit(
4774 state: &mut ConcurrentState,
4775 parent_task: TableId<GuestTask>,
4776 start_func: Box<
4777 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4778 >,
4779 ) -> Result<Self> {
4780 let sync_call_set = state.push(WaitableSet {
4781 is_sync_call_set: true,
4782 ..WaitableSet::default()
4783 })?;
4784 Ok(Self {
4785 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4786 parent_task,
4787 wake_on_cancel: None,
4788 state: GuestThreadState::NotStartedExplicit(start_func),
4789 instance_rep: None,
4790 sync_call_set,
4791 })
4792 }
4793}
4794
4795impl TableDebug for GuestThread {
4796 fn type_name() -> &'static str {
4797 "GuestThread"
4798 }
4799}
4800
4801enum SyncResult {
4802 NotProduced,
4803 Produced(Option<ValRaw>),
4804 Taken,
4805}
4806
4807impl SyncResult {
4808 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4809 Ok(match mem::replace(self, SyncResult::Taken) {
4810 SyncResult::NotProduced => None,
4811 SyncResult::Produced(val) => Some(val),
4812 SyncResult::Taken => {
4813 bail_bug!("attempted to take a synchronous result that was already taken")
4814 }
4815 })
4816 }
4817}
4818
4819#[derive(Debug)]
4820enum HostFutureState {
4821 NotApplicable,
4822 Live,
4823 Dropped,
4824}
4825
4826pub(crate) struct GuestTask {
4828 common: WaitableCommon,
4830 lower_params: Option<RawLower>,
4832 lift_result: Option<LiftResult>,
4834 result: Option<LiftedResult>,
4837 callback: Option<CallbackFn>,
4840 caller: Caller,
4842 call_context: CallContext,
4847 sync_result: SyncResult,
4850 cancel_sent: bool,
4853 starting_sent: bool,
4856 instance: RuntimeInstance,
4863 event: Option<Event>,
4866 exited: bool,
4868 threads: HashSet<TableId<GuestThread>>,
4870 host_future_state: HostFutureState,
4873 async_function: bool,
4876
4877 decremented_interesting_task_count: bool,
4878}
4879
4880impl GuestTask {
4881 fn already_lowered_parameters(&self) -> bool {
4882 self.lower_params.is_none()
4884 }
4885
4886 fn returned_or_cancelled(&self) -> bool {
4887 self.lift_result.is_none()
4889 }
4890
4891 fn ready_to_delete(&self) -> bool {
4892 let threads_completed = self.threads.is_empty();
4893 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4894 let pending_completion_event = matches!(
4895 self.common.event,
4896 Some(Event::Subtask {
4897 status: Status::Returned | Status::ReturnCancelled
4898 })
4899 );
4900 let ready = threads_completed
4901 && !has_sync_result
4902 && !pending_completion_event
4903 && !matches!(self.host_future_state, HostFutureState::Live);
4904 log::trace!(
4905 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4906 threads_completed,
4907 has_sync_result,
4908 pending_completion_event,
4909 self.host_future_state
4910 );
4911 ready
4912 }
4913
4914 fn new(
4915 state: &mut ConcurrentState,
4916 lower_params: RawLower,
4917 lift_result: LiftResult,
4918 caller: Caller,
4919 callback: Option<CallbackFn>,
4920 instance: RuntimeInstance,
4921 async_function: bool,
4922 ) -> Result<QualifiedThreadId> {
4923 let host_future_state = match &caller {
4924 Caller::Guest { .. } => HostFutureState::NotApplicable,
4925 Caller::Host {
4926 host_future_present,
4927 ..
4928 } => {
4929 if *host_future_present {
4930 HostFutureState::Live
4931 } else {
4932 HostFutureState::NotApplicable
4933 }
4934 }
4935 };
4936 let task = state.push(Self {
4937 common: WaitableCommon::default(),
4938 lower_params: Some(lower_params),
4939 lift_result: Some(lift_result),
4940 result: None,
4941 callback,
4942 caller,
4943 call_context: CallContext::default(),
4944 sync_result: SyncResult::NotProduced,
4945 cancel_sent: false,
4946 starting_sent: false,
4947 instance,
4948 event: None,
4949 exited: false,
4950 threads: HashSet::new(),
4951 host_future_state,
4952 async_function,
4953 decremented_interesting_task_count: false,
4954 })?;
4955 let new_thread = GuestThread::new_implicit(state, task)?;
4956 let thread = state.push(new_thread)?;
4957 state.get_mut(task)?.threads.insert(thread);
4958 state.interesting_tasks += 1;
4959 Ok(QualifiedThreadId { task, thread })
4960 }
4961}
4962
4963impl TableDebug for GuestTask {
4964 fn type_name() -> &'static str {
4965 "GuestTask"
4966 }
4967}
4968
4969#[derive(Default)]
4971struct WaitableCommon {
4972 event: Option<Event>,
4974 set: Option<TableId<WaitableSet>>,
4976 handle: Option<u32>,
4978}
4979
4980#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4982enum Waitable {
4983 Host(TableId<HostTask>),
4985 Guest(TableId<GuestTask>),
4987 Transmit(TableId<TransmitHandle>),
4989}
4990
4991impl Waitable {
4992 fn from_instance(
4995 state: Pin<&mut ComponentInstance>,
4996 caller_instance: RuntimeComponentInstanceIndex,
4997 waitable: u32,
4998 ) -> Result<Self> {
4999 use crate::runtime::vm::component::Waitable;
5000
5001 let (waitable, kind) = state.instance_states().0[caller_instance]
5002 .handle_table()
5003 .waitable_rep(waitable)?;
5004
5005 Ok(match kind {
5006 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
5007 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
5008 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
5009 })
5010 }
5011
5012 fn rep(&self) -> u32 {
5014 match self {
5015 Self::Host(id) => id.rep(),
5016 Self::Guest(id) => id.rep(),
5017 Self::Transmit(id) => id.rep(),
5018 }
5019 }
5020
5021 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
5025 log::trace!("waitable {self:?} join set {set:?}");
5026
5027 let old = mem::replace(&mut self.common(state)?.set, set);
5028
5029 if let Some(old) = old {
5030 match *self {
5031 Waitable::Host(id) => state.remove_child(id, old),
5032 Waitable::Guest(id) => state.remove_child(id, old),
5033 Waitable::Transmit(id) => state.remove_child(id, old),
5034 }?;
5035
5036 state.get_mut(old)?.ready.remove(self);
5037 }
5038
5039 if let Some(set) = set {
5040 match *self {
5041 Waitable::Host(id) => state.add_child(id, set),
5042 Waitable::Guest(id) => state.add_child(id, set),
5043 Waitable::Transmit(id) => state.add_child(id, set),
5044 }?;
5045
5046 if self.common(state)?.event.is_some() {
5047 self.mark_ready(state)?;
5048 }
5049 }
5050
5051 Ok(())
5052 }
5053
5054 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
5056 Ok(match self {
5057 Self::Host(id) => &mut state.get_mut(*id)?.common,
5058 Self::Guest(id) => &mut state.get_mut(*id)?.common,
5059 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
5060 })
5061 }
5062
5063 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
5067 log::trace!("set event for {self:?}: {event:?}");
5068 self.common(state)?.event = event;
5069 self.mark_ready(state)
5070 }
5071
5072 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
5074 let common = self.common(state)?;
5075 let event = common.event.take();
5076 if let Some(set) = self.common(state)?.set {
5077 state.get_mut(set)?.ready.remove(self);
5078 }
5079
5080 Ok(event)
5081 }
5082
5083 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
5087 if let Some(set) = self.common(state)?.set {
5088 state.get_mut(set)?.ready.insert(*self);
5089 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
5090 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
5091 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
5092
5093 let item = match mode {
5094 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
5095 WaitMode::Callback(instance) => WorkItem::GuestCall(
5096 state.get_mut(thread.task)?.instance.index,
5097 GuestCall {
5098 thread,
5099 kind: GuestCallKind::DeliverEvent {
5100 instance,
5101 set: Some(set),
5102 },
5103 },
5104 ),
5105 };
5106 state.push_high_priority(item);
5107 }
5108 }
5109 Ok(())
5110 }
5111
5112 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
5114 match self {
5115 Self::Host(task) => {
5116 log::trace!("delete host task {task:?}");
5117 state.delete(*task)?;
5118 }
5119 Self::Guest(task) => {
5120 log::trace!("delete guest task {task:?}");
5121 let task = state.delete(*task)?;
5122
5123 debug_assert!(task.decremented_interesting_task_count);
5130 }
5131 Self::Transmit(task) => {
5132 state.delete(*task)?;
5133 }
5134 }
5135
5136 Ok(())
5137 }
5138}
5139
5140impl fmt::Debug for Waitable {
5141 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5142 match self {
5143 Self::Host(id) => write!(f, "{id:?}"),
5144 Self::Guest(id) => write!(f, "{id:?}"),
5145 Self::Transmit(id) => write!(f, "{id:?}"),
5146 }
5147 }
5148}
5149
5150#[derive(Default)]
5152struct WaitableSet {
5153 ready: BTreeSet<Waitable>,
5155 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
5157 is_sync_call_set: bool,
5160}
5161
5162impl TableDebug for WaitableSet {
5163 fn type_name() -> &'static str {
5164 "WaitableSet"
5165 }
5166}
5167
5168type RawLower =
5170 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
5171
5172type RawLift = Box<
5174 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5175>;
5176
5177type LiftedResult = Box<dyn Any + Send + Sync>;
5181
5182struct DummyResult;
5185
5186#[derive(Default)]
5188pub struct ConcurrentInstanceState {
5189 backpressure: u16,
5191 do_not_enter: bool,
5193 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
5196}
5197
5198impl ConcurrentInstanceState {
5199 pub fn pending_is_empty(&self) -> bool {
5200 self.pending.is_empty()
5201 }
5202}
5203
5204#[derive(Debug, Copy, Clone)]
5205pub(crate) enum CurrentThread {
5206 Guest(QualifiedThreadId),
5207 Host(TableId<HostTask>),
5208 None,
5209}
5210
5211impl CurrentThread {
5212 fn guest(&self) -> Option<&QualifiedThreadId> {
5213 match self {
5214 Self::Guest(id) => Some(id),
5215 _ => None,
5216 }
5217 }
5218
5219 fn host(&self) -> Option<TableId<HostTask>> {
5220 match self {
5221 Self::Host(id) => Some(*id),
5222 _ => None,
5223 }
5224 }
5225
5226 fn is_none(&self) -> bool {
5227 matches!(self, Self::None)
5228 }
5229}
5230
5231impl From<QualifiedThreadId> for CurrentThread {
5232 fn from(id: QualifiedThreadId) -> Self {
5233 Self::Guest(id)
5234 }
5235}
5236
5237impl From<TableId<HostTask>> for CurrentThread {
5238 fn from(id: TableId<HostTask>) -> Self {
5239 Self::Host(id)
5240 }
5241}
5242
5243pub struct ConcurrentState {
5245 unforced_current_thread: CurrentThread,
5251
5252 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5257 table: AlwaysMut<ResourceTable>,
5259 high_priority: Vec<WorkItem>,
5261 low_priority: VecDeque<WorkItem>,
5263 suspend_reason: Option<SuspendReason>,
5267 worker: Option<StoreFiber<'static>>,
5271 worker_item: Option<WorkerItem>,
5273
5274 global_error_context_ref_counts:
5287 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5288
5289 interesting_tasks: usize,
5302
5303 interesting_tasks_empty_waker: Option<Waker>,
5307
5308 ready_for_concurrent_call_waker: Option<Waker>,
5313}
5314
5315impl Default for ConcurrentState {
5316 fn default() -> Self {
5317 Self {
5318 unforced_current_thread: CurrentThread::None,
5319 table: AlwaysMut::new(ResourceTable::new()),
5320 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5321 high_priority: Vec::new(),
5322 low_priority: VecDeque::new(),
5323 suspend_reason: None,
5324 worker: None,
5325 worker_item: None,
5326 global_error_context_ref_counts: BTreeMap::new(),
5327 interesting_tasks: 0,
5328 interesting_tasks_empty_waker: None,
5329 ready_for_concurrent_call_waker: None,
5330 }
5331 }
5332}
5333
5334impl ConcurrentState {
5335 pub(crate) fn take_fibers_and_futures(
5352 &mut self,
5353 fibers: &mut Vec<StoreFiber<'static>>,
5354 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5355 ) {
5356 for entry in self.table.get_mut().iter_mut() {
5357 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5358 for mode in mem::take(&mut set.waiting).into_values() {
5359 if let WaitMode::Fiber(fiber) = mode {
5360 fibers.push(fiber);
5361 }
5362 }
5363 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5364 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5365 mem::replace(&mut thread.state, GuestThreadState::Completed)
5366 {
5367 fibers.push(fiber);
5368 }
5369 }
5370 }
5371
5372 if let Some(fiber) = self.worker.take() {
5373 fibers.push(fiber);
5374 }
5375
5376 let mut handle_item = |item| match item {
5377 WorkItem::ResumeFiber(fiber) => {
5378 fibers.push(fiber);
5379 }
5380 WorkItem::PushFuture(future) => {
5381 self.futures
5382 .get_mut()
5383 .as_mut()
5384 .unwrap()
5385 .push(future.into_inner());
5386 }
5387 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5388 }
5389 };
5390
5391 for item in mem::take(&mut self.high_priority) {
5392 handle_item(item);
5393 }
5394 for item in mem::take(&mut self.low_priority) {
5395 handle_item(item);
5396 }
5397
5398 if let Some(them) = self.futures.get_mut().take() {
5399 futures.push(them);
5400 }
5401 }
5402
5403 #[cfg(feature = "gc")]
5404 pub(crate) fn trace_fiber_roots(
5405 &mut self,
5406 modules: &ModuleRegistry,
5407 unwind: &dyn Unwind,
5408 gc_roots_list: &mut GcRootsList,
5409 ) {
5410 let ConcurrentState {
5411 table,
5412 worker,
5413 high_priority,
5414 low_priority,
5415
5416 futures: _,
5420
5421 worker_item: _,
5423 unforced_current_thread: _,
5424 suspend_reason: _,
5425 global_error_context_ref_counts: _,
5426 interesting_tasks: _,
5427 interesting_tasks_empty_waker: _,
5428 ready_for_concurrent_call_waker: _,
5429 } = self;
5430
5431 for entry in table.get_mut().iter_mut() {
5432 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5433 for mode in set.waiting.values_mut() {
5434 if let WaitMode::Fiber(fiber) = mode {
5435 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5436 }
5437 }
5438 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5439 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5440 &mut thread.state
5441 {
5442 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5443 }
5444 }
5445 }
5446
5447 if let Some(fiber) = worker {
5448 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5449 }
5450
5451 let mut handle_item = |item: &mut WorkItem| match item {
5452 WorkItem::ResumeFiber(fiber) => {
5453 fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5454 }
5455 WorkItem::PushFuture(_future) => {
5456 }
5459 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5460 }
5461 };
5462
5463 for item in high_priority {
5464 handle_item(item);
5465 }
5466 for item in low_priority {
5467 handle_item(item);
5468 }
5469 }
5470
5471 fn push<V: Send + Sync + 'static>(
5472 &mut self,
5473 value: V,
5474 ) -> Result<TableId<V>, ResourceTableError> {
5475 self.table.get_mut().push(value).map(TableId::from)
5476 }
5477
5478 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5479 self.table.get_mut().get_mut(&Resource::from(id))
5480 }
5481
5482 pub fn add_child<T: 'static, U: 'static>(
5483 &mut self,
5484 child: TableId<T>,
5485 parent: TableId<U>,
5486 ) -> Result<(), ResourceTableError> {
5487 self.table
5488 .get_mut()
5489 .add_child(Resource::from(child), Resource::from(parent))
5490 }
5491
5492 pub fn remove_child<T: 'static, U: 'static>(
5493 &mut self,
5494 child: TableId<T>,
5495 parent: TableId<U>,
5496 ) -> Result<(), ResourceTableError> {
5497 self.table
5498 .get_mut()
5499 .remove_child(Resource::from(child), Resource::from(parent))
5500 }
5501
5502 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5503 self.table.get_mut().delete(Resource::from(id))
5504 }
5505
5506 fn push_future(&mut self, future: HostTaskFuture) {
5507 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5514 }
5515
5516 fn push_high_priority(&mut self, item: WorkItem) {
5517 log::trace!("push high priority: {item:?}");
5518 self.high_priority.push(item);
5519 }
5520
5521 fn push_low_priority(&mut self, item: WorkItem) {
5522 log::trace!("push low priority: {item:?}");
5523 self.low_priority.push_front(item);
5524 }
5525
5526 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5527 if high_priority {
5528 self.push_high_priority(item);
5529 } else {
5530 self.push_low_priority(item);
5531 }
5532 }
5533
5534 fn promote_instance_local_thread_work_item(
5535 &mut self,
5536 current_instance: RuntimeComponentInstanceIndex,
5537 ) -> bool {
5538 self.promote_work_items_matching(|item: &WorkItem| match item {
5539 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5540 *instance == current_instance
5541 }
5542 _ => false,
5543 })
5544 }
5545
5546 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5547 self.promote_work_items_matching(|item: &WorkItem| match item {
5548 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5549 *t == thread
5550 }
5551 _ => false,
5552 })
5553 }
5554
5555 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5556 where
5557 F: FnMut(&WorkItem) -> bool,
5558 {
5559 if self.high_priority.iter().any(&mut predicate) {
5563 true
5564 }
5565 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5568 let item = self.low_priority.remove(idx).unwrap();
5569 self.push_high_priority(item);
5570 true
5571 } else {
5572 false
5573 }
5574 }
5575
5576 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5577 if self.may_block(task)? {
5578 Ok(())
5579 } else {
5580 Err(Trap::CannotBlockSyncTask.into())
5581 }
5582 }
5583
5584 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5585 let task = self.get_mut(task)?;
5586 Ok(task.async_function || task.returned_or_cancelled())
5587 }
5588
5589 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5595 let (task, is_host) = (task >> 1, task & 1 == 1);
5596 if is_host {
5597 let task: TableId<HostTask> = TableId::new(task);
5598 Ok(&mut self.get_mut(task)?.call_context)
5599 } else {
5600 let task: TableId<GuestTask> = TableId::new(task);
5601 Ok(&mut self.get_mut(task)?.call_context)
5602 }
5603 }
5604
5605 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5606 match self.futures.get_mut().as_mut() {
5607 Some(f) => Ok(f),
5608 None => bail_bug!("futures field of concurrent state is currently taken"),
5609 }
5610 }
5611
5612 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5613 self.table.get_mut()
5614 }
5615
5616 fn parent(&mut self, cur: CurrentThread) -> Option<CurrentThread> {
5618 match cur {
5619 CurrentThread::Guest(thread) => {
5620 let task = self.get_mut(thread.task).ok()?;
5621 Some(match task.caller {
5622 Caller::Host { caller, .. } => caller,
5623 Caller::Guest { thread } => thread.into(),
5624 })
5625 }
5626 CurrentThread::Host(id) => Some(self.get_mut(id).ok()?.caller.into()),
5627 CurrentThread::None => None,
5628 }
5629 }
5630}
5631
5632fn for_any_lower<
5635 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5636>(
5637 fun: F,
5638) -> F {
5639 fun
5640}
5641
5642fn for_any_lift<
5644 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5645>(
5646 fun: F,
5647) -> F {
5648 fun
5649}
5650
5651fn check_ambient_store(id: StoreId) {
5652 let message = "\
5653 `Future`s which depend on asynchronous component tasks, streams, or \
5654 futures to complete may only be polled from the event loop of the \
5655 store to which they belong. Please use \
5656 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5657 ";
5658 tls::try_get(|store| {
5659 let matched = match store {
5660 tls::TryGet::Some(store) => store.id() == id,
5661 tls::TryGet::Taken | tls::TryGet::None => false,
5662 };
5663
5664 if !matched {
5665 panic!("{message}")
5666 }
5667 });
5668}
5669
5670fn check_recursive_run() {
5673 tls::try_get(|store| {
5674 if !matches!(store, tls::TryGet::None) {
5675 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5676 }
5677 });
5678}
5679
5680fn unpack_callback_code(code: u32) -> (u32, u32) {
5681 (code & 0xF, code >> 4)
5682}
5683
5684struct WaitableCheckParams {
5688 set: TableId<WaitableSet>,
5689 options: OptionsIndex,
5690 payload: u32,
5691}
5692
5693enum WaitableCheck {
5696 Wait,
5697 Poll,
5698}
5699
5700#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
5709pub struct GuestTaskId(TableId<GuestTask>);
5710
5711pub(crate) struct PreparedCall<R> {
5713 handle: Func,
5715 thread: QualifiedThreadId,
5717 param_count: usize,
5719 rx: oneshot::Receiver<LiftedResult>,
5722 runtime_instance: RuntimeInstance,
5724 _phantom: PhantomData<R>,
5725}
5726
5727impl<R> PreparedCall<R> {
5728 pub(crate) fn task_id(&self) -> TaskId {
5730 TaskId {
5731 task: self.thread.task,
5732 runtime_instance: self.runtime_instance,
5733 }
5734 }
5735}
5736
5737pub(crate) struct TaskId {
5739 task: TableId<GuestTask>,
5740 runtime_instance: RuntimeInstance,
5741}
5742
5743impl TaskId {
5744 pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5750 let task = store.concurrent_state_mut()?.get_mut(self.task)?;
5751 let delete = if !task.already_lowered_parameters() {
5752 store.cancel_guest_subtask_without_lowered_parameters(
5753 self.runtime_instance,
5754 self.task,
5755 )?;
5756 true
5757 } else {
5758 task.host_future_state = HostFutureState::Dropped;
5759 task.ready_to_delete()
5760 };
5761 if delete {
5762 Waitable::Guest(self.task).delete_from(store.concurrent_state_mut()?)?
5763 }
5764 Ok(())
5765 }
5766}
5767
5768pub(crate) fn prepare_call<T, R>(
5774 mut store: StoreContextMut<T>,
5775 handle: Func,
5776 param_count: usize,
5777 host_future_present: bool,
5778 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5779 + Send
5780 + Sync
5781 + 'static,
5782 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5783 + Send
5784 + Sync
5785 + 'static,
5786) -> Result<PreparedCall<R>> {
5787 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5788
5789 let instance = handle.instance().id().get(store.0);
5790 let options = &instance.component().env_component().options[options];
5791 let ty = &instance.component().types()[ty];
5792 let async_function = ty.async_;
5793 let task_return_type = ty.results;
5794 let component_instance = raw_options.instance;
5795 let callback = options.callback.map(|i| instance.runtime_callback(i));
5796 let memory = options
5797 .memory()
5798 .map(|i| instance.runtime_memory(i))
5799 .map(SendSyncPtr::new);
5800 let string_encoding = options.string_encoding;
5801 let token = StoreToken::new(store.as_context_mut());
5802 let caller = store.0.current_thread()?;
5803 let state = store.0.concurrent_state_mut()?;
5804
5805 let (tx, rx) = oneshot::channel();
5806
5807 let instance = handle.instance().runtime_instance(component_instance);
5808 let thread = GuestTask::new(
5809 state,
5810 Box::new(for_any_lower(move |store, params| {
5811 lower_params(handle, token.as_context_mut(store), params)
5812 })),
5813 LiftResult {
5814 lift: Box::new(for_any_lift(move |store, result| {
5815 lift_result(handle, store, result)
5816 })),
5817 ty: task_return_type,
5818 memory,
5819 string_encoding,
5820 },
5821 Caller::Host {
5822 tx: Some(tx),
5823 host_future_present,
5824 caller,
5825 },
5826 callback.map(|callback| {
5827 let callback = SendSyncPtr::new(callback);
5828 let instance = handle.instance();
5829 Box::new(move |store: &mut dyn VMStore, event, handle| {
5830 let store = token.as_context_mut(store);
5831 unsafe { instance.call_callback(store, callback, event, handle) }
5834 }) as CallbackFn
5835 }),
5836 instance,
5837 async_function,
5838 )?;
5839
5840 if !store.0.may_enter(instance)? {
5841 bail!(Trap::CannotEnterComponent);
5842 }
5843
5844 Ok(PreparedCall {
5845 handle,
5846 thread,
5847 param_count,
5848 runtime_instance: instance,
5849 rx,
5850 _phantom: PhantomData,
5851 })
5852}
5853
5854pub(crate) struct QueuedCall<R> {
5855 store: StoreId,
5856 task: TableId<GuestTask>,
5857 rx: oneshot::Receiver<LiftedResult>,
5858 _marker: PhantomData<fn() -> R>,
5859}
5860
5861impl<R> QueuedCall<R> {
5862 pub(crate) fn new<T: 'static>(
5869 mut store: StoreContextMut<T>,
5870 prepared: PreparedCall<R>,
5871 ) -> Result<QueuedCall<R>> {
5872 let PreparedCall {
5873 handle,
5874 thread,
5875 param_count,
5876 rx,
5877 ..
5878 } = prepared;
5879
5880 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5881
5882 Ok(QueuedCall {
5883 store: store.0.id(),
5884 task: thread.task,
5885 rx,
5886 _marker: PhantomData,
5887 })
5888 }
5889
5890 fn task(&self) -> GuestTaskId {
5891 GuestTaskId(self.task)
5892 }
5893}
5894
5895impl<R> Future for QueuedCall<R>
5896where
5897 R: 'static,
5898{
5899 type Output = Result<R>;
5900
5901 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5902 check_ambient_store(self.store);
5903 Pin::new(&mut self.rx).poll(cx).map(|result| match result {
5904 Ok(r) => match r.downcast() {
5905 Ok(r) => Ok(*r),
5906 Err(_) => bail_bug!("wrong type of value produced"),
5907 },
5908 Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5909 })
5910 }
5911}
5912
5913fn queue_call0<T: 'static>(
5916 store: StoreContextMut<T>,
5917 handle: Func,
5918 guest_thread: QualifiedThreadId,
5919 param_count: usize,
5920) -> Result<()> {
5921 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5922 let is_concurrent = raw_options.async_;
5923 let callback = raw_options.callback;
5924 let instance = handle.instance();
5925 let callee = handle.lifted_core_func(store.0);
5926 let post_return = handle.post_return_core_func(store.0);
5927 let callback = callback.map(|i| {
5928 let instance = instance.id().get(store.0);
5929 SendSyncPtr::new(instance.runtime_callback(i))
5930 });
5931
5932 log::trace!("queueing call {guest_thread:?}");
5933
5934 unsafe {
5938 instance.queue_call(
5939 store,
5940 guest_thread,
5941 SendSyncPtr::new(callee),
5942 param_count,
5943 1,
5944 is_concurrent,
5945 callback,
5946 post_return.map(SendSyncPtr::new),
5947 )
5948 }
5949}