1use crate::bail_bug;
54use crate::component::func::{self, Func, call_post_return};
55use crate::component::{
56 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::prelude::*;
60use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63use crate::{
64 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::Trap;
87use wasmtime_environ::component::{
88 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
89 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
90 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
91 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
92 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
93};
94use wasmtime_environ::packed_option::ReservedValue;
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112const BLOCKED: u32 = 0xffff_ffff;
115
116#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119 Starting = 0,
120 Started = 1,
121 Returned = 2,
122 StartCancelled = 3,
123 ReturnCancelled = 4,
124}
125
126impl Status {
127 pub fn pack(self, waitable: Option<u32>) -> u32 {
133 assert!(matches!(self, Status::Returned) == waitable.is_none());
134 let waitable = waitable.unwrap_or(0);
135 assert!(waitable < (1 << 28));
136 (waitable << 4) | (self as u32)
137 }
138}
139
140#[derive(Clone, Copy, Debug)]
143enum Event {
144 None,
145 Subtask {
146 status: Status,
147 },
148 StreamRead {
149 code: ReturnCode,
150 pending: Option<(TypeStreamTableIndex, u32)>,
151 },
152 StreamWrite {
153 code: ReturnCode,
154 pending: Option<(TypeStreamTableIndex, u32)>,
155 },
156 FutureRead {
157 code: ReturnCode,
158 pending: Option<(TypeFutureTableIndex, u32)>,
159 },
160 FutureWrite {
161 code: ReturnCode,
162 pending: Option<(TypeFutureTableIndex, u32)>,
163 },
164 Cancelled,
165}
166
167impl Event {
168 fn parts(self) -> (u32, u32) {
173 const EVENT_NONE: u32 = 0;
174 const EVENT_SUBTASK: u32 = 1;
175 const EVENT_STREAM_READ: u32 = 2;
176 const EVENT_STREAM_WRITE: u32 = 3;
177 const EVENT_FUTURE_READ: u32 = 4;
178 const EVENT_FUTURE_WRITE: u32 = 5;
179 const EVENT_CANCELLED: u32 = 6;
180 match self {
181 Event::None => (EVENT_NONE, 0),
182 Event::Cancelled => (EVENT_CANCELLED, 0),
183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188 }
189 }
190}
191
192mod callback_code {
194 pub const EXIT: u32 = 0;
195 pub const YIELD: u32 = 1;
196 pub const WAIT: u32 = 2;
197}
198
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210 store: StoreContextMut<'a, T>,
211 get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216 D: HasData + ?Sized,
217 T: 'static,
218{
219 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221 Self { store, get_data }
222 }
223
224 pub fn data_mut(&mut self) -> &mut T {
226 self.store.data_mut()
227 }
228
229 pub fn get(&mut self) -> D::Data<'_> {
231 (self.get_data)(self.data_mut())
232 }
233
234 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238 where
239 T: 'static,
240 {
241 let accessor = Accessor {
242 get_data: self.get_data,
243 token: StoreToken::new(self.store.as_context_mut()),
244 };
245 self.store
246 .as_context_mut()
247 .spawn_with_accessor(accessor, task)
248 }
249
250 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253 self.get_data
254 }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259 D: HasData + ?Sized,
260 T: 'static,
261{
262 type Data = T;
263
264 fn as_context(&self) -> StoreContext<'_, T> {
265 self.store.as_context()
266 }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271 D: HasData + ?Sized,
272 T: 'static,
273{
274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275 self.store.as_context_mut()
276 }
277}
278
279pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340 D: HasData + ?Sized,
341{
342 token: StoreToken<T>,
343 get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346pub trait AsAccessor {
363 type Data: 'static;
365
366 type AccessorData: HasData + ?Sized;
369
370 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375 type Data = T::Data;
376 type AccessorData = T::AccessorData;
377
378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379 T::as_accessor(self)
380 }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384 type Data = T;
385 type AccessorData = D;
386
387 fn as_accessor(&self) -> &Accessor<T, D> {
388 self
389 }
390}
391
392const _: () = {
415 const fn assert<T: Send + Sync>() {}
416 assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420 pub(crate) fn new(token: StoreToken<T>) -> Self {
429 Self {
430 token,
431 get_data: |x| x,
432 }
433 }
434}
435
436impl<T, D> Accessor<T, D>
437where
438 D: HasData + ?Sized,
439{
440 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458 tls::get(|vmstore| {
459 fun(Access {
460 store: self.token.as_context_mut(vmstore),
461 get_data: self.get_data,
462 })
463 })
464 }
465
466 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469 self.get_data
470 }
471
472 pub fn with_getter<D2: HasData>(
489 &self,
490 get_data: fn(&mut T) -> D2::Data<'_>,
491 ) -> Accessor<T, D2> {
492 Accessor {
493 token: self.token,
494 get_data,
495 }
496 }
497
498 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514 where
515 T: 'static,
516 {
517 let accessor = self.clone_for_spawn();
518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519 }
520
521 fn clone_for_spawn(&self) -> Self {
522 Self {
523 token: self.token,
524 get_data: self.get_data,
525 }
526 }
527}
528
529pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541where
542 D: HasData + ?Sized,
543{
544 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546}
547
548enum CallerInfo {
551 Async {
553 params: Vec<ValRaw>,
554 has_result: bool,
555 },
556 Sync {
558 params: Vec<ValRaw>,
559 result_count: u32,
560 },
561}
562
563enum WaitMode {
565 Fiber(StoreFiber<'static>),
567 Callback(Instance),
570}
571
572#[derive(Debug)]
574enum SuspendReason {
575 Waiting {
578 set: TableId<WaitableSet>,
579 thread: QualifiedThreadId,
580 skip_may_block_check: bool,
581 },
582 NeedWork,
585 Yielding {
588 thread: QualifiedThreadId,
589 skip_may_block_check: bool,
590 },
591 ExplicitlySuspending {
593 thread: QualifiedThreadId,
594 skip_may_block_check: bool,
595 },
596}
597
598enum GuestCallKind {
600 DeliverEvent {
603 instance: Instance,
605 set: Option<TableId<WaitableSet>>,
610 },
611 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618}
619
620impl fmt::Debug for GuestCallKind {
621 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622 match self {
623 Self::DeliverEvent { instance, set } => f
624 .debug_struct("DeliverEvent")
625 .field("instance", instance)
626 .field("set", set)
627 .finish(),
628 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630 }
631 }
632}
633
634#[derive(Copy, Clone, Debug)]
636pub enum SuspensionTarget {
637 SomeSuspended(u32),
638 Some(u32),
639 None,
640}
641
642impl SuspensionTarget {
643 fn is_none(&self) -> bool {
644 matches!(self, SuspensionTarget::None)
645 }
646 fn is_some(&self) -> bool {
647 !self.is_none()
648 }
649}
650
651#[derive(Debug)]
653struct GuestCall {
654 thread: QualifiedThreadId,
655 kind: GuestCallKind,
656}
657
658impl GuestCall {
659 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669 let instance = store
670 .concurrent_state_mut()
671 .get_mut(self.thread.task)?
672 .instance;
673 let state = store.instance_state(instance).concurrent_state();
674
675 let ready = match &self.kind {
676 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678 GuestCallKind::StartExplicit(_) => true,
679 };
680 log::trace!(
681 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682 state.do_not_enter,
683 state.backpressure
684 );
685 Ok(ready)
686 }
687}
688
689enum WorkerItem {
691 GuestCall(GuestCall),
692 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693}
694
695enum WorkItem {
698 PushFuture(AlwaysMut<HostTaskFuture>),
700 ResumeFiber(StoreFiber<'static>),
702 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708}
709
710impl fmt::Debug for WorkItem {
711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712 match self {
713 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715 Self::ResumeThread(instance, thread) => f
716 .debug_tuple("ResumeThread")
717 .field(instance)
718 .field(thread)
719 .finish(),
720 Self::GuestCall(instance, call) => f
721 .debug_tuple("GuestCall")
722 .field(instance)
723 .field(call)
724 .finish(),
725 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726 }
727 }
728}
729
730#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732pub(crate) enum WaitResult {
733 Cancelled,
734 Completed,
735}
736
737pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745 store: &mut dyn VMStore,
746 future: impl Future<Output = Result<R>> + Send + 'static,
747) -> Result<R> {
748 let state = store.concurrent_state_mut();
749 let task = state.current_host_thread()?;
750
751 let mut future = Box::pin(async move {
755 let result = future.await?;
756 tls::get(move |store| {
757 let state = store.concurrent_state_mut();
758 let host_state = &mut state.get_mut(task)?.state;
759 assert!(matches!(host_state, HostTaskState::CalleeStarted));
760 *host_state = HostTaskState::CalleeFinished(Box::new(result));
761
762 Waitable::Host(task).set_event(
763 state,
764 Some(Event::Subtask {
765 status: Status::Returned,
766 }),
767 )?;
768
769 Ok(())
770 })
771 }) as HostTaskFuture;
772
773 let poll = tls::set(store, || {
777 future
778 .as_mut()
779 .poll(&mut Context::from_waker(&Waker::noop()))
780 });
781
782 match poll {
783 Poll::Ready(result) => result?,
785
786 Poll::Pending => {
791 let state = store.concurrent_state_mut();
792 state.push_future(future);
793
794 let caller = state.get_mut(task)?.caller;
795 let set = state.get_mut(caller.thread)?.sync_call_set;
796 Waitable::Host(task).join(state, Some(set))?;
797
798 store.suspend(SuspendReason::Waiting {
799 set,
800 thread: caller,
801 skip_may_block_check: false,
802 })?;
803
804 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808 }
809 }
810
811 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
814 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
815 Ok(result) => *result,
816 Err(_) => bail_bug!("host task finished with wrong type of result"),
817 }),
818 _ => bail_bug!("unexpected host task state after completion"),
819 }
820}
821
822fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
824 let mut next = Some(call);
825 while let Some(call) = next.take() {
826 match call.kind {
827 GuestCallKind::DeliverEvent { instance, set } => {
828 let (event, waitable) =
829 match instance.get_event(store, call.thread.task, set, true)? {
830 Some(pair) => pair,
831 None => bail_bug!("delivering non-present event"),
832 };
833 let state = store.concurrent_state_mut();
834 let task = state.get_mut(call.thread.task)?;
835 let runtime_instance = task.instance;
836 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
837
838 log::trace!(
839 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
840 call.thread,
841 );
842
843 let old_thread = store.set_thread(call.thread)?;
844 log::trace!(
845 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
846 call.thread
847 );
848
849 store.enter_instance(runtime_instance);
850
851 let Some(callback) = store
852 .concurrent_state_mut()
853 .get_mut(call.thread.task)?
854 .callback
855 .take()
856 else {
857 bail_bug!("guest task callback field not present")
858 };
859
860 let code = callback(store, event, handle)?;
861
862 store
863 .concurrent_state_mut()
864 .get_mut(call.thread.task)?
865 .callback = Some(callback);
866
867 store.exit_instance(runtime_instance)?;
868
869 store.set_thread(old_thread)?;
870
871 next = instance.handle_callback_code(
872 store,
873 call.thread,
874 runtime_instance.index,
875 code,
876 )?;
877
878 log::trace!(
879 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
880 );
881 }
882 GuestCallKind::StartImplicit(fun) => {
883 next = fun(store)?;
884 }
885 GuestCallKind::StartExplicit(fun) => {
886 fun(store)?;
887 }
888 }
889 }
890
891 Ok(())
892}
893
894impl<T> Store<T> {
895 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
897 where
898 T: Send + 'static,
899 {
900 ensure!(
901 self.as_context().0.concurrency_support(),
902 "cannot use `run_concurrent` when Config::concurrency_support disabled",
903 );
904 self.as_context_mut().run_concurrent(fun).await
905 }
906
907 #[doc(hidden)]
908 pub fn assert_concurrent_state_empty(&mut self) {
909 self.as_context_mut().assert_concurrent_state_empty();
910 }
911
912 #[doc(hidden)]
913 pub fn concurrent_state_table_size(&mut self) -> usize {
914 self.as_context_mut().concurrent_state_table_size()
915 }
916
917 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
919 where
920 T: 'static,
921 {
922 self.as_context_mut().spawn(task)
923 }
924}
925
926impl<T> StoreContextMut<'_, T> {
927 #[doc(hidden)]
938 pub fn assert_concurrent_state_empty(self) {
939 let store = self.0;
940 store
941 .store_data_mut()
942 .components
943 .assert_instance_states_empty();
944 let state = store.concurrent_state_mut();
945 assert!(
946 state.table.get_mut().is_empty(),
947 "non-empty table: {:?}",
948 state.table.get_mut()
949 );
950 assert!(state.high_priority.is_empty());
951 assert!(state.low_priority.is_empty());
952 assert!(state.current_thread.is_none());
953 assert!(state.futures_mut().unwrap().is_empty());
954 assert!(state.global_error_context_ref_counts.is_empty());
955 }
956
957 #[doc(hidden)]
962 pub fn concurrent_state_table_size(&mut self) -> usize {
963 self.0
964 .concurrent_state_mut()
965 .table
966 .get_mut()
967 .iter_mut()
968 .count()
969 }
970
971 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
981 where
982 T: 'static,
983 {
984 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
985 self.spawn_with_accessor(accessor, task)
986 }
987
988 fn spawn_with_accessor<D>(
991 self,
992 accessor: Accessor<T, D>,
993 task: impl AccessorTask<T, D>,
994 ) -> JoinHandle
995 where
996 T: 'static,
997 D: HasData + ?Sized,
998 {
999 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1003 self.0
1004 .concurrent_state_mut()
1005 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1006 handle
1007 }
1008
1009 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1093 where
1094 T: Send + 'static,
1095 {
1096 ensure!(
1097 self.0.concurrency_support(),
1098 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1099 );
1100 self.do_run_concurrent(fun, false).await
1101 }
1102
1103 pub(super) async fn run_concurrent_trap_on_idle<R>(
1104 self,
1105 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1106 ) -> Result<R>
1107 where
1108 T: Send + 'static,
1109 {
1110 self.do_run_concurrent(fun, true).await
1111 }
1112
1113 async fn do_run_concurrent<R>(
1114 mut self,
1115 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1116 trap_on_idle: bool,
1117 ) -> Result<R>
1118 where
1119 T: Send + 'static,
1120 {
1121 debug_assert!(self.0.concurrency_support());
1122 check_recursive_run();
1123 let token = StoreToken::new(self.as_context_mut());
1124
1125 struct Dropper<'a, T: 'static, V> {
1126 store: StoreContextMut<'a, T>,
1127 value: ManuallyDrop<V>,
1128 }
1129
1130 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1131 fn drop(&mut self) {
1132 tls::set(self.store.0, || {
1133 unsafe { ManuallyDrop::drop(&mut self.value) }
1138 });
1139 }
1140 }
1141
1142 let accessor = &Accessor::new(token);
1143 let dropper = &mut Dropper {
1144 store: self,
1145 value: ManuallyDrop::new(fun(accessor)),
1146 };
1147 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1149
1150 dropper
1151 .store
1152 .as_context_mut()
1153 .poll_until(future, trap_on_idle)
1154 .await
1155 }
1156
1157 async fn poll_until<R>(
1163 mut self,
1164 mut future: Pin<&mut impl Future<Output = R>>,
1165 trap_on_idle: bool,
1166 ) -> Result<R>
1167 where
1168 T: Send + 'static,
1169 {
1170 struct Reset<'a, T: 'static> {
1171 store: StoreContextMut<'a, T>,
1172 futures: Option<FuturesUnordered<HostTaskFuture>>,
1173 }
1174
1175 impl<'a, T> Drop for Reset<'a, T> {
1176 fn drop(&mut self) {
1177 if let Some(futures) = self.futures.take() {
1178 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1179 }
1180 }
1181 }
1182
1183 loop {
1184 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1188 let mut reset = Reset {
1189 store: self.as_context_mut(),
1190 futures,
1191 };
1192 let mut next = match reset.futures.as_mut() {
1193 Some(f) => pin!(f.next()),
1194 None => bail_bug!("concurrent state missing futures field"),
1195 };
1196
1197 enum PollResult<R> {
1198 Complete(R),
1199 ProcessWork(Vec<WorkItem>),
1200 }
1201 let result = future::poll_fn(|cx| {
1202 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1205 return Poll::Ready(Ok(PollResult::Complete(value)));
1206 }
1207
1208 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1212 Poll::Ready(Some(output)) => {
1213 match output {
1214 Err(e) => return Poll::Ready(Err(e)),
1215 Ok(()) => {}
1216 }
1217 Poll::Ready(true)
1218 }
1219 Poll::Ready(None) => Poll::Ready(false),
1220 Poll::Pending => Poll::Pending,
1221 };
1222
1223 let state = reset.store.0.concurrent_state_mut();
1227 let ready = state.collect_work_items_to_run();
1228 if !ready.is_empty() {
1229 return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1230 }
1231
1232 return match next {
1236 Poll::Ready(true) => {
1237 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1243 }
1244 Poll::Ready(false) => {
1245 if let Poll::Ready(value) =
1249 tls::set(reset.store.0, || future.as_mut().poll(cx))
1250 {
1251 Poll::Ready(Ok(PollResult::Complete(value)))
1252 } else {
1253 if trap_on_idle {
1259 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1262 } else {
1263 Poll::Pending
1267 }
1268 }
1269 }
1270 Poll::Pending => Poll::Pending,
1275 };
1276 })
1277 .await;
1278
1279 drop(reset);
1283
1284 match result? {
1285 PollResult::Complete(value) => break Ok(value),
1288 PollResult::ProcessWork(ready) => {
1291 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1292 store: StoreContextMut<'a, T>,
1293 ready: I,
1294 }
1295
1296 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1297 fn drop(&mut self) {
1298 while let Some(item) = self.ready.next() {
1299 match item {
1300 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1301 WorkItem::PushFuture(future) => {
1302 tls::set(self.store.0, move || drop(future))
1303 }
1304 _ => {}
1305 }
1306 }
1307 }
1308 }
1309
1310 let mut dispose = Dispose {
1311 store: self.as_context_mut(),
1312 ready: ready.into_iter(),
1313 };
1314
1315 while let Some(item) = dispose.ready.next() {
1316 dispose
1317 .store
1318 .as_context_mut()
1319 .handle_work_item(item)
1320 .await?;
1321 }
1322 }
1323 }
1324 }
1325 }
1326
1327 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1329 where
1330 T: Send,
1331 {
1332 log::trace!("handle work item {item:?}");
1333 match item {
1334 WorkItem::PushFuture(future) => {
1335 self.0
1336 .concurrent_state_mut()
1337 .futures_mut()?
1338 .push(future.into_inner());
1339 }
1340 WorkItem::ResumeFiber(fiber) => {
1341 self.0.resume_fiber(fiber).await?;
1342 }
1343 WorkItem::ResumeThread(_, thread) => {
1344 if let GuestThreadState::Ready(fiber) = mem::replace(
1345 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1346 GuestThreadState::Running,
1347 ) {
1348 self.0.resume_fiber(fiber).await?;
1349 } else {
1350 bail_bug!("cannot resume non-pending thread {thread:?}");
1351 }
1352 }
1353 WorkItem::GuestCall(_, call) => {
1354 if call.is_ready(self.0)? {
1355 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1356 } else {
1357 let state = self.0.concurrent_state_mut();
1358 let task = state.get_mut(call.thread.task)?;
1359 if !task.starting_sent {
1360 task.starting_sent = true;
1361 if let GuestCallKind::StartImplicit(_) = &call.kind {
1362 Waitable::Guest(call.thread.task).set_event(
1363 state,
1364 Some(Event::Subtask {
1365 status: Status::Starting,
1366 }),
1367 )?;
1368 }
1369 }
1370
1371 let instance = state.get_mut(call.thread.task)?.instance;
1372 self.0
1373 .instance_state(instance)
1374 .concurrent_state()
1375 .pending
1376 .insert(call.thread, call.kind);
1377 }
1378 }
1379 WorkItem::WorkerFunction(fun) => {
1380 self.run_on_worker(WorkerItem::Function(fun)).await?;
1381 }
1382 }
1383
1384 Ok(())
1385 }
1386
1387 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1389 where
1390 T: Send,
1391 {
1392 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1393 fiber
1394 } else {
1395 fiber::make_fiber(self.0, move |store| {
1396 loop {
1397 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1398 bail_bug!("worker_item not present when resuming fiber")
1399 };
1400 match item {
1401 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1402 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1403 }
1404
1405 store.suspend(SuspendReason::NeedWork)?;
1406 }
1407 })?
1408 };
1409
1410 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1411 assert!(worker_item.is_none());
1412 *worker_item = Some(item);
1413
1414 self.0.resume_fiber(worker).await
1415 }
1416
1417 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1422 where
1423 T: 'static,
1424 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1425 + Send
1426 + Sync
1427 + 'static,
1428 R: Send + Sync + 'static,
1429 {
1430 let token = StoreToken::new(self);
1431 async move {
1432 let mut accessor = Accessor::new(token);
1433 closure(&mut accessor).await
1434 }
1435 }
1436}
1437
1438impl StoreOpaque {
1439 pub(crate) fn enter_guest_sync_call(
1446 &mut self,
1447 guest_caller: Option<RuntimeInstance>,
1448 callee_async: bool,
1449 callee: RuntimeInstance,
1450 ) -> Result<()> {
1451 log::trace!("enter sync call {callee:?}");
1452 if !self.concurrency_support() {
1453 return Ok(self.enter_call_not_concurrent());
1454 }
1455
1456 let state = self.concurrent_state_mut();
1457 let thread = state.current_thread;
1458 let instance = if let Some(thread) = thread.guest() {
1459 Some(state.get_mut(thread.task)?.instance)
1460 } else {
1461 None
1462 };
1463 if guest_caller.is_some() {
1464 debug_assert_eq!(instance, guest_caller);
1465 }
1466 let task = GuestTask::new(
1467 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1468 LiftResult {
1469 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1470 ty: TypeTupleIndex::reserved_value(),
1471 memory: None,
1472 string_encoding: StringEncoding::Utf8,
1473 },
1474 if let Some(thread) = thread.guest() {
1475 Caller::Guest { thread: *thread }
1476 } else {
1477 Caller::Host {
1478 tx: None,
1479 host_future_present: false,
1480 caller: thread,
1481 }
1482 },
1483 None,
1484 callee,
1485 callee_async,
1486 )?;
1487
1488 let guest_task = state.push(task)?;
1489 let new_thread = GuestThread::new_implicit(state, guest_task)?;
1490 let guest_thread = state.push(new_thread)?;
1491 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1492 guest_thread,
1493 self,
1494 callee.index,
1495 )?;
1496
1497 let state = self.concurrent_state_mut();
1498 state.get_mut(guest_task)?.threads.insert(guest_thread);
1499
1500 self.set_thread(QualifiedThreadId {
1501 task: guest_task,
1502 thread: guest_thread,
1503 })?;
1504
1505 Ok(())
1506 }
1507
1508 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1510 if !self.concurrency_support() {
1511 return Ok(self.exit_call_not_concurrent());
1512 }
1513 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1514 Some(t) => *t,
1515 None => bail_bug!("expected task when exiting"),
1516 };
1517 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1518 log::trace!("exit sync call {instance:?}");
1519 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1520 self,
1521 thread,
1522 instance.index,
1523 )?;
1524
1525 let state = self.concurrent_state_mut();
1526 let task = state.get_mut(thread.task)?;
1527 let caller = match &task.caller {
1528 &Caller::Guest { thread } => thread.into(),
1529 &Caller::Host { caller, .. } => caller,
1530 };
1531 self.set_thread(caller)?;
1532
1533 let state = self.concurrent_state_mut();
1534 let task = state.get_mut(thread.task)?;
1535 if task.ready_to_delete() {
1536 state.delete(thread.task)?.dispose(state)?;
1537 }
1538
1539 Ok(())
1540 }
1541
1542 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1550 if !self.concurrency_support() {
1551 self.enter_call_not_concurrent();
1552 return Ok(None);
1553 }
1554 let state = self.concurrent_state_mut();
1555 let caller = state.current_guest_thread()?;
1556 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1557 log::trace!("new host task {task:?}");
1558 self.set_thread(task)?;
1559 Ok(Some(task))
1560 }
1561
1562 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1568 if !self.concurrency_support() {
1569 return Ok(());
1570 }
1571 let task = self.concurrent_state_mut().current_host_thread()?;
1572 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1573 self.set_thread(caller)?;
1574 Ok(())
1575 }
1576
1577 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1584 match task {
1585 Some(task) => {
1586 log::trace!("delete host task {task:?}");
1587 self.concurrent_state_mut().delete(task)?;
1588 }
1589 None => {
1590 self.exit_call_not_concurrent();
1591 }
1592 }
1593 Ok(())
1594 }
1595
1596 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1604 if self.trapped() {
1605 return Ok(false);
1606 }
1607 if !self.concurrency_support() {
1608 return Ok(true);
1609 }
1610 let state = self.concurrent_state_mut();
1611 let mut cur = state.current_thread;
1612 loop {
1613 match cur {
1614 CurrentThread::None => break Ok(true),
1615 CurrentThread::Guest(thread) => {
1616 let task = state.get_mut(thread.task)?;
1617
1618 if task.instance.instance == instance.instance {
1625 break Ok(false);
1626 }
1627 cur = match task.caller {
1628 Caller::Host { caller, .. } => caller,
1629 Caller::Guest { thread } => thread.into(),
1630 };
1631 }
1632 CurrentThread::Host(id) => {
1633 cur = state.get_mut(id)?.caller.into();
1634 }
1635 }
1636 }
1637 }
1638
1639 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1642 self.component_instance_mut(instance.instance)
1643 .instance_state(instance.index)
1644 }
1645
1646 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1647 let state = self.concurrent_state_mut();
1652 let old_thread = mem::replace(&mut state.current_thread, thread.into());
1653 if let Some(old_thread) = old_thread.guest() {
1654 let instance = state.get_mut(old_thread.task)?.instance.instance;
1655 self.component_instance_mut(instance)
1656 .set_task_may_block(false)
1657 }
1658
1659 if self.concurrent_state_mut().current_thread.guest().is_some() {
1662 self.set_task_may_block()?;
1663 }
1664
1665 Ok(old_thread)
1666 }
1667
1668 fn set_task_may_block(&mut self) -> Result<()> {
1671 let state = self.concurrent_state_mut();
1672 let guest_thread = state.current_guest_thread()?;
1673 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1674 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1675 self.component_instance_mut(instance)
1676 .set_task_may_block(may_block);
1677 Ok(())
1678 }
1679
1680 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1681 if !self.concurrency_support() {
1682 return Ok(());
1683 }
1684 let state = self.concurrent_state_mut();
1685 let task = state.current_guest_thread()?.task;
1686 let instance = state.get_mut(task)?.instance.instance;
1687 let task_may_block = self.component_instance(instance).get_task_may_block();
1688
1689 if task_may_block {
1690 Ok(())
1691 } else {
1692 Err(Trap::CannotBlockSyncTask.into())
1693 }
1694 }
1695
1696 fn enter_instance(&mut self, instance: RuntimeInstance) {
1700 log::trace!("enter {instance:?}");
1701 self.instance_state(instance)
1702 .concurrent_state()
1703 .do_not_enter = true;
1704 }
1705
1706 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1710 log::trace!("exit {instance:?}");
1711 self.instance_state(instance)
1712 .concurrent_state()
1713 .do_not_enter = false;
1714 self.partition_pending(instance)
1715 }
1716
1717 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1722 for (thread, kind) in
1723 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1724 {
1725 let call = GuestCall { thread, kind };
1726 if call.is_ready(self)? {
1727 self.concurrent_state_mut()
1728 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1729 } else {
1730 self.instance_state(instance)
1731 .concurrent_state()
1732 .pending
1733 .insert(call.thread, call.kind);
1734 }
1735 }
1736
1737 Ok(())
1738 }
1739
1740 pub(crate) fn backpressure_modify(
1742 &mut self,
1743 caller_instance: RuntimeInstance,
1744 modify: impl FnOnce(u16) -> Option<u16>,
1745 ) -> Result<()> {
1746 let state = self.instance_state(caller_instance).concurrent_state();
1747 let old = state.backpressure;
1748 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1749 state.backpressure = new;
1750
1751 if old > 0 && new == 0 {
1752 self.partition_pending(caller_instance)?;
1755 }
1756
1757 Ok(())
1758 }
1759
1760 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1763 let old_thread = self.concurrent_state_mut().current_thread;
1764 log::trace!("resume_fiber: save current thread {old_thread:?}");
1765
1766 let fiber = fiber::resolve_or_release(self, fiber).await?;
1767
1768 self.set_thread(old_thread)?;
1769
1770 let state = self.concurrent_state_mut();
1771
1772 if let Some(ot) = old_thread.guest() {
1773 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1774 }
1775 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1776
1777 if let Some(mut fiber) = fiber {
1778 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1779 let reason = match state.suspend_reason.take() {
1781 Some(r) => r,
1782 None => bail_bug!("suspend reason missing when resuming fiber"),
1783 };
1784 match reason {
1785 SuspendReason::NeedWork => {
1786 if state.worker.is_none() {
1787 state.worker = Some(fiber);
1788 } else {
1789 fiber.dispose(self);
1790 }
1791 }
1792 SuspendReason::Yielding { thread, .. } => {
1793 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1794 let instance = state.get_mut(thread.task)?.instance.index;
1795 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1796 }
1797 SuspendReason::ExplicitlySuspending { thread, .. } => {
1798 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1799 }
1800 SuspendReason::Waiting { set, thread, .. } => {
1801 let old = state
1802 .get_mut(set)?
1803 .waiting
1804 .insert(thread, WaitMode::Fiber(fiber));
1805 assert!(old.is_none());
1806 }
1807 };
1808 } else {
1809 log::trace!("resume_fiber: fiber has exited");
1810 }
1811
1812 Ok(())
1813 }
1814
1815 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1821 log::trace!("suspend fiber: {reason:?}");
1822
1823 let task = match &reason {
1827 SuspendReason::Yielding { thread, .. }
1828 | SuspendReason::Waiting { thread, .. }
1829 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1830 SuspendReason::NeedWork => None,
1831 };
1832
1833 let old_guest_thread = if task.is_some() {
1834 self.concurrent_state_mut().current_thread
1835 } else {
1836 CurrentThread::None
1837 };
1838
1839 debug_assert!(
1845 matches!(
1846 reason,
1847 SuspendReason::ExplicitlySuspending {
1848 skip_may_block_check: true,
1849 ..
1850 } | SuspendReason::Waiting {
1851 skip_may_block_check: true,
1852 ..
1853 } | SuspendReason::Yielding {
1854 skip_may_block_check: true,
1855 ..
1856 }
1857 ) || old_guest_thread
1858 .guest()
1859 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1860 .transpose()?
1861 .unwrap_or(true)
1862 );
1863
1864 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1865 assert!(suspend_reason.is_none());
1866 *suspend_reason = Some(reason);
1867
1868 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1869
1870 if task.is_some() {
1871 self.set_thread(old_guest_thread)?;
1872 }
1873
1874 Ok(())
1875 }
1876
1877 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1878 let state = self.concurrent_state_mut();
1879 let caller = state.current_guest_thread()?;
1880 let old_set = waitable.common(state)?.set;
1881 let set = state.get_mut(caller.thread)?.sync_call_set;
1882 waitable.join(state, Some(set))?;
1883 self.suspend(SuspendReason::Waiting {
1884 set,
1885 thread: caller,
1886 skip_may_block_check: false,
1887 })?;
1888 let state = self.concurrent_state_mut();
1889 waitable.join(state, old_set)
1890 }
1891}
1892
1893impl Instance {
1894 fn get_event(
1897 self,
1898 store: &mut StoreOpaque,
1899 guest_task: TableId<GuestTask>,
1900 set: Option<TableId<WaitableSet>>,
1901 cancellable: bool,
1902 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1903 let state = store.concurrent_state_mut();
1904
1905 let event = &mut state.get_mut(guest_task)?.event;
1906 if let Some(ev) = event
1907 && (cancellable || !matches!(ev, Event::Cancelled))
1908 {
1909 log::trace!("deliver event {ev:?} to {guest_task:?}");
1910 let ev = *ev;
1911 *event = None;
1912 return Ok(Some((ev, None)));
1913 }
1914
1915 let set = match set {
1916 Some(set) => set,
1917 None => return Ok(None),
1918 };
1919 let waitable = match state.get_mut(set)?.ready.pop_first() {
1920 Some(v) => v,
1921 None => return Ok(None),
1922 };
1923
1924 let common = waitable.common(state)?;
1925 let handle = match common.handle {
1926 Some(h) => h,
1927 None => bail_bug!("handle not set when delivering event"),
1928 };
1929 let event = match common.event.take() {
1930 Some(e) => e,
1931 None => bail_bug!("event not set when delivering event"),
1932 };
1933
1934 log::trace!(
1935 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1936 );
1937
1938 waitable.on_delivery(store, self, event)?;
1939
1940 Ok(Some((event, Some((waitable, handle)))))
1941 }
1942
1943 fn handle_callback_code(
1949 self,
1950 store: &mut StoreOpaque,
1951 guest_thread: QualifiedThreadId,
1952 runtime_instance: RuntimeComponentInstanceIndex,
1953 code: u32,
1954 ) -> Result<Option<GuestCall>> {
1955 let (code, set) = unpack_callback_code(code);
1956
1957 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1958
1959 let state = store.concurrent_state_mut();
1960
1961 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
1962 let set = store
1963 .instance_state(RuntimeInstance {
1964 instance: self.id().instance(),
1965 index: runtime_instance,
1966 })
1967 .handle_table()
1968 .waitable_set_rep(handle)?;
1969
1970 Ok(TableId::<WaitableSet>::new(set))
1971 };
1972
1973 Ok(match code {
1974 callback_code::EXIT => {
1975 log::trace!("implicit thread {guest_thread:?} completed");
1976 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1977 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1978 if task.threads.is_empty() && !task.returned_or_cancelled() {
1979 bail!(Trap::NoAsyncResult);
1980 }
1981 if let Caller::Guest { .. } = task.caller {
1982 task.exited = true;
1983 task.callback = None;
1984 }
1985 if task.ready_to_delete() {
1986 Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
1987 }
1988 None
1989 }
1990 callback_code::YIELD => {
1991 let task = state.get_mut(guest_thread.task)?;
1992 if let Some(event) = task.event {
1997 assert!(matches!(event, Event::None | Event::Cancelled));
1998 } else {
1999 task.event = Some(Event::None);
2000 }
2001 let call = GuestCall {
2002 thread: guest_thread,
2003 kind: GuestCallKind::DeliverEvent {
2004 instance: self,
2005 set: None,
2006 },
2007 };
2008 if state.may_block(guest_thread.task)? {
2009 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2012 None
2013 } else {
2014 Some(call)
2018 }
2019 }
2020 callback_code::WAIT => {
2021 state.check_blocking_for(guest_thread.task)?;
2024
2025 let set = get_set(store, set)?;
2026 let state = store.concurrent_state_mut();
2027
2028 if state.get_mut(guest_thread.task)?.event.is_some()
2029 || !state.get_mut(set)?.ready.is_empty()
2030 {
2031 state.push_high_priority(WorkItem::GuestCall(
2033 runtime_instance,
2034 GuestCall {
2035 thread: guest_thread,
2036 kind: GuestCallKind::DeliverEvent {
2037 instance: self,
2038 set: Some(set),
2039 },
2040 },
2041 ));
2042 } else {
2043 let old = state
2051 .get_mut(guest_thread.thread)?
2052 .wake_on_cancel
2053 .replace(set);
2054 if !old.is_none() {
2055 bail_bug!("thread unexpectedly had wake_on_cancel set");
2056 }
2057 let old = state
2058 .get_mut(set)?
2059 .waiting
2060 .insert(guest_thread, WaitMode::Callback(self));
2061 if !old.is_none() {
2062 bail_bug!("set's waiting set already had this thread registered");
2063 }
2064 }
2065 None
2066 }
2067 _ => bail!(Trap::UnsupportedCallbackCode),
2068 })
2069 }
2070
2071 fn cleanup_thread(
2072 self,
2073 store: &mut StoreOpaque,
2074 guest_thread: QualifiedThreadId,
2075 runtime_instance: RuntimeComponentInstanceIndex,
2076 ) -> Result<()> {
2077 let state = store.concurrent_state_mut();
2078 let thread_data = state.get_mut(guest_thread.thread)?;
2079 let guest_id = match thread_data.instance_rep {
2080 Some(id) => id,
2081 None => bail_bug!("thread must have instance_rep set by now"),
2082 };
2083 let sync_call_set = thread_data.sync_call_set;
2084
2085 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2087 if let Some(Event::Subtask {
2088 status: Status::Returned | Status::ReturnCancelled,
2089 }) = waitable.common(state)?.event
2090 {
2091 waitable.delete_from(state)?;
2092 }
2093 }
2094
2095 store
2096 .instance_state(RuntimeInstance {
2097 instance: self.id().instance(),
2098 index: runtime_instance,
2099 })
2100 .thread_handle_table()
2101 .guest_thread_remove(guest_id)?;
2102
2103 store.concurrent_state_mut().delete(guest_thread.thread)?;
2104 store.concurrent_state_mut().delete(sync_call_set)?;
2105 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2106 task.threads.remove(&guest_thread.thread);
2107 Ok(())
2108 }
2109
2110 unsafe fn queue_call<T: 'static>(
2117 self,
2118 mut store: StoreContextMut<T>,
2119 guest_thread: QualifiedThreadId,
2120 callee: SendSyncPtr<VMFuncRef>,
2121 param_count: usize,
2122 result_count: usize,
2123 async_: bool,
2124 callback: Option<SendSyncPtr<VMFuncRef>>,
2125 post_return: Option<SendSyncPtr<VMFuncRef>>,
2126 ) -> Result<()> {
2127 unsafe fn make_call<T: 'static>(
2142 store: StoreContextMut<T>,
2143 guest_thread: QualifiedThreadId,
2144 callee: SendSyncPtr<VMFuncRef>,
2145 param_count: usize,
2146 result_count: usize,
2147 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2148 + Send
2149 + Sync
2150 + 'static
2151 + use<T> {
2152 let token = StoreToken::new(store);
2153 move |store: &mut dyn VMStore| {
2154 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2155
2156 store
2157 .concurrent_state_mut()
2158 .get_mut(guest_thread.thread)?
2159 .state = GuestThreadState::Running;
2160 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2161 let lower = match task.lower_params.take() {
2162 Some(l) => l,
2163 None => bail_bug!("lower_params missing"),
2164 };
2165
2166 lower(store, &mut storage[..param_count])?;
2167
2168 let mut store = token.as_context_mut(store);
2169
2170 unsafe {
2173 crate::Func::call_unchecked_raw(
2174 &mut store,
2175 callee.as_non_null(),
2176 NonNull::new(
2177 &mut storage[..param_count.max(result_count)]
2178 as *mut [MaybeUninit<ValRaw>] as _,
2179 )
2180 .unwrap(),
2181 )?;
2182 }
2183
2184 Ok(storage)
2185 }
2186 }
2187
2188 let call = unsafe {
2192 make_call(
2193 store.as_context_mut(),
2194 guest_thread,
2195 callee,
2196 param_count,
2197 result_count,
2198 )
2199 };
2200
2201 let callee_instance = store
2202 .0
2203 .concurrent_state_mut()
2204 .get_mut(guest_thread.task)?
2205 .instance;
2206
2207 let fun = if callback.is_some() {
2208 assert!(async_);
2209
2210 Box::new(move |store: &mut dyn VMStore| {
2211 self.add_guest_thread_to_instance_table(
2212 guest_thread.thread,
2213 store,
2214 callee_instance.index,
2215 )?;
2216 let old_thread = store.set_thread(guest_thread)?;
2217 log::trace!(
2218 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2219 );
2220
2221 store.enter_instance(callee_instance);
2222
2223 let storage = call(store)?;
2230
2231 store.exit_instance(callee_instance)?;
2232
2233 store.set_thread(old_thread)?;
2234 let state = store.concurrent_state_mut();
2235 if let Some(t) = old_thread.guest() {
2236 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2237 }
2238 log::trace!("stackless call: restored {old_thread:?} as current thread");
2239
2240 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2243
2244 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2245 })
2246 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2247 } else {
2248 let token = StoreToken::new(store.as_context_mut());
2249 Box::new(move |store: &mut dyn VMStore| {
2250 self.add_guest_thread_to_instance_table(
2251 guest_thread.thread,
2252 store,
2253 callee_instance.index,
2254 )?;
2255 let old_thread = store.set_thread(guest_thread)?;
2256 log::trace!(
2257 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2258 );
2259 let flags = self.id().get(store).instance_flags(callee_instance.index);
2260
2261 if !async_ {
2265 store.enter_instance(callee_instance);
2266 }
2267
2268 let storage = call(store)?;
2275
2276 if async_ {
2277 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2278 if task.threads.len() == 1 && !task.returned_or_cancelled() {
2279 bail!(Trap::NoAsyncResult);
2280 }
2281 } else {
2282 let lift = {
2288 store.exit_instance(callee_instance)?;
2289
2290 let state = store.concurrent_state_mut();
2291 if !state.get_mut(guest_thread.task)?.result.is_none() {
2292 bail_bug!("task has already produced a result");
2293 }
2294
2295 match state.get_mut(guest_thread.task)?.lift_result.take() {
2296 Some(lift) => lift,
2297 None => bail_bug!("lift_result field is missing"),
2298 }
2299 };
2300
2301 let result = (lift.lift)(store, unsafe {
2304 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2305 &storage[..result_count],
2306 )
2307 })?;
2308
2309 let post_return_arg = match result_count {
2310 0 => ValRaw::i32(0),
2311 1 => unsafe { storage[0].assume_init() },
2314 _ => unreachable!(),
2315 };
2316
2317 unsafe {
2318 call_post_return(
2319 token.as_context_mut(store),
2320 post_return.map(|v| v.as_non_null()),
2321 post_return_arg,
2322 flags,
2323 )?;
2324 }
2325
2326 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2327 }
2328
2329 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2331
2332 store.set_thread(old_thread)?;
2333
2334 let state = store.concurrent_state_mut();
2335 let task = state.get_mut(guest_thread.task)?;
2336
2337 match &task.caller {
2338 Caller::Host { .. } => {
2339 if task.ready_to_delete() {
2340 Waitable::Guest(guest_thread.task).delete_from(state)?;
2341 }
2342 }
2343 Caller::Guest { .. } => {
2344 task.exited = true;
2345 }
2346 }
2347
2348 Ok(None)
2349 })
2350 };
2351
2352 store
2353 .0
2354 .concurrent_state_mut()
2355 .push_high_priority(WorkItem::GuestCall(
2356 callee_instance.index,
2357 GuestCall {
2358 thread: guest_thread,
2359 kind: GuestCallKind::StartImplicit(fun),
2360 },
2361 ));
2362
2363 Ok(())
2364 }
2365
2366 unsafe fn prepare_call<T: 'static>(
2379 self,
2380 mut store: StoreContextMut<T>,
2381 start: NonNull<VMFuncRef>,
2382 return_: NonNull<VMFuncRef>,
2383 caller_instance: RuntimeComponentInstanceIndex,
2384 callee_instance: RuntimeComponentInstanceIndex,
2385 task_return_type: TypeTupleIndex,
2386 callee_async: bool,
2387 memory: *mut VMMemoryDefinition,
2388 string_encoding: StringEncoding,
2389 caller_info: CallerInfo,
2390 ) -> Result<()> {
2391 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2392 store.0.check_blocking()?;
2396 }
2397
2398 enum ResultInfo {
2399 Heap { results: u32 },
2400 Stack { result_count: u32 },
2401 }
2402
2403 let result_info = match &caller_info {
2404 CallerInfo::Async {
2405 has_result: true,
2406 params,
2407 } => ResultInfo::Heap {
2408 results: match params.last() {
2409 Some(r) => r.get_u32(),
2410 None => bail_bug!("retptr missing"),
2411 },
2412 },
2413 CallerInfo::Async {
2414 has_result: false, ..
2415 } => ResultInfo::Stack { result_count: 0 },
2416 CallerInfo::Sync {
2417 result_count,
2418 params,
2419 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2420 results: match params.last() {
2421 Some(r) => r.get_u32(),
2422 None => bail_bug!("arg ptr missing"),
2423 },
2424 },
2425 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2426 result_count: *result_count,
2427 },
2428 };
2429
2430 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2431
2432 let start = SendSyncPtr::new(start);
2436 let return_ = SendSyncPtr::new(return_);
2437 let token = StoreToken::new(store.as_context_mut());
2438 let state = store.0.concurrent_state_mut();
2439 let old_thread = state.current_guest_thread()?;
2440
2441 debug_assert_eq!(
2442 state.get_mut(old_thread.task)?.instance,
2443 RuntimeInstance {
2444 instance: self.id().instance(),
2445 index: caller_instance,
2446 }
2447 );
2448
2449 let new_task = GuestTask::new(
2450 Box::new(move |store, dst| {
2451 let mut store = token.as_context_mut(store);
2452 assert!(dst.len() <= MAX_FLAT_PARAMS);
2453 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2455 let count = match caller_info {
2456 CallerInfo::Async { params, has_result } => {
2460 let params = ¶ms[..params.len() - usize::from(has_result)];
2461 for (param, src) in params.iter().zip(&mut src) {
2462 src.write(*param);
2463 }
2464 params.len()
2465 }
2466
2467 CallerInfo::Sync { params, .. } => {
2469 for (param, src) in params.iter().zip(&mut src) {
2470 src.write(*param);
2471 }
2472 params.len()
2473 }
2474 };
2475 unsafe {
2482 crate::Func::call_unchecked_raw(
2483 &mut store,
2484 start.as_non_null(),
2485 NonNull::new(
2486 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2487 )
2488 .unwrap(),
2489 )?;
2490 }
2491 dst.copy_from_slice(&src[..dst.len()]);
2492 let state = store.0.concurrent_state_mut();
2493 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2494 state,
2495 Some(Event::Subtask {
2496 status: Status::Started,
2497 }),
2498 )?;
2499 Ok(())
2500 }),
2501 LiftResult {
2502 lift: Box::new(move |store, src| {
2503 let mut store = token.as_context_mut(store);
2506 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2508 my_src.push(ValRaw::u32(*results));
2509 }
2510
2511 let prev = store.0.set_thread(old_thread)?;
2517
2518 unsafe {
2525 crate::Func::call_unchecked_raw(
2526 &mut store,
2527 return_.as_non_null(),
2528 my_src.as_mut_slice().into(),
2529 )?;
2530 }
2531
2532 store.0.set_thread(prev)?;
2535
2536 let state = store.0.concurrent_state_mut();
2537 let thread = state.current_guest_thread()?;
2538 if sync_caller {
2539 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2540 if let ResultInfo::Stack { result_count } = &result_info {
2541 match result_count {
2542 0 => None,
2543 1 => Some(my_src[0]),
2544 _ => unreachable!(),
2545 }
2546 } else {
2547 None
2548 },
2549 );
2550 }
2551 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2552 }),
2553 ty: task_return_type,
2554 memory: NonNull::new(memory).map(SendSyncPtr::new),
2555 string_encoding,
2556 },
2557 Caller::Guest { thread: old_thread },
2558 None,
2559 RuntimeInstance {
2560 instance: self.id().instance(),
2561 index: callee_instance,
2562 },
2563 callee_async,
2564 )?;
2565
2566 let guest_task = state.push(new_task)?;
2567 let new_thread = GuestThread::new_implicit(state, guest_task)?;
2568 let guest_thread = state.push(new_thread)?;
2569 state.get_mut(guest_task)?.threads.insert(guest_thread);
2570
2571 store.0.set_thread(QualifiedThreadId {
2574 task: guest_task,
2575 thread: guest_thread,
2576 })?;
2577 log::trace!(
2578 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2579 );
2580
2581 Ok(())
2582 }
2583
2584 unsafe fn call_callback<T>(
2589 self,
2590 mut store: StoreContextMut<T>,
2591 function: SendSyncPtr<VMFuncRef>,
2592 event: Event,
2593 handle: u32,
2594 ) -> Result<u32> {
2595 let (ordinal, result) = event.parts();
2596 let params = &mut [
2597 ValRaw::u32(ordinal),
2598 ValRaw::u32(handle),
2599 ValRaw::u32(result),
2600 ];
2601 unsafe {
2606 crate::Func::call_unchecked_raw(
2607 &mut store,
2608 function.as_non_null(),
2609 params.as_mut_slice().into(),
2610 )?;
2611 }
2612 Ok(params[0].get_u32())
2613 }
2614
2615 unsafe fn start_call<T: 'static>(
2628 self,
2629 mut store: StoreContextMut<T>,
2630 callback: *mut VMFuncRef,
2631 post_return: *mut VMFuncRef,
2632 callee: NonNull<VMFuncRef>,
2633 param_count: u32,
2634 result_count: u32,
2635 flags: u32,
2636 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2637 ) -> Result<u32> {
2638 let token = StoreToken::new(store.as_context_mut());
2639 let async_caller = storage.is_none();
2640 let state = store.0.concurrent_state_mut();
2641 let guest_thread = state.current_guest_thread()?;
2642 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2643 let callee = SendSyncPtr::new(callee);
2644 let param_count = usize::try_from(param_count)?;
2645 assert!(param_count <= MAX_FLAT_PARAMS);
2646 let result_count = usize::try_from(result_count)?;
2647 assert!(result_count <= MAX_FLAT_RESULTS);
2648
2649 let task = state.get_mut(guest_thread.task)?;
2650 if let Some(callback) = NonNull::new(callback) {
2651 let callback = SendSyncPtr::new(callback);
2655 task.callback = Some(Box::new(move |store, event, handle| {
2656 let store = token.as_context_mut(store);
2657 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2658 }));
2659 }
2660
2661 let Caller::Guest { thread: caller } = &task.caller else {
2662 bail_bug!("start_call unexpectedly invoked for host->guest call");
2665 };
2666 let caller = *caller;
2667 let caller_instance = state.get_mut(caller.task)?.instance;
2668
2669 unsafe {
2671 self.queue_call(
2672 store.as_context_mut(),
2673 guest_thread,
2674 callee,
2675 param_count,
2676 result_count,
2677 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2678 NonNull::new(callback).map(SendSyncPtr::new),
2679 NonNull::new(post_return).map(SendSyncPtr::new),
2680 )?;
2681 }
2682
2683 let state = store.0.concurrent_state_mut();
2684
2685 let guest_waitable = Waitable::Guest(guest_thread.task);
2688 let old_set = guest_waitable.common(state)?.set;
2689 let set = state.get_mut(caller.thread)?.sync_call_set;
2690 guest_waitable.join(state, Some(set))?;
2691
2692 let (status, waitable) = loop {
2708 store.0.suspend(SuspendReason::Waiting {
2709 set,
2710 thread: caller,
2711 skip_may_block_check: async_caller || !callee_async,
2719 })?;
2720
2721 let state = store.0.concurrent_state_mut();
2722
2723 log::trace!("taking event for {:?}", guest_thread.task);
2724 let event = guest_waitable.take_event(state)?;
2725 let Some(Event::Subtask { status }) = event else {
2726 bail_bug!("subtasks should only get subtask events, got {event:?}")
2727 };
2728
2729 log::trace!("status {status:?} for {:?}", guest_thread.task);
2730
2731 if status == Status::Returned {
2732 break (status, None);
2734 } else if async_caller {
2735 let handle = store
2739 .0
2740 .instance_state(caller_instance)
2741 .handle_table()
2742 .subtask_insert_guest(guest_thread.task.rep())?;
2743 store
2744 .0
2745 .concurrent_state_mut()
2746 .get_mut(guest_thread.task)?
2747 .common
2748 .handle = Some(handle);
2749 break (status, Some(handle));
2750 } else {
2751 }
2755 };
2756
2757 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2758
2759 store.0.set_thread(caller)?;
2761 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2762 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2763
2764 if let Some(storage) = storage {
2765 let state = store.0.concurrent_state_mut();
2769 let task = state.get_mut(guest_thread.task)?;
2770 if let Some(result) = task.sync_result.take()? {
2771 if let Some(result) = result {
2772 storage[0] = MaybeUninit::new(result);
2773 }
2774
2775 if task.exited && task.ready_to_delete() {
2776 Waitable::Guest(guest_thread.task).delete_from(state)?;
2777 }
2778 }
2779 }
2780
2781 Ok(status.pack(waitable))
2782 }
2783
2784 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2797 self,
2798 mut store: StoreContextMut<'_, T>,
2799 future: impl Future<Output = Result<R>> + Send + 'static,
2800 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2801 ) -> Result<Option<u32>> {
2802 let token = StoreToken::new(store.as_context_mut());
2803 let state = store.0.concurrent_state_mut();
2804 let task = state.current_host_thread()?;
2805
2806 let (join_handle, future) = JoinHandle::run(future);
2809 {
2810 let state = &mut state.get_mut(task)?.state;
2811 assert!(matches!(state, HostTaskState::CalleeStarted));
2812 *state = HostTaskState::CalleeRunning(join_handle);
2813 }
2814
2815 let mut future = Box::pin(future);
2816
2817 let poll = tls::set(store.0, || {
2822 future
2823 .as_mut()
2824 .poll(&mut Context::from_waker(&Waker::noop()))
2825 });
2826
2827 match poll {
2828 Poll::Ready(result) => {
2830 let result = result.transpose()?;
2831 lower(store.as_context_mut(), result)?;
2832 return Ok(None);
2833 }
2834
2835 Poll::Pending => {}
2837 }
2838
2839 let future = Box::pin(async move {
2847 let result = match future.await {
2848 Some(result) => Some(result?),
2849 None => None,
2850 };
2851 let on_complete = move |store: &mut dyn VMStore| {
2852 let mut store = token.as_context_mut(store);
2856 let old = store.0.set_thread(task)?;
2857
2858 let status = if result.is_some() {
2859 Status::Returned
2860 } else {
2861 Status::ReturnCancelled
2862 };
2863
2864 lower(store.as_context_mut(), result)?;
2865 let state = store.0.concurrent_state_mut();
2866 match &mut state.get_mut(task)?.state {
2867 HostTaskState::CalleeDone { .. } => {}
2870
2871 other => *other = HostTaskState::CalleeDone { cancelled: false },
2873 }
2874 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2875
2876 store.0.set_thread(old)?;
2877 Ok(())
2878 };
2879
2880 tls::get(move |store| {
2885 store
2886 .concurrent_state_mut()
2887 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2888 on_complete,
2889 ))));
2890 Ok(())
2891 })
2892 });
2893
2894 let state = store.0.concurrent_state_mut();
2897 state.push_future(future);
2898 let caller = state.get_mut(task)?.caller;
2899 let instance = state.get_mut(caller.task)?.instance;
2900 let handle = store
2901 .0
2902 .instance_state(instance)
2903 .handle_table()
2904 .subtask_insert_host(task.rep())?;
2905 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2906 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2907
2908 store.0.set_thread(caller)?;
2912 Ok(Some(handle))
2913 }
2914
2915 pub(crate) fn task_return(
2918 self,
2919 store: &mut dyn VMStore,
2920 ty: TypeTupleIndex,
2921 options: OptionsIndex,
2922 storage: &[ValRaw],
2923 ) -> Result<()> {
2924 let state = store.concurrent_state_mut();
2925 let guest_thread = state.current_guest_thread()?;
2926 let lift = state
2927 .get_mut(guest_thread.task)?
2928 .lift_result
2929 .take()
2930 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
2931 if !state.get_mut(guest_thread.task)?.result.is_none() {
2932 bail_bug!("task result unexpectedly already set");
2933 }
2934
2935 let CanonicalOptions {
2936 string_encoding,
2937 data_model,
2938 ..
2939 } = &self.id().get(store).component().env_component().options[options];
2940
2941 let invalid = ty != lift.ty
2942 || string_encoding != &lift.string_encoding
2943 || match data_model {
2944 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2945 Some(memory) => {
2946 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2947 let actual = self.id().get(store).runtime_memory(memory);
2948 expected != actual.as_ptr()
2949 }
2950 None => false,
2953 },
2954 CanonicalOptionsDataModel::Gc { .. } => true,
2956 };
2957
2958 if invalid {
2959 bail!(Trap::TaskReturnInvalid);
2960 }
2961
2962 log::trace!("task.return for {guest_thread:?}");
2963
2964 let result = (lift.lift)(store, storage)?;
2965 self.task_complete(store, guest_thread.task, result, Status::Returned)
2966 }
2967
2968 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2970 let state = store.concurrent_state_mut();
2971 let guest_thread = state.current_guest_thread()?;
2972 let task = state.get_mut(guest_thread.task)?;
2973 if !task.cancel_sent {
2974 bail!(Trap::TaskCancelNotCancelled);
2975 }
2976 _ = task
2977 .lift_result
2978 .take()
2979 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
2980
2981 if !task.result.is_none() {
2982 bail_bug!("task result should not bet set yet");
2983 }
2984
2985 log::trace!("task.cancel for {guest_thread:?}");
2986
2987 self.task_complete(
2988 store,
2989 guest_thread.task,
2990 Box::new(DummyResult),
2991 Status::ReturnCancelled,
2992 )
2993 }
2994
2995 fn task_complete(
3001 self,
3002 store: &mut StoreOpaque,
3003 guest_task: TableId<GuestTask>,
3004 result: Box<dyn Any + Send + Sync>,
3005 status: Status,
3006 ) -> Result<()> {
3007 store
3008 .component_resource_tables(Some(self))
3009 .validate_scope_exit()?;
3010
3011 let state = store.concurrent_state_mut();
3012 let task = state.get_mut(guest_task)?;
3013
3014 if let Caller::Host { tx, .. } = &mut task.caller {
3015 if let Some(tx) = tx.take() {
3016 _ = tx.send(result);
3017 }
3018 } else {
3019 task.result = Some(result);
3020 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3021 }
3022
3023 Ok(())
3024 }
3025
3026 pub(crate) fn waitable_set_new(
3028 self,
3029 store: &mut StoreOpaque,
3030 caller_instance: RuntimeComponentInstanceIndex,
3031 ) -> Result<u32> {
3032 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3033 let handle = store
3034 .instance_state(RuntimeInstance {
3035 instance: self.id().instance(),
3036 index: caller_instance,
3037 })
3038 .handle_table()
3039 .waitable_set_insert(set.rep())?;
3040 log::trace!("new waitable set {set:?} (handle {handle})");
3041 Ok(handle)
3042 }
3043
3044 pub(crate) fn waitable_set_drop(
3046 self,
3047 store: &mut StoreOpaque,
3048 caller_instance: RuntimeComponentInstanceIndex,
3049 set: u32,
3050 ) -> Result<()> {
3051 let rep = store
3052 .instance_state(RuntimeInstance {
3053 instance: self.id().instance(),
3054 index: caller_instance,
3055 })
3056 .handle_table()
3057 .waitable_set_remove(set)?;
3058
3059 log::trace!("drop waitable set {rep} (handle {set})");
3060
3061 if !store
3065 .concurrent_state_mut()
3066 .get_mut(TableId::<WaitableSet>::new(rep))?
3067 .waiting
3068 .is_empty()
3069 {
3070 bail!(Trap::WaitableSetDropHasWaiters);
3071 }
3072
3073 store
3074 .concurrent_state_mut()
3075 .delete(TableId::<WaitableSet>::new(rep))?;
3076
3077 Ok(())
3078 }
3079
3080 pub(crate) fn waitable_join(
3082 self,
3083 store: &mut StoreOpaque,
3084 caller_instance: RuntimeComponentInstanceIndex,
3085 waitable_handle: u32,
3086 set_handle: u32,
3087 ) -> Result<()> {
3088 let mut instance = self.id().get_mut(store);
3089 let waitable =
3090 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3091
3092 let set = if set_handle == 0 {
3093 None
3094 } else {
3095 let set = instance.instance_states().0[caller_instance]
3096 .handle_table()
3097 .waitable_set_rep(set_handle)?;
3098
3099 Some(TableId::<WaitableSet>::new(set))
3100 };
3101
3102 log::trace!(
3103 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3104 );
3105
3106 waitable.join(store.concurrent_state_mut(), set)
3107 }
3108
3109 pub(crate) fn subtask_drop(
3111 self,
3112 store: &mut StoreOpaque,
3113 caller_instance: RuntimeComponentInstanceIndex,
3114 task_id: u32,
3115 ) -> Result<()> {
3116 self.waitable_join(store, caller_instance, task_id, 0)?;
3117
3118 let (rep, is_host) = store
3119 .instance_state(RuntimeInstance {
3120 instance: self.id().instance(),
3121 index: caller_instance,
3122 })
3123 .handle_table()
3124 .subtask_remove(task_id)?;
3125
3126 let concurrent_state = store.concurrent_state_mut();
3127 let (waitable, expected_caller, delete) = if is_host {
3128 let id = TableId::<HostTask>::new(rep);
3129 let task = concurrent_state.get_mut(id)?;
3130 match &task.state {
3131 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3132 HostTaskState::CalleeDone { .. } => {}
3133 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3134 bail_bug!("invalid state for callee in `subtask.drop`")
3135 }
3136 }
3137 (Waitable::Host(id), task.caller, true)
3138 } else {
3139 let id = TableId::<GuestTask>::new(rep);
3140 let task = concurrent_state.get_mut(id)?;
3141 if task.lift_result.is_some() {
3142 bail!(Trap::SubtaskDropNotResolved);
3143 }
3144 if let Caller::Guest { thread } = task.caller {
3145 (
3146 Waitable::Guest(id),
3147 thread,
3148 concurrent_state.get_mut(id)?.ready_to_delete(),
3149 )
3150 } else {
3151 bail_bug!("expected guest caller for `subtask.drop`")
3152 }
3153 };
3154
3155 waitable.common(concurrent_state)?.handle = None;
3156
3157 if waitable.take_event(concurrent_state)?.is_some() {
3160 bail!(Trap::SubtaskDropNotResolved);
3161 }
3162
3163 if delete {
3164 waitable.delete_from(concurrent_state)?;
3165 }
3166
3167 debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?);
3171 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3172 Ok(())
3173 }
3174
3175 pub(crate) fn waitable_set_wait(
3177 self,
3178 store: &mut StoreOpaque,
3179 options: OptionsIndex,
3180 set: u32,
3181 payload: u32,
3182 ) -> Result<u32> {
3183 if !self.options(store, options).async_ {
3184 store.check_blocking()?;
3188 }
3189
3190 let &CanonicalOptions {
3191 cancellable,
3192 instance: caller_instance,
3193 ..
3194 } = &self.id().get(store).component().env_component().options[options];
3195 let rep = store
3196 .instance_state(RuntimeInstance {
3197 instance: self.id().instance(),
3198 index: caller_instance,
3199 })
3200 .handle_table()
3201 .waitable_set_rep(set)?;
3202
3203 self.waitable_check(
3204 store,
3205 cancellable,
3206 WaitableCheck::Wait,
3207 WaitableCheckParams {
3208 set: TableId::new(rep),
3209 options,
3210 payload,
3211 },
3212 )
3213 }
3214
3215 pub(crate) fn waitable_set_poll(
3217 self,
3218 store: &mut StoreOpaque,
3219 options: OptionsIndex,
3220 set: u32,
3221 payload: u32,
3222 ) -> Result<u32> {
3223 let &CanonicalOptions {
3224 cancellable,
3225 instance: caller_instance,
3226 ..
3227 } = &self.id().get(store).component().env_component().options[options];
3228 let rep = store
3229 .instance_state(RuntimeInstance {
3230 instance: self.id().instance(),
3231 index: caller_instance,
3232 })
3233 .handle_table()
3234 .waitable_set_rep(set)?;
3235
3236 self.waitable_check(
3237 store,
3238 cancellable,
3239 WaitableCheck::Poll,
3240 WaitableCheckParams {
3241 set: TableId::new(rep),
3242 options,
3243 payload,
3244 },
3245 )
3246 }
3247
3248 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3250 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3251 match store
3252 .concurrent_state_mut()
3253 .get_mut(thread_id)?
3254 .instance_rep
3255 {
3256 Some(r) => Ok(r),
3257 None => bail_bug!("thread should have instance_rep by now"),
3258 }
3259 }
3260
3261 pub(crate) fn thread_new_indirect<T: 'static>(
3263 self,
3264 mut store: StoreContextMut<T>,
3265 runtime_instance: RuntimeComponentInstanceIndex,
3266 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3268 start_func_idx: u32,
3269 context: i32,
3270 ) -> Result<u32> {
3271 log::trace!("creating new thread");
3272
3273 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3274 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3275 let callee = instance
3276 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3277 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3278 if callee.type_index(store.0) != start_func_ty.type_index() {
3279 bail!(Trap::ThreadNewIndirectInvalidType);
3280 }
3281
3282 let token = StoreToken::new(store.as_context_mut());
3283 let start_func = Box::new(
3284 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3285 let old_thread = store.set_thread(guest_thread)?;
3286 log::trace!(
3287 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3288 );
3289
3290 let mut store = token.as_context_mut(store);
3291 let mut params = [ValRaw::i32(context)];
3292 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3295
3296 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3297 log::trace!("explicit thread {guest_thread:?} completed");
3298 let state = store.0.concurrent_state_mut();
3299 let task = state.get_mut(guest_thread.task)?;
3300 if task.threads.is_empty() && !task.returned_or_cancelled() {
3301 bail!(Trap::NoAsyncResult);
3302 }
3303 store.0.set_thread(old_thread)?;
3304 let state = store.0.concurrent_state_mut();
3305 if let Some(t) = old_thread.guest() {
3306 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3307 }
3308 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3309 Waitable::Guest(guest_thread.task).delete_from(state)?;
3310 }
3311 log::trace!("thread start: restored {old_thread:?} as current thread");
3312
3313 Ok(())
3314 },
3315 );
3316
3317 let state = store.0.concurrent_state_mut();
3318 let current_thread = state.current_guest_thread()?;
3319 let parent_task = current_thread.task;
3320
3321 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3322 let thread_id = state.push(new_thread)?;
3323 state.get_mut(parent_task)?.threads.insert(thread_id);
3324
3325 log::trace!("new thread with id {thread_id:?} created");
3326
3327 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3328 }
3329
3330 pub(crate) fn resume_thread(
3331 self,
3332 store: &mut StoreOpaque,
3333 runtime_instance: RuntimeComponentInstanceIndex,
3334 thread_idx: u32,
3335 high_priority: bool,
3336 allow_ready: bool,
3337 ) -> Result<()> {
3338 let thread_id =
3339 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3340 let state = store.concurrent_state_mut();
3341 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3342 let thread = state.get_mut(guest_thread.thread)?;
3343
3344 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3345 GuestThreadState::NotStartedExplicit(start_func) => {
3346 log::trace!("starting thread {guest_thread:?}");
3347 let guest_call = WorkItem::GuestCall(
3348 runtime_instance,
3349 GuestCall {
3350 thread: guest_thread,
3351 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3352 start_func(store, guest_thread)
3353 })),
3354 },
3355 );
3356 store
3357 .concurrent_state_mut()
3358 .push_work_item(guest_call, high_priority);
3359 }
3360 GuestThreadState::Suspended(fiber) => {
3361 log::trace!("resuming thread {thread_id:?} that was suspended");
3362 store
3363 .concurrent_state_mut()
3364 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3365 }
3366 GuestThreadState::Ready(fiber) if allow_ready => {
3367 log::trace!("resuming thread {thread_id:?} that was ready");
3368 thread.state = GuestThreadState::Ready(fiber);
3369 store
3370 .concurrent_state_mut()
3371 .promote_thread_work_item(guest_thread);
3372 }
3373 other => {
3374 thread.state = other;
3375 bail!(Trap::CannotResumeThread);
3376 }
3377 }
3378 Ok(())
3379 }
3380
3381 fn add_guest_thread_to_instance_table(
3382 self,
3383 thread_id: TableId<GuestThread>,
3384 store: &mut StoreOpaque,
3385 runtime_instance: RuntimeComponentInstanceIndex,
3386 ) -> Result<u32> {
3387 let guest_id = store
3388 .instance_state(RuntimeInstance {
3389 instance: self.id().instance(),
3390 index: runtime_instance,
3391 })
3392 .thread_handle_table()
3393 .guest_thread_insert(thread_id.rep())?;
3394 store
3395 .concurrent_state_mut()
3396 .get_mut(thread_id)?
3397 .instance_rep = Some(guest_id);
3398 Ok(guest_id)
3399 }
3400
3401 pub(crate) fn suspension_intrinsic(
3404 self,
3405 store: &mut StoreOpaque,
3406 caller: RuntimeComponentInstanceIndex,
3407 cancellable: bool,
3408 yielding: bool,
3409 to_thread: SuspensionTarget,
3410 ) -> Result<WaitResult> {
3411 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3412 if to_thread.is_none() {
3413 let state = store.concurrent_state_mut();
3414 if yielding {
3415 if !state.may_block(guest_thread.task)? {
3417 if !state.promote_instance_local_thread_work_item(caller) {
3420 return Ok(WaitResult::Completed);
3422 }
3423 }
3424 } else {
3425 store.check_blocking()?;
3429 }
3430 }
3431
3432 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3434 return Ok(WaitResult::Cancelled);
3435 }
3436
3437 match to_thread {
3438 SuspensionTarget::SomeSuspended(thread) => {
3439 self.resume_thread(store, caller, thread, true, false)?
3440 }
3441 SuspensionTarget::Some(thread) => {
3442 self.resume_thread(store, caller, thread, true, true)?
3443 }
3444 SuspensionTarget::None => { }
3445 }
3446
3447 let reason = if yielding {
3448 SuspendReason::Yielding {
3449 thread: guest_thread,
3450 skip_may_block_check: to_thread.is_some(),
3454 }
3455 } else {
3456 SuspendReason::ExplicitlySuspending {
3457 thread: guest_thread,
3458 skip_may_block_check: to_thread.is_some(),
3462 }
3463 };
3464
3465 store.suspend(reason)?;
3466
3467 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3468 Ok(WaitResult::Cancelled)
3469 } else {
3470 Ok(WaitResult::Completed)
3471 }
3472 }
3473
3474 fn waitable_check(
3476 self,
3477 store: &mut StoreOpaque,
3478 cancellable: bool,
3479 check: WaitableCheck,
3480 params: WaitableCheckParams,
3481 ) -> Result<u32> {
3482 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3483
3484 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3485
3486 let state = store.concurrent_state_mut();
3487 let task = state.get_mut(guest_thread.task)?;
3488
3489 match &check {
3492 WaitableCheck::Wait => {
3493 let set = params.set;
3494
3495 if (task.event.is_none()
3496 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3497 && state.get_mut(set)?.ready.is_empty()
3498 {
3499 if cancellable {
3500 let old = state
3501 .get_mut(guest_thread.thread)?
3502 .wake_on_cancel
3503 .replace(set);
3504 if !old.is_none() {
3505 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3506 }
3507 }
3508
3509 store.suspend(SuspendReason::Waiting {
3510 set,
3511 thread: guest_thread,
3512 skip_may_block_check: false,
3513 })?;
3514 }
3515 }
3516 WaitableCheck::Poll => {}
3517 }
3518
3519 log::trace!(
3520 "waitable check for {guest_thread:?}; set {:?}, part two",
3521 params.set
3522 );
3523
3524 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3526
3527 let (ordinal, handle, result) = match &check {
3528 WaitableCheck::Wait => {
3529 let (event, waitable) = match event {
3530 Some(p) => p,
3531 None => bail_bug!("event expected to be present"),
3532 };
3533 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3534 let (ordinal, result) = event.parts();
3535 (ordinal, handle, result)
3536 }
3537 WaitableCheck::Poll => {
3538 if let Some((event, waitable)) = event {
3539 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3540 let (ordinal, result) = event.parts();
3541 (ordinal, handle, result)
3542 } else {
3543 log::trace!(
3544 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3545 guest_thread.task,
3546 params.set
3547 );
3548 let (ordinal, result) = Event::None.parts();
3549 (ordinal, 0, result)
3550 }
3551 }
3552 };
3553 let memory = self.options_memory_mut(store, params.options);
3554 let ptr = func::validate_inbounds_dynamic(
3555 &CanonicalAbiInfo::POINTER_PAIR,
3556 memory,
3557 &ValRaw::u32(params.payload),
3558 )?;
3559 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3560 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3561 Ok(ordinal)
3562 }
3563
3564 pub(crate) fn subtask_cancel(
3566 self,
3567 store: &mut StoreOpaque,
3568 caller_instance: RuntimeComponentInstanceIndex,
3569 async_: bool,
3570 task_id: u32,
3571 ) -> Result<u32> {
3572 if !async_ {
3573 store.check_blocking()?;
3577 }
3578
3579 let (rep, is_host) = store
3580 .instance_state(RuntimeInstance {
3581 instance: self.id().instance(),
3582 index: caller_instance,
3583 })
3584 .handle_table()
3585 .subtask_rep(task_id)?;
3586 let (waitable, expected_caller) = if is_host {
3587 let id = TableId::<HostTask>::new(rep);
3588 (
3589 Waitable::Host(id),
3590 store.concurrent_state_mut().get_mut(id)?.caller,
3591 )
3592 } else {
3593 let id = TableId::<GuestTask>::new(rep);
3594 if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3595 (Waitable::Guest(id), thread)
3596 } else {
3597 bail_bug!("expected guest caller for `subtask.cancel`")
3598 }
3599 };
3600 let concurrent_state = store.concurrent_state_mut();
3604 debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?);
3605
3606 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3607
3608 let needs_block;
3609 if let Waitable::Host(host_task) = waitable {
3610 let state = &mut concurrent_state.get_mut(host_task)?.state;
3611 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3612 HostTaskState::CalleeRunning(handle) => {
3619 handle.abort();
3620 needs_block = true;
3621 }
3622
3623 HostTaskState::CalleeDone { cancelled } => {
3626 if cancelled {
3627 bail!(Trap::SubtaskCancelAfterTerminal);
3628 } else {
3629 needs_block = false;
3632 }
3633 }
3634
3635 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3638 bail_bug!("invalid states for host callee")
3639 }
3640 }
3641 } else {
3642 let caller = concurrent_state.current_guest_thread()?;
3643 let guest_task = TableId::<GuestTask>::new(rep);
3644 let task = concurrent_state.get_mut(guest_task)?;
3645 if !task.already_lowered_parameters() {
3646 task.lower_params = None;
3650 task.lift_result = None;
3651 task.exited = true;
3652
3653 let instance = task.instance;
3654
3655 assert_eq!(1, task.threads.len());
3656 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3657 let concurrent_state = store.concurrent_state_mut();
3658 concurrent_state.delete(thread)?;
3659 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3660
3661 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3663 let pending_count = pending.len();
3664 pending.retain(|thread, _| thread.task != guest_task);
3665 if pending.len() == pending_count {
3667 bail!(Trap::SubtaskCancelAfterTerminal);
3668 }
3669 return Ok(Status::StartCancelled as u32);
3670 } else if !task.returned_or_cancelled() {
3671 task.cancel_sent = true;
3674 task.event = Some(Event::Cancelled);
3679 let runtime_instance = task.instance.index;
3680 for thread in task.threads.clone() {
3681 let thread = QualifiedThreadId {
3682 task: guest_task,
3683 thread,
3684 };
3685 if let Some(set) = concurrent_state
3686 .get_mut(thread.thread)?
3687 .wake_on_cancel
3688 .take()
3689 {
3690 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3691 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3692 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3693 runtime_instance,
3694 GuestCall {
3695 thread,
3696 kind: GuestCallKind::DeliverEvent {
3697 instance,
3698 set: None,
3699 },
3700 },
3701 ),
3702 None => bail_bug!("thread not present in wake_on_cancel set"),
3703 };
3704 concurrent_state.push_high_priority(item);
3705
3706 store.suspend(SuspendReason::Yielding {
3707 thread: caller,
3708 skip_may_block_check: false,
3711 })?;
3712 break;
3713 }
3714 }
3715
3716 needs_block = !store
3719 .concurrent_state_mut()
3720 .get_mut(guest_task)?
3721 .returned_or_cancelled()
3722 } else {
3723 needs_block = false;
3724 }
3725 };
3726
3727 if needs_block {
3731 if async_ {
3732 return Ok(BLOCKED);
3733 }
3734
3735 store.wait_for_event(waitable)?;
3739
3740 }
3742
3743 let event = waitable.take_event(store.concurrent_state_mut())?;
3744 if let Some(Event::Subtask {
3745 status: status @ (Status::Returned | Status::ReturnCancelled),
3746 }) = event
3747 {
3748 Ok(status as u32)
3749 } else {
3750 bail!(Trap::SubtaskCancelAfterTerminal);
3751 }
3752 }
3753
3754 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3755 store.concurrent_state_mut().context_get(slot)
3756 }
3757
3758 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3759 store.concurrent_state_mut().context_set(slot, value)
3760 }
3761}
3762
3763pub trait VMComponentAsyncStore {
3771 unsafe fn prepare_call(
3777 &mut self,
3778 instance: Instance,
3779 memory: *mut VMMemoryDefinition,
3780 start: NonNull<VMFuncRef>,
3781 return_: NonNull<VMFuncRef>,
3782 caller_instance: RuntimeComponentInstanceIndex,
3783 callee_instance: RuntimeComponentInstanceIndex,
3784 task_return_type: TypeTupleIndex,
3785 callee_async: bool,
3786 string_encoding: StringEncoding,
3787 result_count: u32,
3788 storage: *mut ValRaw,
3789 storage_len: usize,
3790 ) -> Result<()>;
3791
3792 unsafe fn sync_start(
3795 &mut self,
3796 instance: Instance,
3797 callback: *mut VMFuncRef,
3798 callee: NonNull<VMFuncRef>,
3799 param_count: u32,
3800 storage: *mut MaybeUninit<ValRaw>,
3801 storage_len: usize,
3802 ) -> Result<()>;
3803
3804 unsafe fn async_start(
3807 &mut self,
3808 instance: Instance,
3809 callback: *mut VMFuncRef,
3810 post_return: *mut VMFuncRef,
3811 callee: NonNull<VMFuncRef>,
3812 param_count: u32,
3813 result_count: u32,
3814 flags: u32,
3815 ) -> Result<u32>;
3816
3817 fn future_write(
3819 &mut self,
3820 instance: Instance,
3821 caller: RuntimeComponentInstanceIndex,
3822 ty: TypeFutureTableIndex,
3823 options: OptionsIndex,
3824 future: u32,
3825 address: u32,
3826 ) -> Result<u32>;
3827
3828 fn future_read(
3830 &mut self,
3831 instance: Instance,
3832 caller: RuntimeComponentInstanceIndex,
3833 ty: TypeFutureTableIndex,
3834 options: OptionsIndex,
3835 future: u32,
3836 address: u32,
3837 ) -> Result<u32>;
3838
3839 fn future_drop_writable(
3841 &mut self,
3842 instance: Instance,
3843 ty: TypeFutureTableIndex,
3844 writer: u32,
3845 ) -> Result<()>;
3846
3847 fn stream_write(
3849 &mut self,
3850 instance: Instance,
3851 caller: RuntimeComponentInstanceIndex,
3852 ty: TypeStreamTableIndex,
3853 options: OptionsIndex,
3854 stream: u32,
3855 address: u32,
3856 count: u32,
3857 ) -> Result<u32>;
3858
3859 fn stream_read(
3861 &mut self,
3862 instance: Instance,
3863 caller: RuntimeComponentInstanceIndex,
3864 ty: TypeStreamTableIndex,
3865 options: OptionsIndex,
3866 stream: u32,
3867 address: u32,
3868 count: u32,
3869 ) -> Result<u32>;
3870
3871 fn flat_stream_write(
3874 &mut self,
3875 instance: Instance,
3876 caller: RuntimeComponentInstanceIndex,
3877 ty: TypeStreamTableIndex,
3878 options: OptionsIndex,
3879 payload_size: u32,
3880 payload_align: u32,
3881 stream: u32,
3882 address: u32,
3883 count: u32,
3884 ) -> Result<u32>;
3885
3886 fn flat_stream_read(
3889 &mut self,
3890 instance: Instance,
3891 caller: RuntimeComponentInstanceIndex,
3892 ty: TypeStreamTableIndex,
3893 options: OptionsIndex,
3894 payload_size: u32,
3895 payload_align: u32,
3896 stream: u32,
3897 address: u32,
3898 count: u32,
3899 ) -> Result<u32>;
3900
3901 fn stream_drop_writable(
3903 &mut self,
3904 instance: Instance,
3905 ty: TypeStreamTableIndex,
3906 writer: u32,
3907 ) -> Result<()>;
3908
3909 fn error_context_debug_message(
3911 &mut self,
3912 instance: Instance,
3913 ty: TypeComponentLocalErrorContextTableIndex,
3914 options: OptionsIndex,
3915 err_ctx_handle: u32,
3916 debug_msg_address: u32,
3917 ) -> Result<()>;
3918
3919 fn thread_new_indirect(
3921 &mut self,
3922 instance: Instance,
3923 caller: RuntimeComponentInstanceIndex,
3924 func_ty_idx: TypeFuncIndex,
3925 start_func_table_idx: RuntimeTableIndex,
3926 start_func_idx: u32,
3927 context: i32,
3928 ) -> Result<u32>;
3929}
3930
3931impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3933 unsafe fn prepare_call(
3934 &mut self,
3935 instance: Instance,
3936 memory: *mut VMMemoryDefinition,
3937 start: NonNull<VMFuncRef>,
3938 return_: NonNull<VMFuncRef>,
3939 caller_instance: RuntimeComponentInstanceIndex,
3940 callee_instance: RuntimeComponentInstanceIndex,
3941 task_return_type: TypeTupleIndex,
3942 callee_async: bool,
3943 string_encoding: StringEncoding,
3944 result_count_or_max_if_async: u32,
3945 storage: *mut ValRaw,
3946 storage_len: usize,
3947 ) -> Result<()> {
3948 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3952
3953 unsafe {
3954 instance.prepare_call(
3955 StoreContextMut(self),
3956 start,
3957 return_,
3958 caller_instance,
3959 callee_instance,
3960 task_return_type,
3961 callee_async,
3962 memory,
3963 string_encoding,
3964 match result_count_or_max_if_async {
3965 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3966 params,
3967 has_result: false,
3968 },
3969 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3970 params,
3971 has_result: true,
3972 },
3973 result_count => CallerInfo::Sync {
3974 params,
3975 result_count,
3976 },
3977 },
3978 )
3979 }
3980 }
3981
3982 unsafe fn sync_start(
3983 &mut self,
3984 instance: Instance,
3985 callback: *mut VMFuncRef,
3986 callee: NonNull<VMFuncRef>,
3987 param_count: u32,
3988 storage: *mut MaybeUninit<ValRaw>,
3989 storage_len: usize,
3990 ) -> Result<()> {
3991 unsafe {
3992 instance
3993 .start_call(
3994 StoreContextMut(self),
3995 callback,
3996 ptr::null_mut(),
3997 callee,
3998 param_count,
3999 1,
4000 START_FLAG_ASYNC_CALLEE,
4001 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4005 )
4006 .map(drop)
4007 }
4008 }
4009
4010 unsafe fn async_start(
4011 &mut self,
4012 instance: Instance,
4013 callback: *mut VMFuncRef,
4014 post_return: *mut VMFuncRef,
4015 callee: NonNull<VMFuncRef>,
4016 param_count: u32,
4017 result_count: u32,
4018 flags: u32,
4019 ) -> Result<u32> {
4020 unsafe {
4021 instance.start_call(
4022 StoreContextMut(self),
4023 callback,
4024 post_return,
4025 callee,
4026 param_count,
4027 result_count,
4028 flags,
4029 None,
4030 )
4031 }
4032 }
4033
4034 fn future_write(
4035 &mut self,
4036 instance: Instance,
4037 caller: RuntimeComponentInstanceIndex,
4038 ty: TypeFutureTableIndex,
4039 options: OptionsIndex,
4040 future: u32,
4041 address: u32,
4042 ) -> Result<u32> {
4043 instance
4044 .guest_write(
4045 StoreContextMut(self),
4046 caller,
4047 TransmitIndex::Future(ty),
4048 options,
4049 None,
4050 future,
4051 address,
4052 1,
4053 )
4054 .map(|result| result.encode())
4055 }
4056
4057 fn future_read(
4058 &mut self,
4059 instance: Instance,
4060 caller: RuntimeComponentInstanceIndex,
4061 ty: TypeFutureTableIndex,
4062 options: OptionsIndex,
4063 future: u32,
4064 address: u32,
4065 ) -> Result<u32> {
4066 instance
4067 .guest_read(
4068 StoreContextMut(self),
4069 caller,
4070 TransmitIndex::Future(ty),
4071 options,
4072 None,
4073 future,
4074 address,
4075 1,
4076 )
4077 .map(|result| result.encode())
4078 }
4079
4080 fn stream_write(
4081 &mut self,
4082 instance: Instance,
4083 caller: RuntimeComponentInstanceIndex,
4084 ty: TypeStreamTableIndex,
4085 options: OptionsIndex,
4086 stream: u32,
4087 address: u32,
4088 count: u32,
4089 ) -> Result<u32> {
4090 instance
4091 .guest_write(
4092 StoreContextMut(self),
4093 caller,
4094 TransmitIndex::Stream(ty),
4095 options,
4096 None,
4097 stream,
4098 address,
4099 count,
4100 )
4101 .map(|result| result.encode())
4102 }
4103
4104 fn stream_read(
4105 &mut self,
4106 instance: Instance,
4107 caller: RuntimeComponentInstanceIndex,
4108 ty: TypeStreamTableIndex,
4109 options: OptionsIndex,
4110 stream: u32,
4111 address: u32,
4112 count: u32,
4113 ) -> Result<u32> {
4114 instance
4115 .guest_read(
4116 StoreContextMut(self),
4117 caller,
4118 TransmitIndex::Stream(ty),
4119 options,
4120 None,
4121 stream,
4122 address,
4123 count,
4124 )
4125 .map(|result| result.encode())
4126 }
4127
4128 fn future_drop_writable(
4129 &mut self,
4130 instance: Instance,
4131 ty: TypeFutureTableIndex,
4132 writer: u32,
4133 ) -> Result<()> {
4134 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4135 }
4136
4137 fn flat_stream_write(
4138 &mut self,
4139 instance: Instance,
4140 caller: RuntimeComponentInstanceIndex,
4141 ty: TypeStreamTableIndex,
4142 options: OptionsIndex,
4143 payload_size: u32,
4144 payload_align: u32,
4145 stream: u32,
4146 address: u32,
4147 count: u32,
4148 ) -> Result<u32> {
4149 instance
4150 .guest_write(
4151 StoreContextMut(self),
4152 caller,
4153 TransmitIndex::Stream(ty),
4154 options,
4155 Some(FlatAbi {
4156 size: payload_size,
4157 align: payload_align,
4158 }),
4159 stream,
4160 address,
4161 count,
4162 )
4163 .map(|result| result.encode())
4164 }
4165
4166 fn flat_stream_read(
4167 &mut self,
4168 instance: Instance,
4169 caller: RuntimeComponentInstanceIndex,
4170 ty: TypeStreamTableIndex,
4171 options: OptionsIndex,
4172 payload_size: u32,
4173 payload_align: u32,
4174 stream: u32,
4175 address: u32,
4176 count: u32,
4177 ) -> Result<u32> {
4178 instance
4179 .guest_read(
4180 StoreContextMut(self),
4181 caller,
4182 TransmitIndex::Stream(ty),
4183 options,
4184 Some(FlatAbi {
4185 size: payload_size,
4186 align: payload_align,
4187 }),
4188 stream,
4189 address,
4190 count,
4191 )
4192 .map(|result| result.encode())
4193 }
4194
4195 fn stream_drop_writable(
4196 &mut self,
4197 instance: Instance,
4198 ty: TypeStreamTableIndex,
4199 writer: u32,
4200 ) -> Result<()> {
4201 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4202 }
4203
4204 fn error_context_debug_message(
4205 &mut self,
4206 instance: Instance,
4207 ty: TypeComponentLocalErrorContextTableIndex,
4208 options: OptionsIndex,
4209 err_ctx_handle: u32,
4210 debug_msg_address: u32,
4211 ) -> Result<()> {
4212 instance.error_context_debug_message(
4213 StoreContextMut(self),
4214 ty,
4215 options,
4216 err_ctx_handle,
4217 debug_msg_address,
4218 )
4219 }
4220
4221 fn thread_new_indirect(
4222 &mut self,
4223 instance: Instance,
4224 caller: RuntimeComponentInstanceIndex,
4225 func_ty_idx: TypeFuncIndex,
4226 start_func_table_idx: RuntimeTableIndex,
4227 start_func_idx: u32,
4228 context: i32,
4229 ) -> Result<u32> {
4230 instance.thread_new_indirect(
4231 StoreContextMut(self),
4232 caller,
4233 func_ty_idx,
4234 start_func_table_idx,
4235 start_func_idx,
4236 context,
4237 )
4238 }
4239}
4240
4241type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4242
4243pub(crate) struct HostTask {
4247 common: WaitableCommon,
4248
4249 caller: QualifiedThreadId,
4251
4252 call_context: CallContext,
4255
4256 state: HostTaskState,
4257}
4258
4259enum HostTaskState {
4260 CalleeStarted,
4265
4266 CalleeRunning(JoinHandle),
4271
4272 CalleeFinished(LiftedResult),
4276
4277 CalleeDone { cancelled: bool },
4280}
4281
4282impl HostTask {
4283 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4284 Self {
4285 common: WaitableCommon::default(),
4286 call_context: CallContext::default(),
4287 caller,
4288 state,
4289 }
4290 }
4291}
4292
4293impl TableDebug for HostTask {
4294 fn type_name() -> &'static str {
4295 "HostTask"
4296 }
4297}
4298
4299type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4300
4301enum Caller {
4303 Host {
4305 tx: Option<oneshot::Sender<LiftedResult>>,
4307 host_future_present: bool,
4310 caller: CurrentThread,
4314 },
4315 Guest {
4317 thread: QualifiedThreadId,
4319 },
4320}
4321
4322struct LiftResult {
4325 lift: RawLift,
4326 ty: TypeTupleIndex,
4327 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4328 string_encoding: StringEncoding,
4329}
4330
4331#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4336pub(crate) struct QualifiedThreadId {
4337 task: TableId<GuestTask>,
4338 thread: TableId<GuestThread>,
4339}
4340
4341impl QualifiedThreadId {
4342 fn qualify(
4343 state: &mut ConcurrentState,
4344 thread: TableId<GuestThread>,
4345 ) -> Result<QualifiedThreadId> {
4346 Ok(QualifiedThreadId {
4347 task: state.get_mut(thread)?.parent_task,
4348 thread,
4349 })
4350 }
4351}
4352
4353impl fmt::Debug for QualifiedThreadId {
4354 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4355 f.debug_tuple("QualifiedThreadId")
4356 .field(&self.task.rep())
4357 .field(&self.thread.rep())
4358 .finish()
4359 }
4360}
4361
4362enum GuestThreadState {
4363 NotStartedImplicit,
4364 NotStartedExplicit(
4365 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4366 ),
4367 Running,
4368 Suspended(StoreFiber<'static>),
4369 Ready(StoreFiber<'static>),
4370 Completed,
4371}
4372pub struct GuestThread {
4373 context: [u32; 2],
4376 parent_task: TableId<GuestTask>,
4378 wake_on_cancel: Option<TableId<WaitableSet>>,
4381 state: GuestThreadState,
4383 instance_rep: Option<u32>,
4386 sync_call_set: TableId<WaitableSet>,
4388}
4389
4390impl GuestThread {
4391 fn from_instance(
4394 state: Pin<&mut ComponentInstance>,
4395 caller_instance: RuntimeComponentInstanceIndex,
4396 guest_thread: u32,
4397 ) -> Result<TableId<Self>> {
4398 let rep = state.instance_states().0[caller_instance]
4399 .thread_handle_table()
4400 .guest_thread_rep(guest_thread)?;
4401 Ok(TableId::new(rep))
4402 }
4403
4404 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4405 let sync_call_set = state.push(WaitableSet::default())?;
4406 Ok(Self {
4407 context: [0; 2],
4408 parent_task,
4409 wake_on_cancel: None,
4410 state: GuestThreadState::NotStartedImplicit,
4411 instance_rep: None,
4412 sync_call_set,
4413 })
4414 }
4415
4416 fn new_explicit(
4417 state: &mut ConcurrentState,
4418 parent_task: TableId<GuestTask>,
4419 start_func: Box<
4420 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4421 >,
4422 ) -> Result<Self> {
4423 let sync_call_set = state.push(WaitableSet::default())?;
4424 Ok(Self {
4425 context: [0; 2],
4426 parent_task,
4427 wake_on_cancel: None,
4428 state: GuestThreadState::NotStartedExplicit(start_func),
4429 instance_rep: None,
4430 sync_call_set,
4431 })
4432 }
4433}
4434
4435impl TableDebug for GuestThread {
4436 fn type_name() -> &'static str {
4437 "GuestThread"
4438 }
4439}
4440
4441enum SyncResult {
4442 NotProduced,
4443 Produced(Option<ValRaw>),
4444 Taken,
4445}
4446
4447impl SyncResult {
4448 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4449 Ok(match mem::replace(self, SyncResult::Taken) {
4450 SyncResult::NotProduced => None,
4451 SyncResult::Produced(val) => Some(val),
4452 SyncResult::Taken => {
4453 bail_bug!("attempted to take a synchronous result that was already taken")
4454 }
4455 })
4456 }
4457}
4458
4459#[derive(Debug)]
4460enum HostFutureState {
4461 NotApplicable,
4462 Live,
4463 Dropped,
4464}
4465
4466pub(crate) struct GuestTask {
4468 common: WaitableCommon,
4470 lower_params: Option<RawLower>,
4472 lift_result: Option<LiftResult>,
4474 result: Option<LiftedResult>,
4477 callback: Option<CallbackFn>,
4480 caller: Caller,
4482 call_context: CallContext,
4487 sync_result: SyncResult,
4490 cancel_sent: bool,
4493 starting_sent: bool,
4496 instance: RuntimeInstance,
4503 event: Option<Event>,
4506 exited: bool,
4508 threads: HashSet<TableId<GuestThread>>,
4510 host_future_state: HostFutureState,
4513 async_function: bool,
4516}
4517
4518impl GuestTask {
4519 fn already_lowered_parameters(&self) -> bool {
4520 self.lower_params.is_none()
4522 }
4523
4524 fn returned_or_cancelled(&self) -> bool {
4525 self.lift_result.is_none()
4527 }
4528
4529 fn ready_to_delete(&self) -> bool {
4530 let threads_completed = self.threads.is_empty();
4531 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4532 let pending_completion_event = matches!(
4533 self.common.event,
4534 Some(Event::Subtask {
4535 status: Status::Returned | Status::ReturnCancelled
4536 })
4537 );
4538 let ready = threads_completed
4539 && !has_sync_result
4540 && !pending_completion_event
4541 && !matches!(self.host_future_state, HostFutureState::Live);
4542 log::trace!(
4543 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4544 threads_completed,
4545 has_sync_result,
4546 pending_completion_event,
4547 self.host_future_state
4548 );
4549 ready
4550 }
4551
4552 fn new(
4553 lower_params: RawLower,
4554 lift_result: LiftResult,
4555 caller: Caller,
4556 callback: Option<CallbackFn>,
4557 instance: RuntimeInstance,
4558 async_function: bool,
4559 ) -> Result<Self> {
4560 let host_future_state = match &caller {
4561 Caller::Guest { .. } => HostFutureState::NotApplicable,
4562 Caller::Host {
4563 host_future_present,
4564 ..
4565 } => {
4566 if *host_future_present {
4567 HostFutureState::Live
4568 } else {
4569 HostFutureState::NotApplicable
4570 }
4571 }
4572 };
4573 Ok(Self {
4574 common: WaitableCommon::default(),
4575 lower_params: Some(lower_params),
4576 lift_result: Some(lift_result),
4577 result: None,
4578 callback,
4579 caller,
4580 call_context: CallContext::default(),
4581 sync_result: SyncResult::NotProduced,
4582 cancel_sent: false,
4583 starting_sent: false,
4584 instance,
4585 event: None,
4586 exited: false,
4587 threads: HashSet::new(),
4588 host_future_state,
4589 async_function,
4590 })
4591 }
4592
4593 fn dispose(self, _state: &mut ConcurrentState) -> Result<()> {
4595 assert!(self.threads.is_empty());
4596 Ok(())
4597 }
4598}
4599
4600impl TableDebug for GuestTask {
4601 fn type_name() -> &'static str {
4602 "GuestTask"
4603 }
4604}
4605
4606#[derive(Default)]
4608struct WaitableCommon {
4609 event: Option<Event>,
4611 set: Option<TableId<WaitableSet>>,
4613 handle: Option<u32>,
4615}
4616
4617#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4619enum Waitable {
4620 Host(TableId<HostTask>),
4622 Guest(TableId<GuestTask>),
4624 Transmit(TableId<TransmitHandle>),
4626}
4627
4628impl Waitable {
4629 fn from_instance(
4632 state: Pin<&mut ComponentInstance>,
4633 caller_instance: RuntimeComponentInstanceIndex,
4634 waitable: u32,
4635 ) -> Result<Self> {
4636 use crate::runtime::vm::component::Waitable;
4637
4638 let (waitable, kind) = state.instance_states().0[caller_instance]
4639 .handle_table()
4640 .waitable_rep(waitable)?;
4641
4642 Ok(match kind {
4643 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4644 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4645 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4646 })
4647 }
4648
4649 fn rep(&self) -> u32 {
4651 match self {
4652 Self::Host(id) => id.rep(),
4653 Self::Guest(id) => id.rep(),
4654 Self::Transmit(id) => id.rep(),
4655 }
4656 }
4657
4658 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4662 log::trace!("waitable {self:?} join set {set:?}",);
4663
4664 let old = mem::replace(&mut self.common(state)?.set, set);
4665
4666 if let Some(old) = old {
4667 match *self {
4668 Waitable::Host(id) => state.remove_child(id, old),
4669 Waitable::Guest(id) => state.remove_child(id, old),
4670 Waitable::Transmit(id) => state.remove_child(id, old),
4671 }?;
4672
4673 state.get_mut(old)?.ready.remove(self);
4674 }
4675
4676 if let Some(set) = set {
4677 match *self {
4678 Waitable::Host(id) => state.add_child(id, set),
4679 Waitable::Guest(id) => state.add_child(id, set),
4680 Waitable::Transmit(id) => state.add_child(id, set),
4681 }?;
4682
4683 if self.common(state)?.event.is_some() {
4684 self.mark_ready(state)?;
4685 }
4686 }
4687
4688 Ok(())
4689 }
4690
4691 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4693 Ok(match self {
4694 Self::Host(id) => &mut state.get_mut(*id)?.common,
4695 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4696 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4697 })
4698 }
4699
4700 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4704 log::trace!("set event for {self:?}: {event:?}");
4705 self.common(state)?.event = event;
4706 self.mark_ready(state)
4707 }
4708
4709 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4711 let common = self.common(state)?;
4712 let event = common.event.take();
4713 if let Some(set) = self.common(state)?.set {
4714 state.get_mut(set)?.ready.remove(self);
4715 }
4716
4717 Ok(event)
4718 }
4719
4720 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4724 if let Some(set) = self.common(state)?.set {
4725 state.get_mut(set)?.ready.insert(*self);
4726 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4727 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4728 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4729
4730 let item = match mode {
4731 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4732 WaitMode::Callback(instance) => WorkItem::GuestCall(
4733 state.get_mut(thread.task)?.instance.index,
4734 GuestCall {
4735 thread,
4736 kind: GuestCallKind::DeliverEvent {
4737 instance,
4738 set: Some(set),
4739 },
4740 },
4741 ),
4742 };
4743 state.push_high_priority(item);
4744 }
4745 }
4746 Ok(())
4747 }
4748
4749 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4751 match self {
4752 Self::Host(task) => {
4753 log::trace!("delete host task {task:?}");
4754 state.delete(*task)?;
4755 }
4756 Self::Guest(task) => {
4757 log::trace!("delete guest task {task:?}");
4758 state.delete(*task)?.dispose(state)?;
4759 }
4760 Self::Transmit(task) => {
4761 state.delete(*task)?;
4762 }
4763 }
4764
4765 Ok(())
4766 }
4767}
4768
4769impl fmt::Debug for Waitable {
4770 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4771 match self {
4772 Self::Host(id) => write!(f, "{id:?}"),
4773 Self::Guest(id) => write!(f, "{id:?}"),
4774 Self::Transmit(id) => write!(f, "{id:?}"),
4775 }
4776 }
4777}
4778
4779#[derive(Default)]
4781struct WaitableSet {
4782 ready: BTreeSet<Waitable>,
4784 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4786}
4787
4788impl TableDebug for WaitableSet {
4789 fn type_name() -> &'static str {
4790 "WaitableSet"
4791 }
4792}
4793
4794type RawLower =
4796 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4797
4798type RawLift = Box<
4800 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4801>;
4802
4803type LiftedResult = Box<dyn Any + Send + Sync>;
4807
4808struct DummyResult;
4811
4812#[derive(Default)]
4814pub struct ConcurrentInstanceState {
4815 backpressure: u16,
4817 do_not_enter: bool,
4819 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4822}
4823
4824impl ConcurrentInstanceState {
4825 pub fn pending_is_empty(&self) -> bool {
4826 self.pending.is_empty()
4827 }
4828}
4829
4830#[derive(Debug, Copy, Clone)]
4831pub(crate) enum CurrentThread {
4832 Guest(QualifiedThreadId),
4833 Host(TableId<HostTask>),
4834 None,
4835}
4836
4837impl CurrentThread {
4838 fn guest(&self) -> Option<&QualifiedThreadId> {
4839 match self {
4840 Self::Guest(id) => Some(id),
4841 _ => None,
4842 }
4843 }
4844
4845 fn host(&self) -> Option<TableId<HostTask>> {
4846 match self {
4847 Self::Host(id) => Some(*id),
4848 _ => None,
4849 }
4850 }
4851
4852 fn is_none(&self) -> bool {
4853 matches!(self, Self::None)
4854 }
4855}
4856
4857impl From<QualifiedThreadId> for CurrentThread {
4858 fn from(id: QualifiedThreadId) -> Self {
4859 Self::Guest(id)
4860 }
4861}
4862
4863impl From<TableId<HostTask>> for CurrentThread {
4864 fn from(id: TableId<HostTask>) -> Self {
4865 Self::Host(id)
4866 }
4867}
4868
4869pub struct ConcurrentState {
4871 current_thread: CurrentThread,
4873
4874 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4879 table: AlwaysMut<ResourceTable>,
4881 high_priority: Vec<WorkItem>,
4883 low_priority: VecDeque<WorkItem>,
4885 suspend_reason: Option<SuspendReason>,
4889 worker: Option<StoreFiber<'static>>,
4893 worker_item: Option<WorkerItem>,
4895
4896 global_error_context_ref_counts:
4909 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4910}
4911
4912impl Default for ConcurrentState {
4913 fn default() -> Self {
4914 Self {
4915 current_thread: CurrentThread::None,
4916 table: AlwaysMut::new(ResourceTable::new()),
4917 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4918 high_priority: Vec::new(),
4919 low_priority: VecDeque::new(),
4920 suspend_reason: None,
4921 worker: None,
4922 worker_item: None,
4923 global_error_context_ref_counts: BTreeMap::new(),
4924 }
4925 }
4926}
4927
4928impl ConcurrentState {
4929 pub(crate) fn take_fibers_and_futures(
4946 &mut self,
4947 fibers: &mut Vec<StoreFiber<'static>>,
4948 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4949 ) {
4950 for entry in self.table.get_mut().iter_mut() {
4951 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4952 for mode in mem::take(&mut set.waiting).into_values() {
4953 if let WaitMode::Fiber(fiber) = mode {
4954 fibers.push(fiber);
4955 }
4956 }
4957 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4958 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4959 mem::replace(&mut thread.state, GuestThreadState::Completed)
4960 {
4961 fibers.push(fiber);
4962 }
4963 }
4964 }
4965
4966 if let Some(fiber) = self.worker.take() {
4967 fibers.push(fiber);
4968 }
4969
4970 let mut handle_item = |item| match item {
4971 WorkItem::ResumeFiber(fiber) => {
4972 fibers.push(fiber);
4973 }
4974 WorkItem::PushFuture(future) => {
4975 self.futures
4976 .get_mut()
4977 .as_mut()
4978 .unwrap()
4979 .push(future.into_inner());
4980 }
4981 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
4982 }
4983 };
4984
4985 for item in mem::take(&mut self.high_priority) {
4986 handle_item(item);
4987 }
4988 for item in mem::take(&mut self.low_priority) {
4989 handle_item(item);
4990 }
4991
4992 if let Some(them) = self.futures.get_mut().take() {
4993 futures.push(them);
4994 }
4995 }
4996
4997 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
5001 let mut ready = mem::take(&mut self.high_priority);
5002 if ready.is_empty() {
5003 if let Some(item) = self.low_priority.pop_back() {
5004 ready.push(item);
5005 }
5006 }
5007 ready
5008 }
5009
5010 fn push<V: Send + Sync + 'static>(
5011 &mut self,
5012 value: V,
5013 ) -> Result<TableId<V>, ResourceTableError> {
5014 self.table.get_mut().push(value).map(TableId::from)
5015 }
5016
5017 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5018 self.table.get_mut().get_mut(&Resource::from(id))
5019 }
5020
5021 pub fn add_child<T: 'static, U: 'static>(
5022 &mut self,
5023 child: TableId<T>,
5024 parent: TableId<U>,
5025 ) -> Result<(), ResourceTableError> {
5026 self.table
5027 .get_mut()
5028 .add_child(Resource::from(child), Resource::from(parent))
5029 }
5030
5031 pub fn remove_child<T: 'static, U: 'static>(
5032 &mut self,
5033 child: TableId<T>,
5034 parent: TableId<U>,
5035 ) -> Result<(), ResourceTableError> {
5036 self.table
5037 .get_mut()
5038 .remove_child(Resource::from(child), Resource::from(parent))
5039 }
5040
5041 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5042 self.table.get_mut().delete(Resource::from(id))
5043 }
5044
5045 fn push_future(&mut self, future: HostTaskFuture) {
5046 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5053 }
5054
5055 fn push_high_priority(&mut self, item: WorkItem) {
5056 log::trace!("push high priority: {item:?}");
5057 self.high_priority.push(item);
5058 }
5059
5060 fn push_low_priority(&mut self, item: WorkItem) {
5061 log::trace!("push low priority: {item:?}");
5062 self.low_priority.push_front(item);
5063 }
5064
5065 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5066 if high_priority {
5067 self.push_high_priority(item);
5068 } else {
5069 self.push_low_priority(item);
5070 }
5071 }
5072
5073 fn promote_instance_local_thread_work_item(
5074 &mut self,
5075 current_instance: RuntimeComponentInstanceIndex,
5076 ) -> bool {
5077 self.promote_work_items_matching(|item: &WorkItem| match item {
5078 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5079 *instance == current_instance
5080 }
5081 _ => false,
5082 })
5083 }
5084
5085 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5086 self.promote_work_items_matching(|item: &WorkItem| match item {
5087 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5088 *t == thread
5089 }
5090 _ => false,
5091 })
5092 }
5093
5094 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5095 where
5096 F: FnMut(&WorkItem) -> bool,
5097 {
5098 if self.high_priority.iter().any(&mut predicate) {
5102 true
5103 }
5104 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5107 let item = self.low_priority.remove(idx).unwrap();
5108 self.push_high_priority(item);
5109 true
5110 } else {
5111 false
5112 }
5113 }
5114
5115 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5117 let thread = self.current_guest_thread()?;
5118 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot)?];
5119 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5120 Ok(val)
5121 }
5122
5123 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5125 let thread = self.current_guest_thread()?;
5126 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5127 self.get_mut(thread.thread)?.context[usize::try_from(slot)?] = val;
5128 Ok(())
5129 }
5130
5131 fn take_pending_cancellation(&mut self) -> Result<bool> {
5134 let thread = self.current_guest_thread()?;
5135 if let Some(event) = self.get_mut(thread.task)?.event.take() {
5136 assert!(matches!(event, Event::Cancelled));
5137 Ok(true)
5138 } else {
5139 Ok(false)
5140 }
5141 }
5142
5143 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5144 if self.may_block(task)? {
5145 Ok(())
5146 } else {
5147 Err(Trap::CannotBlockSyncTask.into())
5148 }
5149 }
5150
5151 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5152 let task = self.get_mut(task)?;
5153 Ok(task.async_function || task.returned_or_cancelled())
5154 }
5155
5156 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5162 let (task, is_host) = (task >> 1, task & 1 == 1);
5163 if is_host {
5164 let task: TableId<HostTask> = TableId::new(task);
5165 Ok(&mut self.get_mut(task)?.call_context)
5166 } else {
5167 let task: TableId<GuestTask> = TableId::new(task);
5168 Ok(&mut self.get_mut(task)?.call_context)
5169 }
5170 }
5171
5172 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5175 let (bits, is_host) = match self.current_thread {
5176 CurrentThread::Guest(id) => (id.task.rep(), false),
5177 CurrentThread::Host(id) => (id.rep(), true),
5178 CurrentThread::None => bail_bug!("current thread is not set"),
5179 };
5180 assert_eq!((bits << 1) >> 1, bits);
5181 Ok((bits << 1) | u32::from(is_host))
5182 }
5183
5184 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5185 match self.current_thread.guest() {
5186 Some(id) => Ok(*id),
5187 None => bail_bug!("current thread is not a guest thread"),
5188 }
5189 }
5190
5191 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5192 match self.current_thread.host() {
5193 Some(id) => Ok(id),
5194 None => bail_bug!("current thread is not a host thread"),
5195 }
5196 }
5197
5198 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5199 match self.futures.get_mut().as_mut() {
5200 Some(f) => Ok(f),
5201 None => bail_bug!("futures field of concurrent state is currently taken"),
5202 }
5203 }
5204
5205 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5206 self.table.get_mut()
5207 }
5208}
5209
5210fn for_any_lower<
5213 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5214>(
5215 fun: F,
5216) -> F {
5217 fun
5218}
5219
5220fn for_any_lift<
5222 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5223>(
5224 fun: F,
5225) -> F {
5226 fun
5227}
5228
5229fn checked<F: Future + Send + 'static>(
5234 id: StoreId,
5235 fut: F,
5236) -> impl Future<Output = F::Output> + Send + 'static {
5237 async move {
5238 let mut fut = pin!(fut);
5239 future::poll_fn(move |cx| {
5240 let message = "\
5241 `Future`s which depend on asynchronous component tasks, streams, or \
5242 futures to complete may only be polled from the event loop of the \
5243 store to which they belong. Please use \
5244 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5245 ";
5246 tls::try_get(|store| {
5247 let matched = match store {
5248 tls::TryGet::Some(store) => store.id() == id,
5249 tls::TryGet::Taken | tls::TryGet::None => false,
5250 };
5251
5252 if !matched {
5253 panic!("{message}")
5254 }
5255 });
5256 fut.as_mut().poll(cx)
5257 })
5258 .await
5259 }
5260}
5261
5262fn check_recursive_run() {
5265 tls::try_get(|store| {
5266 if !matches!(store, tls::TryGet::None) {
5267 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5268 }
5269 });
5270}
5271
5272fn unpack_callback_code(code: u32) -> (u32, u32) {
5273 (code & 0xF, code >> 4)
5274}
5275
5276struct WaitableCheckParams {
5280 set: TableId<WaitableSet>,
5281 options: OptionsIndex,
5282 payload: u32,
5283}
5284
5285enum WaitableCheck {
5288 Wait,
5289 Poll,
5290}
5291
5292pub(crate) struct PreparedCall<R> {
5294 handle: Func,
5296 thread: QualifiedThreadId,
5298 param_count: usize,
5300 rx: oneshot::Receiver<LiftedResult>,
5303 _phantom: PhantomData<R>,
5304}
5305
5306impl<R> PreparedCall<R> {
5307 pub(crate) fn task_id(&self) -> TaskId {
5309 TaskId {
5310 task: self.thread.task,
5311 }
5312 }
5313}
5314
5315pub(crate) struct TaskId {
5317 task: TableId<GuestTask>,
5318}
5319
5320impl TaskId {
5321 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5327 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5328 if !task.already_lowered_parameters() {
5329 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5330 } else {
5331 task.host_future_state = HostFutureState::Dropped;
5332 if task.ready_to_delete() {
5333 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5334 }
5335 }
5336 Ok(())
5337 }
5338}
5339
5340pub(crate) fn prepare_call<T, R>(
5346 mut store: StoreContextMut<T>,
5347 handle: Func,
5348 param_count: usize,
5349 host_future_present: bool,
5350 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5351 + Send
5352 + Sync
5353 + 'static,
5354 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5355 + Send
5356 + Sync
5357 + 'static,
5358) -> Result<PreparedCall<R>> {
5359 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5360
5361 let instance = handle.instance().id().get(store.0);
5362 let options = &instance.component().env_component().options[options];
5363 let ty = &instance.component().types()[ty];
5364 let async_function = ty.async_;
5365 let task_return_type = ty.results;
5366 let component_instance = raw_options.instance;
5367 let callback = options.callback.map(|i| instance.runtime_callback(i));
5368 let memory = options
5369 .memory()
5370 .map(|i| instance.runtime_memory(i))
5371 .map(SendSyncPtr::new);
5372 let string_encoding = options.string_encoding;
5373 let token = StoreToken::new(store.as_context_mut());
5374 let state = store.0.concurrent_state_mut();
5375
5376 let (tx, rx) = oneshot::channel();
5377
5378 let instance = RuntimeInstance {
5379 instance: handle.instance().id().instance(),
5380 index: component_instance,
5381 };
5382 let caller = state.current_thread;
5383 let task = GuestTask::new(
5384 Box::new(for_any_lower(move |store, params| {
5385 lower_params(handle, token.as_context_mut(store), params)
5386 })),
5387 LiftResult {
5388 lift: Box::new(for_any_lift(move |store, result| {
5389 lift_result(handle, store, result)
5390 })),
5391 ty: task_return_type,
5392 memory,
5393 string_encoding,
5394 },
5395 Caller::Host {
5396 tx: Some(tx),
5397 host_future_present,
5398 caller,
5399 },
5400 callback.map(|callback| {
5401 let callback = SendSyncPtr::new(callback);
5402 let instance = handle.instance();
5403 Box::new(move |store: &mut dyn VMStore, event, handle| {
5404 let store = token.as_context_mut(store);
5405 unsafe { instance.call_callback(store, callback, event, handle) }
5408 }) as CallbackFn
5409 }),
5410 instance,
5411 async_function,
5412 )?;
5413
5414 let task = state.push(task)?;
5415 let new_thread = GuestThread::new_implicit(state, task)?;
5416 let thread = state.push(new_thread)?;
5417 state.get_mut(task)?.threads.insert(thread);
5418
5419 if !store.0.may_enter(instance)? {
5420 bail!(Trap::CannotEnterComponent);
5421 }
5422
5423 Ok(PreparedCall {
5424 handle,
5425 thread: QualifiedThreadId { task, thread },
5426 param_count,
5427 rx,
5428 _phantom: PhantomData,
5429 })
5430}
5431
5432pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5439 mut store: StoreContextMut<T>,
5440 prepared: PreparedCall<R>,
5441) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5442 let PreparedCall {
5443 handle,
5444 thread,
5445 param_count,
5446 rx,
5447 ..
5448 } = prepared;
5449
5450 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5451
5452 Ok(checked(
5453 store.0.id(),
5454 rx.map(move |result| match result {
5455 Ok(r) => match r.downcast() {
5456 Ok(r) => Ok(*r),
5457 Err(_) => bail_bug!("wrong type of value produced"),
5458 },
5459 Err(e) => Err(e.into()),
5460 }),
5461 ))
5462}
5463
5464fn queue_call0<T: 'static>(
5467 store: StoreContextMut<T>,
5468 handle: Func,
5469 guest_thread: QualifiedThreadId,
5470 param_count: usize,
5471) -> Result<()> {
5472 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5473 let is_concurrent = raw_options.async_;
5474 let callback = raw_options.callback;
5475 let instance = handle.instance();
5476 let callee = handle.lifted_core_func(store.0);
5477 let post_return = handle.post_return_core_func(store.0);
5478 let callback = callback.map(|i| {
5479 let instance = instance.id().get(store.0);
5480 SendSyncPtr::new(instance.runtime_callback(i))
5481 });
5482
5483 log::trace!("queueing call {guest_thread:?}");
5484
5485 unsafe {
5489 instance.queue_call(
5490 store,
5491 guest_thread,
5492 SendSyncPtr::new(callee),
5493 param_count,
5494 1,
5495 is_concurrent,
5496 callback,
5497 post_return.map(SendSyncPtr::new),
5498 )
5499 }
5500}