1use crate::bail_bug;
54use crate::component::func::{self, Func, call_post_return};
55use crate::component::{
56 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::prelude::*;
60use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63use crate::{
64 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::component::{
87 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
88 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
89 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
90 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
91 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
92};
93use wasmtime_environ::packed_option::ReservedValue;
94use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112const BLOCKED: u32 = 0xffff_ffff;
115
116#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119 Starting = 0,
120 Started = 1,
121 Returned = 2,
122 StartCancelled = 3,
123 ReturnCancelled = 4,
124}
125
126impl Status {
127 pub fn pack(self, waitable: Option<u32>) -> u32 {
133 assert!(matches!(self, Status::Returned) == waitable.is_none());
134 let waitable = waitable.unwrap_or(0);
135 assert!(waitable < (1 << 28));
136 (waitable << 4) | (self as u32)
137 }
138}
139
140#[derive(Clone, Copy, Debug)]
143enum Event {
144 None,
145 Subtask {
146 status: Status,
147 },
148 StreamRead {
149 code: ReturnCode,
150 pending: Option<(TypeStreamTableIndex, u32)>,
151 },
152 StreamWrite {
153 code: ReturnCode,
154 pending: Option<(TypeStreamTableIndex, u32)>,
155 },
156 FutureRead {
157 code: ReturnCode,
158 pending: Option<(TypeFutureTableIndex, u32)>,
159 },
160 FutureWrite {
161 code: ReturnCode,
162 pending: Option<(TypeFutureTableIndex, u32)>,
163 },
164 Cancelled,
165}
166
167impl Event {
168 fn parts(self) -> (u32, u32) {
173 const EVENT_NONE: u32 = 0;
174 const EVENT_SUBTASK: u32 = 1;
175 const EVENT_STREAM_READ: u32 = 2;
176 const EVENT_STREAM_WRITE: u32 = 3;
177 const EVENT_FUTURE_READ: u32 = 4;
178 const EVENT_FUTURE_WRITE: u32 = 5;
179 const EVENT_CANCELLED: u32 = 6;
180 match self {
181 Event::None => (EVENT_NONE, 0),
182 Event::Cancelled => (EVENT_CANCELLED, 0),
183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188 }
189 }
190}
191
192mod callback_code {
194 pub const EXIT: u32 = 0;
195 pub const YIELD: u32 = 1;
196 pub const WAIT: u32 = 2;
197}
198
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210 store: StoreContextMut<'a, T>,
211 get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216 D: HasData + ?Sized,
217 T: 'static,
218{
219 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221 Self { store, get_data }
222 }
223
224 pub fn data_mut(&mut self) -> &mut T {
226 self.store.data_mut()
227 }
228
229 pub fn get(&mut self) -> D::Data<'_> {
231 (self.get_data)(self.data_mut())
232 }
233
234 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238 where
239 T: 'static,
240 {
241 let accessor = Accessor {
242 get_data: self.get_data,
243 token: StoreToken::new(self.store.as_context_mut()),
244 };
245 self.store
246 .as_context_mut()
247 .spawn_with_accessor(accessor, task)
248 }
249
250 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253 self.get_data
254 }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259 D: HasData + ?Sized,
260 T: 'static,
261{
262 type Data = T;
263
264 fn as_context(&self) -> StoreContext<'_, T> {
265 self.store.as_context()
266 }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271 D: HasData + ?Sized,
272 T: 'static,
273{
274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275 self.store.as_context_mut()
276 }
277}
278
279pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340 D: HasData + ?Sized,
341{
342 token: StoreToken<T>,
343 get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346pub trait AsAccessor {
363 type Data: 'static;
365
366 type AccessorData: HasData + ?Sized;
369
370 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375 type Data = T::Data;
376 type AccessorData = T::AccessorData;
377
378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379 T::as_accessor(self)
380 }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384 type Data = T;
385 type AccessorData = D;
386
387 fn as_accessor(&self) -> &Accessor<T, D> {
388 self
389 }
390}
391
392const _: () = {
415 const fn assert<T: Send + Sync>() {}
416 assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420 pub(crate) fn new(token: StoreToken<T>) -> Self {
429 Self {
430 token,
431 get_data: |x| x,
432 }
433 }
434}
435
436impl<T, D> Accessor<T, D>
437where
438 D: HasData + ?Sized,
439{
440 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458 tls::get(|vmstore| {
459 fun(Access {
460 store: self.token.as_context_mut(vmstore),
461 get_data: self.get_data,
462 })
463 })
464 }
465
466 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469 self.get_data
470 }
471
472 pub fn with_getter<D2: HasData>(
489 &self,
490 get_data: fn(&mut T) -> D2::Data<'_>,
491 ) -> Accessor<T, D2> {
492 Accessor {
493 token: self.token,
494 get_data,
495 }
496 }
497
498 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514 where
515 T: 'static,
516 {
517 let accessor = self.clone_for_spawn();
518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519 }
520
521 fn clone_for_spawn(&self) -> Self {
522 Self {
523 token: self.token,
524 get_data: self.get_data,
525 }
526 }
527}
528
529pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541where
542 D: HasData + ?Sized,
543{
544 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546}
547
548enum CallerInfo {
551 Async {
553 params: Vec<ValRaw>,
554 has_result: bool,
555 },
556 Sync {
558 params: Vec<ValRaw>,
559 result_count: u32,
560 },
561}
562
563enum WaitMode {
565 Fiber(StoreFiber<'static>),
567 Callback(Instance),
570}
571
572#[derive(Debug)]
574enum SuspendReason {
575 Waiting {
578 set: TableId<WaitableSet>,
579 thread: QualifiedThreadId,
580 skip_may_block_check: bool,
581 },
582 NeedWork,
585 Yielding {
588 thread: QualifiedThreadId,
589 skip_may_block_check: bool,
590 },
591 ExplicitlySuspending {
593 thread: QualifiedThreadId,
594 skip_may_block_check: bool,
595 },
596}
597
598enum GuestCallKind {
600 DeliverEvent {
603 instance: Instance,
605 set: Option<TableId<WaitableSet>>,
610 },
611 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618}
619
620impl fmt::Debug for GuestCallKind {
621 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622 match self {
623 Self::DeliverEvent { instance, set } => f
624 .debug_struct("DeliverEvent")
625 .field("instance", instance)
626 .field("set", set)
627 .finish(),
628 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630 }
631 }
632}
633
634#[derive(Copy, Clone, Debug)]
636pub enum SuspensionTarget {
637 SomeSuspended(u32),
638 Some(u32),
639 None,
640}
641
642impl SuspensionTarget {
643 fn is_none(&self) -> bool {
644 matches!(self, SuspensionTarget::None)
645 }
646 fn is_some(&self) -> bool {
647 !self.is_none()
648 }
649}
650
651#[derive(Debug)]
653struct GuestCall {
654 thread: QualifiedThreadId,
655 kind: GuestCallKind,
656}
657
658impl GuestCall {
659 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669 let instance = store
670 .concurrent_state_mut()
671 .get_mut(self.thread.task)?
672 .instance;
673 let state = store.instance_state(instance).concurrent_state();
674
675 let ready = match &self.kind {
676 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678 GuestCallKind::StartExplicit(_) => true,
679 };
680 log::trace!(
681 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682 state.do_not_enter,
683 state.backpressure
684 );
685 Ok(ready)
686 }
687}
688
689enum WorkerItem {
691 GuestCall(GuestCall),
692 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693}
694
695enum WorkItem {
698 PushFuture(AlwaysMut<HostTaskFuture>),
700 ResumeFiber(StoreFiber<'static>),
702 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708}
709
710impl fmt::Debug for WorkItem {
711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712 match self {
713 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715 Self::ResumeThread(instance, thread) => f
716 .debug_tuple("ResumeThread")
717 .field(instance)
718 .field(thread)
719 .finish(),
720 Self::GuestCall(instance, call) => f
721 .debug_tuple("GuestCall")
722 .field(instance)
723 .field(call)
724 .finish(),
725 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726 }
727 }
728}
729
730#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732pub(crate) enum WaitResult {
733 Cancelled,
734 Completed,
735}
736
737pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745 store: &mut dyn VMStore,
746 future: impl Future<Output = Result<R>> + Send + 'static,
747) -> Result<R> {
748 let state = store.concurrent_state_mut();
749 let task = state.current_host_thread()?;
750
751 let mut future = Box::pin(async move {
755 let result = future.await?;
756 tls::get(move |store| {
757 let state = store.concurrent_state_mut();
758 let host_state = &mut state.get_mut(task)?.state;
759 assert!(matches!(host_state, HostTaskState::CalleeStarted));
760 *host_state = HostTaskState::CalleeFinished(Box::new(result));
761
762 Waitable::Host(task).set_event(
763 state,
764 Some(Event::Subtask {
765 status: Status::Returned,
766 }),
767 )?;
768
769 Ok(())
770 })
771 }) as HostTaskFuture;
772
773 let poll = tls::set(store, || {
777 future
778 .as_mut()
779 .poll(&mut Context::from_waker(&Waker::noop()))
780 });
781
782 match poll {
783 Poll::Ready(result) => result?,
785
786 Poll::Pending => {
791 let state = store.concurrent_state_mut();
792 state.push_future(future);
793
794 let caller = state.get_mut(task)?.caller;
795 let set = state.get_mut(caller.thread)?.sync_call_set;
796 Waitable::Host(task).join(state, Some(set))?;
797
798 store.suspend(SuspendReason::Waiting {
799 set,
800 thread: caller,
801 skip_may_block_check: false,
802 })?;
803
804 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808 }
809 }
810
811 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
814 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
815 Ok(result) => *result,
816 Err(_) => bail_bug!("host task finished with wrong type of result"),
817 }),
818 _ => bail_bug!("unexpected host task state after completion"),
819 }
820}
821
822fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
824 let mut next = Some(call);
825 while let Some(call) = next.take() {
826 match call.kind {
827 GuestCallKind::DeliverEvent { instance, set } => {
828 let (event, waitable) =
829 match instance.get_event(store, call.thread.task, set, true)? {
830 Some(pair) => pair,
831 None => bail_bug!("delivering non-present event"),
832 };
833 let state = store.concurrent_state_mut();
834 let task = state.get_mut(call.thread.task)?;
835 let runtime_instance = task.instance;
836 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
837
838 log::trace!(
839 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
840 call.thread,
841 );
842
843 let old_thread = store.set_thread(call.thread)?;
844 log::trace!(
845 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
846 call.thread
847 );
848
849 store.enter_instance(runtime_instance);
850
851 let Some(callback) = store
852 .concurrent_state_mut()
853 .get_mut(call.thread.task)?
854 .callback
855 .take()
856 else {
857 bail_bug!("guest task callback field not present")
858 };
859
860 let code = callback(store, event, handle)?;
861
862 store
863 .concurrent_state_mut()
864 .get_mut(call.thread.task)?
865 .callback = Some(callback);
866
867 store.exit_instance(runtime_instance)?;
868
869 store.set_thread(old_thread)?;
870
871 next = instance.handle_callback_code(
872 store,
873 call.thread,
874 runtime_instance.index,
875 code,
876 )?;
877
878 log::trace!(
879 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
880 );
881 }
882 GuestCallKind::StartImplicit(fun) => {
883 next = fun(store)?;
884 }
885 GuestCallKind::StartExplicit(fun) => {
886 fun(store)?;
887 }
888 }
889 }
890
891 Ok(())
892}
893
894impl<T> Store<T> {
895 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
897 where
898 T: Send + 'static,
899 {
900 ensure!(
901 self.as_context().0.concurrency_support(),
902 "cannot use `run_concurrent` when Config::concurrency_support disabled",
903 );
904 self.as_context_mut().run_concurrent(fun).await
905 }
906
907 #[doc(hidden)]
908 pub fn assert_concurrent_state_empty(&mut self) {
909 self.as_context_mut().assert_concurrent_state_empty();
910 }
911
912 #[doc(hidden)]
913 pub fn concurrent_state_table_size(&mut self) -> usize {
914 self.as_context_mut().concurrent_state_table_size()
915 }
916
917 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
919 where
920 T: 'static,
921 {
922 self.as_context_mut().spawn(task)
923 }
924}
925
926impl<T> StoreContextMut<'_, T> {
927 #[doc(hidden)]
938 pub fn assert_concurrent_state_empty(self) {
939 let store = self.0;
940 store
941 .store_data_mut()
942 .components
943 .assert_instance_states_empty();
944 let state = store.concurrent_state_mut();
945 assert!(
946 state.table.get_mut().is_empty(),
947 "non-empty table: {:?}",
948 state.table.get_mut()
949 );
950 assert!(state.high_priority.is_empty());
951 assert!(state.low_priority.is_empty());
952 assert!(state.current_thread.is_none());
953 assert!(state.futures_mut().unwrap().is_empty());
954 assert!(state.global_error_context_ref_counts.is_empty());
955 }
956
957 #[doc(hidden)]
962 pub fn concurrent_state_table_size(&mut self) -> usize {
963 self.0
964 .concurrent_state_mut()
965 .table
966 .get_mut()
967 .iter_mut()
968 .count()
969 }
970
971 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
981 where
982 T: 'static,
983 {
984 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
985 self.spawn_with_accessor(accessor, task)
986 }
987
988 fn spawn_with_accessor<D>(
991 self,
992 accessor: Accessor<T, D>,
993 task: impl AccessorTask<T, D>,
994 ) -> JoinHandle
995 where
996 T: 'static,
997 D: HasData + ?Sized,
998 {
999 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1003 self.0
1004 .concurrent_state_mut()
1005 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1006 handle
1007 }
1008
1009 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1093 where
1094 T: Send + 'static,
1095 {
1096 ensure!(
1097 self.0.concurrency_support(),
1098 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1099 );
1100 self.do_run_concurrent(fun, false).await
1101 }
1102
1103 pub(super) async fn run_concurrent_trap_on_idle<R>(
1104 self,
1105 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1106 ) -> Result<R>
1107 where
1108 T: Send + 'static,
1109 {
1110 self.do_run_concurrent(fun, true).await
1111 }
1112
1113 async fn do_run_concurrent<R>(
1114 mut self,
1115 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1116 trap_on_idle: bool,
1117 ) -> Result<R>
1118 where
1119 T: Send + 'static,
1120 {
1121 debug_assert!(self.0.concurrency_support());
1122 check_recursive_run();
1123 let token = StoreToken::new(self.as_context_mut());
1124
1125 struct Dropper<'a, T: 'static, V> {
1126 store: StoreContextMut<'a, T>,
1127 value: ManuallyDrop<V>,
1128 }
1129
1130 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1131 fn drop(&mut self) {
1132 tls::set(self.store.0, || {
1133 unsafe { ManuallyDrop::drop(&mut self.value) }
1138 });
1139 }
1140 }
1141
1142 let accessor = &Accessor::new(token);
1143 let dropper = &mut Dropper {
1144 store: self,
1145 value: ManuallyDrop::new(fun(accessor)),
1146 };
1147 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1149
1150 dropper
1151 .store
1152 .as_context_mut()
1153 .poll_until(future, trap_on_idle)
1154 .await
1155 }
1156
1157 async fn poll_until<R>(
1163 mut self,
1164 mut future: Pin<&mut impl Future<Output = R>>,
1165 trap_on_idle: bool,
1166 ) -> Result<R>
1167 where
1168 T: Send + 'static,
1169 {
1170 struct Reset<'a, T: 'static> {
1171 store: StoreContextMut<'a, T>,
1172 futures: Option<FuturesUnordered<HostTaskFuture>>,
1173 }
1174
1175 impl<'a, T> Drop for Reset<'a, T> {
1176 fn drop(&mut self) {
1177 if let Some(futures) = self.futures.take() {
1178 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1179 }
1180 }
1181 }
1182
1183 loop {
1184 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1188 let mut reset = Reset {
1189 store: self.as_context_mut(),
1190 futures,
1191 };
1192 let mut next = match reset.futures.as_mut() {
1193 Some(f) => pin!(f.next()),
1194 None => bail_bug!("concurrent state missing futures field"),
1195 };
1196
1197 enum PollResult<R> {
1198 Complete(R),
1199 ProcessWork {
1200 ready: Vec<WorkItem>,
1201 low_priority: bool,
1202 },
1203 }
1204
1205 let result = future::poll_fn(|cx| {
1206 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1209 return Poll::Ready(Ok(PollResult::Complete(value)));
1210 }
1211
1212 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1216 Poll::Ready(Some(output)) => {
1217 match output {
1218 Err(e) => return Poll::Ready(Err(e)),
1219 Ok(()) => {}
1220 }
1221 Poll::Ready(true)
1222 }
1223 Poll::Ready(None) => Poll::Ready(false),
1224 Poll::Pending => Poll::Pending,
1225 };
1226
1227 let state = reset.store.0.concurrent_state_mut();
1231 let mut ready = mem::take(&mut state.high_priority);
1232 let mut low_priority = false;
1233 if ready.is_empty() {
1234 if let Some(item) = state.low_priority.pop_back() {
1235 ready.push(item);
1236 low_priority = true;
1237 }
1238 }
1239 if !ready.is_empty() {
1240 return Poll::Ready(Ok(PollResult::ProcessWork {
1241 ready,
1242 low_priority,
1243 }));
1244 }
1245
1246 return match next {
1250 Poll::Ready(true) => {
1251 Poll::Ready(Ok(PollResult::ProcessWork {
1257 ready: Vec::new(),
1258 low_priority: false,
1259 }))
1260 }
1261 Poll::Ready(false) => {
1262 if let Poll::Ready(value) =
1266 tls::set(reset.store.0, || future.as_mut().poll(cx))
1267 {
1268 Poll::Ready(Ok(PollResult::Complete(value)))
1269 } else {
1270 if trap_on_idle {
1276 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1279 } else {
1280 Poll::Pending
1284 }
1285 }
1286 }
1287 Poll::Pending => Poll::Pending,
1292 };
1293 })
1294 .await;
1295
1296 drop(reset);
1300
1301 match result? {
1302 PollResult::Complete(value) => break Ok(value),
1305 PollResult::ProcessWork {
1308 ready,
1309 low_priority,
1310 } => {
1311 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1312 store: StoreContextMut<'a, T>,
1313 ready: I,
1314 }
1315
1316 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1317 fn drop(&mut self) {
1318 while let Some(item) = self.ready.next() {
1319 match item {
1320 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1321 WorkItem::PushFuture(future) => {
1322 tls::set(self.store.0, move || drop(future))
1323 }
1324 _ => {}
1325 }
1326 }
1327 }
1328 }
1329
1330 let mut dispose = Dispose {
1331 store: self.as_context_mut(),
1332 ready: ready.into_iter(),
1333 };
1334
1335 if low_priority {
1357 dispose.store.0.yield_now().await
1358 }
1359
1360 while let Some(item) = dispose.ready.next() {
1361 dispose
1362 .store
1363 .as_context_mut()
1364 .handle_work_item(item)
1365 .await?;
1366 }
1367 }
1368 }
1369 }
1370 }
1371
1372 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1374 where
1375 T: Send,
1376 {
1377 log::trace!("handle work item {item:?}");
1378 match item {
1379 WorkItem::PushFuture(future) => {
1380 self.0
1381 .concurrent_state_mut()
1382 .futures_mut()?
1383 .push(future.into_inner());
1384 }
1385 WorkItem::ResumeFiber(fiber) => {
1386 self.0.resume_fiber(fiber).await?;
1387 }
1388 WorkItem::ResumeThread(_, thread) => {
1389 if let GuestThreadState::Ready(fiber) = mem::replace(
1390 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1391 GuestThreadState::Running,
1392 ) {
1393 self.0.resume_fiber(fiber).await?;
1394 } else {
1395 bail_bug!("cannot resume non-pending thread {thread:?}");
1396 }
1397 }
1398 WorkItem::GuestCall(_, call) => {
1399 if call.is_ready(self.0)? {
1400 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1401 } else {
1402 let state = self.0.concurrent_state_mut();
1403 let task = state.get_mut(call.thread.task)?;
1404 if !task.starting_sent {
1405 task.starting_sent = true;
1406 if let GuestCallKind::StartImplicit(_) = &call.kind {
1407 Waitable::Guest(call.thread.task).set_event(
1408 state,
1409 Some(Event::Subtask {
1410 status: Status::Starting,
1411 }),
1412 )?;
1413 }
1414 }
1415
1416 let instance = state.get_mut(call.thread.task)?.instance;
1417 self.0
1418 .instance_state(instance)
1419 .concurrent_state()
1420 .pending
1421 .insert(call.thread, call.kind);
1422 }
1423 }
1424 WorkItem::WorkerFunction(fun) => {
1425 self.run_on_worker(WorkerItem::Function(fun)).await?;
1426 }
1427 }
1428
1429 Ok(())
1430 }
1431
1432 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1434 where
1435 T: Send,
1436 {
1437 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1438 fiber
1439 } else {
1440 fiber::make_fiber(self.0, move |store| {
1441 loop {
1442 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1443 bail_bug!("worker_item not present when resuming fiber")
1444 };
1445 match item {
1446 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1447 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1448 }
1449
1450 store.suspend(SuspendReason::NeedWork)?;
1451 }
1452 })?
1453 };
1454
1455 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1456 assert!(worker_item.is_none());
1457 *worker_item = Some(item);
1458
1459 self.0.resume_fiber(worker).await
1460 }
1461
1462 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1467 where
1468 T: 'static,
1469 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1470 + Send
1471 + Sync
1472 + 'static,
1473 R: Send + Sync + 'static,
1474 {
1475 let token = StoreToken::new(self);
1476 async move {
1477 let mut accessor = Accessor::new(token);
1478 closure(&mut accessor).await
1479 }
1480 }
1481}
1482
1483impl StoreOpaque {
1484 pub(crate) fn enter_guest_sync_call(
1491 &mut self,
1492 guest_caller: Option<RuntimeInstance>,
1493 callee_async: bool,
1494 callee: RuntimeInstance,
1495 ) -> Result<()> {
1496 log::trace!("enter sync call {callee:?}");
1497 if !self.concurrency_support() {
1498 return self.enter_call_not_concurrent();
1499 }
1500
1501 let state = self.concurrent_state_mut();
1502 let thread = state.current_thread;
1503 let instance = if let Some(thread) = thread.guest() {
1504 Some(state.get_mut(thread.task)?.instance)
1505 } else {
1506 None
1507 };
1508 if guest_caller.is_some() {
1509 debug_assert_eq!(instance, guest_caller);
1510 }
1511 let task = GuestTask::new(
1512 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1513 LiftResult {
1514 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1515 ty: TypeTupleIndex::reserved_value(),
1516 memory: None,
1517 string_encoding: StringEncoding::Utf8,
1518 },
1519 if let Some(thread) = thread.guest() {
1520 Caller::Guest { thread: *thread }
1521 } else {
1522 Caller::Host {
1523 tx: None,
1524 host_future_present: false,
1525 caller: thread,
1526 }
1527 },
1528 None,
1529 callee,
1530 callee_async,
1531 )?;
1532
1533 let guest_task = state.push(task)?;
1534 let new_thread = GuestThread::new_implicit(state, guest_task)?;
1535 let guest_thread = state.push(new_thread)?;
1536 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1537 guest_thread,
1538 self,
1539 callee.index,
1540 )?;
1541
1542 let state = self.concurrent_state_mut();
1543 state.get_mut(guest_task)?.threads.insert(guest_thread);
1544
1545 self.set_thread(QualifiedThreadId {
1546 task: guest_task,
1547 thread: guest_thread,
1548 })?;
1549
1550 Ok(())
1551 }
1552
1553 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1555 if !self.concurrency_support() {
1556 return Ok(self.exit_call_not_concurrent());
1557 }
1558 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1559 Some(t) => *t,
1560 None => bail_bug!("expected task when exiting"),
1561 };
1562 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1563 log::trace!("exit sync call {instance:?}");
1564 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1565 self,
1566 thread,
1567 instance.index,
1568 )?;
1569
1570 let state = self.concurrent_state_mut();
1571 let task = state.get_mut(thread.task)?;
1572 let caller = match &task.caller {
1573 &Caller::Guest { thread } => thread.into(),
1574 &Caller::Host { caller, .. } => caller,
1575 };
1576 self.set_thread(caller)?;
1577
1578 let state = self.concurrent_state_mut();
1579 let task = state.get_mut(thread.task)?;
1580 if task.ready_to_delete() {
1581 state.delete(thread.task)?;
1582 }
1583
1584 Ok(())
1585 }
1586
1587 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1595 if !self.concurrency_support() {
1596 self.enter_call_not_concurrent()?;
1597 return Ok(None);
1598 }
1599 let state = self.concurrent_state_mut();
1600 let caller = state.current_guest_thread()?;
1601 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1602 log::trace!("new host task {task:?}");
1603 self.set_thread(task)?;
1604 Ok(Some(task))
1605 }
1606
1607 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1613 if !self.concurrency_support() {
1614 return Ok(());
1615 }
1616 let task = self.concurrent_state_mut().current_host_thread()?;
1617 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1618 self.set_thread(caller)?;
1619 Ok(())
1620 }
1621
1622 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1629 match task {
1630 Some(task) => {
1631 log::trace!("delete host task {task:?}");
1632 self.concurrent_state_mut().delete(task)?;
1633 }
1634 None => {
1635 self.exit_call_not_concurrent();
1636 }
1637 }
1638 Ok(())
1639 }
1640
1641 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1649 if self.trapped() {
1650 return Ok(false);
1651 }
1652 if !self.concurrency_support() {
1653 return Ok(true);
1654 }
1655 let state = self.concurrent_state_mut();
1656 let mut cur = state.current_thread;
1657 loop {
1658 match cur {
1659 CurrentThread::None => break Ok(true),
1660 CurrentThread::Guest(thread) => {
1661 let task = state.get_mut(thread.task)?;
1662
1663 if task.instance.instance == instance.instance {
1670 break Ok(false);
1671 }
1672 cur = match task.caller {
1673 Caller::Host { caller, .. } => caller,
1674 Caller::Guest { thread } => thread.into(),
1675 };
1676 }
1677 CurrentThread::Host(id) => {
1678 cur = state.get_mut(id)?.caller.into();
1679 }
1680 }
1681 }
1682 }
1683
1684 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1687 self.component_instance_mut(instance.instance)
1688 .instance_state(instance.index)
1689 }
1690
1691 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1697 let thread = thread.into();
1698 let state = self.concurrent_state_mut();
1699 let old_thread = mem::replace(&mut state.current_thread, thread);
1700
1701 if let Some(old_thread) = old_thread.guest() {
1709 let old_context = self.vm_store_context().component_context;
1710 self.concurrent_state_mut()
1711 .get_mut(old_thread.thread)?
1712 .context = old_context;
1713 }
1714 if cfg!(debug_assertions) {
1715 self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1716 }
1717 if let Some(thread) = thread.guest() {
1718 let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1719 let context = thread.context;
1720 if cfg!(debug_assertions) {
1721 thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1722 }
1723 self.vm_store_context_mut().component_context = context;
1724 }
1725
1726 let state = self.concurrent_state_mut();
1734 if let Some(old_thread) = old_thread.guest() {
1735 let instance = state.get_mut(old_thread.task)?.instance.instance;
1736 self.component_instance_mut(instance)
1737 .set_task_may_block(false)
1738 }
1739
1740 if thread.guest().is_some() {
1741 self.set_task_may_block()?;
1742 }
1743
1744 Ok(old_thread)
1745 }
1746
1747 fn set_task_may_block(&mut self) -> Result<()> {
1750 let state = self.concurrent_state_mut();
1751 let guest_thread = state.current_guest_thread()?;
1752 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1753 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1754 self.component_instance_mut(instance)
1755 .set_task_may_block(may_block);
1756 Ok(())
1757 }
1758
1759 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1760 if !self.concurrency_support() {
1761 return Ok(());
1762 }
1763 let state = self.concurrent_state_mut();
1764 let task = state.current_guest_thread()?.task;
1765 let instance = state.get_mut(task)?.instance.instance;
1766 let task_may_block = self.component_instance(instance).get_task_may_block();
1767
1768 if task_may_block {
1769 Ok(())
1770 } else {
1771 Err(Trap::CannotBlockSyncTask.into())
1772 }
1773 }
1774
1775 fn enter_instance(&mut self, instance: RuntimeInstance) {
1779 log::trace!("enter {instance:?}");
1780 self.instance_state(instance)
1781 .concurrent_state()
1782 .do_not_enter = true;
1783 }
1784
1785 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1789 log::trace!("exit {instance:?}");
1790 self.instance_state(instance)
1791 .concurrent_state()
1792 .do_not_enter = false;
1793 self.partition_pending(instance)
1794 }
1795
1796 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1801 for (thread, kind) in
1802 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1803 {
1804 let call = GuestCall { thread, kind };
1805 if call.is_ready(self)? {
1806 self.concurrent_state_mut()
1807 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1808 } else {
1809 self.instance_state(instance)
1810 .concurrent_state()
1811 .pending
1812 .insert(call.thread, call.kind);
1813 }
1814 }
1815
1816 Ok(())
1817 }
1818
1819 pub(crate) fn backpressure_modify(
1821 &mut self,
1822 caller_instance: RuntimeInstance,
1823 modify: impl FnOnce(u16) -> Option<u16>,
1824 ) -> Result<()> {
1825 let state = self.instance_state(caller_instance).concurrent_state();
1826 let old = state.backpressure;
1827 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1828 state.backpressure = new;
1829
1830 if old > 0 && new == 0 {
1831 self.partition_pending(caller_instance)?;
1834 }
1835
1836 Ok(())
1837 }
1838
1839 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1842 let old_thread = self.concurrent_state_mut().current_thread;
1843 log::trace!("resume_fiber: save current thread {old_thread:?}");
1844
1845 let fiber = fiber::resolve_or_release(self, fiber).await?;
1846
1847 self.set_thread(old_thread)?;
1848
1849 let state = self.concurrent_state_mut();
1850
1851 if let Some(ot) = old_thread.guest() {
1852 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1853 }
1854 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1855
1856 if let Some(mut fiber) = fiber {
1857 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1858 let reason = match state.suspend_reason.take() {
1860 Some(r) => r,
1861 None => bail_bug!("suspend reason missing when resuming fiber"),
1862 };
1863 match reason {
1864 SuspendReason::NeedWork => {
1865 if state.worker.is_none() {
1866 state.worker = Some(fiber);
1867 } else {
1868 fiber.dispose(self);
1869 }
1870 }
1871 SuspendReason::Yielding { thread, .. } => {
1872 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1873 let instance = state.get_mut(thread.task)?.instance.index;
1874 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1875 }
1876 SuspendReason::ExplicitlySuspending { thread, .. } => {
1877 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1878 }
1879 SuspendReason::Waiting { set, thread, .. } => {
1880 let old = state
1881 .get_mut(set)?
1882 .waiting
1883 .insert(thread, WaitMode::Fiber(fiber));
1884 assert!(old.is_none());
1885 }
1886 };
1887 } else {
1888 log::trace!("resume_fiber: fiber has exited");
1889 }
1890
1891 Ok(())
1892 }
1893
1894 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1900 log::trace!("suspend fiber: {reason:?}");
1901
1902 let task = match &reason {
1906 SuspendReason::Yielding { thread, .. }
1907 | SuspendReason::Waiting { thread, .. }
1908 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1909 SuspendReason::NeedWork => None,
1910 };
1911
1912 let old_guest_thread = if task.is_some() {
1913 self.concurrent_state_mut().current_thread
1914 } else {
1915 CurrentThread::None
1916 };
1917
1918 debug_assert!(
1924 matches!(
1925 reason,
1926 SuspendReason::ExplicitlySuspending {
1927 skip_may_block_check: true,
1928 ..
1929 } | SuspendReason::Waiting {
1930 skip_may_block_check: true,
1931 ..
1932 } | SuspendReason::Yielding {
1933 skip_may_block_check: true,
1934 ..
1935 }
1936 ) || old_guest_thread
1937 .guest()
1938 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1939 .transpose()?
1940 .unwrap_or(true)
1941 );
1942
1943 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1944 assert!(suspend_reason.is_none());
1945 *suspend_reason = Some(reason);
1946
1947 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1948
1949 if task.is_some() {
1950 self.set_thread(old_guest_thread)?;
1951 }
1952
1953 Ok(())
1954 }
1955
1956 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1957 let state = self.concurrent_state_mut();
1958 let caller = state.current_guest_thread()?;
1959 let old_set = waitable.common(state)?.set;
1960 let set = state.get_mut(caller.thread)?.sync_call_set;
1961 waitable.join(state, Some(set))?;
1962 self.suspend(SuspendReason::Waiting {
1963 set,
1964 thread: caller,
1965 skip_may_block_check: false,
1966 })?;
1967 let state = self.concurrent_state_mut();
1968 waitable.join(state, old_set)
1969 }
1970}
1971
1972impl Instance {
1973 fn get_event(
1976 self,
1977 store: &mut StoreOpaque,
1978 guest_task: TableId<GuestTask>,
1979 set: Option<TableId<WaitableSet>>,
1980 cancellable: bool,
1981 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1982 let state = store.concurrent_state_mut();
1983
1984 let event = &mut state.get_mut(guest_task)?.event;
1985 if let Some(ev) = event
1986 && (cancellable || !matches!(ev, Event::Cancelled))
1987 {
1988 log::trace!("deliver event {ev:?} to {guest_task:?}");
1989 let ev = *ev;
1990 *event = None;
1991 return Ok(Some((ev, None)));
1992 }
1993
1994 let set = match set {
1995 Some(set) => set,
1996 None => return Ok(None),
1997 };
1998 let waitable = match state.get_mut(set)?.ready.pop_first() {
1999 Some(v) => v,
2000 None => return Ok(None),
2001 };
2002
2003 let common = waitable.common(state)?;
2004 let handle = match common.handle {
2005 Some(h) => h,
2006 None => bail_bug!("handle not set when delivering event"),
2007 };
2008 let event = match common.event.take() {
2009 Some(e) => e,
2010 None => bail_bug!("event not set when delivering event"),
2011 };
2012
2013 log::trace!(
2014 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2015 );
2016
2017 waitable.on_delivery(store, self, event)?;
2018
2019 Ok(Some((event, Some((waitable, handle)))))
2020 }
2021
2022 fn handle_callback_code(
2028 self,
2029 store: &mut StoreOpaque,
2030 guest_thread: QualifiedThreadId,
2031 runtime_instance: RuntimeComponentInstanceIndex,
2032 code: u32,
2033 ) -> Result<Option<GuestCall>> {
2034 let (code, set) = unpack_callback_code(code);
2035
2036 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2037
2038 let state = store.concurrent_state_mut();
2039
2040 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2041 let set = store
2042 .instance_state(RuntimeInstance {
2043 instance: self.id().instance(),
2044 index: runtime_instance,
2045 })
2046 .handle_table()
2047 .waitable_set_rep(handle)?;
2048
2049 Ok(TableId::<WaitableSet>::new(set))
2050 };
2051
2052 Ok(match code {
2053 callback_code::EXIT => {
2054 log::trace!("implicit thread {guest_thread:?} completed");
2055 self.cleanup_thread(store, guest_thread, runtime_instance)?;
2056 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2057 if task.threads.is_empty() && !task.returned_or_cancelled() {
2058 bail!(Trap::NoAsyncResult);
2059 }
2060 if let Caller::Guest { .. } = task.caller {
2061 task.exited = true;
2062 task.callback = None;
2063 }
2064 if task.ready_to_delete() {
2065 Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
2066 }
2067 None
2068 }
2069 callback_code::YIELD => {
2070 let task = state.get_mut(guest_thread.task)?;
2071 if let Some(event) = task.event {
2076 assert!(matches!(event, Event::None | Event::Cancelled));
2077 } else {
2078 task.event = Some(Event::None);
2079 }
2080 let call = GuestCall {
2081 thread: guest_thread,
2082 kind: GuestCallKind::DeliverEvent {
2083 instance: self,
2084 set: None,
2085 },
2086 };
2087 if state.may_block(guest_thread.task)? {
2088 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2091 None
2092 } else {
2093 Some(call)
2097 }
2098 }
2099 callback_code::WAIT => {
2100 state.check_blocking_for(guest_thread.task)?;
2103
2104 let set = get_set(store, set)?;
2105 let state = store.concurrent_state_mut();
2106
2107 if state.get_mut(guest_thread.task)?.event.is_some()
2108 || !state.get_mut(set)?.ready.is_empty()
2109 {
2110 state.push_high_priority(WorkItem::GuestCall(
2112 runtime_instance,
2113 GuestCall {
2114 thread: guest_thread,
2115 kind: GuestCallKind::DeliverEvent {
2116 instance: self,
2117 set: Some(set),
2118 },
2119 },
2120 ));
2121 } else {
2122 let old = state
2130 .get_mut(guest_thread.thread)?
2131 .wake_on_cancel
2132 .replace(set);
2133 if !old.is_none() {
2134 bail_bug!("thread unexpectedly had wake_on_cancel set");
2135 }
2136 let old = state
2137 .get_mut(set)?
2138 .waiting
2139 .insert(guest_thread, WaitMode::Callback(self));
2140 if !old.is_none() {
2141 bail_bug!("set's waiting set already had this thread registered");
2142 }
2143 }
2144 None
2145 }
2146 _ => bail!(Trap::UnsupportedCallbackCode),
2147 })
2148 }
2149
2150 fn cleanup_thread(
2151 self,
2152 store: &mut StoreOpaque,
2153 guest_thread: QualifiedThreadId,
2154 runtime_instance: RuntimeComponentInstanceIndex,
2155 ) -> Result<()> {
2156 let state = store.concurrent_state_mut();
2157 let thread_data = state.get_mut(guest_thread.thread)?;
2158 let sync_call_set = thread_data.sync_call_set;
2159 if let Some(guest_id) = thread_data.instance_rep {
2160 store
2161 .instance_state(RuntimeInstance {
2162 instance: self.id().instance(),
2163 index: runtime_instance,
2164 })
2165 .thread_handle_table()
2166 .guest_thread_remove(guest_id)?;
2167 }
2168 let state = store.concurrent_state_mut();
2169
2170 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2172 if let Some(Event::Subtask {
2173 status: Status::Returned | Status::ReturnCancelled,
2174 }) = waitable.common(state)?.event
2175 {
2176 waitable.delete_from(state)?;
2177 }
2178 }
2179
2180 state.delete(guest_thread.thread)?;
2181 state.delete(sync_call_set)?;
2182 let task = state.get_mut(guest_thread.task)?;
2183 task.threads.remove(&guest_thread.thread);
2184 Ok(())
2185 }
2186
2187 unsafe fn queue_call<T: 'static>(
2194 self,
2195 mut store: StoreContextMut<T>,
2196 guest_thread: QualifiedThreadId,
2197 callee: SendSyncPtr<VMFuncRef>,
2198 param_count: usize,
2199 result_count: usize,
2200 async_: bool,
2201 callback: Option<SendSyncPtr<VMFuncRef>>,
2202 post_return: Option<SendSyncPtr<VMFuncRef>>,
2203 ) -> Result<()> {
2204 unsafe fn make_call<T: 'static>(
2219 store: StoreContextMut<T>,
2220 guest_thread: QualifiedThreadId,
2221 callee: SendSyncPtr<VMFuncRef>,
2222 param_count: usize,
2223 result_count: usize,
2224 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2225 + Send
2226 + Sync
2227 + 'static
2228 + use<T> {
2229 let token = StoreToken::new(store);
2230 move |store: &mut dyn VMStore| {
2231 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2232
2233 store
2234 .concurrent_state_mut()
2235 .get_mut(guest_thread.thread)?
2236 .state = GuestThreadState::Running;
2237 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2238 let lower = match task.lower_params.take() {
2239 Some(l) => l,
2240 None => bail_bug!("lower_params missing"),
2241 };
2242
2243 lower(store, &mut storage[..param_count])?;
2244
2245 let mut store = token.as_context_mut(store);
2246
2247 unsafe {
2250 crate::Func::call_unchecked_raw(
2251 &mut store,
2252 callee.as_non_null(),
2253 NonNull::new(
2254 &mut storage[..param_count.max(result_count)]
2255 as *mut [MaybeUninit<ValRaw>] as _,
2256 )
2257 .unwrap(),
2258 )?;
2259 }
2260
2261 Ok(storage)
2262 }
2263 }
2264
2265 let call = unsafe {
2269 make_call(
2270 store.as_context_mut(),
2271 guest_thread,
2272 callee,
2273 param_count,
2274 result_count,
2275 )
2276 };
2277
2278 let callee_instance = store
2279 .0
2280 .concurrent_state_mut()
2281 .get_mut(guest_thread.task)?
2282 .instance;
2283
2284 let fun = if callback.is_some() {
2285 assert!(async_);
2286
2287 Box::new(move |store: &mut dyn VMStore| {
2288 self.add_guest_thread_to_instance_table(
2289 guest_thread.thread,
2290 store,
2291 callee_instance.index,
2292 )?;
2293 let old_thread = store.set_thread(guest_thread)?;
2294 log::trace!(
2295 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2296 );
2297
2298 store.enter_instance(callee_instance);
2299
2300 let storage = call(store)?;
2307
2308 store.exit_instance(callee_instance)?;
2309
2310 store.set_thread(old_thread)?;
2311 let state = store.concurrent_state_mut();
2312 if let Some(t) = old_thread.guest() {
2313 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2314 }
2315 log::trace!("stackless call: restored {old_thread:?} as current thread");
2316
2317 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2320
2321 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2322 })
2323 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2324 } else {
2325 let token = StoreToken::new(store.as_context_mut());
2326 Box::new(move |store: &mut dyn VMStore| {
2327 self.add_guest_thread_to_instance_table(
2328 guest_thread.thread,
2329 store,
2330 callee_instance.index,
2331 )?;
2332 let old_thread = store.set_thread(guest_thread)?;
2333 log::trace!(
2334 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2335 );
2336 let flags = self.id().get(store).instance_flags(callee_instance.index);
2337
2338 if !async_ {
2342 store.enter_instance(callee_instance);
2343 }
2344
2345 let storage = call(store)?;
2352
2353 if async_ {
2354 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2355 if task.threads.len() == 1 && !task.returned_or_cancelled() {
2356 bail!(Trap::NoAsyncResult);
2357 }
2358 } else {
2359 let lift = {
2365 store.exit_instance(callee_instance)?;
2366
2367 let state = store.concurrent_state_mut();
2368 if !state.get_mut(guest_thread.task)?.result.is_none() {
2369 bail_bug!("task has already produced a result");
2370 }
2371
2372 match state.get_mut(guest_thread.task)?.lift_result.take() {
2373 Some(lift) => lift,
2374 None => bail_bug!("lift_result field is missing"),
2375 }
2376 };
2377
2378 let result = (lift.lift)(store, unsafe {
2381 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2382 &storage[..result_count],
2383 )
2384 })?;
2385
2386 let post_return_arg = match result_count {
2387 0 => ValRaw::i32(0),
2388 1 => unsafe { storage[0].assume_init() },
2391 _ => unreachable!(),
2392 };
2393
2394 unsafe {
2395 call_post_return(
2396 token.as_context_mut(store),
2397 post_return.map(|v| v.as_non_null()),
2398 post_return_arg,
2399 flags,
2400 )?;
2401 }
2402
2403 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2404 }
2405
2406 store.set_thread(old_thread)?;
2407
2408 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2410
2411 let state = store.concurrent_state_mut();
2412 let task = state.get_mut(guest_thread.task)?;
2413
2414 match &task.caller {
2415 Caller::Host { .. } => {
2416 if task.ready_to_delete() {
2417 Waitable::Guest(guest_thread.task).delete_from(state)?;
2418 }
2419 }
2420 Caller::Guest { .. } => {
2421 task.exited = true;
2422 }
2423 }
2424
2425 Ok(None)
2426 })
2427 };
2428
2429 store
2430 .0
2431 .concurrent_state_mut()
2432 .push_high_priority(WorkItem::GuestCall(
2433 callee_instance.index,
2434 GuestCall {
2435 thread: guest_thread,
2436 kind: GuestCallKind::StartImplicit(fun),
2437 },
2438 ));
2439
2440 Ok(())
2441 }
2442
2443 unsafe fn prepare_call<T: 'static>(
2456 self,
2457 mut store: StoreContextMut<T>,
2458 start: NonNull<VMFuncRef>,
2459 return_: NonNull<VMFuncRef>,
2460 caller_instance: RuntimeComponentInstanceIndex,
2461 callee_instance: RuntimeComponentInstanceIndex,
2462 task_return_type: TypeTupleIndex,
2463 callee_async: bool,
2464 memory: *mut VMMemoryDefinition,
2465 string_encoding: StringEncoding,
2466 caller_info: CallerInfo,
2467 ) -> Result<()> {
2468 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2469 store.0.check_blocking()?;
2473 }
2474
2475 enum ResultInfo {
2476 Heap { results: u32 },
2477 Stack { result_count: u32 },
2478 }
2479
2480 let result_info = match &caller_info {
2481 CallerInfo::Async {
2482 has_result: true,
2483 params,
2484 } => ResultInfo::Heap {
2485 results: match params.last() {
2486 Some(r) => r.get_u32(),
2487 None => bail_bug!("retptr missing"),
2488 },
2489 },
2490 CallerInfo::Async {
2491 has_result: false, ..
2492 } => ResultInfo::Stack { result_count: 0 },
2493 CallerInfo::Sync {
2494 result_count,
2495 params,
2496 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2497 results: match params.last() {
2498 Some(r) => r.get_u32(),
2499 None => bail_bug!("arg ptr missing"),
2500 },
2501 },
2502 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2503 result_count: *result_count,
2504 },
2505 };
2506
2507 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2508
2509 let start = SendSyncPtr::new(start);
2513 let return_ = SendSyncPtr::new(return_);
2514 let token = StoreToken::new(store.as_context_mut());
2515 let state = store.0.concurrent_state_mut();
2516 let old_thread = state.current_guest_thread()?;
2517
2518 debug_assert_eq!(
2519 state.get_mut(old_thread.task)?.instance,
2520 RuntimeInstance {
2521 instance: self.id().instance(),
2522 index: caller_instance,
2523 }
2524 );
2525
2526 let new_task = GuestTask::new(
2527 Box::new(move |store, dst| {
2528 let mut store = token.as_context_mut(store);
2529 assert!(dst.len() <= MAX_FLAT_PARAMS);
2530 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2532 let count = match caller_info {
2533 CallerInfo::Async { params, has_result } => {
2537 let params = ¶ms[..params.len() - usize::from(has_result)];
2538 for (param, src) in params.iter().zip(&mut src) {
2539 src.write(*param);
2540 }
2541 params.len()
2542 }
2543
2544 CallerInfo::Sync { params, .. } => {
2546 for (param, src) in params.iter().zip(&mut src) {
2547 src.write(*param);
2548 }
2549 params.len()
2550 }
2551 };
2552 unsafe {
2559 crate::Func::call_unchecked_raw(
2560 &mut store,
2561 start.as_non_null(),
2562 NonNull::new(
2563 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2564 )
2565 .unwrap(),
2566 )?;
2567 }
2568 dst.copy_from_slice(&src[..dst.len()]);
2569 let state = store.0.concurrent_state_mut();
2570 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2571 state,
2572 Some(Event::Subtask {
2573 status: Status::Started,
2574 }),
2575 )?;
2576 Ok(())
2577 }),
2578 LiftResult {
2579 lift: Box::new(move |store, src| {
2580 let mut store = token.as_context_mut(store);
2583 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2585 my_src.push(ValRaw::u32(*results));
2586 }
2587
2588 let prev = store.0.set_thread(old_thread)?;
2594
2595 unsafe {
2602 crate::Func::call_unchecked_raw(
2603 &mut store,
2604 return_.as_non_null(),
2605 my_src.as_mut_slice().into(),
2606 )?;
2607 }
2608
2609 store.0.set_thread(prev)?;
2612
2613 let state = store.0.concurrent_state_mut();
2614 let thread = state.current_guest_thread()?;
2615 if sync_caller {
2616 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2617 if let ResultInfo::Stack { result_count } = &result_info {
2618 match result_count {
2619 0 => None,
2620 1 => Some(my_src[0]),
2621 _ => unreachable!(),
2622 }
2623 } else {
2624 None
2625 },
2626 );
2627 }
2628 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2629 }),
2630 ty: task_return_type,
2631 memory: NonNull::new(memory).map(SendSyncPtr::new),
2632 string_encoding,
2633 },
2634 Caller::Guest { thread: old_thread },
2635 None,
2636 RuntimeInstance {
2637 instance: self.id().instance(),
2638 index: callee_instance,
2639 },
2640 callee_async,
2641 )?;
2642
2643 let guest_task = state.push(new_task)?;
2644 let new_thread = GuestThread::new_implicit(state, guest_task)?;
2645 let guest_thread = state.push(new_thread)?;
2646 state.get_mut(guest_task)?.threads.insert(guest_thread);
2647
2648 store.0.set_thread(QualifiedThreadId {
2651 task: guest_task,
2652 thread: guest_thread,
2653 })?;
2654 log::trace!(
2655 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2656 );
2657
2658 Ok(())
2659 }
2660
2661 unsafe fn call_callback<T>(
2666 self,
2667 mut store: StoreContextMut<T>,
2668 function: SendSyncPtr<VMFuncRef>,
2669 event: Event,
2670 handle: u32,
2671 ) -> Result<u32> {
2672 let (ordinal, result) = event.parts();
2673 let params = &mut [
2674 ValRaw::u32(ordinal),
2675 ValRaw::u32(handle),
2676 ValRaw::u32(result),
2677 ];
2678 unsafe {
2683 crate::Func::call_unchecked_raw(
2684 &mut store,
2685 function.as_non_null(),
2686 params.as_mut_slice().into(),
2687 )?;
2688 }
2689 Ok(params[0].get_u32())
2690 }
2691
2692 unsafe fn start_call<T: 'static>(
2705 self,
2706 mut store: StoreContextMut<T>,
2707 callback: *mut VMFuncRef,
2708 post_return: *mut VMFuncRef,
2709 callee: NonNull<VMFuncRef>,
2710 param_count: u32,
2711 result_count: u32,
2712 flags: u32,
2713 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2714 ) -> Result<u32> {
2715 let token = StoreToken::new(store.as_context_mut());
2716 let async_caller = storage.is_none();
2717 let state = store.0.concurrent_state_mut();
2718 let guest_thread = state.current_guest_thread()?;
2719 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2720 let callee = SendSyncPtr::new(callee);
2721 let param_count = usize::try_from(param_count)?;
2722 assert!(param_count <= MAX_FLAT_PARAMS);
2723 let result_count = usize::try_from(result_count)?;
2724 assert!(result_count <= MAX_FLAT_RESULTS);
2725
2726 let task = state.get_mut(guest_thread.task)?;
2727 if let Some(callback) = NonNull::new(callback) {
2728 let callback = SendSyncPtr::new(callback);
2732 task.callback = Some(Box::new(move |store, event, handle| {
2733 let store = token.as_context_mut(store);
2734 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2735 }));
2736 }
2737
2738 let Caller::Guest { thread: caller } = &task.caller else {
2739 bail_bug!("start_call unexpectedly invoked for host->guest call");
2742 };
2743 let caller = *caller;
2744 let caller_instance = state.get_mut(caller.task)?.instance;
2745
2746 unsafe {
2748 self.queue_call(
2749 store.as_context_mut(),
2750 guest_thread,
2751 callee,
2752 param_count,
2753 result_count,
2754 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2755 NonNull::new(callback).map(SendSyncPtr::new),
2756 NonNull::new(post_return).map(SendSyncPtr::new),
2757 )?;
2758 }
2759
2760 let state = store.0.concurrent_state_mut();
2761
2762 let guest_waitable = Waitable::Guest(guest_thread.task);
2765 let old_set = guest_waitable.common(state)?.set;
2766 let set = state.get_mut(caller.thread)?.sync_call_set;
2767 guest_waitable.join(state, Some(set))?;
2768
2769 store.0.set_thread(CurrentThread::None)?;
2770
2771 let (status, waitable) = loop {
2787 store.0.suspend(SuspendReason::Waiting {
2788 set,
2789 thread: caller,
2790 skip_may_block_check: async_caller || !callee_async,
2798 })?;
2799
2800 let state = store.0.concurrent_state_mut();
2801
2802 log::trace!("taking event for {:?}", guest_thread.task);
2803 let event = guest_waitable.take_event(state)?;
2804 let Some(Event::Subtask { status }) = event else {
2805 bail_bug!("subtasks should only get subtask events, got {event:?}")
2806 };
2807
2808 log::trace!("status {status:?} for {:?}", guest_thread.task);
2809
2810 if status == Status::Returned {
2811 break (status, None);
2813 } else if async_caller {
2814 let handle = store
2818 .0
2819 .instance_state(caller_instance)
2820 .handle_table()
2821 .subtask_insert_guest(guest_thread.task.rep())?;
2822 store
2823 .0
2824 .concurrent_state_mut()
2825 .get_mut(guest_thread.task)?
2826 .common
2827 .handle = Some(handle);
2828 break (status, Some(handle));
2829 } else {
2830 }
2834 };
2835
2836 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2837
2838 store.0.set_thread(caller)?;
2840 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2841 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2842
2843 if let Some(storage) = storage {
2844 let state = store.0.concurrent_state_mut();
2848 let task = state.get_mut(guest_thread.task)?;
2849 if let Some(result) = task.sync_result.take()? {
2850 if let Some(result) = result {
2851 storage[0] = MaybeUninit::new(result);
2852 }
2853
2854 if task.exited && task.ready_to_delete() {
2855 Waitable::Guest(guest_thread.task).delete_from(state)?;
2856 }
2857 }
2858 }
2859
2860 Ok(status.pack(waitable))
2861 }
2862
2863 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2876 self,
2877 mut store: StoreContextMut<'_, T>,
2878 future: impl Future<Output = Result<R>> + Send + 'static,
2879 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2880 ) -> Result<Option<u32>> {
2881 let token = StoreToken::new(store.as_context_mut());
2882 let state = store.0.concurrent_state_mut();
2883 let task = state.current_host_thread()?;
2884
2885 let (join_handle, future) = JoinHandle::run(future);
2888 {
2889 let state = &mut state.get_mut(task)?.state;
2890 assert!(matches!(state, HostTaskState::CalleeStarted));
2891 *state = HostTaskState::CalleeRunning(join_handle);
2892 }
2893
2894 let mut future = Box::pin(future);
2895
2896 let poll = tls::set(store.0, || {
2901 future
2902 .as_mut()
2903 .poll(&mut Context::from_waker(&Waker::noop()))
2904 });
2905
2906 match poll {
2907 Poll::Ready(result) => {
2909 let result = result.transpose()?;
2910 lower(store.as_context_mut(), result)?;
2911 return Ok(None);
2912 }
2913
2914 Poll::Pending => {}
2916 }
2917
2918 let future = Box::pin(async move {
2926 let result = match future.await {
2927 Some(result) => Some(result?),
2928 None => None,
2929 };
2930 let on_complete = move |store: &mut dyn VMStore| {
2931 let mut store = token.as_context_mut(store);
2935 let old = store.0.set_thread(task)?;
2936
2937 let status = if result.is_some() {
2938 Status::Returned
2939 } else {
2940 Status::ReturnCancelled
2941 };
2942
2943 lower(store.as_context_mut(), result)?;
2944 let state = store.0.concurrent_state_mut();
2945 match &mut state.get_mut(task)?.state {
2946 HostTaskState::CalleeDone { .. } => {}
2949
2950 other => *other = HostTaskState::CalleeDone { cancelled: false },
2952 }
2953 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2954
2955 store.0.set_thread(old)?;
2956 Ok(())
2957 };
2958
2959 tls::get(move |store| {
2964 store
2965 .concurrent_state_mut()
2966 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2967 on_complete,
2968 ))));
2969 Ok(())
2970 })
2971 });
2972
2973 let state = store.0.concurrent_state_mut();
2976 state.push_future(future);
2977 let caller = state.get_mut(task)?.caller;
2978 let instance = state.get_mut(caller.task)?.instance;
2979 let handle = store
2980 .0
2981 .instance_state(instance)
2982 .handle_table()
2983 .subtask_insert_host(task.rep())?;
2984 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2985 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2986
2987 store.0.set_thread(caller)?;
2991 Ok(Some(handle))
2992 }
2993
2994 pub(crate) fn task_return(
2997 self,
2998 store: &mut dyn VMStore,
2999 ty: TypeTupleIndex,
3000 options: OptionsIndex,
3001 storage: &[ValRaw],
3002 ) -> Result<()> {
3003 let state = store.concurrent_state_mut();
3004 let guest_thread = state.current_guest_thread()?;
3005 let lift = state
3006 .get_mut(guest_thread.task)?
3007 .lift_result
3008 .take()
3009 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3010 if !state.get_mut(guest_thread.task)?.result.is_none() {
3011 bail_bug!("task result unexpectedly already set");
3012 }
3013
3014 let CanonicalOptions {
3015 string_encoding,
3016 data_model,
3017 ..
3018 } = &self.id().get(store).component().env_component().options[options];
3019
3020 let invalid = ty != lift.ty
3021 || string_encoding != &lift.string_encoding
3022 || match data_model {
3023 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3024 Some(memory) => {
3025 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3026 let actual = self.id().get(store).runtime_memory(memory);
3027 expected != actual.as_ptr()
3028 }
3029 None => false,
3032 },
3033 CanonicalOptionsDataModel::Gc { .. } => true,
3035 };
3036
3037 if invalid {
3038 bail!(Trap::TaskReturnInvalid);
3039 }
3040
3041 log::trace!("task.return for {guest_thread:?}");
3042
3043 let result = (lift.lift)(store, storage)?;
3044 self.task_complete(store, guest_thread.task, result, Status::Returned)
3045 }
3046
3047 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3049 let state = store.concurrent_state_mut();
3050 let guest_thread = state.current_guest_thread()?;
3051 let task = state.get_mut(guest_thread.task)?;
3052 if !task.cancel_sent {
3053 bail!(Trap::TaskCancelNotCancelled);
3054 }
3055 _ = task
3056 .lift_result
3057 .take()
3058 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3059
3060 if !task.result.is_none() {
3061 bail_bug!("task result should not bet set yet");
3062 }
3063
3064 log::trace!("task.cancel for {guest_thread:?}");
3065
3066 self.task_complete(
3067 store,
3068 guest_thread.task,
3069 Box::new(DummyResult),
3070 Status::ReturnCancelled,
3071 )
3072 }
3073
3074 fn task_complete(
3080 self,
3081 store: &mut StoreOpaque,
3082 guest_task: TableId<GuestTask>,
3083 result: Box<dyn Any + Send + Sync>,
3084 status: Status,
3085 ) -> Result<()> {
3086 store
3087 .component_resource_tables(Some(self))
3088 .validate_scope_exit()?;
3089
3090 let state = store.concurrent_state_mut();
3091 let task = state.get_mut(guest_task)?;
3092
3093 if let Caller::Host { tx, .. } = &mut task.caller {
3094 if let Some(tx) = tx.take() {
3095 _ = tx.send(result);
3096 }
3097 } else {
3098 task.result = Some(result);
3099 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3100 }
3101
3102 Ok(())
3103 }
3104
3105 pub(crate) fn waitable_set_new(
3107 self,
3108 store: &mut StoreOpaque,
3109 caller_instance: RuntimeComponentInstanceIndex,
3110 ) -> Result<u32> {
3111 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3112 let handle = store
3113 .instance_state(RuntimeInstance {
3114 instance: self.id().instance(),
3115 index: caller_instance,
3116 })
3117 .handle_table()
3118 .waitable_set_insert(set.rep())?;
3119 log::trace!("new waitable set {set:?} (handle {handle})");
3120 Ok(handle)
3121 }
3122
3123 pub(crate) fn waitable_set_drop(
3125 self,
3126 store: &mut StoreOpaque,
3127 caller_instance: RuntimeComponentInstanceIndex,
3128 set: u32,
3129 ) -> Result<()> {
3130 let rep = store
3131 .instance_state(RuntimeInstance {
3132 instance: self.id().instance(),
3133 index: caller_instance,
3134 })
3135 .handle_table()
3136 .waitable_set_remove(set)?;
3137
3138 log::trace!("drop waitable set {rep} (handle {set})");
3139
3140 if !store
3144 .concurrent_state_mut()
3145 .get_mut(TableId::<WaitableSet>::new(rep))?
3146 .waiting
3147 .is_empty()
3148 {
3149 bail!(Trap::WaitableSetDropHasWaiters);
3150 }
3151
3152 store
3153 .concurrent_state_mut()
3154 .delete(TableId::<WaitableSet>::new(rep))?;
3155
3156 Ok(())
3157 }
3158
3159 pub(crate) fn waitable_join(
3161 self,
3162 store: &mut StoreOpaque,
3163 caller_instance: RuntimeComponentInstanceIndex,
3164 waitable_handle: u32,
3165 set_handle: u32,
3166 ) -> Result<()> {
3167 let mut instance = self.id().get_mut(store);
3168 let waitable =
3169 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3170
3171 let set = if set_handle == 0 {
3172 None
3173 } else {
3174 let set = instance.instance_states().0[caller_instance]
3175 .handle_table()
3176 .waitable_set_rep(set_handle)?;
3177
3178 Some(TableId::<WaitableSet>::new(set))
3179 };
3180
3181 log::trace!(
3182 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3183 );
3184
3185 waitable.join(store.concurrent_state_mut(), set)
3186 }
3187
3188 pub(crate) fn subtask_drop(
3190 self,
3191 store: &mut StoreOpaque,
3192 caller_instance: RuntimeComponentInstanceIndex,
3193 task_id: u32,
3194 ) -> Result<()> {
3195 self.waitable_join(store, caller_instance, task_id, 0)?;
3196
3197 let (rep, is_host) = store
3198 .instance_state(RuntimeInstance {
3199 instance: self.id().instance(),
3200 index: caller_instance,
3201 })
3202 .handle_table()
3203 .subtask_remove(task_id)?;
3204
3205 let concurrent_state = store.concurrent_state_mut();
3206 let (waitable, delete) = if is_host {
3207 let id = TableId::<HostTask>::new(rep);
3208 let task = concurrent_state.get_mut(id)?;
3209 match &task.state {
3210 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3211 HostTaskState::CalleeDone { .. } => {}
3212 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3213 bail_bug!("invalid state for callee in `subtask.drop`")
3214 }
3215 }
3216 (Waitable::Host(id), true)
3217 } else {
3218 let id = TableId::<GuestTask>::new(rep);
3219 let task = concurrent_state.get_mut(id)?;
3220 if task.lift_result.is_some() {
3221 bail!(Trap::SubtaskDropNotResolved);
3222 }
3223 (
3224 Waitable::Guest(id),
3225 concurrent_state.get_mut(id)?.ready_to_delete(),
3226 )
3227 };
3228
3229 waitable.common(concurrent_state)?.handle = None;
3230
3231 if waitable.take_event(concurrent_state)?.is_some() {
3234 bail!(Trap::SubtaskDropNotResolved);
3235 }
3236
3237 if delete {
3238 waitable.delete_from(concurrent_state)?;
3239 }
3240
3241 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3242 Ok(())
3243 }
3244
3245 pub(crate) fn waitable_set_wait(
3247 self,
3248 store: &mut StoreOpaque,
3249 options: OptionsIndex,
3250 set: u32,
3251 payload: u32,
3252 ) -> Result<u32> {
3253 if !self.options(store, options).async_ {
3254 store.check_blocking()?;
3258 }
3259
3260 let &CanonicalOptions {
3261 cancellable,
3262 instance: caller_instance,
3263 ..
3264 } = &self.id().get(store).component().env_component().options[options];
3265 let rep = store
3266 .instance_state(RuntimeInstance {
3267 instance: self.id().instance(),
3268 index: caller_instance,
3269 })
3270 .handle_table()
3271 .waitable_set_rep(set)?;
3272
3273 self.waitable_check(
3274 store,
3275 cancellable,
3276 WaitableCheck::Wait,
3277 WaitableCheckParams {
3278 set: TableId::new(rep),
3279 options,
3280 payload,
3281 },
3282 )
3283 }
3284
3285 pub(crate) fn waitable_set_poll(
3287 self,
3288 store: &mut StoreOpaque,
3289 options: OptionsIndex,
3290 set: u32,
3291 payload: u32,
3292 ) -> Result<u32> {
3293 let &CanonicalOptions {
3294 cancellable,
3295 instance: caller_instance,
3296 ..
3297 } = &self.id().get(store).component().env_component().options[options];
3298 let rep = store
3299 .instance_state(RuntimeInstance {
3300 instance: self.id().instance(),
3301 index: caller_instance,
3302 })
3303 .handle_table()
3304 .waitable_set_rep(set)?;
3305
3306 self.waitable_check(
3307 store,
3308 cancellable,
3309 WaitableCheck::Poll,
3310 WaitableCheckParams {
3311 set: TableId::new(rep),
3312 options,
3313 payload,
3314 },
3315 )
3316 }
3317
3318 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3320 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3321 match store
3322 .concurrent_state_mut()
3323 .get_mut(thread_id)?
3324 .instance_rep
3325 {
3326 Some(r) => Ok(r),
3327 None => bail_bug!("thread should have instance_rep by now"),
3328 }
3329 }
3330
3331 pub(crate) fn thread_new_indirect<T: 'static>(
3333 self,
3334 mut store: StoreContextMut<T>,
3335 runtime_instance: RuntimeComponentInstanceIndex,
3336 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3338 start_func_idx: u32,
3339 context: i32,
3340 ) -> Result<u32> {
3341 log::trace!("creating new thread");
3342
3343 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3344 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3345 let callee = instance
3346 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3347 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3348 if callee.type_index(store.0) != start_func_ty.type_index() {
3349 bail!(Trap::ThreadNewIndirectInvalidType);
3350 }
3351
3352 let token = StoreToken::new(store.as_context_mut());
3353 let start_func = Box::new(
3354 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3355 let old_thread = store.set_thread(guest_thread)?;
3356 log::trace!(
3357 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3358 );
3359
3360 let mut store = token.as_context_mut(store);
3361 let mut params = [ValRaw::i32(context)];
3362 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3365
3366 store.0.set_thread(old_thread)?;
3367
3368 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3369 log::trace!("explicit thread {guest_thread:?} completed");
3370 let state = store.0.concurrent_state_mut();
3371 let task = state.get_mut(guest_thread.task)?;
3372 if task.threads.is_empty() && !task.returned_or_cancelled() {
3373 bail!(Trap::NoAsyncResult);
3374 }
3375 let state = store.0.concurrent_state_mut();
3376 if let Some(t) = old_thread.guest() {
3377 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3378 }
3379 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3380 Waitable::Guest(guest_thread.task).delete_from(state)?;
3381 }
3382 log::trace!("thread start: restored {old_thread:?} as current thread");
3383
3384 Ok(())
3385 },
3386 );
3387
3388 let state = store.0.concurrent_state_mut();
3389 let current_thread = state.current_guest_thread()?;
3390 let parent_task = current_thread.task;
3391
3392 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3393 let thread_id = state.push(new_thread)?;
3394 state.get_mut(parent_task)?.threads.insert(thread_id);
3395
3396 log::trace!("new thread with id {thread_id:?} created");
3397
3398 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3399 }
3400
3401 pub(crate) fn resume_thread(
3402 self,
3403 store: &mut StoreOpaque,
3404 runtime_instance: RuntimeComponentInstanceIndex,
3405 thread_idx: u32,
3406 high_priority: bool,
3407 allow_ready: bool,
3408 ) -> Result<()> {
3409 let thread_id =
3410 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3411 let state = store.concurrent_state_mut();
3412 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3413 let thread = state.get_mut(guest_thread.thread)?;
3414
3415 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3416 GuestThreadState::NotStartedExplicit(start_func) => {
3417 log::trace!("starting thread {guest_thread:?}");
3418 let guest_call = WorkItem::GuestCall(
3419 runtime_instance,
3420 GuestCall {
3421 thread: guest_thread,
3422 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3423 start_func(store, guest_thread)
3424 })),
3425 },
3426 );
3427 store
3428 .concurrent_state_mut()
3429 .push_work_item(guest_call, high_priority);
3430 }
3431 GuestThreadState::Suspended(fiber) => {
3432 log::trace!("resuming thread {thread_id:?} that was suspended");
3433 store
3434 .concurrent_state_mut()
3435 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3436 }
3437 GuestThreadState::Ready(fiber) if allow_ready => {
3438 log::trace!("resuming thread {thread_id:?} that was ready");
3439 thread.state = GuestThreadState::Ready(fiber);
3440 store
3441 .concurrent_state_mut()
3442 .promote_thread_work_item(guest_thread);
3443 }
3444 other => {
3445 thread.state = other;
3446 bail!(Trap::CannotResumeThread);
3447 }
3448 }
3449 Ok(())
3450 }
3451
3452 fn add_guest_thread_to_instance_table(
3453 self,
3454 thread_id: TableId<GuestThread>,
3455 store: &mut StoreOpaque,
3456 runtime_instance: RuntimeComponentInstanceIndex,
3457 ) -> Result<u32> {
3458 let guest_id = store
3459 .instance_state(RuntimeInstance {
3460 instance: self.id().instance(),
3461 index: runtime_instance,
3462 })
3463 .thread_handle_table()
3464 .guest_thread_insert(thread_id.rep())?;
3465 store
3466 .concurrent_state_mut()
3467 .get_mut(thread_id)?
3468 .instance_rep = Some(guest_id);
3469 Ok(guest_id)
3470 }
3471
3472 pub(crate) fn suspension_intrinsic(
3475 self,
3476 store: &mut StoreOpaque,
3477 caller: RuntimeComponentInstanceIndex,
3478 cancellable: bool,
3479 yielding: bool,
3480 to_thread: SuspensionTarget,
3481 ) -> Result<WaitResult> {
3482 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3483 if to_thread.is_none() {
3484 let state = store.concurrent_state_mut();
3485 if yielding {
3486 if !state.may_block(guest_thread.task)? {
3488 if !state.promote_instance_local_thread_work_item(caller) {
3491 return Ok(WaitResult::Completed);
3493 }
3494 }
3495 } else {
3496 store.check_blocking()?;
3500 }
3501 }
3502
3503 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3505 return Ok(WaitResult::Cancelled);
3506 }
3507
3508 match to_thread {
3509 SuspensionTarget::SomeSuspended(thread) => {
3510 self.resume_thread(store, caller, thread, true, false)?
3511 }
3512 SuspensionTarget::Some(thread) => {
3513 self.resume_thread(store, caller, thread, true, true)?
3514 }
3515 SuspensionTarget::None => { }
3516 }
3517
3518 let reason = if yielding {
3519 SuspendReason::Yielding {
3520 thread: guest_thread,
3521 skip_may_block_check: to_thread.is_some(),
3525 }
3526 } else {
3527 SuspendReason::ExplicitlySuspending {
3528 thread: guest_thread,
3529 skip_may_block_check: to_thread.is_some(),
3533 }
3534 };
3535
3536 store.suspend(reason)?;
3537
3538 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3539 Ok(WaitResult::Cancelled)
3540 } else {
3541 Ok(WaitResult::Completed)
3542 }
3543 }
3544
3545 fn waitable_check(
3547 self,
3548 store: &mut StoreOpaque,
3549 cancellable: bool,
3550 check: WaitableCheck,
3551 params: WaitableCheckParams,
3552 ) -> Result<u32> {
3553 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3554
3555 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3556
3557 let state = store.concurrent_state_mut();
3558 let task = state.get_mut(guest_thread.task)?;
3559
3560 match &check {
3563 WaitableCheck::Wait => {
3564 let set = params.set;
3565
3566 if (task.event.is_none()
3567 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3568 && state.get_mut(set)?.ready.is_empty()
3569 {
3570 if cancellable {
3571 let old = state
3572 .get_mut(guest_thread.thread)?
3573 .wake_on_cancel
3574 .replace(set);
3575 if !old.is_none() {
3576 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3577 }
3578 }
3579
3580 store.suspend(SuspendReason::Waiting {
3581 set,
3582 thread: guest_thread,
3583 skip_may_block_check: false,
3584 })?;
3585 }
3586 }
3587 WaitableCheck::Poll => {}
3588 }
3589
3590 log::trace!(
3591 "waitable check for {guest_thread:?}; set {:?}, part two",
3592 params.set
3593 );
3594
3595 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3597
3598 let (ordinal, handle, result) = match &check {
3599 WaitableCheck::Wait => {
3600 let (event, waitable) = match event {
3601 Some(p) => p,
3602 None => bail_bug!("event expected to be present"),
3603 };
3604 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3605 let (ordinal, result) = event.parts();
3606 (ordinal, handle, result)
3607 }
3608 WaitableCheck::Poll => {
3609 if let Some((event, waitable)) = event {
3610 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3611 let (ordinal, result) = event.parts();
3612 (ordinal, handle, result)
3613 } else {
3614 log::trace!(
3615 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3616 guest_thread.task,
3617 params.set
3618 );
3619 let (ordinal, result) = Event::None.parts();
3620 (ordinal, 0, result)
3621 }
3622 }
3623 };
3624 let memory = self.options_memory_mut(store, params.options);
3625 let ptr = func::validate_inbounds_dynamic(
3626 &CanonicalAbiInfo::POINTER_PAIR,
3627 memory,
3628 &ValRaw::u32(params.payload),
3629 )?;
3630 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3631 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3632 Ok(ordinal)
3633 }
3634
3635 pub(crate) fn subtask_cancel(
3637 self,
3638 store: &mut StoreOpaque,
3639 caller_instance: RuntimeComponentInstanceIndex,
3640 async_: bool,
3641 task_id: u32,
3642 ) -> Result<u32> {
3643 if !async_ {
3644 store.check_blocking()?;
3648 }
3649
3650 let (rep, is_host) = store
3651 .instance_state(RuntimeInstance {
3652 instance: self.id().instance(),
3653 index: caller_instance,
3654 })
3655 .handle_table()
3656 .subtask_rep(task_id)?;
3657 let waitable = if is_host {
3658 Waitable::Host(TableId::<HostTask>::new(rep))
3659 } else {
3660 Waitable::Guest(TableId::<GuestTask>::new(rep))
3661 };
3662 let concurrent_state = store.concurrent_state_mut();
3663
3664 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3665
3666 let needs_block;
3667 if let Waitable::Host(host_task) = waitable {
3668 let state = &mut concurrent_state.get_mut(host_task)?.state;
3669 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3670 HostTaskState::CalleeRunning(handle) => {
3677 handle.abort();
3678 needs_block = true;
3679 }
3680
3681 HostTaskState::CalleeDone { cancelled } => {
3684 if cancelled {
3685 bail!(Trap::SubtaskCancelAfterTerminal);
3686 } else {
3687 needs_block = false;
3690 }
3691 }
3692
3693 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3696 bail_bug!("invalid states for host callee")
3697 }
3698 }
3699 } else {
3700 let caller = concurrent_state.current_guest_thread()?;
3701 let guest_task = TableId::<GuestTask>::new(rep);
3702 let task = concurrent_state.get_mut(guest_task)?;
3703 if !task.already_lowered_parameters() {
3704 task.lower_params = None;
3708 task.lift_result = None;
3709 task.exited = true;
3710 let instance = task.instance;
3711
3712 assert_eq!(1, task.threads.len());
3715 let thread = *task.threads.iter().next().unwrap();
3716 self.cleanup_thread(
3717 store,
3718 QualifiedThreadId {
3719 task: guest_task,
3720 thread,
3721 },
3722 caller_instance,
3723 )?;
3724
3725 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3727 let pending_count = pending.len();
3728 pending.retain(|thread, _| thread.task != guest_task);
3729 if pending.len() == pending_count {
3731 bail!(Trap::SubtaskCancelAfterTerminal);
3732 }
3733 return Ok(Status::StartCancelled as u32);
3734 } else if !task.returned_or_cancelled() {
3735 task.cancel_sent = true;
3738 task.event = Some(Event::Cancelled);
3743 let runtime_instance = task.instance.index;
3744 for thread in task.threads.clone() {
3745 let thread = QualifiedThreadId {
3746 task: guest_task,
3747 thread,
3748 };
3749 if let Some(set) = concurrent_state
3750 .get_mut(thread.thread)?
3751 .wake_on_cancel
3752 .take()
3753 {
3754 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3755 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3756 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3757 runtime_instance,
3758 GuestCall {
3759 thread,
3760 kind: GuestCallKind::DeliverEvent {
3761 instance,
3762 set: None,
3763 },
3764 },
3765 ),
3766 None => bail_bug!("thread not present in wake_on_cancel set"),
3767 };
3768 concurrent_state.push_high_priority(item);
3769
3770 store.suspend(SuspendReason::Yielding {
3771 thread: caller,
3772 skip_may_block_check: false,
3775 })?;
3776 break;
3777 }
3778 }
3779
3780 needs_block = !store
3783 .concurrent_state_mut()
3784 .get_mut(guest_task)?
3785 .returned_or_cancelled()
3786 } else {
3787 needs_block = false;
3788 }
3789 };
3790
3791 if needs_block {
3795 if async_ {
3796 return Ok(BLOCKED);
3797 }
3798
3799 store.wait_for_event(waitable)?;
3803
3804 }
3806
3807 let event = waitable.take_event(store.concurrent_state_mut())?;
3808 if let Some(Event::Subtask {
3809 status: status @ (Status::Returned | Status::ReturnCancelled),
3810 }) = event
3811 {
3812 Ok(status as u32)
3813 } else {
3814 bail!(Trap::SubtaskCancelAfterTerminal);
3815 }
3816 }
3817}
3818
3819pub trait VMComponentAsyncStore {
3827 unsafe fn prepare_call(
3833 &mut self,
3834 instance: Instance,
3835 memory: *mut VMMemoryDefinition,
3836 start: NonNull<VMFuncRef>,
3837 return_: NonNull<VMFuncRef>,
3838 caller_instance: RuntimeComponentInstanceIndex,
3839 callee_instance: RuntimeComponentInstanceIndex,
3840 task_return_type: TypeTupleIndex,
3841 callee_async: bool,
3842 string_encoding: StringEncoding,
3843 result_count: u32,
3844 storage: *mut ValRaw,
3845 storage_len: usize,
3846 ) -> Result<()>;
3847
3848 unsafe fn sync_start(
3851 &mut self,
3852 instance: Instance,
3853 callback: *mut VMFuncRef,
3854 callee: NonNull<VMFuncRef>,
3855 param_count: u32,
3856 storage: *mut MaybeUninit<ValRaw>,
3857 storage_len: usize,
3858 ) -> Result<()>;
3859
3860 unsafe fn async_start(
3863 &mut self,
3864 instance: Instance,
3865 callback: *mut VMFuncRef,
3866 post_return: *mut VMFuncRef,
3867 callee: NonNull<VMFuncRef>,
3868 param_count: u32,
3869 result_count: u32,
3870 flags: u32,
3871 ) -> Result<u32>;
3872
3873 fn future_write(
3875 &mut self,
3876 instance: Instance,
3877 caller: RuntimeComponentInstanceIndex,
3878 ty: TypeFutureTableIndex,
3879 options: OptionsIndex,
3880 future: u32,
3881 address: u32,
3882 ) -> Result<u32>;
3883
3884 fn future_read(
3886 &mut self,
3887 instance: Instance,
3888 caller: RuntimeComponentInstanceIndex,
3889 ty: TypeFutureTableIndex,
3890 options: OptionsIndex,
3891 future: u32,
3892 address: u32,
3893 ) -> Result<u32>;
3894
3895 fn future_drop_writable(
3897 &mut self,
3898 instance: Instance,
3899 ty: TypeFutureTableIndex,
3900 writer: u32,
3901 ) -> Result<()>;
3902
3903 fn stream_write(
3905 &mut self,
3906 instance: Instance,
3907 caller: RuntimeComponentInstanceIndex,
3908 ty: TypeStreamTableIndex,
3909 options: OptionsIndex,
3910 stream: u32,
3911 address: u32,
3912 count: u32,
3913 ) -> Result<u32>;
3914
3915 fn stream_read(
3917 &mut self,
3918 instance: Instance,
3919 caller: RuntimeComponentInstanceIndex,
3920 ty: TypeStreamTableIndex,
3921 options: OptionsIndex,
3922 stream: u32,
3923 address: u32,
3924 count: u32,
3925 ) -> Result<u32>;
3926
3927 fn flat_stream_write(
3930 &mut self,
3931 instance: Instance,
3932 caller: RuntimeComponentInstanceIndex,
3933 ty: TypeStreamTableIndex,
3934 options: OptionsIndex,
3935 payload_size: u32,
3936 payload_align: u32,
3937 stream: u32,
3938 address: u32,
3939 count: u32,
3940 ) -> Result<u32>;
3941
3942 fn flat_stream_read(
3945 &mut self,
3946 instance: Instance,
3947 caller: RuntimeComponentInstanceIndex,
3948 ty: TypeStreamTableIndex,
3949 options: OptionsIndex,
3950 payload_size: u32,
3951 payload_align: u32,
3952 stream: u32,
3953 address: u32,
3954 count: u32,
3955 ) -> Result<u32>;
3956
3957 fn stream_drop_writable(
3959 &mut self,
3960 instance: Instance,
3961 ty: TypeStreamTableIndex,
3962 writer: u32,
3963 ) -> Result<()>;
3964
3965 fn error_context_debug_message(
3967 &mut self,
3968 instance: Instance,
3969 ty: TypeComponentLocalErrorContextTableIndex,
3970 options: OptionsIndex,
3971 err_ctx_handle: u32,
3972 debug_msg_address: u32,
3973 ) -> Result<()>;
3974
3975 fn thread_new_indirect(
3977 &mut self,
3978 instance: Instance,
3979 caller: RuntimeComponentInstanceIndex,
3980 func_ty_idx: TypeFuncIndex,
3981 start_func_table_idx: RuntimeTableIndex,
3982 start_func_idx: u32,
3983 context: i32,
3984 ) -> Result<u32>;
3985}
3986
3987impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3989 unsafe fn prepare_call(
3990 &mut self,
3991 instance: Instance,
3992 memory: *mut VMMemoryDefinition,
3993 start: NonNull<VMFuncRef>,
3994 return_: NonNull<VMFuncRef>,
3995 caller_instance: RuntimeComponentInstanceIndex,
3996 callee_instance: RuntimeComponentInstanceIndex,
3997 task_return_type: TypeTupleIndex,
3998 callee_async: bool,
3999 string_encoding: StringEncoding,
4000 result_count_or_max_if_async: u32,
4001 storage: *mut ValRaw,
4002 storage_len: usize,
4003 ) -> Result<()> {
4004 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
4008
4009 unsafe {
4010 instance.prepare_call(
4011 StoreContextMut(self),
4012 start,
4013 return_,
4014 caller_instance,
4015 callee_instance,
4016 task_return_type,
4017 callee_async,
4018 memory,
4019 string_encoding,
4020 match result_count_or_max_if_async {
4021 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4022 params,
4023 has_result: false,
4024 },
4025 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4026 params,
4027 has_result: true,
4028 },
4029 result_count => CallerInfo::Sync {
4030 params,
4031 result_count,
4032 },
4033 },
4034 )
4035 }
4036 }
4037
4038 unsafe fn sync_start(
4039 &mut self,
4040 instance: Instance,
4041 callback: *mut VMFuncRef,
4042 callee: NonNull<VMFuncRef>,
4043 param_count: u32,
4044 storage: *mut MaybeUninit<ValRaw>,
4045 storage_len: usize,
4046 ) -> Result<()> {
4047 unsafe {
4048 instance
4049 .start_call(
4050 StoreContextMut(self),
4051 callback,
4052 ptr::null_mut(),
4053 callee,
4054 param_count,
4055 1,
4056 START_FLAG_ASYNC_CALLEE,
4057 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4061 )
4062 .map(drop)
4063 }
4064 }
4065
4066 unsafe fn async_start(
4067 &mut self,
4068 instance: Instance,
4069 callback: *mut VMFuncRef,
4070 post_return: *mut VMFuncRef,
4071 callee: NonNull<VMFuncRef>,
4072 param_count: u32,
4073 result_count: u32,
4074 flags: u32,
4075 ) -> Result<u32> {
4076 unsafe {
4077 instance.start_call(
4078 StoreContextMut(self),
4079 callback,
4080 post_return,
4081 callee,
4082 param_count,
4083 result_count,
4084 flags,
4085 None,
4086 )
4087 }
4088 }
4089
4090 fn future_write(
4091 &mut self,
4092 instance: Instance,
4093 caller: RuntimeComponentInstanceIndex,
4094 ty: TypeFutureTableIndex,
4095 options: OptionsIndex,
4096 future: u32,
4097 address: u32,
4098 ) -> Result<u32> {
4099 instance
4100 .guest_write(
4101 StoreContextMut(self),
4102 caller,
4103 TransmitIndex::Future(ty),
4104 options,
4105 None,
4106 future,
4107 address,
4108 1,
4109 )
4110 .map(|result| result.encode())
4111 }
4112
4113 fn future_read(
4114 &mut self,
4115 instance: Instance,
4116 caller: RuntimeComponentInstanceIndex,
4117 ty: TypeFutureTableIndex,
4118 options: OptionsIndex,
4119 future: u32,
4120 address: u32,
4121 ) -> Result<u32> {
4122 instance
4123 .guest_read(
4124 StoreContextMut(self),
4125 caller,
4126 TransmitIndex::Future(ty),
4127 options,
4128 None,
4129 future,
4130 address,
4131 1,
4132 )
4133 .map(|result| result.encode())
4134 }
4135
4136 fn stream_write(
4137 &mut self,
4138 instance: Instance,
4139 caller: RuntimeComponentInstanceIndex,
4140 ty: TypeStreamTableIndex,
4141 options: OptionsIndex,
4142 stream: u32,
4143 address: u32,
4144 count: u32,
4145 ) -> Result<u32> {
4146 instance
4147 .guest_write(
4148 StoreContextMut(self),
4149 caller,
4150 TransmitIndex::Stream(ty),
4151 options,
4152 None,
4153 stream,
4154 address,
4155 count,
4156 )
4157 .map(|result| result.encode())
4158 }
4159
4160 fn stream_read(
4161 &mut self,
4162 instance: Instance,
4163 caller: RuntimeComponentInstanceIndex,
4164 ty: TypeStreamTableIndex,
4165 options: OptionsIndex,
4166 stream: u32,
4167 address: u32,
4168 count: u32,
4169 ) -> Result<u32> {
4170 instance
4171 .guest_read(
4172 StoreContextMut(self),
4173 caller,
4174 TransmitIndex::Stream(ty),
4175 options,
4176 None,
4177 stream,
4178 address,
4179 count,
4180 )
4181 .map(|result| result.encode())
4182 }
4183
4184 fn future_drop_writable(
4185 &mut self,
4186 instance: Instance,
4187 ty: TypeFutureTableIndex,
4188 writer: u32,
4189 ) -> Result<()> {
4190 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4191 }
4192
4193 fn flat_stream_write(
4194 &mut self,
4195 instance: Instance,
4196 caller: RuntimeComponentInstanceIndex,
4197 ty: TypeStreamTableIndex,
4198 options: OptionsIndex,
4199 payload_size: u32,
4200 payload_align: u32,
4201 stream: u32,
4202 address: u32,
4203 count: u32,
4204 ) -> Result<u32> {
4205 instance
4206 .guest_write(
4207 StoreContextMut(self),
4208 caller,
4209 TransmitIndex::Stream(ty),
4210 options,
4211 Some(FlatAbi {
4212 size: payload_size,
4213 align: payload_align,
4214 }),
4215 stream,
4216 address,
4217 count,
4218 )
4219 .map(|result| result.encode())
4220 }
4221
4222 fn flat_stream_read(
4223 &mut self,
4224 instance: Instance,
4225 caller: RuntimeComponentInstanceIndex,
4226 ty: TypeStreamTableIndex,
4227 options: OptionsIndex,
4228 payload_size: u32,
4229 payload_align: u32,
4230 stream: u32,
4231 address: u32,
4232 count: u32,
4233 ) -> Result<u32> {
4234 instance
4235 .guest_read(
4236 StoreContextMut(self),
4237 caller,
4238 TransmitIndex::Stream(ty),
4239 options,
4240 Some(FlatAbi {
4241 size: payload_size,
4242 align: payload_align,
4243 }),
4244 stream,
4245 address,
4246 count,
4247 )
4248 .map(|result| result.encode())
4249 }
4250
4251 fn stream_drop_writable(
4252 &mut self,
4253 instance: Instance,
4254 ty: TypeStreamTableIndex,
4255 writer: u32,
4256 ) -> Result<()> {
4257 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4258 }
4259
4260 fn error_context_debug_message(
4261 &mut self,
4262 instance: Instance,
4263 ty: TypeComponentLocalErrorContextTableIndex,
4264 options: OptionsIndex,
4265 err_ctx_handle: u32,
4266 debug_msg_address: u32,
4267 ) -> Result<()> {
4268 instance.error_context_debug_message(
4269 StoreContextMut(self),
4270 ty,
4271 options,
4272 err_ctx_handle,
4273 debug_msg_address,
4274 )
4275 }
4276
4277 fn thread_new_indirect(
4278 &mut self,
4279 instance: Instance,
4280 caller: RuntimeComponentInstanceIndex,
4281 func_ty_idx: TypeFuncIndex,
4282 start_func_table_idx: RuntimeTableIndex,
4283 start_func_idx: u32,
4284 context: i32,
4285 ) -> Result<u32> {
4286 instance.thread_new_indirect(
4287 StoreContextMut(self),
4288 caller,
4289 func_ty_idx,
4290 start_func_table_idx,
4291 start_func_idx,
4292 context,
4293 )
4294 }
4295}
4296
4297type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4298
4299pub(crate) struct HostTask {
4303 common: WaitableCommon,
4304
4305 caller: QualifiedThreadId,
4307
4308 call_context: CallContext,
4311
4312 state: HostTaskState,
4313}
4314
4315enum HostTaskState {
4316 CalleeStarted,
4321
4322 CalleeRunning(JoinHandle),
4327
4328 CalleeFinished(LiftedResult),
4332
4333 CalleeDone { cancelled: bool },
4336}
4337
4338impl HostTask {
4339 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4340 Self {
4341 common: WaitableCommon::default(),
4342 call_context: CallContext::default(),
4343 caller,
4344 state,
4345 }
4346 }
4347}
4348
4349impl TableDebug for HostTask {
4350 fn type_name() -> &'static str {
4351 "HostTask"
4352 }
4353}
4354
4355type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4356
4357enum Caller {
4359 Host {
4361 tx: Option<oneshot::Sender<LiftedResult>>,
4363 host_future_present: bool,
4366 caller: CurrentThread,
4370 },
4371 Guest {
4373 thread: QualifiedThreadId,
4375 },
4376}
4377
4378struct LiftResult {
4381 lift: RawLift,
4382 ty: TypeTupleIndex,
4383 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4384 string_encoding: StringEncoding,
4385}
4386
4387#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4392pub(crate) struct QualifiedThreadId {
4393 task: TableId<GuestTask>,
4394 thread: TableId<GuestThread>,
4395}
4396
4397impl QualifiedThreadId {
4398 fn qualify(
4399 state: &mut ConcurrentState,
4400 thread: TableId<GuestThread>,
4401 ) -> Result<QualifiedThreadId> {
4402 Ok(QualifiedThreadId {
4403 task: state.get_mut(thread)?.parent_task,
4404 thread,
4405 })
4406 }
4407}
4408
4409impl fmt::Debug for QualifiedThreadId {
4410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4411 f.debug_tuple("QualifiedThreadId")
4412 .field(&self.task.rep())
4413 .field(&self.thread.rep())
4414 .finish()
4415 }
4416}
4417
4418enum GuestThreadState {
4419 NotStartedImplicit,
4420 NotStartedExplicit(
4421 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4422 ),
4423 Running,
4424 Suspended(StoreFiber<'static>),
4425 Ready(StoreFiber<'static>),
4426 Completed,
4427}
4428pub struct GuestThread {
4429 context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4432 parent_task: TableId<GuestTask>,
4434 wake_on_cancel: Option<TableId<WaitableSet>>,
4437 state: GuestThreadState,
4439 instance_rep: Option<u32>,
4442 sync_call_set: TableId<WaitableSet>,
4444}
4445
4446impl GuestThread {
4447 fn from_instance(
4450 state: Pin<&mut ComponentInstance>,
4451 caller_instance: RuntimeComponentInstanceIndex,
4452 guest_thread: u32,
4453 ) -> Result<TableId<Self>> {
4454 let rep = state.instance_states().0[caller_instance]
4455 .thread_handle_table()
4456 .guest_thread_rep(guest_thread)?;
4457 Ok(TableId::new(rep))
4458 }
4459
4460 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4461 let sync_call_set = state.push(WaitableSet::default())?;
4462 Ok(Self {
4463 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4464 parent_task,
4465 wake_on_cancel: None,
4466 state: GuestThreadState::NotStartedImplicit,
4467 instance_rep: None,
4468 sync_call_set,
4469 })
4470 }
4471
4472 fn new_explicit(
4473 state: &mut ConcurrentState,
4474 parent_task: TableId<GuestTask>,
4475 start_func: Box<
4476 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4477 >,
4478 ) -> Result<Self> {
4479 let sync_call_set = state.push(WaitableSet::default())?;
4480 Ok(Self {
4481 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4482 parent_task,
4483 wake_on_cancel: None,
4484 state: GuestThreadState::NotStartedExplicit(start_func),
4485 instance_rep: None,
4486 sync_call_set,
4487 })
4488 }
4489}
4490
4491impl TableDebug for GuestThread {
4492 fn type_name() -> &'static str {
4493 "GuestThread"
4494 }
4495}
4496
4497enum SyncResult {
4498 NotProduced,
4499 Produced(Option<ValRaw>),
4500 Taken,
4501}
4502
4503impl SyncResult {
4504 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4505 Ok(match mem::replace(self, SyncResult::Taken) {
4506 SyncResult::NotProduced => None,
4507 SyncResult::Produced(val) => Some(val),
4508 SyncResult::Taken => {
4509 bail_bug!("attempted to take a synchronous result that was already taken")
4510 }
4511 })
4512 }
4513}
4514
4515#[derive(Debug)]
4516enum HostFutureState {
4517 NotApplicable,
4518 Live,
4519 Dropped,
4520}
4521
4522pub(crate) struct GuestTask {
4524 common: WaitableCommon,
4526 lower_params: Option<RawLower>,
4528 lift_result: Option<LiftResult>,
4530 result: Option<LiftedResult>,
4533 callback: Option<CallbackFn>,
4536 caller: Caller,
4538 call_context: CallContext,
4543 sync_result: SyncResult,
4546 cancel_sent: bool,
4549 starting_sent: bool,
4552 instance: RuntimeInstance,
4559 event: Option<Event>,
4562 exited: bool,
4564 threads: HashSet<TableId<GuestThread>>,
4566 host_future_state: HostFutureState,
4569 async_function: bool,
4572}
4573
4574impl GuestTask {
4575 fn already_lowered_parameters(&self) -> bool {
4576 self.lower_params.is_none()
4578 }
4579
4580 fn returned_or_cancelled(&self) -> bool {
4581 self.lift_result.is_none()
4583 }
4584
4585 fn ready_to_delete(&self) -> bool {
4586 let threads_completed = self.threads.is_empty();
4587 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4588 let pending_completion_event = matches!(
4589 self.common.event,
4590 Some(Event::Subtask {
4591 status: Status::Returned | Status::ReturnCancelled
4592 })
4593 );
4594 let ready = threads_completed
4595 && !has_sync_result
4596 && !pending_completion_event
4597 && !matches!(self.host_future_state, HostFutureState::Live);
4598 log::trace!(
4599 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4600 threads_completed,
4601 has_sync_result,
4602 pending_completion_event,
4603 self.host_future_state
4604 );
4605 ready
4606 }
4607
4608 fn new(
4609 lower_params: RawLower,
4610 lift_result: LiftResult,
4611 caller: Caller,
4612 callback: Option<CallbackFn>,
4613 instance: RuntimeInstance,
4614 async_function: bool,
4615 ) -> Result<Self> {
4616 let host_future_state = match &caller {
4617 Caller::Guest { .. } => HostFutureState::NotApplicable,
4618 Caller::Host {
4619 host_future_present,
4620 ..
4621 } => {
4622 if *host_future_present {
4623 HostFutureState::Live
4624 } else {
4625 HostFutureState::NotApplicable
4626 }
4627 }
4628 };
4629 Ok(Self {
4630 common: WaitableCommon::default(),
4631 lower_params: Some(lower_params),
4632 lift_result: Some(lift_result),
4633 result: None,
4634 callback,
4635 caller,
4636 call_context: CallContext::default(),
4637 sync_result: SyncResult::NotProduced,
4638 cancel_sent: false,
4639 starting_sent: false,
4640 instance,
4641 event: None,
4642 exited: false,
4643 threads: HashSet::new(),
4644 host_future_state,
4645 async_function,
4646 })
4647 }
4648}
4649
4650impl TableDebug for GuestTask {
4651 fn type_name() -> &'static str {
4652 "GuestTask"
4653 }
4654}
4655
4656#[derive(Default)]
4658struct WaitableCommon {
4659 event: Option<Event>,
4661 set: Option<TableId<WaitableSet>>,
4663 handle: Option<u32>,
4665}
4666
4667#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4669enum Waitable {
4670 Host(TableId<HostTask>),
4672 Guest(TableId<GuestTask>),
4674 Transmit(TableId<TransmitHandle>),
4676}
4677
4678impl Waitable {
4679 fn from_instance(
4682 state: Pin<&mut ComponentInstance>,
4683 caller_instance: RuntimeComponentInstanceIndex,
4684 waitable: u32,
4685 ) -> Result<Self> {
4686 use crate::runtime::vm::component::Waitable;
4687
4688 let (waitable, kind) = state.instance_states().0[caller_instance]
4689 .handle_table()
4690 .waitable_rep(waitable)?;
4691
4692 Ok(match kind {
4693 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4694 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4695 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4696 })
4697 }
4698
4699 fn rep(&self) -> u32 {
4701 match self {
4702 Self::Host(id) => id.rep(),
4703 Self::Guest(id) => id.rep(),
4704 Self::Transmit(id) => id.rep(),
4705 }
4706 }
4707
4708 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4712 log::trace!("waitable {self:?} join set {set:?}",);
4713
4714 let old = mem::replace(&mut self.common(state)?.set, set);
4715
4716 if let Some(old) = old {
4717 match *self {
4718 Waitable::Host(id) => state.remove_child(id, old),
4719 Waitable::Guest(id) => state.remove_child(id, old),
4720 Waitable::Transmit(id) => state.remove_child(id, old),
4721 }?;
4722
4723 state.get_mut(old)?.ready.remove(self);
4724 }
4725
4726 if let Some(set) = set {
4727 match *self {
4728 Waitable::Host(id) => state.add_child(id, set),
4729 Waitable::Guest(id) => state.add_child(id, set),
4730 Waitable::Transmit(id) => state.add_child(id, set),
4731 }?;
4732
4733 if self.common(state)?.event.is_some() {
4734 self.mark_ready(state)?;
4735 }
4736 }
4737
4738 Ok(())
4739 }
4740
4741 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4743 Ok(match self {
4744 Self::Host(id) => &mut state.get_mut(*id)?.common,
4745 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4746 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4747 })
4748 }
4749
4750 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4754 log::trace!("set event for {self:?}: {event:?}");
4755 self.common(state)?.event = event;
4756 self.mark_ready(state)
4757 }
4758
4759 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4761 let common = self.common(state)?;
4762 let event = common.event.take();
4763 if let Some(set) = self.common(state)?.set {
4764 state.get_mut(set)?.ready.remove(self);
4765 }
4766
4767 Ok(event)
4768 }
4769
4770 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4774 if let Some(set) = self.common(state)?.set {
4775 state.get_mut(set)?.ready.insert(*self);
4776 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4777 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4778 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4779
4780 let item = match mode {
4781 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4782 WaitMode::Callback(instance) => WorkItem::GuestCall(
4783 state.get_mut(thread.task)?.instance.index,
4784 GuestCall {
4785 thread,
4786 kind: GuestCallKind::DeliverEvent {
4787 instance,
4788 set: Some(set),
4789 },
4790 },
4791 ),
4792 };
4793 state.push_high_priority(item);
4794 }
4795 }
4796 Ok(())
4797 }
4798
4799 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4801 match self {
4802 Self::Host(task) => {
4803 log::trace!("delete host task {task:?}");
4804 state.delete(*task)?;
4805 }
4806 Self::Guest(task) => {
4807 log::trace!("delete guest task {task:?}");
4808 state.delete(*task)?;
4809 }
4810 Self::Transmit(task) => {
4811 state.delete(*task)?;
4812 }
4813 }
4814
4815 Ok(())
4816 }
4817}
4818
4819impl fmt::Debug for Waitable {
4820 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4821 match self {
4822 Self::Host(id) => write!(f, "{id:?}"),
4823 Self::Guest(id) => write!(f, "{id:?}"),
4824 Self::Transmit(id) => write!(f, "{id:?}"),
4825 }
4826 }
4827}
4828
4829#[derive(Default)]
4831struct WaitableSet {
4832 ready: BTreeSet<Waitable>,
4834 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4836}
4837
4838impl TableDebug for WaitableSet {
4839 fn type_name() -> &'static str {
4840 "WaitableSet"
4841 }
4842}
4843
4844type RawLower =
4846 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4847
4848type RawLift = Box<
4850 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4851>;
4852
4853type LiftedResult = Box<dyn Any + Send + Sync>;
4857
4858struct DummyResult;
4861
4862#[derive(Default)]
4864pub struct ConcurrentInstanceState {
4865 backpressure: u16,
4867 do_not_enter: bool,
4869 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4872}
4873
4874impl ConcurrentInstanceState {
4875 pub fn pending_is_empty(&self) -> bool {
4876 self.pending.is_empty()
4877 }
4878}
4879
4880#[derive(Debug, Copy, Clone)]
4881pub(crate) enum CurrentThread {
4882 Guest(QualifiedThreadId),
4883 Host(TableId<HostTask>),
4884 None,
4885}
4886
4887impl CurrentThread {
4888 fn guest(&self) -> Option<&QualifiedThreadId> {
4889 match self {
4890 Self::Guest(id) => Some(id),
4891 _ => None,
4892 }
4893 }
4894
4895 fn host(&self) -> Option<TableId<HostTask>> {
4896 match self {
4897 Self::Host(id) => Some(*id),
4898 _ => None,
4899 }
4900 }
4901
4902 fn is_none(&self) -> bool {
4903 matches!(self, Self::None)
4904 }
4905}
4906
4907impl From<QualifiedThreadId> for CurrentThread {
4908 fn from(id: QualifiedThreadId) -> Self {
4909 Self::Guest(id)
4910 }
4911}
4912
4913impl From<TableId<HostTask>> for CurrentThread {
4914 fn from(id: TableId<HostTask>) -> Self {
4915 Self::Host(id)
4916 }
4917}
4918
4919pub struct ConcurrentState {
4921 current_thread: CurrentThread,
4923
4924 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4929 table: AlwaysMut<ResourceTable>,
4931 high_priority: Vec<WorkItem>,
4933 low_priority: VecDeque<WorkItem>,
4935 suspend_reason: Option<SuspendReason>,
4939 worker: Option<StoreFiber<'static>>,
4943 worker_item: Option<WorkerItem>,
4945
4946 global_error_context_ref_counts:
4959 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4960}
4961
4962impl Default for ConcurrentState {
4963 fn default() -> Self {
4964 Self {
4965 current_thread: CurrentThread::None,
4966 table: AlwaysMut::new(ResourceTable::new()),
4967 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4968 high_priority: Vec::new(),
4969 low_priority: VecDeque::new(),
4970 suspend_reason: None,
4971 worker: None,
4972 worker_item: None,
4973 global_error_context_ref_counts: BTreeMap::new(),
4974 }
4975 }
4976}
4977
4978impl ConcurrentState {
4979 pub(crate) fn take_fibers_and_futures(
4996 &mut self,
4997 fibers: &mut Vec<StoreFiber<'static>>,
4998 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4999 ) {
5000 for entry in self.table.get_mut().iter_mut() {
5001 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5002 for mode in mem::take(&mut set.waiting).into_values() {
5003 if let WaitMode::Fiber(fiber) = mode {
5004 fibers.push(fiber);
5005 }
5006 }
5007 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5008 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
5009 mem::replace(&mut thread.state, GuestThreadState::Completed)
5010 {
5011 fibers.push(fiber);
5012 }
5013 }
5014 }
5015
5016 if let Some(fiber) = self.worker.take() {
5017 fibers.push(fiber);
5018 }
5019
5020 let mut handle_item = |item| match item {
5021 WorkItem::ResumeFiber(fiber) => {
5022 fibers.push(fiber);
5023 }
5024 WorkItem::PushFuture(future) => {
5025 self.futures
5026 .get_mut()
5027 .as_mut()
5028 .unwrap()
5029 .push(future.into_inner());
5030 }
5031 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5032 }
5033 };
5034
5035 for item in mem::take(&mut self.high_priority) {
5036 handle_item(item);
5037 }
5038 for item in mem::take(&mut self.low_priority) {
5039 handle_item(item);
5040 }
5041
5042 if let Some(them) = self.futures.get_mut().take() {
5043 futures.push(them);
5044 }
5045 }
5046
5047 fn push<V: Send + Sync + 'static>(
5048 &mut self,
5049 value: V,
5050 ) -> Result<TableId<V>, ResourceTableError> {
5051 self.table.get_mut().push(value).map(TableId::from)
5052 }
5053
5054 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5055 self.table.get_mut().get_mut(&Resource::from(id))
5056 }
5057
5058 pub fn add_child<T: 'static, U: 'static>(
5059 &mut self,
5060 child: TableId<T>,
5061 parent: TableId<U>,
5062 ) -> Result<(), ResourceTableError> {
5063 self.table
5064 .get_mut()
5065 .add_child(Resource::from(child), Resource::from(parent))
5066 }
5067
5068 pub fn remove_child<T: 'static, U: 'static>(
5069 &mut self,
5070 child: TableId<T>,
5071 parent: TableId<U>,
5072 ) -> Result<(), ResourceTableError> {
5073 self.table
5074 .get_mut()
5075 .remove_child(Resource::from(child), Resource::from(parent))
5076 }
5077
5078 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5079 self.table.get_mut().delete(Resource::from(id))
5080 }
5081
5082 fn push_future(&mut self, future: HostTaskFuture) {
5083 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5090 }
5091
5092 fn push_high_priority(&mut self, item: WorkItem) {
5093 log::trace!("push high priority: {item:?}");
5094 self.high_priority.push(item);
5095 }
5096
5097 fn push_low_priority(&mut self, item: WorkItem) {
5098 log::trace!("push low priority: {item:?}");
5099 self.low_priority.push_front(item);
5100 }
5101
5102 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5103 if high_priority {
5104 self.push_high_priority(item);
5105 } else {
5106 self.push_low_priority(item);
5107 }
5108 }
5109
5110 fn promote_instance_local_thread_work_item(
5111 &mut self,
5112 current_instance: RuntimeComponentInstanceIndex,
5113 ) -> bool {
5114 self.promote_work_items_matching(|item: &WorkItem| match item {
5115 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5116 *instance == current_instance
5117 }
5118 _ => false,
5119 })
5120 }
5121
5122 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5123 self.promote_work_items_matching(|item: &WorkItem| match item {
5124 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5125 *t == thread
5126 }
5127 _ => false,
5128 })
5129 }
5130
5131 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5132 where
5133 F: FnMut(&WorkItem) -> bool,
5134 {
5135 if self.high_priority.iter().any(&mut predicate) {
5139 true
5140 }
5141 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5144 let item = self.low_priority.remove(idx).unwrap();
5145 self.push_high_priority(item);
5146 true
5147 } else {
5148 false
5149 }
5150 }
5151
5152 fn take_pending_cancellation(&mut self) -> Result<bool> {
5155 let thread = self.current_guest_thread()?;
5156 if let Some(event) = self.get_mut(thread.task)?.event.take() {
5157 assert!(matches!(event, Event::Cancelled));
5158 Ok(true)
5159 } else {
5160 Ok(false)
5161 }
5162 }
5163
5164 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5165 if self.may_block(task)? {
5166 Ok(())
5167 } else {
5168 Err(Trap::CannotBlockSyncTask.into())
5169 }
5170 }
5171
5172 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5173 let task = self.get_mut(task)?;
5174 Ok(task.async_function || task.returned_or_cancelled())
5175 }
5176
5177 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5183 let (task, is_host) = (task >> 1, task & 1 == 1);
5184 if is_host {
5185 let task: TableId<HostTask> = TableId::new(task);
5186 Ok(&mut self.get_mut(task)?.call_context)
5187 } else {
5188 let task: TableId<GuestTask> = TableId::new(task);
5189 Ok(&mut self.get_mut(task)?.call_context)
5190 }
5191 }
5192
5193 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5196 let (bits, is_host) = match self.current_thread {
5197 CurrentThread::Guest(id) => (id.task.rep(), false),
5198 CurrentThread::Host(id) => (id.rep(), true),
5199 CurrentThread::None => bail_bug!("current thread is not set"),
5200 };
5201 assert_eq!((bits << 1) >> 1, bits);
5202 Ok((bits << 1) | u32::from(is_host))
5203 }
5204
5205 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5206 match self.current_thread.guest() {
5207 Some(id) => Ok(*id),
5208 None => bail_bug!("current thread is not a guest thread"),
5209 }
5210 }
5211
5212 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5213 match self.current_thread.host() {
5214 Some(id) => Ok(id),
5215 None => bail_bug!("current thread is not a host thread"),
5216 }
5217 }
5218
5219 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5220 match self.futures.get_mut().as_mut() {
5221 Some(f) => Ok(f),
5222 None => bail_bug!("futures field of concurrent state is currently taken"),
5223 }
5224 }
5225
5226 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5227 self.table.get_mut()
5228 }
5229}
5230
5231fn for_any_lower<
5234 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5235>(
5236 fun: F,
5237) -> F {
5238 fun
5239}
5240
5241fn for_any_lift<
5243 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5244>(
5245 fun: F,
5246) -> F {
5247 fun
5248}
5249
5250fn checked<F: Future + Send + 'static>(
5255 id: StoreId,
5256 fut: F,
5257) -> impl Future<Output = F::Output> + Send + 'static {
5258 async move {
5259 let mut fut = pin!(fut);
5260 future::poll_fn(move |cx| {
5261 let message = "\
5262 `Future`s which depend on asynchronous component tasks, streams, or \
5263 futures to complete may only be polled from the event loop of the \
5264 store to which they belong. Please use \
5265 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5266 ";
5267 tls::try_get(|store| {
5268 let matched = match store {
5269 tls::TryGet::Some(store) => store.id() == id,
5270 tls::TryGet::Taken | tls::TryGet::None => false,
5271 };
5272
5273 if !matched {
5274 panic!("{message}")
5275 }
5276 });
5277 fut.as_mut().poll(cx)
5278 })
5279 .await
5280 }
5281}
5282
5283fn check_recursive_run() {
5286 tls::try_get(|store| {
5287 if !matches!(store, tls::TryGet::None) {
5288 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5289 }
5290 });
5291}
5292
5293fn unpack_callback_code(code: u32) -> (u32, u32) {
5294 (code & 0xF, code >> 4)
5295}
5296
5297struct WaitableCheckParams {
5301 set: TableId<WaitableSet>,
5302 options: OptionsIndex,
5303 payload: u32,
5304}
5305
5306enum WaitableCheck {
5309 Wait,
5310 Poll,
5311}
5312
5313pub(crate) struct PreparedCall<R> {
5315 handle: Func,
5317 thread: QualifiedThreadId,
5319 param_count: usize,
5321 rx: oneshot::Receiver<LiftedResult>,
5324 _phantom: PhantomData<R>,
5325}
5326
5327impl<R> PreparedCall<R> {
5328 pub(crate) fn task_id(&self) -> TaskId {
5330 TaskId {
5331 task: self.thread.task,
5332 }
5333 }
5334}
5335
5336pub(crate) struct TaskId {
5338 task: TableId<GuestTask>,
5339}
5340
5341impl TaskId {
5342 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5348 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5349 let delete = if !task.already_lowered_parameters() {
5350 true
5351 } else {
5352 task.host_future_state = HostFutureState::Dropped;
5353 task.ready_to_delete()
5354 };
5355 if delete {
5356 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5357 }
5358 Ok(())
5359 }
5360}
5361
5362pub(crate) fn prepare_call<T, R>(
5368 mut store: StoreContextMut<T>,
5369 handle: Func,
5370 param_count: usize,
5371 host_future_present: bool,
5372 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5373 + Send
5374 + Sync
5375 + 'static,
5376 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5377 + Send
5378 + Sync
5379 + 'static,
5380) -> Result<PreparedCall<R>> {
5381 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5382
5383 let instance = handle.instance().id().get(store.0);
5384 let options = &instance.component().env_component().options[options];
5385 let ty = &instance.component().types()[ty];
5386 let async_function = ty.async_;
5387 let task_return_type = ty.results;
5388 let component_instance = raw_options.instance;
5389 let callback = options.callback.map(|i| instance.runtime_callback(i));
5390 let memory = options
5391 .memory()
5392 .map(|i| instance.runtime_memory(i))
5393 .map(SendSyncPtr::new);
5394 let string_encoding = options.string_encoding;
5395 let token = StoreToken::new(store.as_context_mut());
5396 let state = store.0.concurrent_state_mut();
5397
5398 let (tx, rx) = oneshot::channel();
5399
5400 let instance = RuntimeInstance {
5401 instance: handle.instance().id().instance(),
5402 index: component_instance,
5403 };
5404 let caller = state.current_thread;
5405 let task = GuestTask::new(
5406 Box::new(for_any_lower(move |store, params| {
5407 lower_params(handle, token.as_context_mut(store), params)
5408 })),
5409 LiftResult {
5410 lift: Box::new(for_any_lift(move |store, result| {
5411 lift_result(handle, store, result)
5412 })),
5413 ty: task_return_type,
5414 memory,
5415 string_encoding,
5416 },
5417 Caller::Host {
5418 tx: Some(tx),
5419 host_future_present,
5420 caller,
5421 },
5422 callback.map(|callback| {
5423 let callback = SendSyncPtr::new(callback);
5424 let instance = handle.instance();
5425 Box::new(move |store: &mut dyn VMStore, event, handle| {
5426 let store = token.as_context_mut(store);
5427 unsafe { instance.call_callback(store, callback, event, handle) }
5430 }) as CallbackFn
5431 }),
5432 instance,
5433 async_function,
5434 )?;
5435
5436 let task = state.push(task)?;
5437 let new_thread = GuestThread::new_implicit(state, task)?;
5438 let thread = state.push(new_thread)?;
5439 state.get_mut(task)?.threads.insert(thread);
5440
5441 if !store.0.may_enter(instance)? {
5442 bail!(Trap::CannotEnterComponent);
5443 }
5444
5445 Ok(PreparedCall {
5446 handle,
5447 thread: QualifiedThreadId { task, thread },
5448 param_count,
5449 rx,
5450 _phantom: PhantomData,
5451 })
5452}
5453
5454pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5461 mut store: StoreContextMut<T>,
5462 prepared: PreparedCall<R>,
5463) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5464 let PreparedCall {
5465 handle,
5466 thread,
5467 param_count,
5468 rx,
5469 ..
5470 } = prepared;
5471
5472 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5473
5474 Ok(checked(
5475 store.0.id(),
5476 rx.map(move |result| match result {
5477 Ok(r) => match r.downcast() {
5478 Ok(r) => Ok(*r),
5479 Err(_) => bail_bug!("wrong type of value produced"),
5480 },
5481 Err(e) => Err(e.into()),
5482 }),
5483 ))
5484}
5485
5486fn queue_call0<T: 'static>(
5489 store: StoreContextMut<T>,
5490 handle: Func,
5491 guest_thread: QualifiedThreadId,
5492 param_count: usize,
5493) -> Result<()> {
5494 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5495 let is_concurrent = raw_options.async_;
5496 let callback = raw_options.callback;
5497 let instance = handle.instance();
5498 let callee = handle.lifted_core_func(store.0);
5499 let post_return = handle.post_return_core_func(store.0);
5500 let callback = callback.map(|i| {
5501 let instance = instance.id().get(store.0);
5502 SendSyncPtr::new(instance.runtime_callback(i))
5503 });
5504
5505 log::trace!("queueing call {guest_thread:?}");
5506
5507 unsafe {
5511 instance.queue_call(
5512 store,
5513 guest_thread,
5514 SendSyncPtr::new(callee),
5515 param_count,
5516 1,
5517 is_concurrent,
5518 callback,
5519 post_return.map(SendSyncPtr::new),
5520 )
5521 }
5522}