1use crate::bail_bug;
54use crate::component::func::{self, Func, call_post_return};
55use crate::component::{
56 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::prelude::*;
60use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63use crate::{
64 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::Trap;
87use wasmtime_environ::component::{
88 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
89 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
90 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
91 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
92 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
93};
94use wasmtime_environ::packed_option::ReservedValue;
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112const BLOCKED: u32 = 0xffff_ffff;
115
116#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119 Starting = 0,
120 Started = 1,
121 Returned = 2,
122 StartCancelled = 3,
123 ReturnCancelled = 4,
124}
125
126impl Status {
127 pub fn pack(self, waitable: Option<u32>) -> u32 {
133 assert!(matches!(self, Status::Returned) == waitable.is_none());
134 let waitable = waitable.unwrap_or(0);
135 assert!(waitable < (1 << 28));
136 (waitable << 4) | (self as u32)
137 }
138}
139
140#[derive(Clone, Copy, Debug)]
143enum Event {
144 None,
145 Subtask {
146 status: Status,
147 },
148 StreamRead {
149 code: ReturnCode,
150 pending: Option<(TypeStreamTableIndex, u32)>,
151 },
152 StreamWrite {
153 code: ReturnCode,
154 pending: Option<(TypeStreamTableIndex, u32)>,
155 },
156 FutureRead {
157 code: ReturnCode,
158 pending: Option<(TypeFutureTableIndex, u32)>,
159 },
160 FutureWrite {
161 code: ReturnCode,
162 pending: Option<(TypeFutureTableIndex, u32)>,
163 },
164 Cancelled,
165}
166
167impl Event {
168 fn parts(self) -> (u32, u32) {
173 const EVENT_NONE: u32 = 0;
174 const EVENT_SUBTASK: u32 = 1;
175 const EVENT_STREAM_READ: u32 = 2;
176 const EVENT_STREAM_WRITE: u32 = 3;
177 const EVENT_FUTURE_READ: u32 = 4;
178 const EVENT_FUTURE_WRITE: u32 = 5;
179 const EVENT_CANCELLED: u32 = 6;
180 match self {
181 Event::None => (EVENT_NONE, 0),
182 Event::Cancelled => (EVENT_CANCELLED, 0),
183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188 }
189 }
190}
191
192mod callback_code {
194 pub const EXIT: u32 = 0;
195 pub const YIELD: u32 = 1;
196 pub const WAIT: u32 = 2;
197}
198
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210 store: StoreContextMut<'a, T>,
211 get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216 D: HasData + ?Sized,
217 T: 'static,
218{
219 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221 Self { store, get_data }
222 }
223
224 pub fn data_mut(&mut self) -> &mut T {
226 self.store.data_mut()
227 }
228
229 pub fn get(&mut self) -> D::Data<'_> {
231 (self.get_data)(self.data_mut())
232 }
233
234 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238 where
239 T: 'static,
240 {
241 let accessor = Accessor {
242 get_data: self.get_data,
243 token: StoreToken::new(self.store.as_context_mut()),
244 };
245 self.store
246 .as_context_mut()
247 .spawn_with_accessor(accessor, task)
248 }
249
250 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253 self.get_data
254 }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259 D: HasData + ?Sized,
260 T: 'static,
261{
262 type Data = T;
263
264 fn as_context(&self) -> StoreContext<'_, T> {
265 self.store.as_context()
266 }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271 D: HasData + ?Sized,
272 T: 'static,
273{
274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275 self.store.as_context_mut()
276 }
277}
278
279pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340 D: HasData + ?Sized,
341{
342 token: StoreToken<T>,
343 get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346pub trait AsAccessor {
363 type Data: 'static;
365
366 type AccessorData: HasData + ?Sized;
369
370 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375 type Data = T::Data;
376 type AccessorData = T::AccessorData;
377
378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379 T::as_accessor(self)
380 }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384 type Data = T;
385 type AccessorData = D;
386
387 fn as_accessor(&self) -> &Accessor<T, D> {
388 self
389 }
390}
391
392const _: () = {
415 const fn assert<T: Send + Sync>() {}
416 assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420 pub(crate) fn new(token: StoreToken<T>) -> Self {
429 Self {
430 token,
431 get_data: |x| x,
432 }
433 }
434}
435
436impl<T, D> Accessor<T, D>
437where
438 D: HasData + ?Sized,
439{
440 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458 tls::get(|vmstore| {
459 fun(Access {
460 store: self.token.as_context_mut(vmstore),
461 get_data: self.get_data,
462 })
463 })
464 }
465
466 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469 self.get_data
470 }
471
472 pub fn with_getter<D2: HasData>(
489 &self,
490 get_data: fn(&mut T) -> D2::Data<'_>,
491 ) -> Accessor<T, D2> {
492 Accessor {
493 token: self.token,
494 get_data,
495 }
496 }
497
498 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514 where
515 T: 'static,
516 {
517 let accessor = self.clone_for_spawn();
518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519 }
520
521 fn clone_for_spawn(&self) -> Self {
522 Self {
523 token: self.token,
524 get_data: self.get_data,
525 }
526 }
527}
528
529pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541where
542 D: HasData + ?Sized,
543{
544 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546}
547
548enum CallerInfo {
551 Async {
553 params: Vec<ValRaw>,
554 has_result: bool,
555 },
556 Sync {
558 params: Vec<ValRaw>,
559 result_count: u32,
560 },
561}
562
563enum WaitMode {
565 Fiber(StoreFiber<'static>),
567 Callback(Instance),
570}
571
572#[derive(Debug)]
574enum SuspendReason {
575 Waiting {
578 set: TableId<WaitableSet>,
579 thread: QualifiedThreadId,
580 skip_may_block_check: bool,
581 },
582 NeedWork,
585 Yielding {
588 thread: QualifiedThreadId,
589 skip_may_block_check: bool,
590 },
591 ExplicitlySuspending {
593 thread: QualifiedThreadId,
594 skip_may_block_check: bool,
595 },
596}
597
598enum GuestCallKind {
600 DeliverEvent {
603 instance: Instance,
605 set: Option<TableId<WaitableSet>>,
610 },
611 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618}
619
620impl fmt::Debug for GuestCallKind {
621 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622 match self {
623 Self::DeliverEvent { instance, set } => f
624 .debug_struct("DeliverEvent")
625 .field("instance", instance)
626 .field("set", set)
627 .finish(),
628 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630 }
631 }
632}
633
634#[derive(Copy, Clone, Debug)]
636pub enum SuspensionTarget {
637 SomeSuspended(u32),
638 Some(u32),
639 None,
640}
641
642impl SuspensionTarget {
643 fn is_none(&self) -> bool {
644 matches!(self, SuspensionTarget::None)
645 }
646 fn is_some(&self) -> bool {
647 !self.is_none()
648 }
649}
650
651#[derive(Debug)]
653struct GuestCall {
654 thread: QualifiedThreadId,
655 kind: GuestCallKind,
656}
657
658impl GuestCall {
659 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669 let instance = store
670 .concurrent_state_mut()
671 .get_mut(self.thread.task)?
672 .instance;
673 let state = store.instance_state(instance).concurrent_state();
674
675 let ready = match &self.kind {
676 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678 GuestCallKind::StartExplicit(_) => true,
679 };
680 log::trace!(
681 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682 state.do_not_enter,
683 state.backpressure
684 );
685 Ok(ready)
686 }
687}
688
689enum WorkerItem {
691 GuestCall(GuestCall),
692 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693}
694
695enum WorkItem {
698 PushFuture(AlwaysMut<HostTaskFuture>),
700 ResumeFiber(StoreFiber<'static>),
702 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708}
709
710impl fmt::Debug for WorkItem {
711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712 match self {
713 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715 Self::ResumeThread(instance, thread) => f
716 .debug_tuple("ResumeThread")
717 .field(instance)
718 .field(thread)
719 .finish(),
720 Self::GuestCall(instance, call) => f
721 .debug_tuple("GuestCall")
722 .field(instance)
723 .field(call)
724 .finish(),
725 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726 }
727 }
728}
729
730#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732pub(crate) enum WaitResult {
733 Cancelled,
734 Completed,
735}
736
737pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745 store: &mut dyn VMStore,
746 future: impl Future<Output = Result<R>> + Send + 'static,
747) -> Result<R> {
748 let state = store.concurrent_state_mut();
749 let task = state.current_host_thread()?;
750
751 let mut future = Box::pin(async move {
755 let result = future.await?;
756 tls::get(move |store| {
757 let state = store.concurrent_state_mut();
758 let host_state = &mut state.get_mut(task)?.state;
759 assert!(matches!(host_state, HostTaskState::CalleeStarted));
760 *host_state = HostTaskState::CalleeFinished(Box::new(result));
761
762 Waitable::Host(task).set_event(
763 state,
764 Some(Event::Subtask {
765 status: Status::Returned,
766 }),
767 )?;
768
769 Ok(())
770 })
771 }) as HostTaskFuture;
772
773 let poll = tls::set(store, || {
777 future
778 .as_mut()
779 .poll(&mut Context::from_waker(&Waker::noop()))
780 });
781
782 match poll {
783 Poll::Ready(result) => result?,
785
786 Poll::Pending => {
791 let state = store.concurrent_state_mut();
792 state.push_future(future);
793
794 let caller = state.get_mut(task)?.caller;
795 let set = state.get_mut(caller.thread)?.sync_call_set;
796 Waitable::Host(task).join(state, Some(set))?;
797
798 store.suspend(SuspendReason::Waiting {
799 set,
800 thread: caller,
801 skip_may_block_check: false,
802 })?;
803
804 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808 }
809 }
810
811 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
814 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
815 Ok(result) => *result,
816 Err(_) => bail_bug!("host task finished with wrong type of result"),
817 }),
818 _ => bail_bug!("unexpected host task state after completion"),
819 }
820}
821
822fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
824 let mut next = Some(call);
825 while let Some(call) = next.take() {
826 match call.kind {
827 GuestCallKind::DeliverEvent { instance, set } => {
828 let (event, waitable) =
829 match instance.get_event(store, call.thread.task, set, true)? {
830 Some(pair) => pair,
831 None => bail_bug!("delivering non-present event"),
832 };
833 let state = store.concurrent_state_mut();
834 let task = state.get_mut(call.thread.task)?;
835 let runtime_instance = task.instance;
836 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
837
838 log::trace!(
839 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
840 call.thread,
841 );
842
843 let old_thread = store.set_thread(call.thread)?;
844 log::trace!(
845 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
846 call.thread
847 );
848
849 store.enter_instance(runtime_instance);
850
851 let Some(callback) = store
852 .concurrent_state_mut()
853 .get_mut(call.thread.task)?
854 .callback
855 .take()
856 else {
857 bail_bug!("guest task callback field not present")
858 };
859
860 let code = callback(store, event, handle)?;
861
862 store
863 .concurrent_state_mut()
864 .get_mut(call.thread.task)?
865 .callback = Some(callback);
866
867 store.exit_instance(runtime_instance)?;
868
869 store.set_thread(old_thread)?;
870
871 next = instance.handle_callback_code(
872 store,
873 call.thread,
874 runtime_instance.index,
875 code,
876 )?;
877
878 log::trace!(
879 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
880 );
881 }
882 GuestCallKind::StartImplicit(fun) => {
883 next = fun(store)?;
884 }
885 GuestCallKind::StartExplicit(fun) => {
886 fun(store)?;
887 }
888 }
889 }
890
891 Ok(())
892}
893
894impl<T> Store<T> {
895 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
897 where
898 T: Send + 'static,
899 {
900 ensure!(
901 self.as_context().0.concurrency_support(),
902 "cannot use `run_concurrent` when Config::concurrency_support disabled",
903 );
904 self.as_context_mut().run_concurrent(fun).await
905 }
906
907 #[doc(hidden)]
908 pub fn assert_concurrent_state_empty(&mut self) {
909 self.as_context_mut().assert_concurrent_state_empty();
910 }
911
912 #[doc(hidden)]
913 pub fn concurrent_state_table_size(&mut self) -> usize {
914 self.as_context_mut().concurrent_state_table_size()
915 }
916
917 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
919 where
920 T: 'static,
921 {
922 self.as_context_mut().spawn(task)
923 }
924}
925
926impl<T> StoreContextMut<'_, T> {
927 #[doc(hidden)]
938 pub fn assert_concurrent_state_empty(self) {
939 let store = self.0;
940 store
941 .store_data_mut()
942 .components
943 .assert_instance_states_empty();
944 let state = store.concurrent_state_mut();
945 assert!(
946 state.table.get_mut().is_empty(),
947 "non-empty table: {:?}",
948 state.table.get_mut()
949 );
950 assert!(state.high_priority.is_empty());
951 assert!(state.low_priority.is_empty());
952 assert!(state.current_thread.is_none());
953 assert!(state.futures_mut().unwrap().is_empty());
954 assert!(state.global_error_context_ref_counts.is_empty());
955 }
956
957 #[doc(hidden)]
962 pub fn concurrent_state_table_size(&mut self) -> usize {
963 self.0
964 .concurrent_state_mut()
965 .table
966 .get_mut()
967 .iter_mut()
968 .count()
969 }
970
971 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
981 where
982 T: 'static,
983 {
984 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
985 self.spawn_with_accessor(accessor, task)
986 }
987
988 fn spawn_with_accessor<D>(
991 self,
992 accessor: Accessor<T, D>,
993 task: impl AccessorTask<T, D>,
994 ) -> JoinHandle
995 where
996 T: 'static,
997 D: HasData + ?Sized,
998 {
999 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1003 self.0
1004 .concurrent_state_mut()
1005 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1006 handle
1007 }
1008
1009 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1093 where
1094 T: Send + 'static,
1095 {
1096 ensure!(
1097 self.0.concurrency_support(),
1098 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1099 );
1100 self.do_run_concurrent(fun, false).await
1101 }
1102
1103 pub(super) async fn run_concurrent_trap_on_idle<R>(
1104 self,
1105 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1106 ) -> Result<R>
1107 where
1108 T: Send + 'static,
1109 {
1110 self.do_run_concurrent(fun, true).await
1111 }
1112
1113 async fn do_run_concurrent<R>(
1114 mut self,
1115 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1116 trap_on_idle: bool,
1117 ) -> Result<R>
1118 where
1119 T: Send + 'static,
1120 {
1121 debug_assert!(self.0.concurrency_support());
1122 check_recursive_run();
1123 let token = StoreToken::new(self.as_context_mut());
1124
1125 struct Dropper<'a, T: 'static, V> {
1126 store: StoreContextMut<'a, T>,
1127 value: ManuallyDrop<V>,
1128 }
1129
1130 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1131 fn drop(&mut self) {
1132 tls::set(self.store.0, || {
1133 unsafe { ManuallyDrop::drop(&mut self.value) }
1138 });
1139 }
1140 }
1141
1142 let accessor = &Accessor::new(token);
1143 let dropper = &mut Dropper {
1144 store: self,
1145 value: ManuallyDrop::new(fun(accessor)),
1146 };
1147 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1149
1150 dropper
1151 .store
1152 .as_context_mut()
1153 .poll_until(future, trap_on_idle)
1154 .await
1155 }
1156
1157 async fn poll_until<R>(
1163 mut self,
1164 mut future: Pin<&mut impl Future<Output = R>>,
1165 trap_on_idle: bool,
1166 ) -> Result<R>
1167 where
1168 T: Send + 'static,
1169 {
1170 struct Reset<'a, T: 'static> {
1171 store: StoreContextMut<'a, T>,
1172 futures: Option<FuturesUnordered<HostTaskFuture>>,
1173 }
1174
1175 impl<'a, T> Drop for Reset<'a, T> {
1176 fn drop(&mut self) {
1177 if let Some(futures) = self.futures.take() {
1178 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1179 }
1180 }
1181 }
1182
1183 loop {
1184 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1188 let mut reset = Reset {
1189 store: self.as_context_mut(),
1190 futures,
1191 };
1192 let mut next = match reset.futures.as_mut() {
1193 Some(f) => pin!(f.next()),
1194 None => bail_bug!("concurrent state missing futures field"),
1195 };
1196
1197 enum PollResult<R> {
1198 Complete(R),
1199 ProcessWork {
1200 ready: Vec<WorkItem>,
1201 low_priority: bool,
1202 },
1203 }
1204
1205 let result = future::poll_fn(|cx| {
1206 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1209 return Poll::Ready(Ok(PollResult::Complete(value)));
1210 }
1211
1212 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1216 Poll::Ready(Some(output)) => {
1217 match output {
1218 Err(e) => return Poll::Ready(Err(e)),
1219 Ok(()) => {}
1220 }
1221 Poll::Ready(true)
1222 }
1223 Poll::Ready(None) => Poll::Ready(false),
1224 Poll::Pending => Poll::Pending,
1225 };
1226
1227 let state = reset.store.0.concurrent_state_mut();
1231 let mut ready = mem::take(&mut state.high_priority);
1232 let mut low_priority = false;
1233 if ready.is_empty() {
1234 if let Some(item) = state.low_priority.pop_back() {
1235 ready.push(item);
1236 low_priority = true;
1237 }
1238 }
1239 if !ready.is_empty() {
1240 return Poll::Ready(Ok(PollResult::ProcessWork {
1241 ready,
1242 low_priority,
1243 }));
1244 }
1245
1246 return match next {
1250 Poll::Ready(true) => {
1251 Poll::Ready(Ok(PollResult::ProcessWork {
1257 ready: Vec::new(),
1258 low_priority: false,
1259 }))
1260 }
1261 Poll::Ready(false) => {
1262 if let Poll::Ready(value) =
1266 tls::set(reset.store.0, || future.as_mut().poll(cx))
1267 {
1268 Poll::Ready(Ok(PollResult::Complete(value)))
1269 } else {
1270 if trap_on_idle {
1276 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1279 } else {
1280 Poll::Pending
1284 }
1285 }
1286 }
1287 Poll::Pending => Poll::Pending,
1292 };
1293 })
1294 .await;
1295
1296 drop(reset);
1300
1301 match result? {
1302 PollResult::Complete(value) => break Ok(value),
1305 PollResult::ProcessWork {
1308 ready,
1309 low_priority,
1310 } => {
1311 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1312 store: StoreContextMut<'a, T>,
1313 ready: I,
1314 }
1315
1316 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1317 fn drop(&mut self) {
1318 while let Some(item) = self.ready.next() {
1319 match item {
1320 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1321 WorkItem::PushFuture(future) => {
1322 tls::set(self.store.0, move || drop(future))
1323 }
1324 _ => {}
1325 }
1326 }
1327 }
1328 }
1329
1330 let mut dispose = Dispose {
1331 store: self.as_context_mut(),
1332 ready: ready.into_iter(),
1333 };
1334
1335 if low_priority {
1357 dispose.store.0.yield_now().await
1358 }
1359
1360 while let Some(item) = dispose.ready.next() {
1361 dispose
1362 .store
1363 .as_context_mut()
1364 .handle_work_item(item)
1365 .await?;
1366 }
1367 }
1368 }
1369 }
1370 }
1371
1372 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1374 where
1375 T: Send,
1376 {
1377 log::trace!("handle work item {item:?}");
1378 match item {
1379 WorkItem::PushFuture(future) => {
1380 self.0
1381 .concurrent_state_mut()
1382 .futures_mut()?
1383 .push(future.into_inner());
1384 }
1385 WorkItem::ResumeFiber(fiber) => {
1386 self.0.resume_fiber(fiber).await?;
1387 }
1388 WorkItem::ResumeThread(_, thread) => {
1389 if let GuestThreadState::Ready(fiber) = mem::replace(
1390 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1391 GuestThreadState::Running,
1392 ) {
1393 self.0.resume_fiber(fiber).await?;
1394 } else {
1395 bail_bug!("cannot resume non-pending thread {thread:?}");
1396 }
1397 }
1398 WorkItem::GuestCall(_, call) => {
1399 if call.is_ready(self.0)? {
1400 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1401 } else {
1402 let state = self.0.concurrent_state_mut();
1403 let task = state.get_mut(call.thread.task)?;
1404 if !task.starting_sent {
1405 task.starting_sent = true;
1406 if let GuestCallKind::StartImplicit(_) = &call.kind {
1407 Waitable::Guest(call.thread.task).set_event(
1408 state,
1409 Some(Event::Subtask {
1410 status: Status::Starting,
1411 }),
1412 )?;
1413 }
1414 }
1415
1416 let instance = state.get_mut(call.thread.task)?.instance;
1417 self.0
1418 .instance_state(instance)
1419 .concurrent_state()
1420 .pending
1421 .insert(call.thread, call.kind);
1422 }
1423 }
1424 WorkItem::WorkerFunction(fun) => {
1425 self.run_on_worker(WorkerItem::Function(fun)).await?;
1426 }
1427 }
1428
1429 Ok(())
1430 }
1431
1432 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1434 where
1435 T: Send,
1436 {
1437 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1438 fiber
1439 } else {
1440 fiber::make_fiber(self.0, move |store| {
1441 loop {
1442 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1443 bail_bug!("worker_item not present when resuming fiber")
1444 };
1445 match item {
1446 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1447 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1448 }
1449
1450 store.suspend(SuspendReason::NeedWork)?;
1451 }
1452 })?
1453 };
1454
1455 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1456 assert!(worker_item.is_none());
1457 *worker_item = Some(item);
1458
1459 self.0.resume_fiber(worker).await
1460 }
1461
1462 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1467 where
1468 T: 'static,
1469 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1470 + Send
1471 + Sync
1472 + 'static,
1473 R: Send + Sync + 'static,
1474 {
1475 let token = StoreToken::new(self);
1476 async move {
1477 let mut accessor = Accessor::new(token);
1478 closure(&mut accessor).await
1479 }
1480 }
1481}
1482
1483impl StoreOpaque {
1484 pub(crate) fn enter_guest_sync_call(
1491 &mut self,
1492 guest_caller: Option<RuntimeInstance>,
1493 callee_async: bool,
1494 callee: RuntimeInstance,
1495 ) -> Result<()> {
1496 log::trace!("enter sync call {callee:?}");
1497 if !self.concurrency_support() {
1498 return self.enter_call_not_concurrent();
1499 }
1500
1501 let state = self.concurrent_state_mut();
1502 let thread = state.current_thread;
1503 let instance = if let Some(thread) = thread.guest() {
1504 Some(state.get_mut(thread.task)?.instance)
1505 } else {
1506 None
1507 };
1508 if guest_caller.is_some() {
1509 debug_assert_eq!(instance, guest_caller);
1510 }
1511 let task = GuestTask::new(
1512 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1513 LiftResult {
1514 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1515 ty: TypeTupleIndex::reserved_value(),
1516 memory: None,
1517 string_encoding: StringEncoding::Utf8,
1518 },
1519 if let Some(thread) = thread.guest() {
1520 Caller::Guest { thread: *thread }
1521 } else {
1522 Caller::Host {
1523 tx: None,
1524 host_future_present: false,
1525 caller: thread,
1526 }
1527 },
1528 None,
1529 callee,
1530 callee_async,
1531 )?;
1532
1533 let guest_task = state.push(task)?;
1534 let new_thread = GuestThread::new_implicit(state, guest_task)?;
1535 let guest_thread = state.push(new_thread)?;
1536 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1537 guest_thread,
1538 self,
1539 callee.index,
1540 )?;
1541
1542 let state = self.concurrent_state_mut();
1543 state.get_mut(guest_task)?.threads.insert(guest_thread);
1544
1545 self.set_thread(QualifiedThreadId {
1546 task: guest_task,
1547 thread: guest_thread,
1548 })?;
1549
1550 Ok(())
1551 }
1552
1553 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1555 if !self.concurrency_support() {
1556 return Ok(self.exit_call_not_concurrent());
1557 }
1558 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1559 Some(t) => *t,
1560 None => bail_bug!("expected task when exiting"),
1561 };
1562 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1563 log::trace!("exit sync call {instance:?}");
1564 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1565 self,
1566 thread,
1567 instance.index,
1568 )?;
1569
1570 let state = self.concurrent_state_mut();
1571 let task = state.get_mut(thread.task)?;
1572 let caller = match &task.caller {
1573 &Caller::Guest { thread } => thread.into(),
1574 &Caller::Host { caller, .. } => caller,
1575 };
1576 self.set_thread(caller)?;
1577
1578 let state = self.concurrent_state_mut();
1579 let task = state.get_mut(thread.task)?;
1580 if task.ready_to_delete() {
1581 state.delete(thread.task)?;
1582 }
1583
1584 Ok(())
1585 }
1586
1587 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1595 if !self.concurrency_support() {
1596 self.enter_call_not_concurrent()?;
1597 return Ok(None);
1598 }
1599 let state = self.concurrent_state_mut();
1600 let caller = state.current_guest_thread()?;
1601 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1602 log::trace!("new host task {task:?}");
1603 self.set_thread(task)?;
1604 Ok(Some(task))
1605 }
1606
1607 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1613 if !self.concurrency_support() {
1614 return Ok(());
1615 }
1616 let task = self.concurrent_state_mut().current_host_thread()?;
1617 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1618 self.set_thread(caller)?;
1619 Ok(())
1620 }
1621
1622 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1629 match task {
1630 Some(task) => {
1631 log::trace!("delete host task {task:?}");
1632 self.concurrent_state_mut().delete(task)?;
1633 }
1634 None => {
1635 self.exit_call_not_concurrent();
1636 }
1637 }
1638 Ok(())
1639 }
1640
1641 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1649 if self.trapped() {
1650 return Ok(false);
1651 }
1652 if !self.concurrency_support() {
1653 return Ok(true);
1654 }
1655 let state = self.concurrent_state_mut();
1656 let mut cur = state.current_thread;
1657 loop {
1658 match cur {
1659 CurrentThread::None => break Ok(true),
1660 CurrentThread::Guest(thread) => {
1661 let task = state.get_mut(thread.task)?;
1662
1663 if task.instance.instance == instance.instance {
1670 break Ok(false);
1671 }
1672 cur = match task.caller {
1673 Caller::Host { caller, .. } => caller,
1674 Caller::Guest { thread } => thread.into(),
1675 };
1676 }
1677 CurrentThread::Host(id) => {
1678 cur = state.get_mut(id)?.caller.into();
1679 }
1680 }
1681 }
1682 }
1683
1684 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1687 self.component_instance_mut(instance.instance)
1688 .instance_state(instance.index)
1689 }
1690
1691 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1692 let state = self.concurrent_state_mut();
1697 let old_thread = mem::replace(&mut state.current_thread, thread.into());
1698 if let Some(old_thread) = old_thread.guest() {
1699 let instance = state.get_mut(old_thread.task)?.instance.instance;
1700 self.component_instance_mut(instance)
1701 .set_task_may_block(false)
1702 }
1703
1704 if self.concurrent_state_mut().current_thread.guest().is_some() {
1707 self.set_task_may_block()?;
1708 }
1709
1710 Ok(old_thread)
1711 }
1712
1713 fn set_task_may_block(&mut self) -> Result<()> {
1716 let state = self.concurrent_state_mut();
1717 let guest_thread = state.current_guest_thread()?;
1718 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1719 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1720 self.component_instance_mut(instance)
1721 .set_task_may_block(may_block);
1722 Ok(())
1723 }
1724
1725 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1726 if !self.concurrency_support() {
1727 return Ok(());
1728 }
1729 let state = self.concurrent_state_mut();
1730 let task = state.current_guest_thread()?.task;
1731 let instance = state.get_mut(task)?.instance.instance;
1732 let task_may_block = self.component_instance(instance).get_task_may_block();
1733
1734 if task_may_block {
1735 Ok(())
1736 } else {
1737 Err(Trap::CannotBlockSyncTask.into())
1738 }
1739 }
1740
1741 fn enter_instance(&mut self, instance: RuntimeInstance) {
1745 log::trace!("enter {instance:?}");
1746 self.instance_state(instance)
1747 .concurrent_state()
1748 .do_not_enter = true;
1749 }
1750
1751 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1755 log::trace!("exit {instance:?}");
1756 self.instance_state(instance)
1757 .concurrent_state()
1758 .do_not_enter = false;
1759 self.partition_pending(instance)
1760 }
1761
1762 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1767 for (thread, kind) in
1768 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1769 {
1770 let call = GuestCall { thread, kind };
1771 if call.is_ready(self)? {
1772 self.concurrent_state_mut()
1773 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1774 } else {
1775 self.instance_state(instance)
1776 .concurrent_state()
1777 .pending
1778 .insert(call.thread, call.kind);
1779 }
1780 }
1781
1782 Ok(())
1783 }
1784
1785 pub(crate) fn backpressure_modify(
1787 &mut self,
1788 caller_instance: RuntimeInstance,
1789 modify: impl FnOnce(u16) -> Option<u16>,
1790 ) -> Result<()> {
1791 let state = self.instance_state(caller_instance).concurrent_state();
1792 let old = state.backpressure;
1793 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1794 state.backpressure = new;
1795
1796 if old > 0 && new == 0 {
1797 self.partition_pending(caller_instance)?;
1800 }
1801
1802 Ok(())
1803 }
1804
1805 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1808 let old_thread = self.concurrent_state_mut().current_thread;
1809 log::trace!("resume_fiber: save current thread {old_thread:?}");
1810
1811 let fiber = fiber::resolve_or_release(self, fiber).await?;
1812
1813 self.set_thread(old_thread)?;
1814
1815 let state = self.concurrent_state_mut();
1816
1817 if let Some(ot) = old_thread.guest() {
1818 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1819 }
1820 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1821
1822 if let Some(mut fiber) = fiber {
1823 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1824 let reason = match state.suspend_reason.take() {
1826 Some(r) => r,
1827 None => bail_bug!("suspend reason missing when resuming fiber"),
1828 };
1829 match reason {
1830 SuspendReason::NeedWork => {
1831 if state.worker.is_none() {
1832 state.worker = Some(fiber);
1833 } else {
1834 fiber.dispose(self);
1835 }
1836 }
1837 SuspendReason::Yielding { thread, .. } => {
1838 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1839 let instance = state.get_mut(thread.task)?.instance.index;
1840 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1841 }
1842 SuspendReason::ExplicitlySuspending { thread, .. } => {
1843 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1844 }
1845 SuspendReason::Waiting { set, thread, .. } => {
1846 let old = state
1847 .get_mut(set)?
1848 .waiting
1849 .insert(thread, WaitMode::Fiber(fiber));
1850 assert!(old.is_none());
1851 }
1852 };
1853 } else {
1854 log::trace!("resume_fiber: fiber has exited");
1855 }
1856
1857 Ok(())
1858 }
1859
1860 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1866 log::trace!("suspend fiber: {reason:?}");
1867
1868 let task = match &reason {
1872 SuspendReason::Yielding { thread, .. }
1873 | SuspendReason::Waiting { thread, .. }
1874 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1875 SuspendReason::NeedWork => None,
1876 };
1877
1878 let old_guest_thread = if task.is_some() {
1879 self.concurrent_state_mut().current_thread
1880 } else {
1881 CurrentThread::None
1882 };
1883
1884 debug_assert!(
1890 matches!(
1891 reason,
1892 SuspendReason::ExplicitlySuspending {
1893 skip_may_block_check: true,
1894 ..
1895 } | SuspendReason::Waiting {
1896 skip_may_block_check: true,
1897 ..
1898 } | SuspendReason::Yielding {
1899 skip_may_block_check: true,
1900 ..
1901 }
1902 ) || old_guest_thread
1903 .guest()
1904 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1905 .transpose()?
1906 .unwrap_or(true)
1907 );
1908
1909 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1910 assert!(suspend_reason.is_none());
1911 *suspend_reason = Some(reason);
1912
1913 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1914
1915 if task.is_some() {
1916 self.set_thread(old_guest_thread)?;
1917 }
1918
1919 Ok(())
1920 }
1921
1922 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1923 let state = self.concurrent_state_mut();
1924 let caller = state.current_guest_thread()?;
1925 let old_set = waitable.common(state)?.set;
1926 let set = state.get_mut(caller.thread)?.sync_call_set;
1927 waitable.join(state, Some(set))?;
1928 self.suspend(SuspendReason::Waiting {
1929 set,
1930 thread: caller,
1931 skip_may_block_check: false,
1932 })?;
1933 let state = self.concurrent_state_mut();
1934 waitable.join(state, old_set)
1935 }
1936}
1937
1938impl Instance {
1939 fn get_event(
1942 self,
1943 store: &mut StoreOpaque,
1944 guest_task: TableId<GuestTask>,
1945 set: Option<TableId<WaitableSet>>,
1946 cancellable: bool,
1947 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1948 let state = store.concurrent_state_mut();
1949
1950 let event = &mut state.get_mut(guest_task)?.event;
1951 if let Some(ev) = event
1952 && (cancellable || !matches!(ev, Event::Cancelled))
1953 {
1954 log::trace!("deliver event {ev:?} to {guest_task:?}");
1955 let ev = *ev;
1956 *event = None;
1957 return Ok(Some((ev, None)));
1958 }
1959
1960 let set = match set {
1961 Some(set) => set,
1962 None => return Ok(None),
1963 };
1964 let waitable = match state.get_mut(set)?.ready.pop_first() {
1965 Some(v) => v,
1966 None => return Ok(None),
1967 };
1968
1969 let common = waitable.common(state)?;
1970 let handle = match common.handle {
1971 Some(h) => h,
1972 None => bail_bug!("handle not set when delivering event"),
1973 };
1974 let event = match common.event.take() {
1975 Some(e) => e,
1976 None => bail_bug!("event not set when delivering event"),
1977 };
1978
1979 log::trace!(
1980 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1981 );
1982
1983 waitable.on_delivery(store, self, event)?;
1984
1985 Ok(Some((event, Some((waitable, handle)))))
1986 }
1987
1988 fn handle_callback_code(
1994 self,
1995 store: &mut StoreOpaque,
1996 guest_thread: QualifiedThreadId,
1997 runtime_instance: RuntimeComponentInstanceIndex,
1998 code: u32,
1999 ) -> Result<Option<GuestCall>> {
2000 let (code, set) = unpack_callback_code(code);
2001
2002 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2003
2004 let state = store.concurrent_state_mut();
2005
2006 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2007 let set = store
2008 .instance_state(RuntimeInstance {
2009 instance: self.id().instance(),
2010 index: runtime_instance,
2011 })
2012 .handle_table()
2013 .waitable_set_rep(handle)?;
2014
2015 Ok(TableId::<WaitableSet>::new(set))
2016 };
2017
2018 Ok(match code {
2019 callback_code::EXIT => {
2020 log::trace!("implicit thread {guest_thread:?} completed");
2021 self.cleanup_thread(store, guest_thread, runtime_instance)?;
2022 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2023 if task.threads.is_empty() && !task.returned_or_cancelled() {
2024 bail!(Trap::NoAsyncResult);
2025 }
2026 if let Caller::Guest { .. } = task.caller {
2027 task.exited = true;
2028 task.callback = None;
2029 }
2030 if task.ready_to_delete() {
2031 Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
2032 }
2033 None
2034 }
2035 callback_code::YIELD => {
2036 let task = state.get_mut(guest_thread.task)?;
2037 if let Some(event) = task.event {
2042 assert!(matches!(event, Event::None | Event::Cancelled));
2043 } else {
2044 task.event = Some(Event::None);
2045 }
2046 let call = GuestCall {
2047 thread: guest_thread,
2048 kind: GuestCallKind::DeliverEvent {
2049 instance: self,
2050 set: None,
2051 },
2052 };
2053 if state.may_block(guest_thread.task)? {
2054 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2057 None
2058 } else {
2059 Some(call)
2063 }
2064 }
2065 callback_code::WAIT => {
2066 state.check_blocking_for(guest_thread.task)?;
2069
2070 let set = get_set(store, set)?;
2071 let state = store.concurrent_state_mut();
2072
2073 if state.get_mut(guest_thread.task)?.event.is_some()
2074 || !state.get_mut(set)?.ready.is_empty()
2075 {
2076 state.push_high_priority(WorkItem::GuestCall(
2078 runtime_instance,
2079 GuestCall {
2080 thread: guest_thread,
2081 kind: GuestCallKind::DeliverEvent {
2082 instance: self,
2083 set: Some(set),
2084 },
2085 },
2086 ));
2087 } else {
2088 let old = state
2096 .get_mut(guest_thread.thread)?
2097 .wake_on_cancel
2098 .replace(set);
2099 if !old.is_none() {
2100 bail_bug!("thread unexpectedly had wake_on_cancel set");
2101 }
2102 let old = state
2103 .get_mut(set)?
2104 .waiting
2105 .insert(guest_thread, WaitMode::Callback(self));
2106 if !old.is_none() {
2107 bail_bug!("set's waiting set already had this thread registered");
2108 }
2109 }
2110 None
2111 }
2112 _ => bail!(Trap::UnsupportedCallbackCode),
2113 })
2114 }
2115
2116 fn cleanup_thread(
2117 self,
2118 store: &mut StoreOpaque,
2119 guest_thread: QualifiedThreadId,
2120 runtime_instance: RuntimeComponentInstanceIndex,
2121 ) -> Result<()> {
2122 let state = store.concurrent_state_mut();
2123 let thread_data = state.get_mut(guest_thread.thread)?;
2124 let sync_call_set = thread_data.sync_call_set;
2125 if let Some(guest_id) = thread_data.instance_rep {
2126 store
2127 .instance_state(RuntimeInstance {
2128 instance: self.id().instance(),
2129 index: runtime_instance,
2130 })
2131 .thread_handle_table()
2132 .guest_thread_remove(guest_id)?;
2133 }
2134 let state = store.concurrent_state_mut();
2135
2136 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2138 if let Some(Event::Subtask {
2139 status: Status::Returned | Status::ReturnCancelled,
2140 }) = waitable.common(state)?.event
2141 {
2142 waitable.delete_from(state)?;
2143 }
2144 }
2145
2146 store.concurrent_state_mut().delete(guest_thread.thread)?;
2147 store.concurrent_state_mut().delete(sync_call_set)?;
2148 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2149 task.threads.remove(&guest_thread.thread);
2150 Ok(())
2151 }
2152
2153 unsafe fn queue_call<T: 'static>(
2160 self,
2161 mut store: StoreContextMut<T>,
2162 guest_thread: QualifiedThreadId,
2163 callee: SendSyncPtr<VMFuncRef>,
2164 param_count: usize,
2165 result_count: usize,
2166 async_: bool,
2167 callback: Option<SendSyncPtr<VMFuncRef>>,
2168 post_return: Option<SendSyncPtr<VMFuncRef>>,
2169 ) -> Result<()> {
2170 unsafe fn make_call<T: 'static>(
2185 store: StoreContextMut<T>,
2186 guest_thread: QualifiedThreadId,
2187 callee: SendSyncPtr<VMFuncRef>,
2188 param_count: usize,
2189 result_count: usize,
2190 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2191 + Send
2192 + Sync
2193 + 'static
2194 + use<T> {
2195 let token = StoreToken::new(store);
2196 move |store: &mut dyn VMStore| {
2197 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2198
2199 store
2200 .concurrent_state_mut()
2201 .get_mut(guest_thread.thread)?
2202 .state = GuestThreadState::Running;
2203 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2204 let lower = match task.lower_params.take() {
2205 Some(l) => l,
2206 None => bail_bug!("lower_params missing"),
2207 };
2208
2209 lower(store, &mut storage[..param_count])?;
2210
2211 let mut store = token.as_context_mut(store);
2212
2213 unsafe {
2216 crate::Func::call_unchecked_raw(
2217 &mut store,
2218 callee.as_non_null(),
2219 NonNull::new(
2220 &mut storage[..param_count.max(result_count)]
2221 as *mut [MaybeUninit<ValRaw>] as _,
2222 )
2223 .unwrap(),
2224 )?;
2225 }
2226
2227 Ok(storage)
2228 }
2229 }
2230
2231 let call = unsafe {
2235 make_call(
2236 store.as_context_mut(),
2237 guest_thread,
2238 callee,
2239 param_count,
2240 result_count,
2241 )
2242 };
2243
2244 let callee_instance = store
2245 .0
2246 .concurrent_state_mut()
2247 .get_mut(guest_thread.task)?
2248 .instance;
2249
2250 let fun = if callback.is_some() {
2251 assert!(async_);
2252
2253 Box::new(move |store: &mut dyn VMStore| {
2254 self.add_guest_thread_to_instance_table(
2255 guest_thread.thread,
2256 store,
2257 callee_instance.index,
2258 )?;
2259 let old_thread = store.set_thread(guest_thread)?;
2260 log::trace!(
2261 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2262 );
2263
2264 store.enter_instance(callee_instance);
2265
2266 let storage = call(store)?;
2273
2274 store.exit_instance(callee_instance)?;
2275
2276 store.set_thread(old_thread)?;
2277 let state = store.concurrent_state_mut();
2278 if let Some(t) = old_thread.guest() {
2279 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2280 }
2281 log::trace!("stackless call: restored {old_thread:?} as current thread");
2282
2283 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2286
2287 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2288 })
2289 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2290 } else {
2291 let token = StoreToken::new(store.as_context_mut());
2292 Box::new(move |store: &mut dyn VMStore| {
2293 self.add_guest_thread_to_instance_table(
2294 guest_thread.thread,
2295 store,
2296 callee_instance.index,
2297 )?;
2298 let old_thread = store.set_thread(guest_thread)?;
2299 log::trace!(
2300 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2301 );
2302 let flags = self.id().get(store).instance_flags(callee_instance.index);
2303
2304 if !async_ {
2308 store.enter_instance(callee_instance);
2309 }
2310
2311 let storage = call(store)?;
2318
2319 if async_ {
2320 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2321 if task.threads.len() == 1 && !task.returned_or_cancelled() {
2322 bail!(Trap::NoAsyncResult);
2323 }
2324 } else {
2325 let lift = {
2331 store.exit_instance(callee_instance)?;
2332
2333 let state = store.concurrent_state_mut();
2334 if !state.get_mut(guest_thread.task)?.result.is_none() {
2335 bail_bug!("task has already produced a result");
2336 }
2337
2338 match state.get_mut(guest_thread.task)?.lift_result.take() {
2339 Some(lift) => lift,
2340 None => bail_bug!("lift_result field is missing"),
2341 }
2342 };
2343
2344 let result = (lift.lift)(store, unsafe {
2347 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2348 &storage[..result_count],
2349 )
2350 })?;
2351
2352 let post_return_arg = match result_count {
2353 0 => ValRaw::i32(0),
2354 1 => unsafe { storage[0].assume_init() },
2357 _ => unreachable!(),
2358 };
2359
2360 unsafe {
2361 call_post_return(
2362 token.as_context_mut(store),
2363 post_return.map(|v| v.as_non_null()),
2364 post_return_arg,
2365 flags,
2366 )?;
2367 }
2368
2369 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2370 }
2371
2372 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2374
2375 store.set_thread(old_thread)?;
2376
2377 let state = store.concurrent_state_mut();
2378 let task = state.get_mut(guest_thread.task)?;
2379
2380 match &task.caller {
2381 Caller::Host { .. } => {
2382 if task.ready_to_delete() {
2383 Waitable::Guest(guest_thread.task).delete_from(state)?;
2384 }
2385 }
2386 Caller::Guest { .. } => {
2387 task.exited = true;
2388 }
2389 }
2390
2391 Ok(None)
2392 })
2393 };
2394
2395 store
2396 .0
2397 .concurrent_state_mut()
2398 .push_high_priority(WorkItem::GuestCall(
2399 callee_instance.index,
2400 GuestCall {
2401 thread: guest_thread,
2402 kind: GuestCallKind::StartImplicit(fun),
2403 },
2404 ));
2405
2406 Ok(())
2407 }
2408
2409 unsafe fn prepare_call<T: 'static>(
2422 self,
2423 mut store: StoreContextMut<T>,
2424 start: NonNull<VMFuncRef>,
2425 return_: NonNull<VMFuncRef>,
2426 caller_instance: RuntimeComponentInstanceIndex,
2427 callee_instance: RuntimeComponentInstanceIndex,
2428 task_return_type: TypeTupleIndex,
2429 callee_async: bool,
2430 memory: *mut VMMemoryDefinition,
2431 string_encoding: StringEncoding,
2432 caller_info: CallerInfo,
2433 ) -> Result<()> {
2434 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2435 store.0.check_blocking()?;
2439 }
2440
2441 enum ResultInfo {
2442 Heap { results: u32 },
2443 Stack { result_count: u32 },
2444 }
2445
2446 let result_info = match &caller_info {
2447 CallerInfo::Async {
2448 has_result: true,
2449 params,
2450 } => ResultInfo::Heap {
2451 results: match params.last() {
2452 Some(r) => r.get_u32(),
2453 None => bail_bug!("retptr missing"),
2454 },
2455 },
2456 CallerInfo::Async {
2457 has_result: false, ..
2458 } => ResultInfo::Stack { result_count: 0 },
2459 CallerInfo::Sync {
2460 result_count,
2461 params,
2462 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2463 results: match params.last() {
2464 Some(r) => r.get_u32(),
2465 None => bail_bug!("arg ptr missing"),
2466 },
2467 },
2468 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2469 result_count: *result_count,
2470 },
2471 };
2472
2473 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2474
2475 let start = SendSyncPtr::new(start);
2479 let return_ = SendSyncPtr::new(return_);
2480 let token = StoreToken::new(store.as_context_mut());
2481 let state = store.0.concurrent_state_mut();
2482 let old_thread = state.current_guest_thread()?;
2483
2484 debug_assert_eq!(
2485 state.get_mut(old_thread.task)?.instance,
2486 RuntimeInstance {
2487 instance: self.id().instance(),
2488 index: caller_instance,
2489 }
2490 );
2491
2492 let new_task = GuestTask::new(
2493 Box::new(move |store, dst| {
2494 let mut store = token.as_context_mut(store);
2495 assert!(dst.len() <= MAX_FLAT_PARAMS);
2496 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2498 let count = match caller_info {
2499 CallerInfo::Async { params, has_result } => {
2503 let params = ¶ms[..params.len() - usize::from(has_result)];
2504 for (param, src) in params.iter().zip(&mut src) {
2505 src.write(*param);
2506 }
2507 params.len()
2508 }
2509
2510 CallerInfo::Sync { params, .. } => {
2512 for (param, src) in params.iter().zip(&mut src) {
2513 src.write(*param);
2514 }
2515 params.len()
2516 }
2517 };
2518 unsafe {
2525 crate::Func::call_unchecked_raw(
2526 &mut store,
2527 start.as_non_null(),
2528 NonNull::new(
2529 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2530 )
2531 .unwrap(),
2532 )?;
2533 }
2534 dst.copy_from_slice(&src[..dst.len()]);
2535 let state = store.0.concurrent_state_mut();
2536 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2537 state,
2538 Some(Event::Subtask {
2539 status: Status::Started,
2540 }),
2541 )?;
2542 Ok(())
2543 }),
2544 LiftResult {
2545 lift: Box::new(move |store, src| {
2546 let mut store = token.as_context_mut(store);
2549 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2551 my_src.push(ValRaw::u32(*results));
2552 }
2553
2554 let prev = store.0.set_thread(old_thread)?;
2560
2561 unsafe {
2568 crate::Func::call_unchecked_raw(
2569 &mut store,
2570 return_.as_non_null(),
2571 my_src.as_mut_slice().into(),
2572 )?;
2573 }
2574
2575 store.0.set_thread(prev)?;
2578
2579 let state = store.0.concurrent_state_mut();
2580 let thread = state.current_guest_thread()?;
2581 if sync_caller {
2582 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2583 if let ResultInfo::Stack { result_count } = &result_info {
2584 match result_count {
2585 0 => None,
2586 1 => Some(my_src[0]),
2587 _ => unreachable!(),
2588 }
2589 } else {
2590 None
2591 },
2592 );
2593 }
2594 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2595 }),
2596 ty: task_return_type,
2597 memory: NonNull::new(memory).map(SendSyncPtr::new),
2598 string_encoding,
2599 },
2600 Caller::Guest { thread: old_thread },
2601 None,
2602 RuntimeInstance {
2603 instance: self.id().instance(),
2604 index: callee_instance,
2605 },
2606 callee_async,
2607 )?;
2608
2609 let guest_task = state.push(new_task)?;
2610 let new_thread = GuestThread::new_implicit(state, guest_task)?;
2611 let guest_thread = state.push(new_thread)?;
2612 state.get_mut(guest_task)?.threads.insert(guest_thread);
2613
2614 store.0.set_thread(QualifiedThreadId {
2617 task: guest_task,
2618 thread: guest_thread,
2619 })?;
2620 log::trace!(
2621 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2622 );
2623
2624 Ok(())
2625 }
2626
2627 unsafe fn call_callback<T>(
2632 self,
2633 mut store: StoreContextMut<T>,
2634 function: SendSyncPtr<VMFuncRef>,
2635 event: Event,
2636 handle: u32,
2637 ) -> Result<u32> {
2638 let (ordinal, result) = event.parts();
2639 let params = &mut [
2640 ValRaw::u32(ordinal),
2641 ValRaw::u32(handle),
2642 ValRaw::u32(result),
2643 ];
2644 unsafe {
2649 crate::Func::call_unchecked_raw(
2650 &mut store,
2651 function.as_non_null(),
2652 params.as_mut_slice().into(),
2653 )?;
2654 }
2655 Ok(params[0].get_u32())
2656 }
2657
2658 unsafe fn start_call<T: 'static>(
2671 self,
2672 mut store: StoreContextMut<T>,
2673 callback: *mut VMFuncRef,
2674 post_return: *mut VMFuncRef,
2675 callee: NonNull<VMFuncRef>,
2676 param_count: u32,
2677 result_count: u32,
2678 flags: u32,
2679 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2680 ) -> Result<u32> {
2681 let token = StoreToken::new(store.as_context_mut());
2682 let async_caller = storage.is_none();
2683 let state = store.0.concurrent_state_mut();
2684 let guest_thread = state.current_guest_thread()?;
2685 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2686 let callee = SendSyncPtr::new(callee);
2687 let param_count = usize::try_from(param_count)?;
2688 assert!(param_count <= MAX_FLAT_PARAMS);
2689 let result_count = usize::try_from(result_count)?;
2690 assert!(result_count <= MAX_FLAT_RESULTS);
2691
2692 let task = state.get_mut(guest_thread.task)?;
2693 if let Some(callback) = NonNull::new(callback) {
2694 let callback = SendSyncPtr::new(callback);
2698 task.callback = Some(Box::new(move |store, event, handle| {
2699 let store = token.as_context_mut(store);
2700 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2701 }));
2702 }
2703
2704 let Caller::Guest { thread: caller } = &task.caller else {
2705 bail_bug!("start_call unexpectedly invoked for host->guest call");
2708 };
2709 let caller = *caller;
2710 let caller_instance = state.get_mut(caller.task)?.instance;
2711
2712 unsafe {
2714 self.queue_call(
2715 store.as_context_mut(),
2716 guest_thread,
2717 callee,
2718 param_count,
2719 result_count,
2720 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2721 NonNull::new(callback).map(SendSyncPtr::new),
2722 NonNull::new(post_return).map(SendSyncPtr::new),
2723 )?;
2724 }
2725
2726 let state = store.0.concurrent_state_mut();
2727
2728 let guest_waitable = Waitable::Guest(guest_thread.task);
2731 let old_set = guest_waitable.common(state)?.set;
2732 let set = state.get_mut(caller.thread)?.sync_call_set;
2733 guest_waitable.join(state, Some(set))?;
2734
2735 let (status, waitable) = loop {
2751 store.0.suspend(SuspendReason::Waiting {
2752 set,
2753 thread: caller,
2754 skip_may_block_check: async_caller || !callee_async,
2762 })?;
2763
2764 let state = store.0.concurrent_state_mut();
2765
2766 log::trace!("taking event for {:?}", guest_thread.task);
2767 let event = guest_waitable.take_event(state)?;
2768 let Some(Event::Subtask { status }) = event else {
2769 bail_bug!("subtasks should only get subtask events, got {event:?}")
2770 };
2771
2772 log::trace!("status {status:?} for {:?}", guest_thread.task);
2773
2774 if status == Status::Returned {
2775 break (status, None);
2777 } else if async_caller {
2778 let handle = store
2782 .0
2783 .instance_state(caller_instance)
2784 .handle_table()
2785 .subtask_insert_guest(guest_thread.task.rep())?;
2786 store
2787 .0
2788 .concurrent_state_mut()
2789 .get_mut(guest_thread.task)?
2790 .common
2791 .handle = Some(handle);
2792 break (status, Some(handle));
2793 } else {
2794 }
2798 };
2799
2800 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2801
2802 store.0.set_thread(caller)?;
2804 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2805 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2806
2807 if let Some(storage) = storage {
2808 let state = store.0.concurrent_state_mut();
2812 let task = state.get_mut(guest_thread.task)?;
2813 if let Some(result) = task.sync_result.take()? {
2814 if let Some(result) = result {
2815 storage[0] = MaybeUninit::new(result);
2816 }
2817
2818 if task.exited && task.ready_to_delete() {
2819 Waitable::Guest(guest_thread.task).delete_from(state)?;
2820 }
2821 }
2822 }
2823
2824 Ok(status.pack(waitable))
2825 }
2826
2827 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2840 self,
2841 mut store: StoreContextMut<'_, T>,
2842 future: impl Future<Output = Result<R>> + Send + 'static,
2843 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2844 ) -> Result<Option<u32>> {
2845 let token = StoreToken::new(store.as_context_mut());
2846 let state = store.0.concurrent_state_mut();
2847 let task = state.current_host_thread()?;
2848
2849 let (join_handle, future) = JoinHandle::run(future);
2852 {
2853 let state = &mut state.get_mut(task)?.state;
2854 assert!(matches!(state, HostTaskState::CalleeStarted));
2855 *state = HostTaskState::CalleeRunning(join_handle);
2856 }
2857
2858 let mut future = Box::pin(future);
2859
2860 let poll = tls::set(store.0, || {
2865 future
2866 .as_mut()
2867 .poll(&mut Context::from_waker(&Waker::noop()))
2868 });
2869
2870 match poll {
2871 Poll::Ready(result) => {
2873 let result = result.transpose()?;
2874 lower(store.as_context_mut(), result)?;
2875 return Ok(None);
2876 }
2877
2878 Poll::Pending => {}
2880 }
2881
2882 let future = Box::pin(async move {
2890 let result = match future.await {
2891 Some(result) => Some(result?),
2892 None => None,
2893 };
2894 let on_complete = move |store: &mut dyn VMStore| {
2895 let mut store = token.as_context_mut(store);
2899 let old = store.0.set_thread(task)?;
2900
2901 let status = if result.is_some() {
2902 Status::Returned
2903 } else {
2904 Status::ReturnCancelled
2905 };
2906
2907 lower(store.as_context_mut(), result)?;
2908 let state = store.0.concurrent_state_mut();
2909 match &mut state.get_mut(task)?.state {
2910 HostTaskState::CalleeDone { .. } => {}
2913
2914 other => *other = HostTaskState::CalleeDone { cancelled: false },
2916 }
2917 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2918
2919 store.0.set_thread(old)?;
2920 Ok(())
2921 };
2922
2923 tls::get(move |store| {
2928 store
2929 .concurrent_state_mut()
2930 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2931 on_complete,
2932 ))));
2933 Ok(())
2934 })
2935 });
2936
2937 let state = store.0.concurrent_state_mut();
2940 state.push_future(future);
2941 let caller = state.get_mut(task)?.caller;
2942 let instance = state.get_mut(caller.task)?.instance;
2943 let handle = store
2944 .0
2945 .instance_state(instance)
2946 .handle_table()
2947 .subtask_insert_host(task.rep())?;
2948 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2949 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2950
2951 store.0.set_thread(caller)?;
2955 Ok(Some(handle))
2956 }
2957
2958 pub(crate) fn task_return(
2961 self,
2962 store: &mut dyn VMStore,
2963 ty: TypeTupleIndex,
2964 options: OptionsIndex,
2965 storage: &[ValRaw],
2966 ) -> Result<()> {
2967 let state = store.concurrent_state_mut();
2968 let guest_thread = state.current_guest_thread()?;
2969 let lift = state
2970 .get_mut(guest_thread.task)?
2971 .lift_result
2972 .take()
2973 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
2974 if !state.get_mut(guest_thread.task)?.result.is_none() {
2975 bail_bug!("task result unexpectedly already set");
2976 }
2977
2978 let CanonicalOptions {
2979 string_encoding,
2980 data_model,
2981 ..
2982 } = &self.id().get(store).component().env_component().options[options];
2983
2984 let invalid = ty != lift.ty
2985 || string_encoding != &lift.string_encoding
2986 || match data_model {
2987 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2988 Some(memory) => {
2989 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2990 let actual = self.id().get(store).runtime_memory(memory);
2991 expected != actual.as_ptr()
2992 }
2993 None => false,
2996 },
2997 CanonicalOptionsDataModel::Gc { .. } => true,
2999 };
3000
3001 if invalid {
3002 bail!(Trap::TaskReturnInvalid);
3003 }
3004
3005 log::trace!("task.return for {guest_thread:?}");
3006
3007 let result = (lift.lift)(store, storage)?;
3008 self.task_complete(store, guest_thread.task, result, Status::Returned)
3009 }
3010
3011 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3013 let state = store.concurrent_state_mut();
3014 let guest_thread = state.current_guest_thread()?;
3015 let task = state.get_mut(guest_thread.task)?;
3016 if !task.cancel_sent {
3017 bail!(Trap::TaskCancelNotCancelled);
3018 }
3019 _ = task
3020 .lift_result
3021 .take()
3022 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3023
3024 if !task.result.is_none() {
3025 bail_bug!("task result should not bet set yet");
3026 }
3027
3028 log::trace!("task.cancel for {guest_thread:?}");
3029
3030 self.task_complete(
3031 store,
3032 guest_thread.task,
3033 Box::new(DummyResult),
3034 Status::ReturnCancelled,
3035 )
3036 }
3037
3038 fn task_complete(
3044 self,
3045 store: &mut StoreOpaque,
3046 guest_task: TableId<GuestTask>,
3047 result: Box<dyn Any + Send + Sync>,
3048 status: Status,
3049 ) -> Result<()> {
3050 store
3051 .component_resource_tables(Some(self))
3052 .validate_scope_exit()?;
3053
3054 let state = store.concurrent_state_mut();
3055 let task = state.get_mut(guest_task)?;
3056
3057 if let Caller::Host { tx, .. } = &mut task.caller {
3058 if let Some(tx) = tx.take() {
3059 _ = tx.send(result);
3060 }
3061 } else {
3062 task.result = Some(result);
3063 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3064 }
3065
3066 Ok(())
3067 }
3068
3069 pub(crate) fn waitable_set_new(
3071 self,
3072 store: &mut StoreOpaque,
3073 caller_instance: RuntimeComponentInstanceIndex,
3074 ) -> Result<u32> {
3075 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3076 let handle = store
3077 .instance_state(RuntimeInstance {
3078 instance: self.id().instance(),
3079 index: caller_instance,
3080 })
3081 .handle_table()
3082 .waitable_set_insert(set.rep())?;
3083 log::trace!("new waitable set {set:?} (handle {handle})");
3084 Ok(handle)
3085 }
3086
3087 pub(crate) fn waitable_set_drop(
3089 self,
3090 store: &mut StoreOpaque,
3091 caller_instance: RuntimeComponentInstanceIndex,
3092 set: u32,
3093 ) -> Result<()> {
3094 let rep = store
3095 .instance_state(RuntimeInstance {
3096 instance: self.id().instance(),
3097 index: caller_instance,
3098 })
3099 .handle_table()
3100 .waitable_set_remove(set)?;
3101
3102 log::trace!("drop waitable set {rep} (handle {set})");
3103
3104 if !store
3108 .concurrent_state_mut()
3109 .get_mut(TableId::<WaitableSet>::new(rep))?
3110 .waiting
3111 .is_empty()
3112 {
3113 bail!(Trap::WaitableSetDropHasWaiters);
3114 }
3115
3116 store
3117 .concurrent_state_mut()
3118 .delete(TableId::<WaitableSet>::new(rep))?;
3119
3120 Ok(())
3121 }
3122
3123 pub(crate) fn waitable_join(
3125 self,
3126 store: &mut StoreOpaque,
3127 caller_instance: RuntimeComponentInstanceIndex,
3128 waitable_handle: u32,
3129 set_handle: u32,
3130 ) -> Result<()> {
3131 let mut instance = self.id().get_mut(store);
3132 let waitable =
3133 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3134
3135 let set = if set_handle == 0 {
3136 None
3137 } else {
3138 let set = instance.instance_states().0[caller_instance]
3139 .handle_table()
3140 .waitable_set_rep(set_handle)?;
3141
3142 Some(TableId::<WaitableSet>::new(set))
3143 };
3144
3145 log::trace!(
3146 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3147 );
3148
3149 waitable.join(store.concurrent_state_mut(), set)
3150 }
3151
3152 pub(crate) fn subtask_drop(
3154 self,
3155 store: &mut StoreOpaque,
3156 caller_instance: RuntimeComponentInstanceIndex,
3157 task_id: u32,
3158 ) -> Result<()> {
3159 self.waitable_join(store, caller_instance, task_id, 0)?;
3160
3161 let (rep, is_host) = store
3162 .instance_state(RuntimeInstance {
3163 instance: self.id().instance(),
3164 index: caller_instance,
3165 })
3166 .handle_table()
3167 .subtask_remove(task_id)?;
3168
3169 let concurrent_state = store.concurrent_state_mut();
3170 let (waitable, delete) = if is_host {
3171 let id = TableId::<HostTask>::new(rep);
3172 let task = concurrent_state.get_mut(id)?;
3173 match &task.state {
3174 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3175 HostTaskState::CalleeDone { .. } => {}
3176 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3177 bail_bug!("invalid state for callee in `subtask.drop`")
3178 }
3179 }
3180 (Waitable::Host(id), true)
3181 } else {
3182 let id = TableId::<GuestTask>::new(rep);
3183 let task = concurrent_state.get_mut(id)?;
3184 if task.lift_result.is_some() {
3185 bail!(Trap::SubtaskDropNotResolved);
3186 }
3187 (
3188 Waitable::Guest(id),
3189 concurrent_state.get_mut(id)?.ready_to_delete(),
3190 )
3191 };
3192
3193 waitable.common(concurrent_state)?.handle = None;
3194
3195 if waitable.take_event(concurrent_state)?.is_some() {
3198 bail!(Trap::SubtaskDropNotResolved);
3199 }
3200
3201 if delete {
3202 waitable.delete_from(concurrent_state)?;
3203 }
3204
3205 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3206 Ok(())
3207 }
3208
3209 pub(crate) fn waitable_set_wait(
3211 self,
3212 store: &mut StoreOpaque,
3213 options: OptionsIndex,
3214 set: u32,
3215 payload: u32,
3216 ) -> Result<u32> {
3217 if !self.options(store, options).async_ {
3218 store.check_blocking()?;
3222 }
3223
3224 let &CanonicalOptions {
3225 cancellable,
3226 instance: caller_instance,
3227 ..
3228 } = &self.id().get(store).component().env_component().options[options];
3229 let rep = store
3230 .instance_state(RuntimeInstance {
3231 instance: self.id().instance(),
3232 index: caller_instance,
3233 })
3234 .handle_table()
3235 .waitable_set_rep(set)?;
3236
3237 self.waitable_check(
3238 store,
3239 cancellable,
3240 WaitableCheck::Wait,
3241 WaitableCheckParams {
3242 set: TableId::new(rep),
3243 options,
3244 payload,
3245 },
3246 )
3247 }
3248
3249 pub(crate) fn waitable_set_poll(
3251 self,
3252 store: &mut StoreOpaque,
3253 options: OptionsIndex,
3254 set: u32,
3255 payload: u32,
3256 ) -> Result<u32> {
3257 let &CanonicalOptions {
3258 cancellable,
3259 instance: caller_instance,
3260 ..
3261 } = &self.id().get(store).component().env_component().options[options];
3262 let rep = store
3263 .instance_state(RuntimeInstance {
3264 instance: self.id().instance(),
3265 index: caller_instance,
3266 })
3267 .handle_table()
3268 .waitable_set_rep(set)?;
3269
3270 self.waitable_check(
3271 store,
3272 cancellable,
3273 WaitableCheck::Poll,
3274 WaitableCheckParams {
3275 set: TableId::new(rep),
3276 options,
3277 payload,
3278 },
3279 )
3280 }
3281
3282 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3284 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3285 match store
3286 .concurrent_state_mut()
3287 .get_mut(thread_id)?
3288 .instance_rep
3289 {
3290 Some(r) => Ok(r),
3291 None => bail_bug!("thread should have instance_rep by now"),
3292 }
3293 }
3294
3295 pub(crate) fn thread_new_indirect<T: 'static>(
3297 self,
3298 mut store: StoreContextMut<T>,
3299 runtime_instance: RuntimeComponentInstanceIndex,
3300 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3302 start_func_idx: u32,
3303 context: i32,
3304 ) -> Result<u32> {
3305 log::trace!("creating new thread");
3306
3307 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3308 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3309 let callee = instance
3310 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3311 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3312 if callee.type_index(store.0) != start_func_ty.type_index() {
3313 bail!(Trap::ThreadNewIndirectInvalidType);
3314 }
3315
3316 let token = StoreToken::new(store.as_context_mut());
3317 let start_func = Box::new(
3318 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3319 let old_thread = store.set_thread(guest_thread)?;
3320 log::trace!(
3321 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3322 );
3323
3324 let mut store = token.as_context_mut(store);
3325 let mut params = [ValRaw::i32(context)];
3326 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3329
3330 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3331 log::trace!("explicit thread {guest_thread:?} completed");
3332 let state = store.0.concurrent_state_mut();
3333 let task = state.get_mut(guest_thread.task)?;
3334 if task.threads.is_empty() && !task.returned_or_cancelled() {
3335 bail!(Trap::NoAsyncResult);
3336 }
3337 store.0.set_thread(old_thread)?;
3338 let state = store.0.concurrent_state_mut();
3339 if let Some(t) = old_thread.guest() {
3340 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3341 }
3342 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3343 Waitable::Guest(guest_thread.task).delete_from(state)?;
3344 }
3345 log::trace!("thread start: restored {old_thread:?} as current thread");
3346
3347 Ok(())
3348 },
3349 );
3350
3351 let state = store.0.concurrent_state_mut();
3352 let current_thread = state.current_guest_thread()?;
3353 let parent_task = current_thread.task;
3354
3355 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3356 let thread_id = state.push(new_thread)?;
3357 state.get_mut(parent_task)?.threads.insert(thread_id);
3358
3359 log::trace!("new thread with id {thread_id:?} created");
3360
3361 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3362 }
3363
3364 pub(crate) fn resume_thread(
3365 self,
3366 store: &mut StoreOpaque,
3367 runtime_instance: RuntimeComponentInstanceIndex,
3368 thread_idx: u32,
3369 high_priority: bool,
3370 allow_ready: bool,
3371 ) -> Result<()> {
3372 let thread_id =
3373 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3374 let state = store.concurrent_state_mut();
3375 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3376 let thread = state.get_mut(guest_thread.thread)?;
3377
3378 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3379 GuestThreadState::NotStartedExplicit(start_func) => {
3380 log::trace!("starting thread {guest_thread:?}");
3381 let guest_call = WorkItem::GuestCall(
3382 runtime_instance,
3383 GuestCall {
3384 thread: guest_thread,
3385 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3386 start_func(store, guest_thread)
3387 })),
3388 },
3389 );
3390 store
3391 .concurrent_state_mut()
3392 .push_work_item(guest_call, high_priority);
3393 }
3394 GuestThreadState::Suspended(fiber) => {
3395 log::trace!("resuming thread {thread_id:?} that was suspended");
3396 store
3397 .concurrent_state_mut()
3398 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3399 }
3400 GuestThreadState::Ready(fiber) if allow_ready => {
3401 log::trace!("resuming thread {thread_id:?} that was ready");
3402 thread.state = GuestThreadState::Ready(fiber);
3403 store
3404 .concurrent_state_mut()
3405 .promote_thread_work_item(guest_thread);
3406 }
3407 other => {
3408 thread.state = other;
3409 bail!(Trap::CannotResumeThread);
3410 }
3411 }
3412 Ok(())
3413 }
3414
3415 fn add_guest_thread_to_instance_table(
3416 self,
3417 thread_id: TableId<GuestThread>,
3418 store: &mut StoreOpaque,
3419 runtime_instance: RuntimeComponentInstanceIndex,
3420 ) -> Result<u32> {
3421 let guest_id = store
3422 .instance_state(RuntimeInstance {
3423 instance: self.id().instance(),
3424 index: runtime_instance,
3425 })
3426 .thread_handle_table()
3427 .guest_thread_insert(thread_id.rep())?;
3428 store
3429 .concurrent_state_mut()
3430 .get_mut(thread_id)?
3431 .instance_rep = Some(guest_id);
3432 Ok(guest_id)
3433 }
3434
3435 pub(crate) fn suspension_intrinsic(
3438 self,
3439 store: &mut StoreOpaque,
3440 caller: RuntimeComponentInstanceIndex,
3441 cancellable: bool,
3442 yielding: bool,
3443 to_thread: SuspensionTarget,
3444 ) -> Result<WaitResult> {
3445 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3446 if to_thread.is_none() {
3447 let state = store.concurrent_state_mut();
3448 if yielding {
3449 if !state.may_block(guest_thread.task)? {
3451 if !state.promote_instance_local_thread_work_item(caller) {
3454 return Ok(WaitResult::Completed);
3456 }
3457 }
3458 } else {
3459 store.check_blocking()?;
3463 }
3464 }
3465
3466 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3468 return Ok(WaitResult::Cancelled);
3469 }
3470
3471 match to_thread {
3472 SuspensionTarget::SomeSuspended(thread) => {
3473 self.resume_thread(store, caller, thread, true, false)?
3474 }
3475 SuspensionTarget::Some(thread) => {
3476 self.resume_thread(store, caller, thread, true, true)?
3477 }
3478 SuspensionTarget::None => { }
3479 }
3480
3481 let reason = if yielding {
3482 SuspendReason::Yielding {
3483 thread: guest_thread,
3484 skip_may_block_check: to_thread.is_some(),
3488 }
3489 } else {
3490 SuspendReason::ExplicitlySuspending {
3491 thread: guest_thread,
3492 skip_may_block_check: to_thread.is_some(),
3496 }
3497 };
3498
3499 store.suspend(reason)?;
3500
3501 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3502 Ok(WaitResult::Cancelled)
3503 } else {
3504 Ok(WaitResult::Completed)
3505 }
3506 }
3507
3508 fn waitable_check(
3510 self,
3511 store: &mut StoreOpaque,
3512 cancellable: bool,
3513 check: WaitableCheck,
3514 params: WaitableCheckParams,
3515 ) -> Result<u32> {
3516 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3517
3518 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3519
3520 let state = store.concurrent_state_mut();
3521 let task = state.get_mut(guest_thread.task)?;
3522
3523 match &check {
3526 WaitableCheck::Wait => {
3527 let set = params.set;
3528
3529 if (task.event.is_none()
3530 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3531 && state.get_mut(set)?.ready.is_empty()
3532 {
3533 if cancellable {
3534 let old = state
3535 .get_mut(guest_thread.thread)?
3536 .wake_on_cancel
3537 .replace(set);
3538 if !old.is_none() {
3539 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3540 }
3541 }
3542
3543 store.suspend(SuspendReason::Waiting {
3544 set,
3545 thread: guest_thread,
3546 skip_may_block_check: false,
3547 })?;
3548 }
3549 }
3550 WaitableCheck::Poll => {}
3551 }
3552
3553 log::trace!(
3554 "waitable check for {guest_thread:?}; set {:?}, part two",
3555 params.set
3556 );
3557
3558 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3560
3561 let (ordinal, handle, result) = match &check {
3562 WaitableCheck::Wait => {
3563 let (event, waitable) = match event {
3564 Some(p) => p,
3565 None => bail_bug!("event expected to be present"),
3566 };
3567 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3568 let (ordinal, result) = event.parts();
3569 (ordinal, handle, result)
3570 }
3571 WaitableCheck::Poll => {
3572 if let Some((event, waitable)) = event {
3573 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3574 let (ordinal, result) = event.parts();
3575 (ordinal, handle, result)
3576 } else {
3577 log::trace!(
3578 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3579 guest_thread.task,
3580 params.set
3581 );
3582 let (ordinal, result) = Event::None.parts();
3583 (ordinal, 0, result)
3584 }
3585 }
3586 };
3587 let memory = self.options_memory_mut(store, params.options);
3588 let ptr = func::validate_inbounds_dynamic(
3589 &CanonicalAbiInfo::POINTER_PAIR,
3590 memory,
3591 &ValRaw::u32(params.payload),
3592 )?;
3593 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3594 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3595 Ok(ordinal)
3596 }
3597
3598 pub(crate) fn subtask_cancel(
3600 self,
3601 store: &mut StoreOpaque,
3602 caller_instance: RuntimeComponentInstanceIndex,
3603 async_: bool,
3604 task_id: u32,
3605 ) -> Result<u32> {
3606 if !async_ {
3607 store.check_blocking()?;
3611 }
3612
3613 let (rep, is_host) = store
3614 .instance_state(RuntimeInstance {
3615 instance: self.id().instance(),
3616 index: caller_instance,
3617 })
3618 .handle_table()
3619 .subtask_rep(task_id)?;
3620 let waitable = if is_host {
3621 Waitable::Host(TableId::<HostTask>::new(rep))
3622 } else {
3623 Waitable::Guest(TableId::<GuestTask>::new(rep))
3624 };
3625 let concurrent_state = store.concurrent_state_mut();
3626
3627 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3628
3629 let needs_block;
3630 if let Waitable::Host(host_task) = waitable {
3631 let state = &mut concurrent_state.get_mut(host_task)?.state;
3632 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3633 HostTaskState::CalleeRunning(handle) => {
3640 handle.abort();
3641 needs_block = true;
3642 }
3643
3644 HostTaskState::CalleeDone { cancelled } => {
3647 if cancelled {
3648 bail!(Trap::SubtaskCancelAfterTerminal);
3649 } else {
3650 needs_block = false;
3653 }
3654 }
3655
3656 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3659 bail_bug!("invalid states for host callee")
3660 }
3661 }
3662 } else {
3663 let caller = concurrent_state.current_guest_thread()?;
3664 let guest_task = TableId::<GuestTask>::new(rep);
3665 let task = concurrent_state.get_mut(guest_task)?;
3666 if !task.already_lowered_parameters() {
3667 task.lower_params = None;
3671 task.lift_result = None;
3672 task.exited = true;
3673 let instance = task.instance;
3674
3675 assert_eq!(1, task.threads.len());
3678 let thread = *task.threads.iter().next().unwrap();
3679 self.cleanup_thread(
3680 store,
3681 QualifiedThreadId {
3682 task: guest_task,
3683 thread,
3684 },
3685 caller_instance,
3686 )?;
3687
3688 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3690 let pending_count = pending.len();
3691 pending.retain(|thread, _| thread.task != guest_task);
3692 if pending.len() == pending_count {
3694 bail!(Trap::SubtaskCancelAfterTerminal);
3695 }
3696 return Ok(Status::StartCancelled as u32);
3697 } else if !task.returned_or_cancelled() {
3698 task.cancel_sent = true;
3701 task.event = Some(Event::Cancelled);
3706 let runtime_instance = task.instance.index;
3707 for thread in task.threads.clone() {
3708 let thread = QualifiedThreadId {
3709 task: guest_task,
3710 thread,
3711 };
3712 if let Some(set) = concurrent_state
3713 .get_mut(thread.thread)?
3714 .wake_on_cancel
3715 .take()
3716 {
3717 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3718 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3719 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3720 runtime_instance,
3721 GuestCall {
3722 thread,
3723 kind: GuestCallKind::DeliverEvent {
3724 instance,
3725 set: None,
3726 },
3727 },
3728 ),
3729 None => bail_bug!("thread not present in wake_on_cancel set"),
3730 };
3731 concurrent_state.push_high_priority(item);
3732
3733 store.suspend(SuspendReason::Yielding {
3734 thread: caller,
3735 skip_may_block_check: false,
3738 })?;
3739 break;
3740 }
3741 }
3742
3743 needs_block = !store
3746 .concurrent_state_mut()
3747 .get_mut(guest_task)?
3748 .returned_or_cancelled()
3749 } else {
3750 needs_block = false;
3751 }
3752 };
3753
3754 if needs_block {
3758 if async_ {
3759 return Ok(BLOCKED);
3760 }
3761
3762 store.wait_for_event(waitable)?;
3766
3767 }
3769
3770 let event = waitable.take_event(store.concurrent_state_mut())?;
3771 if let Some(Event::Subtask {
3772 status: status @ (Status::Returned | Status::ReturnCancelled),
3773 }) = event
3774 {
3775 Ok(status as u32)
3776 } else {
3777 bail!(Trap::SubtaskCancelAfterTerminal);
3778 }
3779 }
3780
3781 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3782 store.concurrent_state_mut().context_get(slot)
3783 }
3784
3785 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3786 store.concurrent_state_mut().context_set(slot, value)
3787 }
3788}
3789
3790pub trait VMComponentAsyncStore {
3798 unsafe fn prepare_call(
3804 &mut self,
3805 instance: Instance,
3806 memory: *mut VMMemoryDefinition,
3807 start: NonNull<VMFuncRef>,
3808 return_: NonNull<VMFuncRef>,
3809 caller_instance: RuntimeComponentInstanceIndex,
3810 callee_instance: RuntimeComponentInstanceIndex,
3811 task_return_type: TypeTupleIndex,
3812 callee_async: bool,
3813 string_encoding: StringEncoding,
3814 result_count: u32,
3815 storage: *mut ValRaw,
3816 storage_len: usize,
3817 ) -> Result<()>;
3818
3819 unsafe fn sync_start(
3822 &mut self,
3823 instance: Instance,
3824 callback: *mut VMFuncRef,
3825 callee: NonNull<VMFuncRef>,
3826 param_count: u32,
3827 storage: *mut MaybeUninit<ValRaw>,
3828 storage_len: usize,
3829 ) -> Result<()>;
3830
3831 unsafe fn async_start(
3834 &mut self,
3835 instance: Instance,
3836 callback: *mut VMFuncRef,
3837 post_return: *mut VMFuncRef,
3838 callee: NonNull<VMFuncRef>,
3839 param_count: u32,
3840 result_count: u32,
3841 flags: u32,
3842 ) -> Result<u32>;
3843
3844 fn future_write(
3846 &mut self,
3847 instance: Instance,
3848 caller: RuntimeComponentInstanceIndex,
3849 ty: TypeFutureTableIndex,
3850 options: OptionsIndex,
3851 future: u32,
3852 address: u32,
3853 ) -> Result<u32>;
3854
3855 fn future_read(
3857 &mut self,
3858 instance: Instance,
3859 caller: RuntimeComponentInstanceIndex,
3860 ty: TypeFutureTableIndex,
3861 options: OptionsIndex,
3862 future: u32,
3863 address: u32,
3864 ) -> Result<u32>;
3865
3866 fn future_drop_writable(
3868 &mut self,
3869 instance: Instance,
3870 ty: TypeFutureTableIndex,
3871 writer: u32,
3872 ) -> Result<()>;
3873
3874 fn stream_write(
3876 &mut self,
3877 instance: Instance,
3878 caller: RuntimeComponentInstanceIndex,
3879 ty: TypeStreamTableIndex,
3880 options: OptionsIndex,
3881 stream: u32,
3882 address: u32,
3883 count: u32,
3884 ) -> Result<u32>;
3885
3886 fn stream_read(
3888 &mut self,
3889 instance: Instance,
3890 caller: RuntimeComponentInstanceIndex,
3891 ty: TypeStreamTableIndex,
3892 options: OptionsIndex,
3893 stream: u32,
3894 address: u32,
3895 count: u32,
3896 ) -> Result<u32>;
3897
3898 fn flat_stream_write(
3901 &mut self,
3902 instance: Instance,
3903 caller: RuntimeComponentInstanceIndex,
3904 ty: TypeStreamTableIndex,
3905 options: OptionsIndex,
3906 payload_size: u32,
3907 payload_align: u32,
3908 stream: u32,
3909 address: u32,
3910 count: u32,
3911 ) -> Result<u32>;
3912
3913 fn flat_stream_read(
3916 &mut self,
3917 instance: Instance,
3918 caller: RuntimeComponentInstanceIndex,
3919 ty: TypeStreamTableIndex,
3920 options: OptionsIndex,
3921 payload_size: u32,
3922 payload_align: u32,
3923 stream: u32,
3924 address: u32,
3925 count: u32,
3926 ) -> Result<u32>;
3927
3928 fn stream_drop_writable(
3930 &mut self,
3931 instance: Instance,
3932 ty: TypeStreamTableIndex,
3933 writer: u32,
3934 ) -> Result<()>;
3935
3936 fn error_context_debug_message(
3938 &mut self,
3939 instance: Instance,
3940 ty: TypeComponentLocalErrorContextTableIndex,
3941 options: OptionsIndex,
3942 err_ctx_handle: u32,
3943 debug_msg_address: u32,
3944 ) -> Result<()>;
3945
3946 fn thread_new_indirect(
3948 &mut self,
3949 instance: Instance,
3950 caller: RuntimeComponentInstanceIndex,
3951 func_ty_idx: TypeFuncIndex,
3952 start_func_table_idx: RuntimeTableIndex,
3953 start_func_idx: u32,
3954 context: i32,
3955 ) -> Result<u32>;
3956}
3957
3958impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3960 unsafe fn prepare_call(
3961 &mut self,
3962 instance: Instance,
3963 memory: *mut VMMemoryDefinition,
3964 start: NonNull<VMFuncRef>,
3965 return_: NonNull<VMFuncRef>,
3966 caller_instance: RuntimeComponentInstanceIndex,
3967 callee_instance: RuntimeComponentInstanceIndex,
3968 task_return_type: TypeTupleIndex,
3969 callee_async: bool,
3970 string_encoding: StringEncoding,
3971 result_count_or_max_if_async: u32,
3972 storage: *mut ValRaw,
3973 storage_len: usize,
3974 ) -> Result<()> {
3975 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3979
3980 unsafe {
3981 instance.prepare_call(
3982 StoreContextMut(self),
3983 start,
3984 return_,
3985 caller_instance,
3986 callee_instance,
3987 task_return_type,
3988 callee_async,
3989 memory,
3990 string_encoding,
3991 match result_count_or_max_if_async {
3992 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3993 params,
3994 has_result: false,
3995 },
3996 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3997 params,
3998 has_result: true,
3999 },
4000 result_count => CallerInfo::Sync {
4001 params,
4002 result_count,
4003 },
4004 },
4005 )
4006 }
4007 }
4008
4009 unsafe fn sync_start(
4010 &mut self,
4011 instance: Instance,
4012 callback: *mut VMFuncRef,
4013 callee: NonNull<VMFuncRef>,
4014 param_count: u32,
4015 storage: *mut MaybeUninit<ValRaw>,
4016 storage_len: usize,
4017 ) -> Result<()> {
4018 unsafe {
4019 instance
4020 .start_call(
4021 StoreContextMut(self),
4022 callback,
4023 ptr::null_mut(),
4024 callee,
4025 param_count,
4026 1,
4027 START_FLAG_ASYNC_CALLEE,
4028 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4032 )
4033 .map(drop)
4034 }
4035 }
4036
4037 unsafe fn async_start(
4038 &mut self,
4039 instance: Instance,
4040 callback: *mut VMFuncRef,
4041 post_return: *mut VMFuncRef,
4042 callee: NonNull<VMFuncRef>,
4043 param_count: u32,
4044 result_count: u32,
4045 flags: u32,
4046 ) -> Result<u32> {
4047 unsafe {
4048 instance.start_call(
4049 StoreContextMut(self),
4050 callback,
4051 post_return,
4052 callee,
4053 param_count,
4054 result_count,
4055 flags,
4056 None,
4057 )
4058 }
4059 }
4060
4061 fn future_write(
4062 &mut self,
4063 instance: Instance,
4064 caller: RuntimeComponentInstanceIndex,
4065 ty: TypeFutureTableIndex,
4066 options: OptionsIndex,
4067 future: u32,
4068 address: u32,
4069 ) -> Result<u32> {
4070 instance
4071 .guest_write(
4072 StoreContextMut(self),
4073 caller,
4074 TransmitIndex::Future(ty),
4075 options,
4076 None,
4077 future,
4078 address,
4079 1,
4080 )
4081 .map(|result| result.encode())
4082 }
4083
4084 fn future_read(
4085 &mut self,
4086 instance: Instance,
4087 caller: RuntimeComponentInstanceIndex,
4088 ty: TypeFutureTableIndex,
4089 options: OptionsIndex,
4090 future: u32,
4091 address: u32,
4092 ) -> Result<u32> {
4093 instance
4094 .guest_read(
4095 StoreContextMut(self),
4096 caller,
4097 TransmitIndex::Future(ty),
4098 options,
4099 None,
4100 future,
4101 address,
4102 1,
4103 )
4104 .map(|result| result.encode())
4105 }
4106
4107 fn stream_write(
4108 &mut self,
4109 instance: Instance,
4110 caller: RuntimeComponentInstanceIndex,
4111 ty: TypeStreamTableIndex,
4112 options: OptionsIndex,
4113 stream: u32,
4114 address: u32,
4115 count: u32,
4116 ) -> Result<u32> {
4117 instance
4118 .guest_write(
4119 StoreContextMut(self),
4120 caller,
4121 TransmitIndex::Stream(ty),
4122 options,
4123 None,
4124 stream,
4125 address,
4126 count,
4127 )
4128 .map(|result| result.encode())
4129 }
4130
4131 fn stream_read(
4132 &mut self,
4133 instance: Instance,
4134 caller: RuntimeComponentInstanceIndex,
4135 ty: TypeStreamTableIndex,
4136 options: OptionsIndex,
4137 stream: u32,
4138 address: u32,
4139 count: u32,
4140 ) -> Result<u32> {
4141 instance
4142 .guest_read(
4143 StoreContextMut(self),
4144 caller,
4145 TransmitIndex::Stream(ty),
4146 options,
4147 None,
4148 stream,
4149 address,
4150 count,
4151 )
4152 .map(|result| result.encode())
4153 }
4154
4155 fn future_drop_writable(
4156 &mut self,
4157 instance: Instance,
4158 ty: TypeFutureTableIndex,
4159 writer: u32,
4160 ) -> Result<()> {
4161 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4162 }
4163
4164 fn flat_stream_write(
4165 &mut self,
4166 instance: Instance,
4167 caller: RuntimeComponentInstanceIndex,
4168 ty: TypeStreamTableIndex,
4169 options: OptionsIndex,
4170 payload_size: u32,
4171 payload_align: u32,
4172 stream: u32,
4173 address: u32,
4174 count: u32,
4175 ) -> Result<u32> {
4176 instance
4177 .guest_write(
4178 StoreContextMut(self),
4179 caller,
4180 TransmitIndex::Stream(ty),
4181 options,
4182 Some(FlatAbi {
4183 size: payload_size,
4184 align: payload_align,
4185 }),
4186 stream,
4187 address,
4188 count,
4189 )
4190 .map(|result| result.encode())
4191 }
4192
4193 fn flat_stream_read(
4194 &mut self,
4195 instance: Instance,
4196 caller: RuntimeComponentInstanceIndex,
4197 ty: TypeStreamTableIndex,
4198 options: OptionsIndex,
4199 payload_size: u32,
4200 payload_align: u32,
4201 stream: u32,
4202 address: u32,
4203 count: u32,
4204 ) -> Result<u32> {
4205 instance
4206 .guest_read(
4207 StoreContextMut(self),
4208 caller,
4209 TransmitIndex::Stream(ty),
4210 options,
4211 Some(FlatAbi {
4212 size: payload_size,
4213 align: payload_align,
4214 }),
4215 stream,
4216 address,
4217 count,
4218 )
4219 .map(|result| result.encode())
4220 }
4221
4222 fn stream_drop_writable(
4223 &mut self,
4224 instance: Instance,
4225 ty: TypeStreamTableIndex,
4226 writer: u32,
4227 ) -> Result<()> {
4228 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4229 }
4230
4231 fn error_context_debug_message(
4232 &mut self,
4233 instance: Instance,
4234 ty: TypeComponentLocalErrorContextTableIndex,
4235 options: OptionsIndex,
4236 err_ctx_handle: u32,
4237 debug_msg_address: u32,
4238 ) -> Result<()> {
4239 instance.error_context_debug_message(
4240 StoreContextMut(self),
4241 ty,
4242 options,
4243 err_ctx_handle,
4244 debug_msg_address,
4245 )
4246 }
4247
4248 fn thread_new_indirect(
4249 &mut self,
4250 instance: Instance,
4251 caller: RuntimeComponentInstanceIndex,
4252 func_ty_idx: TypeFuncIndex,
4253 start_func_table_idx: RuntimeTableIndex,
4254 start_func_idx: u32,
4255 context: i32,
4256 ) -> Result<u32> {
4257 instance.thread_new_indirect(
4258 StoreContextMut(self),
4259 caller,
4260 func_ty_idx,
4261 start_func_table_idx,
4262 start_func_idx,
4263 context,
4264 )
4265 }
4266}
4267
4268type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4269
4270pub(crate) struct HostTask {
4274 common: WaitableCommon,
4275
4276 caller: QualifiedThreadId,
4278
4279 call_context: CallContext,
4282
4283 state: HostTaskState,
4284}
4285
4286enum HostTaskState {
4287 CalleeStarted,
4292
4293 CalleeRunning(JoinHandle),
4298
4299 CalleeFinished(LiftedResult),
4303
4304 CalleeDone { cancelled: bool },
4307}
4308
4309impl HostTask {
4310 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4311 Self {
4312 common: WaitableCommon::default(),
4313 call_context: CallContext::default(),
4314 caller,
4315 state,
4316 }
4317 }
4318}
4319
4320impl TableDebug for HostTask {
4321 fn type_name() -> &'static str {
4322 "HostTask"
4323 }
4324}
4325
4326type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4327
4328enum Caller {
4330 Host {
4332 tx: Option<oneshot::Sender<LiftedResult>>,
4334 host_future_present: bool,
4337 caller: CurrentThread,
4341 },
4342 Guest {
4344 thread: QualifiedThreadId,
4346 },
4347}
4348
4349struct LiftResult {
4352 lift: RawLift,
4353 ty: TypeTupleIndex,
4354 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4355 string_encoding: StringEncoding,
4356}
4357
4358#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4363pub(crate) struct QualifiedThreadId {
4364 task: TableId<GuestTask>,
4365 thread: TableId<GuestThread>,
4366}
4367
4368impl QualifiedThreadId {
4369 fn qualify(
4370 state: &mut ConcurrentState,
4371 thread: TableId<GuestThread>,
4372 ) -> Result<QualifiedThreadId> {
4373 Ok(QualifiedThreadId {
4374 task: state.get_mut(thread)?.parent_task,
4375 thread,
4376 })
4377 }
4378}
4379
4380impl fmt::Debug for QualifiedThreadId {
4381 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4382 f.debug_tuple("QualifiedThreadId")
4383 .field(&self.task.rep())
4384 .field(&self.thread.rep())
4385 .finish()
4386 }
4387}
4388
4389enum GuestThreadState {
4390 NotStartedImplicit,
4391 NotStartedExplicit(
4392 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4393 ),
4394 Running,
4395 Suspended(StoreFiber<'static>),
4396 Ready(StoreFiber<'static>),
4397 Completed,
4398}
4399pub struct GuestThread {
4400 context: [u32; 2],
4403 parent_task: TableId<GuestTask>,
4405 wake_on_cancel: Option<TableId<WaitableSet>>,
4408 state: GuestThreadState,
4410 instance_rep: Option<u32>,
4413 sync_call_set: TableId<WaitableSet>,
4415}
4416
4417impl GuestThread {
4418 fn from_instance(
4421 state: Pin<&mut ComponentInstance>,
4422 caller_instance: RuntimeComponentInstanceIndex,
4423 guest_thread: u32,
4424 ) -> Result<TableId<Self>> {
4425 let rep = state.instance_states().0[caller_instance]
4426 .thread_handle_table()
4427 .guest_thread_rep(guest_thread)?;
4428 Ok(TableId::new(rep))
4429 }
4430
4431 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4432 let sync_call_set = state.push(WaitableSet::default())?;
4433 Ok(Self {
4434 context: [0; 2],
4435 parent_task,
4436 wake_on_cancel: None,
4437 state: GuestThreadState::NotStartedImplicit,
4438 instance_rep: None,
4439 sync_call_set,
4440 })
4441 }
4442
4443 fn new_explicit(
4444 state: &mut ConcurrentState,
4445 parent_task: TableId<GuestTask>,
4446 start_func: Box<
4447 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4448 >,
4449 ) -> Result<Self> {
4450 let sync_call_set = state.push(WaitableSet::default())?;
4451 Ok(Self {
4452 context: [0; 2],
4453 parent_task,
4454 wake_on_cancel: None,
4455 state: GuestThreadState::NotStartedExplicit(start_func),
4456 instance_rep: None,
4457 sync_call_set,
4458 })
4459 }
4460}
4461
4462impl TableDebug for GuestThread {
4463 fn type_name() -> &'static str {
4464 "GuestThread"
4465 }
4466}
4467
4468enum SyncResult {
4469 NotProduced,
4470 Produced(Option<ValRaw>),
4471 Taken,
4472}
4473
4474impl SyncResult {
4475 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4476 Ok(match mem::replace(self, SyncResult::Taken) {
4477 SyncResult::NotProduced => None,
4478 SyncResult::Produced(val) => Some(val),
4479 SyncResult::Taken => {
4480 bail_bug!("attempted to take a synchronous result that was already taken")
4481 }
4482 })
4483 }
4484}
4485
4486#[derive(Debug)]
4487enum HostFutureState {
4488 NotApplicable,
4489 Live,
4490 Dropped,
4491}
4492
4493pub(crate) struct GuestTask {
4495 common: WaitableCommon,
4497 lower_params: Option<RawLower>,
4499 lift_result: Option<LiftResult>,
4501 result: Option<LiftedResult>,
4504 callback: Option<CallbackFn>,
4507 caller: Caller,
4509 call_context: CallContext,
4514 sync_result: SyncResult,
4517 cancel_sent: bool,
4520 starting_sent: bool,
4523 instance: RuntimeInstance,
4530 event: Option<Event>,
4533 exited: bool,
4535 threads: HashSet<TableId<GuestThread>>,
4537 host_future_state: HostFutureState,
4540 async_function: bool,
4543}
4544
4545impl GuestTask {
4546 fn already_lowered_parameters(&self) -> bool {
4547 self.lower_params.is_none()
4549 }
4550
4551 fn returned_or_cancelled(&self) -> bool {
4552 self.lift_result.is_none()
4554 }
4555
4556 fn ready_to_delete(&self) -> bool {
4557 let threads_completed = self.threads.is_empty();
4558 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4559 let pending_completion_event = matches!(
4560 self.common.event,
4561 Some(Event::Subtask {
4562 status: Status::Returned | Status::ReturnCancelled
4563 })
4564 );
4565 let ready = threads_completed
4566 && !has_sync_result
4567 && !pending_completion_event
4568 && !matches!(self.host_future_state, HostFutureState::Live);
4569 log::trace!(
4570 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4571 threads_completed,
4572 has_sync_result,
4573 pending_completion_event,
4574 self.host_future_state
4575 );
4576 ready
4577 }
4578
4579 fn new(
4580 lower_params: RawLower,
4581 lift_result: LiftResult,
4582 caller: Caller,
4583 callback: Option<CallbackFn>,
4584 instance: RuntimeInstance,
4585 async_function: bool,
4586 ) -> Result<Self> {
4587 let host_future_state = match &caller {
4588 Caller::Guest { .. } => HostFutureState::NotApplicable,
4589 Caller::Host {
4590 host_future_present,
4591 ..
4592 } => {
4593 if *host_future_present {
4594 HostFutureState::Live
4595 } else {
4596 HostFutureState::NotApplicable
4597 }
4598 }
4599 };
4600 Ok(Self {
4601 common: WaitableCommon::default(),
4602 lower_params: Some(lower_params),
4603 lift_result: Some(lift_result),
4604 result: None,
4605 callback,
4606 caller,
4607 call_context: CallContext::default(),
4608 sync_result: SyncResult::NotProduced,
4609 cancel_sent: false,
4610 starting_sent: false,
4611 instance,
4612 event: None,
4613 exited: false,
4614 threads: HashSet::new(),
4615 host_future_state,
4616 async_function,
4617 })
4618 }
4619}
4620
4621impl TableDebug for GuestTask {
4622 fn type_name() -> &'static str {
4623 "GuestTask"
4624 }
4625}
4626
4627#[derive(Default)]
4629struct WaitableCommon {
4630 event: Option<Event>,
4632 set: Option<TableId<WaitableSet>>,
4634 handle: Option<u32>,
4636}
4637
4638#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4640enum Waitable {
4641 Host(TableId<HostTask>),
4643 Guest(TableId<GuestTask>),
4645 Transmit(TableId<TransmitHandle>),
4647}
4648
4649impl Waitable {
4650 fn from_instance(
4653 state: Pin<&mut ComponentInstance>,
4654 caller_instance: RuntimeComponentInstanceIndex,
4655 waitable: u32,
4656 ) -> Result<Self> {
4657 use crate::runtime::vm::component::Waitable;
4658
4659 let (waitable, kind) = state.instance_states().0[caller_instance]
4660 .handle_table()
4661 .waitable_rep(waitable)?;
4662
4663 Ok(match kind {
4664 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4665 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4666 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4667 })
4668 }
4669
4670 fn rep(&self) -> u32 {
4672 match self {
4673 Self::Host(id) => id.rep(),
4674 Self::Guest(id) => id.rep(),
4675 Self::Transmit(id) => id.rep(),
4676 }
4677 }
4678
4679 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4683 log::trace!("waitable {self:?} join set {set:?}",);
4684
4685 let old = mem::replace(&mut self.common(state)?.set, set);
4686
4687 if let Some(old) = old {
4688 match *self {
4689 Waitable::Host(id) => state.remove_child(id, old),
4690 Waitable::Guest(id) => state.remove_child(id, old),
4691 Waitable::Transmit(id) => state.remove_child(id, old),
4692 }?;
4693
4694 state.get_mut(old)?.ready.remove(self);
4695 }
4696
4697 if let Some(set) = set {
4698 match *self {
4699 Waitable::Host(id) => state.add_child(id, set),
4700 Waitable::Guest(id) => state.add_child(id, set),
4701 Waitable::Transmit(id) => state.add_child(id, set),
4702 }?;
4703
4704 if self.common(state)?.event.is_some() {
4705 self.mark_ready(state)?;
4706 }
4707 }
4708
4709 Ok(())
4710 }
4711
4712 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4714 Ok(match self {
4715 Self::Host(id) => &mut state.get_mut(*id)?.common,
4716 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4717 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4718 })
4719 }
4720
4721 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4725 log::trace!("set event for {self:?}: {event:?}");
4726 self.common(state)?.event = event;
4727 self.mark_ready(state)
4728 }
4729
4730 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4732 let common = self.common(state)?;
4733 let event = common.event.take();
4734 if let Some(set) = self.common(state)?.set {
4735 state.get_mut(set)?.ready.remove(self);
4736 }
4737
4738 Ok(event)
4739 }
4740
4741 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4745 if let Some(set) = self.common(state)?.set {
4746 state.get_mut(set)?.ready.insert(*self);
4747 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4748 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4749 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4750
4751 let item = match mode {
4752 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4753 WaitMode::Callback(instance) => WorkItem::GuestCall(
4754 state.get_mut(thread.task)?.instance.index,
4755 GuestCall {
4756 thread,
4757 kind: GuestCallKind::DeliverEvent {
4758 instance,
4759 set: Some(set),
4760 },
4761 },
4762 ),
4763 };
4764 state.push_high_priority(item);
4765 }
4766 }
4767 Ok(())
4768 }
4769
4770 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4772 match self {
4773 Self::Host(task) => {
4774 log::trace!("delete host task {task:?}");
4775 state.delete(*task)?;
4776 }
4777 Self::Guest(task) => {
4778 log::trace!("delete guest task {task:?}");
4779 state.delete(*task)?;
4780 }
4781 Self::Transmit(task) => {
4782 state.delete(*task)?;
4783 }
4784 }
4785
4786 Ok(())
4787 }
4788}
4789
4790impl fmt::Debug for Waitable {
4791 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4792 match self {
4793 Self::Host(id) => write!(f, "{id:?}"),
4794 Self::Guest(id) => write!(f, "{id:?}"),
4795 Self::Transmit(id) => write!(f, "{id:?}"),
4796 }
4797 }
4798}
4799
4800#[derive(Default)]
4802struct WaitableSet {
4803 ready: BTreeSet<Waitable>,
4805 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4807}
4808
4809impl TableDebug for WaitableSet {
4810 fn type_name() -> &'static str {
4811 "WaitableSet"
4812 }
4813}
4814
4815type RawLower =
4817 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4818
4819type RawLift = Box<
4821 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4822>;
4823
4824type LiftedResult = Box<dyn Any + Send + Sync>;
4828
4829struct DummyResult;
4832
4833#[derive(Default)]
4835pub struct ConcurrentInstanceState {
4836 backpressure: u16,
4838 do_not_enter: bool,
4840 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4843}
4844
4845impl ConcurrentInstanceState {
4846 pub fn pending_is_empty(&self) -> bool {
4847 self.pending.is_empty()
4848 }
4849}
4850
4851#[derive(Debug, Copy, Clone)]
4852pub(crate) enum CurrentThread {
4853 Guest(QualifiedThreadId),
4854 Host(TableId<HostTask>),
4855 None,
4856}
4857
4858impl CurrentThread {
4859 fn guest(&self) -> Option<&QualifiedThreadId> {
4860 match self {
4861 Self::Guest(id) => Some(id),
4862 _ => None,
4863 }
4864 }
4865
4866 fn host(&self) -> Option<TableId<HostTask>> {
4867 match self {
4868 Self::Host(id) => Some(*id),
4869 _ => None,
4870 }
4871 }
4872
4873 fn is_none(&self) -> bool {
4874 matches!(self, Self::None)
4875 }
4876}
4877
4878impl From<QualifiedThreadId> for CurrentThread {
4879 fn from(id: QualifiedThreadId) -> Self {
4880 Self::Guest(id)
4881 }
4882}
4883
4884impl From<TableId<HostTask>> for CurrentThread {
4885 fn from(id: TableId<HostTask>) -> Self {
4886 Self::Host(id)
4887 }
4888}
4889
4890pub struct ConcurrentState {
4892 current_thread: CurrentThread,
4894
4895 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4900 table: AlwaysMut<ResourceTable>,
4902 high_priority: Vec<WorkItem>,
4904 low_priority: VecDeque<WorkItem>,
4906 suspend_reason: Option<SuspendReason>,
4910 worker: Option<StoreFiber<'static>>,
4914 worker_item: Option<WorkerItem>,
4916
4917 global_error_context_ref_counts:
4930 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4931}
4932
4933impl Default for ConcurrentState {
4934 fn default() -> Self {
4935 Self {
4936 current_thread: CurrentThread::None,
4937 table: AlwaysMut::new(ResourceTable::new()),
4938 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4939 high_priority: Vec::new(),
4940 low_priority: VecDeque::new(),
4941 suspend_reason: None,
4942 worker: None,
4943 worker_item: None,
4944 global_error_context_ref_counts: BTreeMap::new(),
4945 }
4946 }
4947}
4948
4949impl ConcurrentState {
4950 pub(crate) fn take_fibers_and_futures(
4967 &mut self,
4968 fibers: &mut Vec<StoreFiber<'static>>,
4969 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4970 ) {
4971 for entry in self.table.get_mut().iter_mut() {
4972 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4973 for mode in mem::take(&mut set.waiting).into_values() {
4974 if let WaitMode::Fiber(fiber) = mode {
4975 fibers.push(fiber);
4976 }
4977 }
4978 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4979 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4980 mem::replace(&mut thread.state, GuestThreadState::Completed)
4981 {
4982 fibers.push(fiber);
4983 }
4984 }
4985 }
4986
4987 if let Some(fiber) = self.worker.take() {
4988 fibers.push(fiber);
4989 }
4990
4991 let mut handle_item = |item| match item {
4992 WorkItem::ResumeFiber(fiber) => {
4993 fibers.push(fiber);
4994 }
4995 WorkItem::PushFuture(future) => {
4996 self.futures
4997 .get_mut()
4998 .as_mut()
4999 .unwrap()
5000 .push(future.into_inner());
5001 }
5002 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5003 }
5004 };
5005
5006 for item in mem::take(&mut self.high_priority) {
5007 handle_item(item);
5008 }
5009 for item in mem::take(&mut self.low_priority) {
5010 handle_item(item);
5011 }
5012
5013 if let Some(them) = self.futures.get_mut().take() {
5014 futures.push(them);
5015 }
5016 }
5017
5018 fn push<V: Send + Sync + 'static>(
5019 &mut self,
5020 value: V,
5021 ) -> Result<TableId<V>, ResourceTableError> {
5022 self.table.get_mut().push(value).map(TableId::from)
5023 }
5024
5025 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5026 self.table.get_mut().get_mut(&Resource::from(id))
5027 }
5028
5029 pub fn add_child<T: 'static, U: 'static>(
5030 &mut self,
5031 child: TableId<T>,
5032 parent: TableId<U>,
5033 ) -> Result<(), ResourceTableError> {
5034 self.table
5035 .get_mut()
5036 .add_child(Resource::from(child), Resource::from(parent))
5037 }
5038
5039 pub fn remove_child<T: 'static, U: 'static>(
5040 &mut self,
5041 child: TableId<T>,
5042 parent: TableId<U>,
5043 ) -> Result<(), ResourceTableError> {
5044 self.table
5045 .get_mut()
5046 .remove_child(Resource::from(child), Resource::from(parent))
5047 }
5048
5049 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5050 self.table.get_mut().delete(Resource::from(id))
5051 }
5052
5053 fn push_future(&mut self, future: HostTaskFuture) {
5054 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5061 }
5062
5063 fn push_high_priority(&mut self, item: WorkItem) {
5064 log::trace!("push high priority: {item:?}");
5065 self.high_priority.push(item);
5066 }
5067
5068 fn push_low_priority(&mut self, item: WorkItem) {
5069 log::trace!("push low priority: {item:?}");
5070 self.low_priority.push_front(item);
5071 }
5072
5073 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5074 if high_priority {
5075 self.push_high_priority(item);
5076 } else {
5077 self.push_low_priority(item);
5078 }
5079 }
5080
5081 fn promote_instance_local_thread_work_item(
5082 &mut self,
5083 current_instance: RuntimeComponentInstanceIndex,
5084 ) -> bool {
5085 self.promote_work_items_matching(|item: &WorkItem| match item {
5086 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5087 *instance == current_instance
5088 }
5089 _ => false,
5090 })
5091 }
5092
5093 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5094 self.promote_work_items_matching(|item: &WorkItem| match item {
5095 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5096 *t == thread
5097 }
5098 _ => false,
5099 })
5100 }
5101
5102 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5103 where
5104 F: FnMut(&WorkItem) -> bool,
5105 {
5106 if self.high_priority.iter().any(&mut predicate) {
5110 true
5111 }
5112 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5115 let item = self.low_priority.remove(idx).unwrap();
5116 self.push_high_priority(item);
5117 true
5118 } else {
5119 false
5120 }
5121 }
5122
5123 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5125 let thread = self.current_guest_thread()?;
5126 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot)?];
5127 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5128 Ok(val)
5129 }
5130
5131 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5133 let thread = self.current_guest_thread()?;
5134 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5135 self.get_mut(thread.thread)?.context[usize::try_from(slot)?] = val;
5136 Ok(())
5137 }
5138
5139 fn take_pending_cancellation(&mut self) -> Result<bool> {
5142 let thread = self.current_guest_thread()?;
5143 if let Some(event) = self.get_mut(thread.task)?.event.take() {
5144 assert!(matches!(event, Event::Cancelled));
5145 Ok(true)
5146 } else {
5147 Ok(false)
5148 }
5149 }
5150
5151 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5152 if self.may_block(task)? {
5153 Ok(())
5154 } else {
5155 Err(Trap::CannotBlockSyncTask.into())
5156 }
5157 }
5158
5159 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5160 let task = self.get_mut(task)?;
5161 Ok(task.async_function || task.returned_or_cancelled())
5162 }
5163
5164 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5170 let (task, is_host) = (task >> 1, task & 1 == 1);
5171 if is_host {
5172 let task: TableId<HostTask> = TableId::new(task);
5173 Ok(&mut self.get_mut(task)?.call_context)
5174 } else {
5175 let task: TableId<GuestTask> = TableId::new(task);
5176 Ok(&mut self.get_mut(task)?.call_context)
5177 }
5178 }
5179
5180 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5183 let (bits, is_host) = match self.current_thread {
5184 CurrentThread::Guest(id) => (id.task.rep(), false),
5185 CurrentThread::Host(id) => (id.rep(), true),
5186 CurrentThread::None => bail_bug!("current thread is not set"),
5187 };
5188 assert_eq!((bits << 1) >> 1, bits);
5189 Ok((bits << 1) | u32::from(is_host))
5190 }
5191
5192 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5193 match self.current_thread.guest() {
5194 Some(id) => Ok(*id),
5195 None => bail_bug!("current thread is not a guest thread"),
5196 }
5197 }
5198
5199 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5200 match self.current_thread.host() {
5201 Some(id) => Ok(id),
5202 None => bail_bug!("current thread is not a host thread"),
5203 }
5204 }
5205
5206 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5207 match self.futures.get_mut().as_mut() {
5208 Some(f) => Ok(f),
5209 None => bail_bug!("futures field of concurrent state is currently taken"),
5210 }
5211 }
5212
5213 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5214 self.table.get_mut()
5215 }
5216}
5217
5218fn for_any_lower<
5221 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5222>(
5223 fun: F,
5224) -> F {
5225 fun
5226}
5227
5228fn for_any_lift<
5230 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5231>(
5232 fun: F,
5233) -> F {
5234 fun
5235}
5236
5237fn checked<F: Future + Send + 'static>(
5242 id: StoreId,
5243 fut: F,
5244) -> impl Future<Output = F::Output> + Send + 'static {
5245 async move {
5246 let mut fut = pin!(fut);
5247 future::poll_fn(move |cx| {
5248 let message = "\
5249 `Future`s which depend on asynchronous component tasks, streams, or \
5250 futures to complete may only be polled from the event loop of the \
5251 store to which they belong. Please use \
5252 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5253 ";
5254 tls::try_get(|store| {
5255 let matched = match store {
5256 tls::TryGet::Some(store) => store.id() == id,
5257 tls::TryGet::Taken | tls::TryGet::None => false,
5258 };
5259
5260 if !matched {
5261 panic!("{message}")
5262 }
5263 });
5264 fut.as_mut().poll(cx)
5265 })
5266 .await
5267 }
5268}
5269
5270fn check_recursive_run() {
5273 tls::try_get(|store| {
5274 if !matches!(store, tls::TryGet::None) {
5275 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5276 }
5277 });
5278}
5279
5280fn unpack_callback_code(code: u32) -> (u32, u32) {
5281 (code & 0xF, code >> 4)
5282}
5283
5284struct WaitableCheckParams {
5288 set: TableId<WaitableSet>,
5289 options: OptionsIndex,
5290 payload: u32,
5291}
5292
5293enum WaitableCheck {
5296 Wait,
5297 Poll,
5298}
5299
5300pub(crate) struct PreparedCall<R> {
5302 handle: Func,
5304 thread: QualifiedThreadId,
5306 param_count: usize,
5308 rx: oneshot::Receiver<LiftedResult>,
5311 _phantom: PhantomData<R>,
5312}
5313
5314impl<R> PreparedCall<R> {
5315 pub(crate) fn task_id(&self) -> TaskId {
5317 TaskId {
5318 task: self.thread.task,
5319 }
5320 }
5321}
5322
5323pub(crate) struct TaskId {
5325 task: TableId<GuestTask>,
5326}
5327
5328impl TaskId {
5329 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5335 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5336 let delete = if !task.already_lowered_parameters() {
5337 true
5338 } else {
5339 task.host_future_state = HostFutureState::Dropped;
5340 task.ready_to_delete()
5341 };
5342 if delete {
5343 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5344 }
5345 Ok(())
5346 }
5347}
5348
5349pub(crate) fn prepare_call<T, R>(
5355 mut store: StoreContextMut<T>,
5356 handle: Func,
5357 param_count: usize,
5358 host_future_present: bool,
5359 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5360 + Send
5361 + Sync
5362 + 'static,
5363 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5364 + Send
5365 + Sync
5366 + 'static,
5367) -> Result<PreparedCall<R>> {
5368 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5369
5370 let instance = handle.instance().id().get(store.0);
5371 let options = &instance.component().env_component().options[options];
5372 let ty = &instance.component().types()[ty];
5373 let async_function = ty.async_;
5374 let task_return_type = ty.results;
5375 let component_instance = raw_options.instance;
5376 let callback = options.callback.map(|i| instance.runtime_callback(i));
5377 let memory = options
5378 .memory()
5379 .map(|i| instance.runtime_memory(i))
5380 .map(SendSyncPtr::new);
5381 let string_encoding = options.string_encoding;
5382 let token = StoreToken::new(store.as_context_mut());
5383 let state = store.0.concurrent_state_mut();
5384
5385 let (tx, rx) = oneshot::channel();
5386
5387 let instance = RuntimeInstance {
5388 instance: handle.instance().id().instance(),
5389 index: component_instance,
5390 };
5391 let caller = state.current_thread;
5392 let task = GuestTask::new(
5393 Box::new(for_any_lower(move |store, params| {
5394 lower_params(handle, token.as_context_mut(store), params)
5395 })),
5396 LiftResult {
5397 lift: Box::new(for_any_lift(move |store, result| {
5398 lift_result(handle, store, result)
5399 })),
5400 ty: task_return_type,
5401 memory,
5402 string_encoding,
5403 },
5404 Caller::Host {
5405 tx: Some(tx),
5406 host_future_present,
5407 caller,
5408 },
5409 callback.map(|callback| {
5410 let callback = SendSyncPtr::new(callback);
5411 let instance = handle.instance();
5412 Box::new(move |store: &mut dyn VMStore, event, handle| {
5413 let store = token.as_context_mut(store);
5414 unsafe { instance.call_callback(store, callback, event, handle) }
5417 }) as CallbackFn
5418 }),
5419 instance,
5420 async_function,
5421 )?;
5422
5423 let task = state.push(task)?;
5424 let new_thread = GuestThread::new_implicit(state, task)?;
5425 let thread = state.push(new_thread)?;
5426 state.get_mut(task)?.threads.insert(thread);
5427
5428 if !store.0.may_enter(instance)? {
5429 bail!(Trap::CannotEnterComponent);
5430 }
5431
5432 Ok(PreparedCall {
5433 handle,
5434 thread: QualifiedThreadId { task, thread },
5435 param_count,
5436 rx,
5437 _phantom: PhantomData,
5438 })
5439}
5440
5441pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5448 mut store: StoreContextMut<T>,
5449 prepared: PreparedCall<R>,
5450) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5451 let PreparedCall {
5452 handle,
5453 thread,
5454 param_count,
5455 rx,
5456 ..
5457 } = prepared;
5458
5459 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5460
5461 Ok(checked(
5462 store.0.id(),
5463 rx.map(move |result| match result {
5464 Ok(r) => match r.downcast() {
5465 Ok(r) => Ok(*r),
5466 Err(_) => bail_bug!("wrong type of value produced"),
5467 },
5468 Err(e) => Err(e.into()),
5469 }),
5470 ))
5471}
5472
5473fn queue_call0<T: 'static>(
5476 store: StoreContextMut<T>,
5477 handle: Func,
5478 guest_thread: QualifiedThreadId,
5479 param_count: usize,
5480) -> Result<()> {
5481 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5482 let is_concurrent = raw_options.async_;
5483 let callback = raw_options.callback;
5484 let instance = handle.instance();
5485 let callee = handle.lifted_core_func(store.0);
5486 let post_return = handle.post_return_core_func(store.0);
5487 let callback = callback.map(|i| {
5488 let instance = instance.id().get(store.0);
5489 SendSyncPtr::new(instance.runtime_callback(i))
5490 });
5491
5492 log::trace!("queueing call {guest_thread:?}");
5493
5494 unsafe {
5498 instance.queue_call(
5499 store,
5500 guest_thread,
5501 SendSyncPtr::new(callee),
5502 param_count,
5503 1,
5504 is_concurrent,
5505 callback,
5506 post_return.map(SendSyncPtr::new),
5507 )
5508 }
5509}