1use crate::bail_bug;
54use crate::component::func::{self, Func, call_post_return};
55use crate::component::{
56 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::prelude::*;
60use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63use crate::{
64 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::component::{
87 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
88 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
89 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
90 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
91 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
92};
93use wasmtime_environ::packed_option::ReservedValue;
94use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112const BLOCKED: u32 = 0xffff_ffff;
115
116#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119 Starting = 0,
120 Started = 1,
121 Returned = 2,
122 StartCancelled = 3,
123 ReturnCancelled = 4,
124}
125
126impl Status {
127 pub fn pack(self, waitable: Option<u32>) -> u32 {
133 assert!(matches!(self, Status::Returned) == waitable.is_none());
134 let waitable = waitable.unwrap_or(0);
135 assert!(waitable < (1 << 28));
136 (waitable << 4) | (self as u32)
137 }
138}
139
140#[derive(Clone, Copy, Debug)]
143enum Event {
144 None,
145 Subtask {
146 status: Status,
147 },
148 StreamRead {
149 code: ReturnCode,
150 pending: Option<(TypeStreamTableIndex, u32)>,
151 },
152 StreamWrite {
153 code: ReturnCode,
154 pending: Option<(TypeStreamTableIndex, u32)>,
155 },
156 FutureRead {
157 code: ReturnCode,
158 pending: Option<(TypeFutureTableIndex, u32)>,
159 },
160 FutureWrite {
161 code: ReturnCode,
162 pending: Option<(TypeFutureTableIndex, u32)>,
163 },
164 Cancelled,
165}
166
167impl Event {
168 fn parts(self) -> (u32, u32) {
173 const EVENT_NONE: u32 = 0;
174 const EVENT_SUBTASK: u32 = 1;
175 const EVENT_STREAM_READ: u32 = 2;
176 const EVENT_STREAM_WRITE: u32 = 3;
177 const EVENT_FUTURE_READ: u32 = 4;
178 const EVENT_FUTURE_WRITE: u32 = 5;
179 const EVENT_CANCELLED: u32 = 6;
180 match self {
181 Event::None => (EVENT_NONE, 0),
182 Event::Cancelled => (EVENT_CANCELLED, 0),
183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188 }
189 }
190}
191
192mod callback_code {
194 pub const EXIT: u32 = 0;
195 pub const YIELD: u32 = 1;
196 pub const WAIT: u32 = 2;
197}
198
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210 store: StoreContextMut<'a, T>,
211 get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216 D: HasData + ?Sized,
217 T: 'static,
218{
219 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221 Self { store, get_data }
222 }
223
224 pub fn data_mut(&mut self) -> &mut T {
226 self.store.data_mut()
227 }
228
229 pub fn get(&mut self) -> D::Data<'_> {
231 (self.get_data)(self.data_mut())
232 }
233
234 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238 where
239 T: 'static,
240 {
241 let accessor = Accessor {
242 get_data: self.get_data,
243 token: StoreToken::new(self.store.as_context_mut()),
244 };
245 self.store
246 .as_context_mut()
247 .spawn_with_accessor(accessor, task)
248 }
249
250 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253 self.get_data
254 }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259 D: HasData + ?Sized,
260 T: 'static,
261{
262 type Data = T;
263
264 fn as_context(&self) -> StoreContext<'_, T> {
265 self.store.as_context()
266 }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271 D: HasData + ?Sized,
272 T: 'static,
273{
274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275 self.store.as_context_mut()
276 }
277}
278
279pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340 D: HasData + ?Sized,
341{
342 token: StoreToken<T>,
343 get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346pub trait AsAccessor {
363 type Data: 'static;
365
366 type AccessorData: HasData + ?Sized;
369
370 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375 type Data = T::Data;
376 type AccessorData = T::AccessorData;
377
378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379 T::as_accessor(self)
380 }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384 type Data = T;
385 type AccessorData = D;
386
387 fn as_accessor(&self) -> &Accessor<T, D> {
388 self
389 }
390}
391
392const _: () = {
415 const fn assert<T: Send + Sync>() {}
416 assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420 pub(crate) fn new(token: StoreToken<T>) -> Self {
429 Self {
430 token,
431 get_data: |x| x,
432 }
433 }
434}
435
436impl<T, D> Accessor<T, D>
437where
438 D: HasData + ?Sized,
439{
440 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458 tls::get(|vmstore| {
459 fun(Access {
460 store: self.token.as_context_mut(vmstore),
461 get_data: self.get_data,
462 })
463 })
464 }
465
466 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469 self.get_data
470 }
471
472 pub fn with_getter<D2: HasData>(
489 &self,
490 get_data: fn(&mut T) -> D2::Data<'_>,
491 ) -> Accessor<T, D2> {
492 Accessor {
493 token: self.token,
494 get_data,
495 }
496 }
497
498 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514 where
515 T: 'static,
516 {
517 let accessor = self.clone_for_spawn();
518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519 }
520
521 fn clone_for_spawn(&self) -> Self {
522 Self {
523 token: self.token,
524 get_data: self.get_data,
525 }
526 }
527
528 pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
564 self.with(|mut access| {
565 let store = access.as_context_mut().0;
566 let state = store.concurrent_state_mut();
567 if state.interesting_tasks == 0 {
568 Poll::Ready(())
569 } else {
570 state.interesting_tasks_empty_waker = Some(cx.waker().clone());
571 Poll::Pending
572 }
573 })
574 }
575}
576
577pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
589where
590 D: HasData + ?Sized,
591{
592 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
594}
595
596enum CallerInfo {
599 Async {
601 params: Vec<ValRaw>,
602 has_result: bool,
603 },
604 Sync {
606 params: Vec<ValRaw>,
607 result_count: u32,
608 },
609}
610
611enum WaitMode {
613 Fiber(StoreFiber<'static>),
615 Callback(Instance),
618}
619
620#[derive(Debug)]
622enum SuspendReason {
623 Waiting {
626 set: TableId<WaitableSet>,
627 thread: QualifiedThreadId,
628 skip_may_block_check: bool,
629 },
630 NeedWork,
633 Yielding {
636 thread: QualifiedThreadId,
637 cancellable: bool,
638 skip_may_block_check: bool,
639 },
640 ExplicitlySuspending {
642 thread: QualifiedThreadId,
643 skip_may_block_check: bool,
644 },
645}
646
647enum GuestCallKind {
649 DeliverEvent {
652 instance: Instance,
654 set: Option<TableId<WaitableSet>>,
659 },
660 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
666 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
667}
668
669impl fmt::Debug for GuestCallKind {
670 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
671 match self {
672 Self::DeliverEvent { instance, set } => f
673 .debug_struct("DeliverEvent")
674 .field("instance", instance)
675 .field("set", set)
676 .finish(),
677 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
678 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
679 }
680 }
681}
682
683#[derive(Copy, Clone, Debug)]
685pub enum SuspensionTarget {
686 SomeSuspended(u32),
687 Some(u32),
688 None,
689}
690
691impl SuspensionTarget {
692 fn is_none(&self) -> bool {
693 matches!(self, SuspensionTarget::None)
694 }
695 fn is_some(&self) -> bool {
696 !self.is_none()
697 }
698}
699
700#[derive(Debug)]
702struct GuestCall {
703 thread: QualifiedThreadId,
704 kind: GuestCallKind,
705}
706
707impl GuestCall {
708 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
718 let instance = store
719 .concurrent_state_mut()
720 .get_mut(self.thread.task)?
721 .instance;
722 let state = store.instance_state(instance).concurrent_state();
723
724 let ready = match &self.kind {
725 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
726 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
727 GuestCallKind::StartExplicit(_) => true,
728 };
729 log::trace!(
730 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
731 state.do_not_enter,
732 state.backpressure
733 );
734 Ok(ready)
735 }
736}
737
738enum WorkerItem {
740 GuestCall(GuestCall),
741 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
742}
743
744enum WorkItem {
747 PushFuture(AlwaysMut<HostTaskFuture>),
749 ResumeFiber(StoreFiber<'static>),
751 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
753 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
755 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
757}
758
759impl fmt::Debug for WorkItem {
760 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
761 match self {
762 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
763 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
764 Self::ResumeThread(instance, thread) => f
765 .debug_tuple("ResumeThread")
766 .field(instance)
767 .field(thread)
768 .finish(),
769 Self::GuestCall(instance, call) => f
770 .debug_tuple("GuestCall")
771 .field(instance)
772 .field(call)
773 .finish(),
774 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
775 }
776 }
777}
778
779#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
781pub(crate) enum WaitResult {
782 Cancelled,
783 Completed,
784}
785
786pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
794 store: &mut dyn VMStore,
795 future: impl Future<Output = Result<R>> + Send + 'static,
796) -> Result<R> {
797 let state = store.concurrent_state_mut();
798 let task = state.current_host_thread()?;
799
800 let mut future = Box::pin(async move {
804 let result = future.await?;
805 tls::get(move |store| {
806 let state = store.concurrent_state_mut();
807 let host_state = &mut state.get_mut(task)?.state;
808 assert!(matches!(host_state, HostTaskState::CalleeStarted));
809 *host_state = HostTaskState::CalleeFinished(Box::new(result));
810
811 Waitable::Host(task).set_event(
812 state,
813 Some(Event::Subtask {
814 status: Status::Returned,
815 }),
816 )?;
817
818 Ok(())
819 })
820 }) as HostTaskFuture;
821
822 let poll = tls::set(store, || {
826 future
827 .as_mut()
828 .poll(&mut Context::from_waker(&Waker::noop()))
829 });
830
831 match poll {
832 Poll::Ready(result) => result?,
834
835 Poll::Pending => {
840 let state = store.concurrent_state_mut();
841 state.push_future(future);
842
843 let caller = state.get_mut(task)?.caller;
844 let set = state.get_mut(caller.thread)?.sync_call_set;
845 Waitable::Host(task).join(state, Some(set))?;
846
847 store.suspend(SuspendReason::Waiting {
848 set,
849 thread: caller,
850 skip_may_block_check: false,
851 })?;
852
853 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
857 }
858 }
859
860 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
862 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
863 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
864 Ok(result) => *result,
865 Err(_) => bail_bug!("host task finished with wrong type of result"),
866 }),
867 _ => bail_bug!("unexpected host task state after completion"),
868 }
869}
870
871fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
873 let mut next = Some(call);
874 while let Some(call) = next.take() {
875 match call.kind {
876 GuestCallKind::DeliverEvent { instance, set } => {
877 let (event, waitable) =
878 match instance.get_event(store, call.thread.task, set, true)? {
879 Some(pair) => pair,
880 None => bail_bug!("delivering non-present event"),
881 };
882 let state = store.concurrent_state_mut();
883 let task = state.get_mut(call.thread.task)?;
884 let runtime_instance = task.instance;
885 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
886
887 log::trace!(
888 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
889 call.thread,
890 );
891
892 let old_thread = store.set_thread(call.thread)?;
893 log::trace!(
894 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
895 call.thread
896 );
897
898 store.enter_instance(runtime_instance);
899
900 let Some(callback) = store
901 .concurrent_state_mut()
902 .get_mut(call.thread.task)?
903 .callback
904 .take()
905 else {
906 bail_bug!("guest task callback field not present")
907 };
908
909 let code = callback(store, event, handle)?;
910
911 store
912 .concurrent_state_mut()
913 .get_mut(call.thread.task)?
914 .callback = Some(callback);
915
916 store.exit_instance(runtime_instance)?;
917
918 store.set_thread(old_thread)?;
919
920 next = instance.handle_callback_code(
921 store,
922 call.thread,
923 runtime_instance.index,
924 code,
925 )?;
926
927 log::trace!(
928 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
929 );
930 }
931 GuestCallKind::StartImplicit(fun) => {
932 next = fun(store)?;
933 }
934 GuestCallKind::StartExplicit(fun) => {
935 fun(store)?;
936 }
937 }
938 }
939
940 Ok(())
941}
942
943impl<T> Store<T> {
944 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
946 where
947 T: Send + 'static,
948 {
949 ensure!(
950 self.as_context().0.concurrency_support(),
951 "cannot use `run_concurrent` when Config::concurrency_support disabled",
952 );
953 self.as_context_mut().run_concurrent(fun).await
954 }
955
956 #[doc(hidden)]
957 pub fn assert_concurrent_state_empty(&mut self) {
958 self.as_context_mut().assert_concurrent_state_empty();
959 }
960
961 #[doc(hidden)]
962 pub fn concurrent_state_table_size(&mut self) -> usize {
963 self.as_context_mut().concurrent_state_table_size()
964 }
965
966 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
968 where
969 T: 'static,
970 {
971 self.as_context_mut().spawn(task)
972 }
973}
974
975impl<T> StoreContextMut<'_, T> {
976 #[doc(hidden)]
987 pub fn assert_concurrent_state_empty(self) {
988 let store = self.0;
989 store
990 .store_data_mut()
991 .components
992 .assert_instance_states_empty();
993 let state = store.concurrent_state_mut();
994 assert!(
995 state.table.get_mut().is_empty(),
996 "non-empty table: {:?}",
997 state.table.get_mut()
998 );
999 assert!(state.high_priority.is_empty());
1000 assert!(state.low_priority.is_empty());
1001 assert!(state.current_thread.is_none());
1002 assert!(state.futures_mut().unwrap().is_empty());
1003 assert!(state.global_error_context_ref_counts.is_empty());
1004 }
1005
1006 #[doc(hidden)]
1011 pub fn concurrent_state_table_size(&mut self) -> usize {
1012 self.0
1013 .concurrent_state_mut()
1014 .table
1015 .get_mut()
1016 .iter_mut()
1017 .count()
1018 }
1019
1020 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1030 where
1031 T: 'static,
1032 {
1033 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1034 self.spawn_with_accessor(accessor, task)
1035 }
1036
1037 fn spawn_with_accessor<D>(
1040 self,
1041 accessor: Accessor<T, D>,
1042 task: impl AccessorTask<T, D>,
1043 ) -> JoinHandle
1044 where
1045 T: 'static,
1046 D: HasData + ?Sized,
1047 {
1048 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1052 self.0
1053 .concurrent_state_mut()
1054 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1055 handle
1056 }
1057
1058 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1142 where
1143 T: Send + 'static,
1144 {
1145 ensure!(
1146 self.0.concurrency_support(),
1147 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1148 );
1149 self.do_run_concurrent(fun, false).await
1150 }
1151
1152 pub(super) async fn run_concurrent_trap_on_idle<R>(
1153 self,
1154 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1155 ) -> Result<R>
1156 where
1157 T: Send + 'static,
1158 {
1159 self.do_run_concurrent(fun, true).await
1160 }
1161
1162 async fn do_run_concurrent<R>(
1163 mut self,
1164 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1165 trap_on_idle: bool,
1166 ) -> Result<R>
1167 where
1168 T: Send + 'static,
1169 {
1170 debug_assert!(self.0.concurrency_support());
1171 check_recursive_run();
1172 let token = StoreToken::new(self.as_context_mut());
1173
1174 struct Dropper<'a, T: 'static, V> {
1175 store: StoreContextMut<'a, T>,
1176 value: ManuallyDrop<V>,
1177 }
1178
1179 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1180 fn drop(&mut self) {
1181 tls::set(self.store.0, || {
1182 unsafe { ManuallyDrop::drop(&mut self.value) }
1187 });
1188 }
1189 }
1190
1191 let accessor = &Accessor::new(token);
1192 let dropper = &mut Dropper {
1193 store: self,
1194 value: ManuallyDrop::new(fun(accessor)),
1195 };
1196 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1198
1199 dropper
1200 .store
1201 .as_context_mut()
1202 .poll_until(future, trap_on_idle)
1203 .await
1204 }
1205
1206 async fn poll_until<R>(
1212 mut self,
1213 mut future: Pin<&mut impl Future<Output = R>>,
1214 trap_on_idle: bool,
1215 ) -> Result<R>
1216 where
1217 T: Send + 'static,
1218 {
1219 struct Reset<'a, T: 'static> {
1220 store: StoreContextMut<'a, T>,
1221 futures: Option<FuturesUnordered<HostTaskFuture>>,
1222 }
1223
1224 impl<'a, T> Drop for Reset<'a, T> {
1225 fn drop(&mut self) {
1226 if let Some(futures) = self.futures.take() {
1227 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1228 }
1229 }
1230 }
1231
1232 loop {
1233 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1237 let mut reset = Reset {
1238 store: self.as_context_mut(),
1239 futures,
1240 };
1241 let mut next = match reset.futures.as_mut() {
1242 Some(f) => pin!(f.next()),
1243 None => bail_bug!("concurrent state missing futures field"),
1244 };
1245
1246 enum PollResult<R> {
1247 Complete(R),
1248 ProcessWork {
1249 ready: Vec<WorkItem>,
1250 low_priority: bool,
1251 },
1252 }
1253
1254 let result = future::poll_fn(|cx| {
1255 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1258 return Poll::Ready(Ok(PollResult::Complete(value)));
1259 }
1260
1261 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1265 Poll::Ready(Some(output)) => {
1266 match output {
1267 Err(e) => return Poll::Ready(Err(e)),
1268 Ok(()) => {}
1269 }
1270 Poll::Ready(true)
1271 }
1272 Poll::Ready(None) => Poll::Ready(false),
1273 Poll::Pending => Poll::Pending,
1274 };
1275
1276 let state = reset.store.0.concurrent_state_mut();
1280 let mut ready = mem::take(&mut state.high_priority);
1281 let mut low_priority = false;
1282 if ready.is_empty() {
1283 if let Some(item) = state.low_priority.pop_back() {
1284 ready.push(item);
1285 low_priority = true;
1286 }
1287 }
1288 if !ready.is_empty() {
1289 return Poll::Ready(Ok(PollResult::ProcessWork {
1290 ready,
1291 low_priority,
1292 }));
1293 }
1294
1295 return match next {
1299 Poll::Ready(true) => {
1300 Poll::Ready(Ok(PollResult::ProcessWork {
1306 ready: Vec::new(),
1307 low_priority: false,
1308 }))
1309 }
1310 Poll::Ready(false) => {
1311 if let Poll::Ready(value) =
1315 tls::set(reset.store.0, || future.as_mut().poll(cx))
1316 {
1317 Poll::Ready(Ok(PollResult::Complete(value)))
1318 } else {
1319 if trap_on_idle {
1325 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1328 } else {
1329 Poll::Pending
1333 }
1334 }
1335 }
1336 Poll::Pending => Poll::Pending,
1341 };
1342 })
1343 .await;
1344
1345 drop(reset);
1349
1350 match result? {
1351 PollResult::Complete(value) => break Ok(value),
1354 PollResult::ProcessWork {
1357 ready,
1358 low_priority,
1359 } => {
1360 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1361 store: StoreContextMut<'a, T>,
1362 ready: I,
1363 }
1364
1365 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1366 fn drop(&mut self) {
1367 while let Some(item) = self.ready.next() {
1368 match item {
1369 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1370 WorkItem::PushFuture(future) => {
1371 tls::set(self.store.0, move || drop(future))
1372 }
1373 _ => {}
1374 }
1375 }
1376 }
1377 }
1378
1379 let mut dispose = Dispose {
1380 store: self.as_context_mut(),
1381 ready: ready.into_iter(),
1382 };
1383
1384 if low_priority {
1406 dispose.store.0.yield_now().await
1407 }
1408
1409 while let Some(item) = dispose.ready.next() {
1410 dispose
1411 .store
1412 .as_context_mut()
1413 .handle_work_item(item)
1414 .await?;
1415 }
1416 }
1417 }
1418 }
1419 }
1420
1421 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1423 where
1424 T: Send,
1425 {
1426 log::trace!("handle work item {item:?}");
1427 match item {
1428 WorkItem::PushFuture(future) => {
1429 self.0
1430 .concurrent_state_mut()
1431 .futures_mut()?
1432 .push(future.into_inner());
1433 }
1434 WorkItem::ResumeFiber(fiber) => {
1435 self.0.resume_fiber(fiber).await?;
1436 }
1437 WorkItem::ResumeThread(_, thread) => {
1438 if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1439 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1440 GuestThreadState::Running,
1441 ) {
1442 self.0.resume_fiber(fiber).await?;
1443 } else {
1444 bail_bug!("cannot resume non-pending thread {thread:?}");
1445 }
1446 }
1447 WorkItem::GuestCall(_, call) => {
1448 if call.is_ready(self.0)? {
1449 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1450 } else {
1451 let state = self.0.concurrent_state_mut();
1452 let task = state.get_mut(call.thread.task)?;
1453 if !task.starting_sent {
1454 task.starting_sent = true;
1455 if let GuestCallKind::StartImplicit(_) = &call.kind {
1456 Waitable::Guest(call.thread.task).set_event(
1457 state,
1458 Some(Event::Subtask {
1459 status: Status::Starting,
1460 }),
1461 )?;
1462 }
1463 }
1464
1465 let instance = state.get_mut(call.thread.task)?.instance;
1466 self.0
1467 .instance_state(instance)
1468 .concurrent_state()
1469 .pending
1470 .insert(call.thread, call.kind);
1471 }
1472 }
1473 WorkItem::WorkerFunction(fun) => {
1474 self.run_on_worker(WorkerItem::Function(fun)).await?;
1475 }
1476 }
1477
1478 Ok(())
1479 }
1480
1481 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1483 where
1484 T: Send,
1485 {
1486 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1487 fiber
1488 } else {
1489 fiber::make_fiber(self.0, move |store| {
1490 loop {
1491 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1492 bail_bug!("worker_item not present when resuming fiber")
1493 };
1494 match item {
1495 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1496 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1497 }
1498
1499 store.suspend(SuspendReason::NeedWork)?;
1500 }
1501 })?
1502 };
1503
1504 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1505 assert!(worker_item.is_none());
1506 *worker_item = Some(item);
1507
1508 self.0.resume_fiber(worker).await
1509 }
1510
1511 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1516 where
1517 T: 'static,
1518 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1519 + Send
1520 + Sync
1521 + 'static,
1522 R: Send + Sync + 'static,
1523 {
1524 let token = StoreToken::new(self);
1525 async move {
1526 let mut accessor = Accessor::new(token);
1527 closure(&mut accessor).await
1528 }
1529 }
1530}
1531
1532impl StoreOpaque {
1533 pub(crate) fn enter_guest_sync_call(
1540 &mut self,
1541 guest_caller: Option<RuntimeInstance>,
1542 callee_async: bool,
1543 callee: RuntimeInstance,
1544 ) -> Result<()> {
1545 log::trace!("enter sync call {callee:?}");
1546 if !self.concurrency_support() {
1547 return self.enter_call_not_concurrent();
1548 }
1549
1550 let state = self.concurrent_state_mut();
1551 let thread = state.current_thread;
1552 let instance = if let Some(thread) = thread.guest() {
1553 Some(state.get_mut(thread.task)?.instance)
1554 } else {
1555 None
1556 };
1557 if guest_caller.is_some() {
1558 debug_assert_eq!(instance, guest_caller);
1559 }
1560 let guest_thread = GuestTask::new(
1561 state,
1562 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1563 LiftResult {
1564 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1565 ty: TypeTupleIndex::reserved_value(),
1566 memory: None,
1567 string_encoding: StringEncoding::Utf8,
1568 },
1569 if let Some(thread) = thread.guest() {
1570 Caller::Guest { thread: *thread }
1571 } else {
1572 Caller::Host {
1573 tx: None,
1574 host_future_present: false,
1575 caller: thread,
1576 }
1577 },
1578 None,
1579 callee,
1580 callee_async,
1581 )?;
1582
1583 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1584 guest_thread.thread,
1585 self,
1586 callee.index,
1587 )?;
1588 self.set_thread(guest_thread)?;
1589
1590 Ok(())
1591 }
1592
1593 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1595 if !self.concurrency_support() {
1596 return Ok(self.exit_call_not_concurrent());
1597 }
1598 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1599 Some(t) => *t,
1600 None => bail_bug!("expected task when exiting"),
1601 };
1602 let task = self.concurrent_state_mut().get_mut(thread.task)?;
1603 let instance = task.instance;
1604 let caller = match &task.caller {
1605 &Caller::Guest { thread } => thread.into(),
1606 &Caller::Host { caller, .. } => caller,
1607 };
1608 task.lift_result = None;
1609 task.exited = true;
1610 self.set_thread(caller)?;
1611
1612 log::trace!("exit sync call {instance:?}");
1613 self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1614
1615 Ok(())
1616 }
1617
1618 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1626 if !self.concurrency_support() {
1627 self.enter_call_not_concurrent()?;
1628 return Ok(None);
1629 }
1630 let state = self.concurrent_state_mut();
1631 let caller = state.current_guest_thread()?;
1632 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1633 log::trace!("new host task {task:?}");
1634 self.set_thread(task)?;
1635 Ok(Some(task))
1636 }
1637
1638 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1644 if !self.concurrency_support() {
1645 return Ok(());
1646 }
1647 let task = self.concurrent_state_mut().current_host_thread()?;
1648 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1649 self.set_thread(caller)?;
1650 Ok(())
1651 }
1652
1653 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1660 match task {
1661 Some(task) => {
1662 log::trace!("delete host task {task:?}");
1663 self.concurrent_state_mut().delete(task)?;
1664 }
1665 None => {
1666 self.exit_call_not_concurrent();
1667 }
1668 }
1669 Ok(())
1670 }
1671
1672 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1680 if self.trapped() {
1681 return Ok(false);
1682 }
1683 if !self.concurrency_support() {
1684 return Ok(true);
1685 }
1686 let state = self.concurrent_state_mut();
1687 let mut cur = state.current_thread;
1688 loop {
1689 match cur {
1690 CurrentThread::None => break Ok(true),
1691 CurrentThread::Guest(thread) => {
1692 let task = state.get_mut(thread.task)?;
1693
1694 if task.instance.instance == instance.instance {
1701 break Ok(false);
1702 }
1703 cur = match task.caller {
1704 Caller::Host { caller, .. } => caller,
1705 Caller::Guest { thread } => thread.into(),
1706 };
1707 }
1708 CurrentThread::Host(id) => {
1709 cur = state.get_mut(id)?.caller.into();
1710 }
1711 }
1712 }
1713 }
1714
1715 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1718 self.component_instance_mut(instance.instance)
1719 .instance_state(instance.index)
1720 }
1721
1722 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1728 let thread = thread.into();
1729 let state = self.concurrent_state_mut();
1730 let old_thread = mem::replace(&mut state.current_thread, thread);
1731
1732 if let Some(old_thread) = old_thread.guest() {
1740 let old_context = self.vm_store_context().component_context;
1741 self.concurrent_state_mut()
1742 .get_mut(old_thread.thread)?
1743 .context = old_context;
1744 }
1745 if cfg!(debug_assertions) {
1746 self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1747 }
1748 if let Some(thread) = thread.guest() {
1749 let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1750 let context = thread.context;
1751 if cfg!(debug_assertions) {
1752 thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1753 }
1754 self.vm_store_context_mut().component_context = context;
1755 }
1756
1757 let state = self.concurrent_state_mut();
1765 if let Some(old_thread) = old_thread.guest() {
1766 let instance = state.get_mut(old_thread.task)?.instance.instance;
1767 self.component_instance_mut(instance)
1768 .set_task_may_block(false)
1769 }
1770
1771 if thread.guest().is_some() {
1772 self.set_task_may_block()?;
1773 }
1774
1775 Ok(old_thread)
1776 }
1777
1778 fn set_task_may_block(&mut self) -> Result<()> {
1781 let state = self.concurrent_state_mut();
1782 let guest_thread = state.current_guest_thread()?;
1783 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1784 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1785 self.component_instance_mut(instance)
1786 .set_task_may_block(may_block);
1787 Ok(())
1788 }
1789
1790 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1791 if !self.concurrency_support() {
1792 return Ok(());
1793 }
1794 let state = self.concurrent_state_mut();
1795 let task = state.current_guest_thread()?.task;
1796 let instance = state.get_mut(task)?.instance.instance;
1797 let task_may_block = self.component_instance(instance).get_task_may_block();
1798
1799 if task_may_block {
1800 Ok(())
1801 } else {
1802 Err(Trap::CannotBlockSyncTask.into())
1803 }
1804 }
1805
1806 fn enter_instance(&mut self, instance: RuntimeInstance) {
1810 log::trace!("enter {instance:?}");
1811 self.instance_state(instance)
1812 .concurrent_state()
1813 .do_not_enter = true;
1814 }
1815
1816 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1820 log::trace!("exit {instance:?}");
1821 self.instance_state(instance)
1822 .concurrent_state()
1823 .do_not_enter = false;
1824 self.partition_pending(instance)
1825 }
1826
1827 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1832 for (thread, kind) in
1833 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1834 {
1835 let call = GuestCall { thread, kind };
1836 if call.is_ready(self)? {
1837 self.concurrent_state_mut()
1838 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1839 } else {
1840 self.instance_state(instance)
1841 .concurrent_state()
1842 .pending
1843 .insert(call.thread, call.kind);
1844 }
1845 }
1846
1847 Ok(())
1848 }
1849
1850 pub(crate) fn backpressure_modify(
1852 &mut self,
1853 caller_instance: RuntimeInstance,
1854 modify: impl FnOnce(u16) -> Option<u16>,
1855 ) -> Result<()> {
1856 let state = self.instance_state(caller_instance).concurrent_state();
1857 let old = state.backpressure;
1858 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1859 state.backpressure = new;
1860
1861 if old > 0 && new == 0 {
1862 self.partition_pending(caller_instance)?;
1865 }
1866
1867 Ok(())
1868 }
1869
1870 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1873 let old_thread = self.concurrent_state_mut().current_thread;
1874 log::trace!("resume_fiber: save current thread {old_thread:?}");
1875
1876 let fiber = fiber::resolve_or_release(self, fiber).await?;
1877
1878 self.set_thread(old_thread)?;
1879
1880 let state = self.concurrent_state_mut();
1881
1882 if let Some(ot) = old_thread.guest() {
1883 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1884 }
1885 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1886
1887 if let Some(mut fiber) = fiber {
1888 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1889 let reason = match state.suspend_reason.take() {
1891 Some(r) => r,
1892 None => bail_bug!("suspend reason missing when resuming fiber"),
1893 };
1894 match reason {
1895 SuspendReason::NeedWork => {
1896 if state.worker.is_none() {
1897 state.worker = Some(fiber);
1898 } else {
1899 fiber.dispose(self);
1900 }
1901 }
1902 SuspendReason::Yielding {
1903 thread,
1904 cancellable,
1905 ..
1906 } => {
1907 state.get_mut(thread.thread)?.state =
1908 GuestThreadState::Ready { fiber, cancellable };
1909 let instance = state.get_mut(thread.task)?.instance.index;
1910 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1911 }
1912 SuspendReason::ExplicitlySuspending { thread, .. } => {
1913 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1914 }
1915 SuspendReason::Waiting { set, thread, .. } => {
1916 let old = state
1917 .get_mut(set)?
1918 .waiting
1919 .insert(thread, WaitMode::Fiber(fiber));
1920 assert!(old.is_none());
1921 }
1922 };
1923 } else {
1924 log::trace!("resume_fiber: fiber has exited");
1925 }
1926
1927 Ok(())
1928 }
1929
1930 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1936 log::trace!("suspend fiber: {reason:?}");
1937
1938 let task = match &reason {
1942 SuspendReason::Yielding { thread, .. }
1943 | SuspendReason::Waiting { thread, .. }
1944 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1945 SuspendReason::NeedWork => None,
1946 };
1947
1948 let old_guest_thread = if task.is_some() {
1949 self.concurrent_state_mut().current_thread
1950 } else {
1951 CurrentThread::None
1952 };
1953
1954 debug_assert!(
1960 matches!(
1961 reason,
1962 SuspendReason::ExplicitlySuspending {
1963 skip_may_block_check: true,
1964 ..
1965 } | SuspendReason::Waiting {
1966 skip_may_block_check: true,
1967 ..
1968 } | SuspendReason::Yielding {
1969 skip_may_block_check: true,
1970 ..
1971 }
1972 ) || old_guest_thread
1973 .guest()
1974 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1975 .transpose()?
1976 .unwrap_or(true)
1977 );
1978
1979 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1980 assert!(suspend_reason.is_none());
1981 *suspend_reason = Some(reason);
1982
1983 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1984
1985 if task.is_some() {
1986 self.set_thread(old_guest_thread)?;
1987 }
1988
1989 Ok(())
1990 }
1991
1992 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1993 let state = self.concurrent_state_mut();
1994
1995 if waitable.common(state)?.set.is_some() {
1996 bail!(Trap::WaitableSyncAndAsync);
1997 }
1998
1999 let caller = state.current_guest_thread()?;
2000 let set = state.get_mut(caller.thread)?.sync_call_set;
2001 waitable.join(state, Some(set))?;
2002 self.suspend(SuspendReason::Waiting {
2003 set,
2004 thread: caller,
2005 skip_may_block_check: false,
2006 })?;
2007 let state = self.concurrent_state_mut();
2008 waitable.join(state, None)
2009 }
2010
2011 fn cleanup_thread(
2033 &mut self,
2034 guest_thread: QualifiedThreadId,
2035 runtime_instance: RuntimeInstance,
2036 cleanup_task: CleanupTask,
2037 ) -> Result<()> {
2038 let state = self.concurrent_state_mut();
2039 let thread_data = state.get_mut(guest_thread.thread)?;
2040 let sync_call_set = thread_data.sync_call_set;
2041 if let Some(guest_id) = thread_data.instance_rep {
2042 self.instance_state(runtime_instance)
2043 .thread_handle_table()
2044 .guest_thread_remove(guest_id)?;
2045 }
2046 let state = self.concurrent_state_mut();
2047
2048 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2050 if let Some(Event::Subtask {
2051 status: Status::Returned | Status::ReturnCancelled,
2052 }) = waitable.common(state)?.event
2053 {
2054 waitable.delete_from(state)?;
2055 }
2056 }
2057
2058 state.delete(guest_thread.thread)?;
2059 state.delete(sync_call_set)?;
2060 let task = state.get_mut(guest_thread.task)?;
2061 task.threads.remove(&guest_thread.thread);
2062
2063 if task.threads.is_empty() && !task.returned_or_cancelled() {
2064 bail!(Trap::NoAsyncResult);
2065 }
2066 let ready_to_delete = task.ready_to_delete();
2067
2068 if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2069 task.decremented_interesting_task_count = true;
2070
2071 debug_assert!(state.interesting_tasks > 0);
2072 state.interesting_tasks -= 1;
2073 if state.interesting_tasks == 0
2074 && let Some(waker) = state.interesting_tasks_empty_waker.take()
2075 {
2076 waker.wake();
2077 }
2078 }
2079
2080 match cleanup_task {
2081 CleanupTask::Yes => {
2082 if ready_to_delete {
2083 Waitable::Guest(guest_thread.task).delete_from(state)?;
2084 }
2085 }
2086 CleanupTask::No => {}
2087 }
2088
2089 Ok(())
2090 }
2091
2092 fn cancel_guest_subtask_without_lowered_parameters(
2105 &mut self,
2106 caller_instance: RuntimeInstance,
2107 guest_task: TableId<GuestTask>,
2108 ) -> Result<()> {
2109 let concurrent_state = self.concurrent_state_mut();
2110 let task = concurrent_state.get_mut(guest_task)?;
2111 assert!(!task.already_lowered_parameters());
2112 task.lower_params = None;
2116 task.lift_result = None;
2117 task.exited = true;
2118 let instance = task.instance;
2119
2120 assert_eq!(1, task.threads.len());
2123 let thread = *task.threads.iter().next().unwrap();
2124 self.cleanup_thread(
2125 QualifiedThreadId {
2126 task: guest_task,
2127 thread,
2128 },
2129 caller_instance,
2130 CleanupTask::No,
2131 )?;
2132
2133 let pending = &mut self.instance_state(instance).concurrent_state().pending;
2135 let pending_count = pending.len();
2136 pending.retain(|thread, _| thread.task != guest_task);
2137 if pending.len() == pending_count {
2139 bail!(Trap::SubtaskCancelAfterTerminal);
2140 }
2141 Ok(())
2142 }
2143}
2144
2145enum CleanupTask {
2146 Yes,
2147 No,
2148}
2149
2150impl Instance {
2151 fn get_event(
2154 self,
2155 store: &mut StoreOpaque,
2156 guest_task: TableId<GuestTask>,
2157 set: Option<TableId<WaitableSet>>,
2158 cancellable: bool,
2159 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2160 let state = store.concurrent_state_mut();
2161
2162 let event = &mut state.get_mut(guest_task)?.event;
2163 if let Some(ev) = event
2164 && (cancellable || !matches!(ev, Event::Cancelled))
2165 {
2166 log::trace!("deliver event {ev:?} to {guest_task:?}");
2167 let ev = *ev;
2168 *event = None;
2169 return Ok(Some((ev, None)));
2170 }
2171
2172 let set = match set {
2173 Some(set) => set,
2174 None => return Ok(None),
2175 };
2176 let waitable = match state.get_mut(set)?.ready.pop_first() {
2177 Some(v) => v,
2178 None => return Ok(None),
2179 };
2180
2181 let common = waitable.common(state)?;
2182 let handle = match common.handle {
2183 Some(h) => h,
2184 None => bail_bug!("handle not set when delivering event"),
2185 };
2186 let event = match common.event.take() {
2187 Some(e) => e,
2188 None => bail_bug!("event not set when delivering event"),
2189 };
2190
2191 log::trace!(
2192 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2193 );
2194
2195 waitable.on_delivery(store, self, event)?;
2196
2197 Ok(Some((event, Some((waitable, handle)))))
2198 }
2199
2200 fn handle_callback_code(
2206 self,
2207 store: &mut StoreOpaque,
2208 guest_thread: QualifiedThreadId,
2209 runtime_instance: RuntimeComponentInstanceIndex,
2210 code: u32,
2211 ) -> Result<Option<GuestCall>> {
2212 let (code, set) = unpack_callback_code(code);
2213
2214 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2215
2216 let state = store.concurrent_state_mut();
2217
2218 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2219 let set = store
2220 .instance_state(self.runtime_instance(runtime_instance))
2221 .handle_table()
2222 .waitable_set_rep(handle)?;
2223
2224 Ok(TableId::<WaitableSet>::new(set))
2225 };
2226
2227 Ok(match code {
2228 callback_code::EXIT => {
2229 log::trace!("implicit thread {guest_thread:?} completed");
2230 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2231 task.exited = true;
2232 task.callback = None;
2233 store.cleanup_thread(
2234 guest_thread,
2235 self.runtime_instance(runtime_instance),
2236 CleanupTask::Yes,
2237 )?;
2238 None
2239 }
2240 callback_code::YIELD => {
2241 let task = state.get_mut(guest_thread.task)?;
2242 if let Some(event) = task.event {
2247 assert!(matches!(event, Event::None | Event::Cancelled));
2248 } else {
2249 task.event = Some(Event::None);
2250 }
2251 let call = GuestCall {
2252 thread: guest_thread,
2253 kind: GuestCallKind::DeliverEvent {
2254 instance: self,
2255 set: None,
2256 },
2257 };
2258 if state.may_block(guest_thread.task)? {
2259 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2262 None
2263 } else {
2264 Some(call)
2268 }
2269 }
2270 callback_code::WAIT => {
2271 state.check_blocking_for(guest_thread.task)?;
2274
2275 let set = get_set(store, set)?;
2276 let state = store.concurrent_state_mut();
2277
2278 if state.get_mut(guest_thread.task)?.event.is_some()
2279 || !state.get_mut(set)?.ready.is_empty()
2280 {
2281 state.push_high_priority(WorkItem::GuestCall(
2283 runtime_instance,
2284 GuestCall {
2285 thread: guest_thread,
2286 kind: GuestCallKind::DeliverEvent {
2287 instance: self,
2288 set: Some(set),
2289 },
2290 },
2291 ));
2292 } else {
2293 let old = state
2301 .get_mut(guest_thread.thread)?
2302 .wake_on_cancel
2303 .replace(set);
2304 if !old.is_none() {
2305 bail_bug!("thread unexpectedly had wake_on_cancel set");
2306 }
2307 let old = state
2308 .get_mut(set)?
2309 .waiting
2310 .insert(guest_thread, WaitMode::Callback(self));
2311 if !old.is_none() {
2312 bail_bug!("set's waiting set already had this thread registered");
2313 }
2314 }
2315 None
2316 }
2317 _ => bail!(Trap::UnsupportedCallbackCode),
2318 })
2319 }
2320
2321 unsafe fn queue_call<T: 'static>(
2328 self,
2329 mut store: StoreContextMut<T>,
2330 guest_thread: QualifiedThreadId,
2331 callee: SendSyncPtr<VMFuncRef>,
2332 param_count: usize,
2333 result_count: usize,
2334 async_: bool,
2335 callback: Option<SendSyncPtr<VMFuncRef>>,
2336 post_return: Option<SendSyncPtr<VMFuncRef>>,
2337 ) -> Result<()> {
2338 unsafe fn make_call<T: 'static>(
2353 store: StoreContextMut<T>,
2354 guest_thread: QualifiedThreadId,
2355 callee: SendSyncPtr<VMFuncRef>,
2356 param_count: usize,
2357 result_count: usize,
2358 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2359 + Send
2360 + Sync
2361 + 'static
2362 + use<T> {
2363 let token = StoreToken::new(store);
2364 move |store: &mut dyn VMStore| {
2365 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2366
2367 store
2368 .concurrent_state_mut()
2369 .get_mut(guest_thread.thread)?
2370 .state = GuestThreadState::Running;
2371 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2372 let lower = match task.lower_params.take() {
2373 Some(l) => l,
2374 None => bail_bug!("lower_params missing"),
2375 };
2376
2377 lower(store, &mut storage[..param_count])?;
2378
2379 let mut store = token.as_context_mut(store);
2380
2381 unsafe {
2384 crate::Func::call_unchecked_raw(
2385 &mut store,
2386 callee.as_non_null(),
2387 NonNull::new(
2388 &mut storage[..param_count.max(result_count)]
2389 as *mut [MaybeUninit<ValRaw>] as _,
2390 )
2391 .unwrap(),
2392 )?;
2393 }
2394
2395 Ok(storage)
2396 }
2397 }
2398
2399 let call = unsafe {
2403 make_call(
2404 store.as_context_mut(),
2405 guest_thread,
2406 callee,
2407 param_count,
2408 result_count,
2409 )
2410 };
2411
2412 let callee_instance = store
2413 .0
2414 .concurrent_state_mut()
2415 .get_mut(guest_thread.task)?
2416 .instance;
2417
2418 let fun = if callback.is_some() {
2419 assert!(async_);
2420
2421 Box::new(move |store: &mut dyn VMStore| {
2422 self.add_guest_thread_to_instance_table(
2423 guest_thread.thread,
2424 store,
2425 callee_instance.index,
2426 )?;
2427 let old_thread = store.set_thread(guest_thread)?;
2428 log::trace!(
2429 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2430 );
2431
2432 store.enter_instance(callee_instance);
2433
2434 let storage = call(store)?;
2441
2442 store.exit_instance(callee_instance)?;
2443
2444 store.set_thread(old_thread)?;
2445 let state = store.concurrent_state_mut();
2446 if let Some(t) = old_thread.guest() {
2447 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2448 }
2449 log::trace!("stackless call: restored {old_thread:?} as current thread");
2450
2451 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2454
2455 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2456 })
2457 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2458 } else {
2459 let token = StoreToken::new(store.as_context_mut());
2460 Box::new(move |store: &mut dyn VMStore| {
2461 self.add_guest_thread_to_instance_table(
2462 guest_thread.thread,
2463 store,
2464 callee_instance.index,
2465 )?;
2466 let old_thread = store.set_thread(guest_thread)?;
2467 log::trace!(
2468 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2469 );
2470 let flags = self.id().get(store).instance_flags(callee_instance.index);
2471
2472 if !async_ {
2476 store.enter_instance(callee_instance);
2477 }
2478
2479 let storage = call(store)?;
2486
2487 if !async_ {
2488 let lift = {
2494 store.exit_instance(callee_instance)?;
2495
2496 let state = store.concurrent_state_mut();
2497 if !state.get_mut(guest_thread.task)?.result.is_none() {
2498 bail_bug!("task has already produced a result");
2499 }
2500
2501 match state.get_mut(guest_thread.task)?.lift_result.take() {
2502 Some(lift) => lift,
2503 None => bail_bug!("lift_result field is missing"),
2504 }
2505 };
2506
2507 let result = (lift.lift)(store, unsafe {
2510 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2511 &storage[..result_count],
2512 )
2513 })?;
2514
2515 let post_return_arg = match result_count {
2516 0 => ValRaw::i32(0),
2517 1 => unsafe { storage[0].assume_init() },
2520 _ => unreachable!(),
2521 };
2522
2523 unsafe {
2524 call_post_return(
2525 token.as_context_mut(store),
2526 post_return.map(|v| v.as_non_null()),
2527 post_return_arg,
2528 flags,
2529 )?;
2530 }
2531
2532 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2533 }
2534
2535 store.set_thread(old_thread)?;
2536
2537 store
2538 .concurrent_state_mut()
2539 .get_mut(guest_thread.task)?
2540 .exited = true;
2541
2542 store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2544 Ok(None)
2545 })
2546 };
2547
2548 store
2549 .0
2550 .concurrent_state_mut()
2551 .push_high_priority(WorkItem::GuestCall(
2552 callee_instance.index,
2553 GuestCall {
2554 thread: guest_thread,
2555 kind: GuestCallKind::StartImplicit(fun),
2556 },
2557 ));
2558
2559 Ok(())
2560 }
2561
2562 unsafe fn prepare_call<T: 'static>(
2575 self,
2576 mut store: StoreContextMut<T>,
2577 start: NonNull<VMFuncRef>,
2578 return_: NonNull<VMFuncRef>,
2579 caller_instance: RuntimeComponentInstanceIndex,
2580 callee_instance: RuntimeComponentInstanceIndex,
2581 task_return_type: TypeTupleIndex,
2582 callee_async: bool,
2583 memory: *mut VMMemoryDefinition,
2584 string_encoding: StringEncoding,
2585 caller_info: CallerInfo,
2586 ) -> Result<()> {
2587 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2588 store.0.check_blocking()?;
2592 }
2593
2594 enum ResultInfo {
2595 Heap { results: u32 },
2596 Stack { result_count: u32 },
2597 }
2598
2599 let result_info = match &caller_info {
2600 CallerInfo::Async {
2601 has_result: true,
2602 params,
2603 } => ResultInfo::Heap {
2604 results: match params.last() {
2605 Some(r) => r.get_u32(),
2606 None => bail_bug!("retptr missing"),
2607 },
2608 },
2609 CallerInfo::Async {
2610 has_result: false, ..
2611 } => ResultInfo::Stack { result_count: 0 },
2612 CallerInfo::Sync {
2613 result_count,
2614 params,
2615 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2616 results: match params.last() {
2617 Some(r) => r.get_u32(),
2618 None => bail_bug!("arg ptr missing"),
2619 },
2620 },
2621 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2622 result_count: *result_count,
2623 },
2624 };
2625
2626 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2627
2628 let start = SendSyncPtr::new(start);
2632 let return_ = SendSyncPtr::new(return_);
2633 let token = StoreToken::new(store.as_context_mut());
2634 let state = store.0.concurrent_state_mut();
2635 let old_thread = state.current_guest_thread()?;
2636
2637 debug_assert_eq!(
2638 state.get_mut(old_thread.task)?.instance,
2639 self.runtime_instance(caller_instance)
2640 );
2641
2642 let guest_thread = GuestTask::new(
2643 state,
2644 Box::new(move |store, dst| {
2645 let mut store = token.as_context_mut(store);
2646 assert!(dst.len() <= MAX_FLAT_PARAMS);
2647 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2649 let count = match caller_info {
2650 CallerInfo::Async { params, has_result } => {
2654 let params = ¶ms[..params.len() - usize::from(has_result)];
2655 for (param, src) in params.iter().zip(&mut src) {
2656 src.write(*param);
2657 }
2658 params.len()
2659 }
2660
2661 CallerInfo::Sync { params, .. } => {
2663 for (param, src) in params.iter().zip(&mut src) {
2664 src.write(*param);
2665 }
2666 params.len()
2667 }
2668 };
2669 unsafe {
2676 crate::Func::call_unchecked_raw(
2677 &mut store,
2678 start.as_non_null(),
2679 NonNull::new(
2680 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2681 )
2682 .unwrap(),
2683 )?;
2684 }
2685 dst.copy_from_slice(&src[..dst.len()]);
2686 let state = store.0.concurrent_state_mut();
2687 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2688 state,
2689 Some(Event::Subtask {
2690 status: Status::Started,
2691 }),
2692 )?;
2693 Ok(())
2694 }),
2695 LiftResult {
2696 lift: Box::new(move |store, src| {
2697 let mut store = token.as_context_mut(store);
2700 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2702 my_src.push(ValRaw::u32(*results));
2703 }
2704
2705 let prev = store.0.set_thread(old_thread)?;
2711
2712 unsafe {
2719 crate::Func::call_unchecked_raw(
2720 &mut store,
2721 return_.as_non_null(),
2722 my_src.as_mut_slice().into(),
2723 )?;
2724 }
2725
2726 store.0.set_thread(prev)?;
2729
2730 let state = store.0.concurrent_state_mut();
2731 let thread = state.current_guest_thread()?;
2732 if sync_caller {
2733 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2734 if let ResultInfo::Stack { result_count } = &result_info {
2735 match result_count {
2736 0 => None,
2737 1 => Some(my_src[0]),
2738 _ => unreachable!(),
2739 }
2740 } else {
2741 None
2742 },
2743 );
2744 }
2745 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2746 }),
2747 ty: task_return_type,
2748 memory: NonNull::new(memory).map(SendSyncPtr::new),
2749 string_encoding,
2750 },
2751 Caller::Guest { thread: old_thread },
2752 None,
2753 self.runtime_instance(callee_instance),
2754 callee_async,
2755 )?;
2756
2757 store.0.set_thread(guest_thread)?;
2760 log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2761
2762 Ok(())
2763 }
2764
2765 unsafe fn call_callback<T>(
2770 self,
2771 mut store: StoreContextMut<T>,
2772 function: SendSyncPtr<VMFuncRef>,
2773 event: Event,
2774 handle: u32,
2775 ) -> Result<u32> {
2776 let (ordinal, result) = event.parts();
2777 let params = &mut [
2778 ValRaw::u32(ordinal),
2779 ValRaw::u32(handle),
2780 ValRaw::u32(result),
2781 ];
2782 unsafe {
2787 crate::Func::call_unchecked_raw(
2788 &mut store,
2789 function.as_non_null(),
2790 params.as_mut_slice().into(),
2791 )?;
2792 }
2793 Ok(params[0].get_u32())
2794 }
2795
2796 unsafe fn start_call<T: 'static>(
2809 self,
2810 mut store: StoreContextMut<T>,
2811 callback: *mut VMFuncRef,
2812 post_return: *mut VMFuncRef,
2813 callee: NonNull<VMFuncRef>,
2814 param_count: u32,
2815 result_count: u32,
2816 flags: u32,
2817 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2818 ) -> Result<u32> {
2819 let token = StoreToken::new(store.as_context_mut());
2820 let async_caller = storage.is_none();
2821 let state = store.0.concurrent_state_mut();
2822 let guest_thread = state.current_guest_thread()?;
2823 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2824 let callee = SendSyncPtr::new(callee);
2825 let param_count = usize::try_from(param_count)?;
2826 assert!(param_count <= MAX_FLAT_PARAMS);
2827 let result_count = usize::try_from(result_count)?;
2828 assert!(result_count <= MAX_FLAT_RESULTS);
2829
2830 let task = state.get_mut(guest_thread.task)?;
2831 if let Some(callback) = NonNull::new(callback) {
2832 let callback = SendSyncPtr::new(callback);
2836 task.callback = Some(Box::new(move |store, event, handle| {
2837 let store = token.as_context_mut(store);
2838 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2839 }));
2840 }
2841
2842 let Caller::Guest { thread: caller } = &task.caller else {
2843 bail_bug!("start_call unexpectedly invoked for host->guest call");
2846 };
2847 let caller = *caller;
2848 let caller_instance = state.get_mut(caller.task)?.instance;
2849
2850 unsafe {
2852 self.queue_call(
2853 store.as_context_mut(),
2854 guest_thread,
2855 callee,
2856 param_count,
2857 result_count,
2858 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2859 NonNull::new(callback).map(SendSyncPtr::new),
2860 NonNull::new(post_return).map(SendSyncPtr::new),
2861 )?;
2862 }
2863
2864 let state = store.0.concurrent_state_mut();
2865
2866 let guest_waitable = Waitable::Guest(guest_thread.task);
2869 let old_set = guest_waitable.common(state)?.set;
2870 let set = state.get_mut(caller.thread)?.sync_call_set;
2871 guest_waitable.join(state, Some(set))?;
2872
2873 store.0.set_thread(CurrentThread::None)?;
2874
2875 let (status, waitable) = loop {
2891 store.0.suspend(SuspendReason::Waiting {
2892 set,
2893 thread: caller,
2894 skip_may_block_check: async_caller || !callee_async,
2902 })?;
2903
2904 let state = store.0.concurrent_state_mut();
2905
2906 log::trace!("taking event for {:?}", guest_thread.task);
2907 let event = guest_waitable.take_event(state)?;
2908 let Some(Event::Subtask { status }) = event else {
2909 bail_bug!("subtasks should only get subtask events, got {event:?}")
2910 };
2911
2912 log::trace!("status {status:?} for {:?}", guest_thread.task);
2913
2914 if status == Status::Returned {
2915 break (status, None);
2917 } else if async_caller {
2918 let handle = store
2922 .0
2923 .instance_state(caller_instance)
2924 .handle_table()
2925 .subtask_insert_guest(guest_thread.task.rep())?;
2926 store
2927 .0
2928 .concurrent_state_mut()
2929 .get_mut(guest_thread.task)?
2930 .common
2931 .handle = Some(handle);
2932 break (status, Some(handle));
2933 } else {
2934 }
2938 };
2939
2940 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2941
2942 store.0.set_thread(caller)?;
2944 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2945 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2946
2947 if let Some(storage) = storage {
2948 let state = store.0.concurrent_state_mut();
2952 let task = state.get_mut(guest_thread.task)?;
2953 if let Some(result) = task.sync_result.take()? {
2954 if let Some(result) = result {
2955 storage[0] = MaybeUninit::new(result);
2956 }
2957
2958 if task.exited && task.ready_to_delete() {
2959 Waitable::Guest(guest_thread.task).delete_from(state)?;
2960 }
2961 }
2962 }
2963
2964 Ok(status.pack(waitable))
2965 }
2966
2967 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2980 self,
2981 mut store: StoreContextMut<'_, T>,
2982 future: impl Future<Output = Result<R>> + Send + 'static,
2983 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2984 ) -> Result<Option<u32>> {
2985 let token = StoreToken::new(store.as_context_mut());
2986 let state = store.0.concurrent_state_mut();
2987 let task = state.current_host_thread()?;
2988
2989 let (join_handle, future) = JoinHandle::run(future);
2992 {
2993 let state = &mut state.get_mut(task)?.state;
2994 assert!(matches!(state, HostTaskState::CalleeStarted));
2995 *state = HostTaskState::CalleeRunning(join_handle);
2996 }
2997
2998 let mut future = Box::pin(future);
2999
3000 let poll = tls::set(store.0, || {
3005 future
3006 .as_mut()
3007 .poll(&mut Context::from_waker(&Waker::noop()))
3008 });
3009
3010 match poll {
3011 Poll::Ready(result) => {
3013 let result = result.transpose()?;
3014 lower(store.as_context_mut(), result)?;
3015 return Ok(None);
3016 }
3017
3018 Poll::Pending => {}
3020 }
3021
3022 let future = Box::pin(async move {
3030 let result = match future.await {
3031 Some(result) => Some(result?),
3032 None => None,
3033 };
3034 let on_complete = move |store: &mut dyn VMStore| {
3035 let mut store = token.as_context_mut(store);
3039 let old = store.0.set_thread(task)?;
3040
3041 let status = if result.is_some() {
3042 Status::Returned
3043 } else {
3044 Status::ReturnCancelled
3045 };
3046
3047 lower(store.as_context_mut(), result)?;
3048 let state = store.0.concurrent_state_mut();
3049 match &mut state.get_mut(task)?.state {
3050 HostTaskState::CalleeDone { .. } => {}
3053
3054 other => *other = HostTaskState::CalleeDone { cancelled: false },
3056 }
3057 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3058
3059 store.0.set_thread(old)?;
3060 Ok(())
3061 };
3062
3063 tls::get(move |store| {
3068 store
3069 .concurrent_state_mut()
3070 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3071 on_complete,
3072 ))));
3073 Ok(())
3074 })
3075 });
3076
3077 let state = store.0.concurrent_state_mut();
3080 state.push_future(future);
3081 let caller = state.get_mut(task)?.caller;
3082 let instance = state.get_mut(caller.task)?.instance;
3083 let handle = store
3084 .0
3085 .instance_state(instance)
3086 .handle_table()
3087 .subtask_insert_host(task.rep())?;
3088 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3089 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3090
3091 store.0.set_thread(caller)?;
3095 Ok(Some(handle))
3096 }
3097
3098 pub(crate) fn task_return(
3101 self,
3102 store: &mut dyn VMStore,
3103 ty: TypeTupleIndex,
3104 options: OptionsIndex,
3105 storage: &[ValRaw],
3106 ) -> Result<()> {
3107 let state = store.concurrent_state_mut();
3108 let guest_thread = state.current_guest_thread()?;
3109 let lift = state
3110 .get_mut(guest_thread.task)?
3111 .lift_result
3112 .take()
3113 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3114 if !state.get_mut(guest_thread.task)?.result.is_none() {
3115 bail_bug!("task result unexpectedly already set");
3116 }
3117
3118 let CanonicalOptions {
3119 string_encoding,
3120 data_model,
3121 ..
3122 } = &self.id().get(store).component().env_component().options[options];
3123
3124 let invalid = ty != lift.ty
3125 || string_encoding != &lift.string_encoding
3126 || match data_model {
3127 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3128 Some(memory) => {
3129 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3130 let actual = self.id().get(store).runtime_memory(memory);
3131 expected != actual.as_ptr()
3132 }
3133 None => false,
3136 },
3137 CanonicalOptionsDataModel::Gc { .. } => true,
3139 };
3140
3141 if invalid {
3142 bail!(Trap::TaskReturnInvalid);
3143 }
3144
3145 log::trace!("task.return for {guest_thread:?}");
3146
3147 let result = (lift.lift)(store, storage)?;
3148 self.task_complete(store, guest_thread.task, result, Status::Returned)
3149 }
3150
3151 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3153 let state = store.concurrent_state_mut();
3154 let guest_thread = state.current_guest_thread()?;
3155 let task = state.get_mut(guest_thread.task)?;
3156 if !task.cancel_sent {
3157 bail!(Trap::TaskCancelNotCancelled);
3158 }
3159 _ = task
3160 .lift_result
3161 .take()
3162 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3163
3164 if !task.result.is_none() {
3165 bail_bug!("task result should not bet set yet");
3166 }
3167
3168 log::trace!("task.cancel for {guest_thread:?}");
3169
3170 self.task_complete(
3171 store,
3172 guest_thread.task,
3173 Box::new(DummyResult),
3174 Status::ReturnCancelled,
3175 )
3176 }
3177
3178 fn task_complete(
3184 self,
3185 store: &mut StoreOpaque,
3186 guest_task: TableId<GuestTask>,
3187 result: Box<dyn Any + Send + Sync>,
3188 status: Status,
3189 ) -> Result<()> {
3190 store
3191 .component_resource_tables(Some(self))
3192 .validate_scope_exit()?;
3193
3194 let state = store.concurrent_state_mut();
3195 let task = state.get_mut(guest_task)?;
3196
3197 if let Caller::Host { tx, .. } = &mut task.caller {
3198 if let Some(tx) = tx.take() {
3199 _ = tx.send(result);
3200 }
3201 } else {
3202 task.result = Some(result);
3203 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3204 }
3205
3206 Ok(())
3207 }
3208
3209 pub(crate) fn waitable_set_new(
3211 self,
3212 store: &mut StoreOpaque,
3213 caller_instance: RuntimeComponentInstanceIndex,
3214 ) -> Result<u32> {
3215 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3216 let handle = store
3217 .instance_state(self.runtime_instance(caller_instance))
3218 .handle_table()
3219 .waitable_set_insert(set.rep())?;
3220 log::trace!("new waitable set {set:?} (handle {handle})");
3221 Ok(handle)
3222 }
3223
3224 pub(crate) fn waitable_set_drop(
3226 self,
3227 store: &mut StoreOpaque,
3228 caller_instance: RuntimeComponentInstanceIndex,
3229 set: u32,
3230 ) -> Result<()> {
3231 let rep = store
3232 .instance_state(self.runtime_instance(caller_instance))
3233 .handle_table()
3234 .waitable_set_remove(set)?;
3235
3236 log::trace!("drop waitable set {rep} (handle {set})");
3237
3238 if !store
3242 .concurrent_state_mut()
3243 .get_mut(TableId::<WaitableSet>::new(rep))?
3244 .waiting
3245 .is_empty()
3246 {
3247 bail!(Trap::WaitableSetDropHasWaiters);
3248 }
3249
3250 store
3251 .concurrent_state_mut()
3252 .delete(TableId::<WaitableSet>::new(rep))?;
3253
3254 Ok(())
3255 }
3256
3257 pub(crate) fn waitable_join(
3259 self,
3260 store: &mut StoreOpaque,
3261 caller_instance: RuntimeComponentInstanceIndex,
3262 waitable_handle: u32,
3263 set_handle: u32,
3264 ) -> Result<()> {
3265 let mut instance = self.id().get_mut(store);
3266 let waitable =
3267 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3268
3269 let set = if set_handle == 0 {
3270 None
3271 } else {
3272 let set = instance.instance_states().0[caller_instance]
3273 .handle_table()
3274 .waitable_set_rep(set_handle)?;
3275
3276 let state = store.concurrent_state_mut();
3277 if let Some(old) = waitable.common(state)?.set
3278 && state.get_mut(old)?.is_sync_call_set
3279 {
3280 bail!(Trap::WaitableSyncAndAsync);
3281 }
3282
3283 Some(TableId::<WaitableSet>::new(set))
3284 };
3285
3286 log::trace!(
3287 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3288 );
3289
3290 waitable.join(store.concurrent_state_mut(), set)
3291 }
3292
3293 pub(crate) fn subtask_drop(
3295 self,
3296 store: &mut StoreOpaque,
3297 caller_instance: RuntimeComponentInstanceIndex,
3298 task_id: u32,
3299 ) -> Result<()> {
3300 self.waitable_join(store, caller_instance, task_id, 0)?;
3301
3302 let (rep, is_host) = store
3303 .instance_state(self.runtime_instance(caller_instance))
3304 .handle_table()
3305 .subtask_remove(task_id)?;
3306
3307 let concurrent_state = store.concurrent_state_mut();
3308 let (waitable, delete) = if is_host {
3309 let id = TableId::<HostTask>::new(rep);
3310 let task = concurrent_state.get_mut(id)?;
3311 match &task.state {
3312 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3313 HostTaskState::CalleeDone { .. } => {}
3314 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3315 bail_bug!("invalid state for callee in `subtask.drop`")
3316 }
3317 }
3318 (Waitable::Host(id), true)
3319 } else {
3320 let id = TableId::<GuestTask>::new(rep);
3321 let task = concurrent_state.get_mut(id)?;
3322 if task.lift_result.is_some() {
3323 bail!(Trap::SubtaskDropNotResolved);
3324 }
3325 (
3326 Waitable::Guest(id),
3327 concurrent_state.get_mut(id)?.ready_to_delete(),
3328 )
3329 };
3330
3331 waitable.common(concurrent_state)?.handle = None;
3332
3333 if waitable.take_event(concurrent_state)?.is_some() {
3336 bail!(Trap::SubtaskDropNotResolved);
3337 }
3338
3339 if delete {
3340 waitable.delete_from(concurrent_state)?;
3341 }
3342
3343 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3344 Ok(())
3345 }
3346
3347 pub(crate) fn waitable_set_wait(
3349 self,
3350 store: &mut StoreOpaque,
3351 options: OptionsIndex,
3352 set: u32,
3353 payload: u32,
3354 ) -> Result<u32> {
3355 if !self.options(store, options).async_ {
3356 store.check_blocking()?;
3360 }
3361
3362 let &CanonicalOptions {
3363 cancellable,
3364 instance: caller_instance,
3365 ..
3366 } = &self.id().get(store).component().env_component().options[options];
3367 let rep = store
3368 .instance_state(self.runtime_instance(caller_instance))
3369 .handle_table()
3370 .waitable_set_rep(set)?;
3371
3372 self.waitable_check(
3373 store,
3374 cancellable,
3375 WaitableCheck::Wait,
3376 WaitableCheckParams {
3377 set: TableId::new(rep),
3378 options,
3379 payload,
3380 },
3381 )
3382 }
3383
3384 pub(crate) fn waitable_set_poll(
3386 self,
3387 store: &mut StoreOpaque,
3388 options: OptionsIndex,
3389 set: u32,
3390 payload: u32,
3391 ) -> Result<u32> {
3392 let &CanonicalOptions {
3393 cancellable,
3394 instance: caller_instance,
3395 ..
3396 } = &self.id().get(store).component().env_component().options[options];
3397 let rep = store
3398 .instance_state(self.runtime_instance(caller_instance))
3399 .handle_table()
3400 .waitable_set_rep(set)?;
3401
3402 self.waitable_check(
3403 store,
3404 cancellable,
3405 WaitableCheck::Poll,
3406 WaitableCheckParams {
3407 set: TableId::new(rep),
3408 options,
3409 payload,
3410 },
3411 )
3412 }
3413
3414 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3416 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3417 match store
3418 .concurrent_state_mut()
3419 .get_mut(thread_id)?
3420 .instance_rep
3421 {
3422 Some(r) => Ok(r),
3423 None => bail_bug!("thread should have instance_rep by now"),
3424 }
3425 }
3426
3427 pub(crate) fn thread_new_indirect<T: 'static>(
3429 self,
3430 mut store: StoreContextMut<T>,
3431 runtime_instance: RuntimeComponentInstanceIndex,
3432 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3434 start_func_idx: u32,
3435 context: i32,
3436 ) -> Result<u32> {
3437 log::trace!("creating new thread");
3438
3439 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3440 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3441 let callee = instance
3442 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3443 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3444 if callee.type_index(store.0) != start_func_ty.type_index() {
3445 bail!(Trap::ThreadNewIndirectInvalidType);
3446 }
3447
3448 let token = StoreToken::new(store.as_context_mut());
3449 let start_func = Box::new(
3450 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3451 let old_thread = store.set_thread(guest_thread)?;
3452 log::trace!(
3453 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3454 );
3455
3456 let mut store = token.as_context_mut(store);
3457 let mut params = [ValRaw::i32(context)];
3458 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3461
3462 store.0.set_thread(old_thread)?;
3463
3464 store.0.cleanup_thread(
3465 guest_thread,
3466 self.runtime_instance(runtime_instance),
3467 CleanupTask::Yes,
3468 )?;
3469 log::trace!("explicit thread {guest_thread:?} completed");
3470 let state = store.0.concurrent_state_mut();
3471 if let Some(t) = old_thread.guest() {
3472 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3473 }
3474 log::trace!("thread start: restored {old_thread:?} as current thread");
3475
3476 Ok(())
3477 },
3478 );
3479
3480 let state = store.0.concurrent_state_mut();
3481 let current_thread = state.current_guest_thread()?;
3482 let parent_task = current_thread.task;
3483
3484 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3485 let thread_id = state.push(new_thread)?;
3486 state.get_mut(parent_task)?.threads.insert(thread_id);
3487
3488 log::trace!("new thread with id {thread_id:?} created");
3489
3490 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3491 }
3492
3493 pub(crate) fn resume_thread(
3494 self,
3495 store: &mut StoreOpaque,
3496 runtime_instance: RuntimeComponentInstanceIndex,
3497 thread_idx: u32,
3498 high_priority: bool,
3499 allow_ready: bool,
3500 ) -> Result<()> {
3501 let thread_id =
3502 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3503 let state = store.concurrent_state_mut();
3504 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3505 let thread = state.get_mut(guest_thread.thread)?;
3506
3507 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3508 GuestThreadState::NotStartedExplicit(start_func) => {
3509 log::trace!("starting thread {guest_thread:?}");
3510 let guest_call = WorkItem::GuestCall(
3511 runtime_instance,
3512 GuestCall {
3513 thread: guest_thread,
3514 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3515 start_func(store, guest_thread)
3516 })),
3517 },
3518 );
3519 store
3520 .concurrent_state_mut()
3521 .push_work_item(guest_call, high_priority);
3522 }
3523 GuestThreadState::Suspended(fiber) => {
3524 log::trace!("resuming thread {thread_id:?} that was suspended");
3525 store
3526 .concurrent_state_mut()
3527 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3528 }
3529 GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3530 log::trace!("resuming thread {thread_id:?} that was ready");
3531 thread.state = GuestThreadState::Ready { fiber, cancellable };
3532 store
3533 .concurrent_state_mut()
3534 .promote_thread_work_item(guest_thread);
3535 }
3536 other => {
3537 thread.state = other;
3538 bail!(Trap::CannotResumeThread);
3539 }
3540 }
3541 Ok(())
3542 }
3543
3544 fn add_guest_thread_to_instance_table(
3545 self,
3546 thread_id: TableId<GuestThread>,
3547 store: &mut StoreOpaque,
3548 runtime_instance: RuntimeComponentInstanceIndex,
3549 ) -> Result<u32> {
3550 let guest_id = store
3551 .instance_state(self.runtime_instance(runtime_instance))
3552 .thread_handle_table()
3553 .guest_thread_insert(thread_id.rep())?;
3554 store
3555 .concurrent_state_mut()
3556 .get_mut(thread_id)?
3557 .instance_rep = Some(guest_id);
3558 Ok(guest_id)
3559 }
3560
3561 pub(crate) fn suspension_intrinsic(
3564 self,
3565 store: &mut StoreOpaque,
3566 caller: RuntimeComponentInstanceIndex,
3567 cancellable: bool,
3568 yielding: bool,
3569 to_thread: SuspensionTarget,
3570 ) -> Result<WaitResult> {
3571 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3572 if to_thread.is_none() {
3573 let state = store.concurrent_state_mut();
3574 if yielding {
3575 if !state.may_block(guest_thread.task)? {
3577 if !state.promote_instance_local_thread_work_item(caller) {
3580 return Ok(WaitResult::Completed);
3582 }
3583 }
3584 } else {
3585 store.check_blocking()?;
3589 }
3590 }
3591
3592 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3594 return Ok(WaitResult::Cancelled);
3595 }
3596
3597 match to_thread {
3598 SuspensionTarget::SomeSuspended(thread) => {
3599 self.resume_thread(store, caller, thread, true, false)?
3600 }
3601 SuspensionTarget::Some(thread) => {
3602 self.resume_thread(store, caller, thread, true, true)?
3603 }
3604 SuspensionTarget::None => { }
3605 }
3606
3607 let reason = if yielding {
3608 SuspendReason::Yielding {
3609 thread: guest_thread,
3610 cancellable,
3611 skip_may_block_check: to_thread.is_some(),
3615 }
3616 } else {
3617 SuspendReason::ExplicitlySuspending {
3618 thread: guest_thread,
3619 skip_may_block_check: to_thread.is_some(),
3623 }
3624 };
3625
3626 store.suspend(reason)?;
3627
3628 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3629 Ok(WaitResult::Cancelled)
3630 } else {
3631 Ok(WaitResult::Completed)
3632 }
3633 }
3634
3635 fn waitable_check(
3637 self,
3638 store: &mut StoreOpaque,
3639 cancellable: bool,
3640 check: WaitableCheck,
3641 params: WaitableCheckParams,
3642 ) -> Result<u32> {
3643 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3644
3645 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3646
3647 let state = store.concurrent_state_mut();
3648 let task = state.get_mut(guest_thread.task)?;
3649
3650 match &check {
3653 WaitableCheck::Wait => {
3654 let set = params.set;
3655
3656 if (task.event.is_none()
3657 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3658 && state.get_mut(set)?.ready.is_empty()
3659 {
3660 if cancellable {
3661 let old = state
3662 .get_mut(guest_thread.thread)?
3663 .wake_on_cancel
3664 .replace(set);
3665 if !old.is_none() {
3666 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3667 }
3668 }
3669
3670 store.suspend(SuspendReason::Waiting {
3671 set,
3672 thread: guest_thread,
3673 skip_may_block_check: false,
3674 })?;
3675 }
3676 }
3677 WaitableCheck::Poll => {}
3678 }
3679
3680 log::trace!(
3681 "waitable check for {guest_thread:?}; set {:?}, part two",
3682 params.set
3683 );
3684
3685 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3687
3688 let (ordinal, handle, result) = match &check {
3689 WaitableCheck::Wait => {
3690 let (event, waitable) = match event {
3691 Some(p) => p,
3692 None => bail_bug!("event expected to be present"),
3693 };
3694 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3695 let (ordinal, result) = event.parts();
3696 (ordinal, handle, result)
3697 }
3698 WaitableCheck::Poll => {
3699 if let Some((event, waitable)) = event {
3700 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3701 let (ordinal, result) = event.parts();
3702 (ordinal, handle, result)
3703 } else {
3704 log::trace!(
3705 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3706 guest_thread.task,
3707 params.set
3708 );
3709 let (ordinal, result) = Event::None.parts();
3710 (ordinal, 0, result)
3711 }
3712 }
3713 };
3714 let memory = self.options_memory_mut(store, params.options);
3715 let ptr = func::validate_inbounds_dynamic(
3716 &CanonicalAbiInfo::POINTER_PAIR,
3717 memory,
3718 &ValRaw::u32(params.payload),
3719 )?;
3720 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3721 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3722 Ok(ordinal)
3723 }
3724
3725 pub(crate) fn subtask_cancel(
3727 self,
3728 store: &mut StoreOpaque,
3729 caller_instance: RuntimeComponentInstanceIndex,
3730 async_: bool,
3731 task_id: u32,
3732 ) -> Result<u32> {
3733 if !async_ {
3734 store.check_blocking()?;
3738 }
3739
3740 let (rep, is_host) = store
3741 .instance_state(self.runtime_instance(caller_instance))
3742 .handle_table()
3743 .subtask_rep(task_id)?;
3744 let waitable = if is_host {
3745 Waitable::Host(TableId::<HostTask>::new(rep))
3746 } else {
3747 Waitable::Guest(TableId::<GuestTask>::new(rep))
3748 };
3749 let concurrent_state = store.concurrent_state_mut();
3750
3751 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3752
3753 let needs_block;
3754 if let Waitable::Host(host_task) = waitable {
3755 let state = &mut concurrent_state.get_mut(host_task)?.state;
3756 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3757 HostTaskState::CalleeRunning(handle) => {
3764 handle.abort();
3765 needs_block = true;
3766 }
3767
3768 HostTaskState::CalleeDone { cancelled } => {
3771 if cancelled {
3772 bail!(Trap::SubtaskCancelAfterTerminal);
3773 } else {
3774 needs_block = false;
3777 }
3778 }
3779
3780 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3783 bail_bug!("invalid states for host callee")
3784 }
3785 }
3786 } else {
3787 let guest_task = TableId::<GuestTask>::new(rep);
3788 let task = concurrent_state.get_mut(guest_task)?;
3789 if !task.already_lowered_parameters() {
3790 store.cancel_guest_subtask_without_lowered_parameters(
3791 self.runtime_instance(caller_instance),
3792 guest_task,
3793 )?;
3794 return Ok(Status::StartCancelled as u32);
3795 } else if !task.returned_or_cancelled() {
3796 task.cancel_sent = true;
3799 task.event = Some(Event::Cancelled);
3804 let runtime_instance = task.instance.index;
3805 for thread in task.threads.clone() {
3806 let thread = QualifiedThreadId {
3807 task: guest_task,
3808 thread,
3809 };
3810 let thread_mut = concurrent_state.get_mut(thread.thread)?;
3811 if let Some(set) = thread_mut.wake_on_cancel.take() {
3812 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3814 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3815 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3816 runtime_instance,
3817 GuestCall {
3818 thread,
3819 kind: GuestCallKind::DeliverEvent {
3820 instance,
3821 set: None,
3822 },
3823 },
3824 ),
3825 None => bail_bug!("thread not present in wake_on_cancel set"),
3826 };
3827 concurrent_state.push_high_priority(item);
3828
3829 let caller = concurrent_state.current_guest_thread()?;
3830 store.suspend(SuspendReason::Yielding {
3831 thread: caller,
3832 cancellable: false,
3833 skip_may_block_check: false,
3836 })?;
3837 break;
3838 } else if let GuestThreadState::Ready {
3839 cancellable: true, ..
3840 } = &thread_mut.state
3841 {
3842 let caller = concurrent_state.current_guest_thread()?;
3845 concurrent_state.promote_thread_work_item(thread);
3846 store.suspend(SuspendReason::Yielding {
3847 thread: caller,
3848 cancellable: false,
3849 skip_may_block_check: false,
3850 })?;
3851 break;
3852 }
3853 }
3854
3855 needs_block = !store
3858 .concurrent_state_mut()
3859 .get_mut(guest_task)?
3860 .returned_or_cancelled()
3861 } else {
3862 needs_block = false;
3863 }
3864 };
3865
3866 if needs_block {
3870 if async_ {
3871 return Ok(BLOCKED);
3872 }
3873
3874 store.wait_for_event(waitable)?;
3878
3879 }
3881
3882 let event = waitable.take_event(store.concurrent_state_mut())?;
3883 if let Some(Event::Subtask {
3884 status: status @ (Status::Returned | Status::ReturnCancelled),
3885 }) = event
3886 {
3887 Ok(status as u32)
3888 } else {
3889 bail!(Trap::SubtaskCancelAfterTerminal);
3890 }
3891 }
3892}
3893
3894pub trait VMComponentAsyncStore {
3902 unsafe fn prepare_call(
3908 &mut self,
3909 instance: Instance,
3910 memory: *mut VMMemoryDefinition,
3911 start: NonNull<VMFuncRef>,
3912 return_: NonNull<VMFuncRef>,
3913 caller_instance: RuntimeComponentInstanceIndex,
3914 callee_instance: RuntimeComponentInstanceIndex,
3915 task_return_type: TypeTupleIndex,
3916 callee_async: bool,
3917 string_encoding: StringEncoding,
3918 result_count: u32,
3919 storage: *mut ValRaw,
3920 storage_len: usize,
3921 ) -> Result<()>;
3922
3923 unsafe fn sync_start(
3926 &mut self,
3927 instance: Instance,
3928 callback: *mut VMFuncRef,
3929 callee: NonNull<VMFuncRef>,
3930 param_count: u32,
3931 storage: *mut MaybeUninit<ValRaw>,
3932 storage_len: usize,
3933 ) -> Result<()>;
3934
3935 unsafe fn async_start(
3938 &mut self,
3939 instance: Instance,
3940 callback: *mut VMFuncRef,
3941 post_return: *mut VMFuncRef,
3942 callee: NonNull<VMFuncRef>,
3943 param_count: u32,
3944 result_count: u32,
3945 flags: u32,
3946 ) -> Result<u32>;
3947
3948 fn future_write(
3950 &mut self,
3951 instance: Instance,
3952 caller: RuntimeComponentInstanceIndex,
3953 ty: TypeFutureTableIndex,
3954 options: OptionsIndex,
3955 future: u32,
3956 address: u32,
3957 ) -> Result<u32>;
3958
3959 fn future_read(
3961 &mut self,
3962 instance: Instance,
3963 caller: RuntimeComponentInstanceIndex,
3964 ty: TypeFutureTableIndex,
3965 options: OptionsIndex,
3966 future: u32,
3967 address: u32,
3968 ) -> Result<u32>;
3969
3970 fn future_drop_writable(
3972 &mut self,
3973 instance: Instance,
3974 ty: TypeFutureTableIndex,
3975 writer: u32,
3976 ) -> Result<()>;
3977
3978 fn stream_write(
3980 &mut self,
3981 instance: Instance,
3982 caller: RuntimeComponentInstanceIndex,
3983 ty: TypeStreamTableIndex,
3984 options: OptionsIndex,
3985 stream: u32,
3986 address: u32,
3987 count: u32,
3988 ) -> Result<u32>;
3989
3990 fn stream_read(
3992 &mut self,
3993 instance: Instance,
3994 caller: RuntimeComponentInstanceIndex,
3995 ty: TypeStreamTableIndex,
3996 options: OptionsIndex,
3997 stream: u32,
3998 address: u32,
3999 count: u32,
4000 ) -> Result<u32>;
4001
4002 fn flat_stream_write(
4005 &mut self,
4006 instance: Instance,
4007 caller: RuntimeComponentInstanceIndex,
4008 ty: TypeStreamTableIndex,
4009 options: OptionsIndex,
4010 payload_size: u32,
4011 payload_align: u32,
4012 stream: u32,
4013 address: u32,
4014 count: u32,
4015 ) -> Result<u32>;
4016
4017 fn flat_stream_read(
4020 &mut self,
4021 instance: Instance,
4022 caller: RuntimeComponentInstanceIndex,
4023 ty: TypeStreamTableIndex,
4024 options: OptionsIndex,
4025 payload_size: u32,
4026 payload_align: u32,
4027 stream: u32,
4028 address: u32,
4029 count: u32,
4030 ) -> Result<u32>;
4031
4032 fn stream_drop_writable(
4034 &mut self,
4035 instance: Instance,
4036 ty: TypeStreamTableIndex,
4037 writer: u32,
4038 ) -> Result<()>;
4039
4040 fn error_context_debug_message(
4042 &mut self,
4043 instance: Instance,
4044 ty: TypeComponentLocalErrorContextTableIndex,
4045 options: OptionsIndex,
4046 err_ctx_handle: u32,
4047 debug_msg_address: u32,
4048 ) -> Result<()>;
4049
4050 fn thread_new_indirect(
4052 &mut self,
4053 instance: Instance,
4054 caller: RuntimeComponentInstanceIndex,
4055 func_ty_idx: TypeFuncIndex,
4056 start_func_table_idx: RuntimeTableIndex,
4057 start_func_idx: u32,
4058 context: i32,
4059 ) -> Result<u32>;
4060}
4061
4062impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4064 unsafe fn prepare_call(
4065 &mut self,
4066 instance: Instance,
4067 memory: *mut VMMemoryDefinition,
4068 start: NonNull<VMFuncRef>,
4069 return_: NonNull<VMFuncRef>,
4070 caller_instance: RuntimeComponentInstanceIndex,
4071 callee_instance: RuntimeComponentInstanceIndex,
4072 task_return_type: TypeTupleIndex,
4073 callee_async: bool,
4074 string_encoding: StringEncoding,
4075 result_count_or_max_if_async: u32,
4076 storage: *mut ValRaw,
4077 storage_len: usize,
4078 ) -> Result<()> {
4079 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
4083
4084 unsafe {
4085 instance.prepare_call(
4086 StoreContextMut(self),
4087 start,
4088 return_,
4089 caller_instance,
4090 callee_instance,
4091 task_return_type,
4092 callee_async,
4093 memory,
4094 string_encoding,
4095 match result_count_or_max_if_async {
4096 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4097 params,
4098 has_result: false,
4099 },
4100 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4101 params,
4102 has_result: true,
4103 },
4104 result_count => CallerInfo::Sync {
4105 params,
4106 result_count,
4107 },
4108 },
4109 )
4110 }
4111 }
4112
4113 unsafe fn sync_start(
4114 &mut self,
4115 instance: Instance,
4116 callback: *mut VMFuncRef,
4117 callee: NonNull<VMFuncRef>,
4118 param_count: u32,
4119 storage: *mut MaybeUninit<ValRaw>,
4120 storage_len: usize,
4121 ) -> Result<()> {
4122 unsafe {
4123 instance
4124 .start_call(
4125 StoreContextMut(self),
4126 callback,
4127 ptr::null_mut(),
4128 callee,
4129 param_count,
4130 1,
4131 START_FLAG_ASYNC_CALLEE,
4132 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4136 )
4137 .map(drop)
4138 }
4139 }
4140
4141 unsafe fn async_start(
4142 &mut self,
4143 instance: Instance,
4144 callback: *mut VMFuncRef,
4145 post_return: *mut VMFuncRef,
4146 callee: NonNull<VMFuncRef>,
4147 param_count: u32,
4148 result_count: u32,
4149 flags: u32,
4150 ) -> Result<u32> {
4151 unsafe {
4152 instance.start_call(
4153 StoreContextMut(self),
4154 callback,
4155 post_return,
4156 callee,
4157 param_count,
4158 result_count,
4159 flags,
4160 None,
4161 )
4162 }
4163 }
4164
4165 fn future_write(
4166 &mut self,
4167 instance: Instance,
4168 caller: RuntimeComponentInstanceIndex,
4169 ty: TypeFutureTableIndex,
4170 options: OptionsIndex,
4171 future: u32,
4172 address: u32,
4173 ) -> Result<u32> {
4174 instance
4175 .guest_write(
4176 StoreContextMut(self),
4177 caller,
4178 TransmitIndex::Future(ty),
4179 options,
4180 None,
4181 future,
4182 address,
4183 1,
4184 )
4185 .map(|result| result.encode())
4186 }
4187
4188 fn future_read(
4189 &mut self,
4190 instance: Instance,
4191 caller: RuntimeComponentInstanceIndex,
4192 ty: TypeFutureTableIndex,
4193 options: OptionsIndex,
4194 future: u32,
4195 address: u32,
4196 ) -> Result<u32> {
4197 instance
4198 .guest_read(
4199 StoreContextMut(self),
4200 caller,
4201 TransmitIndex::Future(ty),
4202 options,
4203 None,
4204 future,
4205 address,
4206 1,
4207 )
4208 .map(|result| result.encode())
4209 }
4210
4211 fn stream_write(
4212 &mut self,
4213 instance: Instance,
4214 caller: RuntimeComponentInstanceIndex,
4215 ty: TypeStreamTableIndex,
4216 options: OptionsIndex,
4217 stream: u32,
4218 address: u32,
4219 count: u32,
4220 ) -> Result<u32> {
4221 instance
4222 .guest_write(
4223 StoreContextMut(self),
4224 caller,
4225 TransmitIndex::Stream(ty),
4226 options,
4227 None,
4228 stream,
4229 address,
4230 count,
4231 )
4232 .map(|result| result.encode())
4233 }
4234
4235 fn stream_read(
4236 &mut self,
4237 instance: Instance,
4238 caller: RuntimeComponentInstanceIndex,
4239 ty: TypeStreamTableIndex,
4240 options: OptionsIndex,
4241 stream: u32,
4242 address: u32,
4243 count: u32,
4244 ) -> Result<u32> {
4245 instance
4246 .guest_read(
4247 StoreContextMut(self),
4248 caller,
4249 TransmitIndex::Stream(ty),
4250 options,
4251 None,
4252 stream,
4253 address,
4254 count,
4255 )
4256 .map(|result| result.encode())
4257 }
4258
4259 fn future_drop_writable(
4260 &mut self,
4261 instance: Instance,
4262 ty: TypeFutureTableIndex,
4263 writer: u32,
4264 ) -> Result<()> {
4265 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4266 }
4267
4268 fn flat_stream_write(
4269 &mut self,
4270 instance: Instance,
4271 caller: RuntimeComponentInstanceIndex,
4272 ty: TypeStreamTableIndex,
4273 options: OptionsIndex,
4274 payload_size: u32,
4275 payload_align: u32,
4276 stream: u32,
4277 address: u32,
4278 count: u32,
4279 ) -> Result<u32> {
4280 instance
4281 .guest_write(
4282 StoreContextMut(self),
4283 caller,
4284 TransmitIndex::Stream(ty),
4285 options,
4286 Some(FlatAbi {
4287 size: payload_size,
4288 align: payload_align,
4289 }),
4290 stream,
4291 address,
4292 count,
4293 )
4294 .map(|result| result.encode())
4295 }
4296
4297 fn flat_stream_read(
4298 &mut self,
4299 instance: Instance,
4300 caller: RuntimeComponentInstanceIndex,
4301 ty: TypeStreamTableIndex,
4302 options: OptionsIndex,
4303 payload_size: u32,
4304 payload_align: u32,
4305 stream: u32,
4306 address: u32,
4307 count: u32,
4308 ) -> Result<u32> {
4309 instance
4310 .guest_read(
4311 StoreContextMut(self),
4312 caller,
4313 TransmitIndex::Stream(ty),
4314 options,
4315 Some(FlatAbi {
4316 size: payload_size,
4317 align: payload_align,
4318 }),
4319 stream,
4320 address,
4321 count,
4322 )
4323 .map(|result| result.encode())
4324 }
4325
4326 fn stream_drop_writable(
4327 &mut self,
4328 instance: Instance,
4329 ty: TypeStreamTableIndex,
4330 writer: u32,
4331 ) -> Result<()> {
4332 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4333 }
4334
4335 fn error_context_debug_message(
4336 &mut self,
4337 instance: Instance,
4338 ty: TypeComponentLocalErrorContextTableIndex,
4339 options: OptionsIndex,
4340 err_ctx_handle: u32,
4341 debug_msg_address: u32,
4342 ) -> Result<()> {
4343 instance.error_context_debug_message(
4344 StoreContextMut(self),
4345 ty,
4346 options,
4347 err_ctx_handle,
4348 debug_msg_address,
4349 )
4350 }
4351
4352 fn thread_new_indirect(
4353 &mut self,
4354 instance: Instance,
4355 caller: RuntimeComponentInstanceIndex,
4356 func_ty_idx: TypeFuncIndex,
4357 start_func_table_idx: RuntimeTableIndex,
4358 start_func_idx: u32,
4359 context: i32,
4360 ) -> Result<u32> {
4361 instance.thread_new_indirect(
4362 StoreContextMut(self),
4363 caller,
4364 func_ty_idx,
4365 start_func_table_idx,
4366 start_func_idx,
4367 context,
4368 )
4369 }
4370}
4371
4372type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4373
4374pub(crate) struct HostTask {
4378 common: WaitableCommon,
4379
4380 caller: QualifiedThreadId,
4382
4383 call_context: CallContext,
4386
4387 state: HostTaskState,
4388}
4389
4390enum HostTaskState {
4391 CalleeStarted,
4396
4397 CalleeRunning(JoinHandle),
4402
4403 CalleeFinished(LiftedResult),
4407
4408 CalleeDone { cancelled: bool },
4411}
4412
4413impl HostTask {
4414 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4415 Self {
4416 common: WaitableCommon::default(),
4417 call_context: CallContext::default(),
4418 caller,
4419 state,
4420 }
4421 }
4422}
4423
4424impl TableDebug for HostTask {
4425 fn type_name() -> &'static str {
4426 "HostTask"
4427 }
4428}
4429
4430type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4431
4432enum Caller {
4434 Host {
4436 tx: Option<oneshot::Sender<LiftedResult>>,
4438 host_future_present: bool,
4441 caller: CurrentThread,
4445 },
4446 Guest {
4448 thread: QualifiedThreadId,
4450 },
4451}
4452
4453struct LiftResult {
4456 lift: RawLift,
4457 ty: TypeTupleIndex,
4458 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4459 string_encoding: StringEncoding,
4460}
4461
4462#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4467pub(crate) struct QualifiedThreadId {
4468 task: TableId<GuestTask>,
4469 thread: TableId<GuestThread>,
4470}
4471
4472impl QualifiedThreadId {
4473 fn qualify(
4474 state: &mut ConcurrentState,
4475 thread: TableId<GuestThread>,
4476 ) -> Result<QualifiedThreadId> {
4477 Ok(QualifiedThreadId {
4478 task: state.get_mut(thread)?.parent_task,
4479 thread,
4480 })
4481 }
4482}
4483
4484impl fmt::Debug for QualifiedThreadId {
4485 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4486 f.debug_tuple("QualifiedThreadId")
4487 .field(&self.task.rep())
4488 .field(&self.thread.rep())
4489 .finish()
4490 }
4491}
4492
4493enum GuestThreadState {
4494 NotStartedImplicit,
4495 NotStartedExplicit(
4496 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4497 ),
4498 Running,
4499 Suspended(StoreFiber<'static>),
4500 Ready {
4501 fiber: StoreFiber<'static>,
4502 cancellable: bool,
4503 },
4504 Completed,
4505}
4506pub struct GuestThread {
4507 context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4510 parent_task: TableId<GuestTask>,
4512 wake_on_cancel: Option<TableId<WaitableSet>>,
4515 state: GuestThreadState,
4517 instance_rep: Option<u32>,
4520 sync_call_set: TableId<WaitableSet>,
4522}
4523
4524impl GuestThread {
4525 fn from_instance(
4528 state: Pin<&mut ComponentInstance>,
4529 caller_instance: RuntimeComponentInstanceIndex,
4530 guest_thread: u32,
4531 ) -> Result<TableId<Self>> {
4532 let rep = state.instance_states().0[caller_instance]
4533 .thread_handle_table()
4534 .guest_thread_rep(guest_thread)?;
4535 Ok(TableId::new(rep))
4536 }
4537
4538 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4539 let sync_call_set = state.push(WaitableSet {
4540 is_sync_call_set: true,
4541 ..WaitableSet::default()
4542 })?;
4543 Ok(Self {
4544 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4545 parent_task,
4546 wake_on_cancel: None,
4547 state: GuestThreadState::NotStartedImplicit,
4548 instance_rep: None,
4549 sync_call_set,
4550 })
4551 }
4552
4553 fn new_explicit(
4554 state: &mut ConcurrentState,
4555 parent_task: TableId<GuestTask>,
4556 start_func: Box<
4557 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4558 >,
4559 ) -> Result<Self> {
4560 let sync_call_set = state.push(WaitableSet {
4561 is_sync_call_set: true,
4562 ..WaitableSet::default()
4563 })?;
4564 Ok(Self {
4565 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4566 parent_task,
4567 wake_on_cancel: None,
4568 state: GuestThreadState::NotStartedExplicit(start_func),
4569 instance_rep: None,
4570 sync_call_set,
4571 })
4572 }
4573}
4574
4575impl TableDebug for GuestThread {
4576 fn type_name() -> &'static str {
4577 "GuestThread"
4578 }
4579}
4580
4581enum SyncResult {
4582 NotProduced,
4583 Produced(Option<ValRaw>),
4584 Taken,
4585}
4586
4587impl SyncResult {
4588 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4589 Ok(match mem::replace(self, SyncResult::Taken) {
4590 SyncResult::NotProduced => None,
4591 SyncResult::Produced(val) => Some(val),
4592 SyncResult::Taken => {
4593 bail_bug!("attempted to take a synchronous result that was already taken")
4594 }
4595 })
4596 }
4597}
4598
4599#[derive(Debug)]
4600enum HostFutureState {
4601 NotApplicable,
4602 Live,
4603 Dropped,
4604}
4605
4606pub(crate) struct GuestTask {
4608 common: WaitableCommon,
4610 lower_params: Option<RawLower>,
4612 lift_result: Option<LiftResult>,
4614 result: Option<LiftedResult>,
4617 callback: Option<CallbackFn>,
4620 caller: Caller,
4622 call_context: CallContext,
4627 sync_result: SyncResult,
4630 cancel_sent: bool,
4633 starting_sent: bool,
4636 instance: RuntimeInstance,
4643 event: Option<Event>,
4646 exited: bool,
4648 threads: HashSet<TableId<GuestThread>>,
4650 host_future_state: HostFutureState,
4653 async_function: bool,
4656
4657 decremented_interesting_task_count: bool,
4658}
4659
4660impl GuestTask {
4661 fn already_lowered_parameters(&self) -> bool {
4662 self.lower_params.is_none()
4664 }
4665
4666 fn returned_or_cancelled(&self) -> bool {
4667 self.lift_result.is_none()
4669 }
4670
4671 fn ready_to_delete(&self) -> bool {
4672 let threads_completed = self.threads.is_empty();
4673 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4674 let pending_completion_event = matches!(
4675 self.common.event,
4676 Some(Event::Subtask {
4677 status: Status::Returned | Status::ReturnCancelled
4678 })
4679 );
4680 let ready = threads_completed
4681 && !has_sync_result
4682 && !pending_completion_event
4683 && !matches!(self.host_future_state, HostFutureState::Live);
4684 log::trace!(
4685 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4686 threads_completed,
4687 has_sync_result,
4688 pending_completion_event,
4689 self.host_future_state
4690 );
4691 ready
4692 }
4693
4694 fn new(
4695 state: &mut ConcurrentState,
4696 lower_params: RawLower,
4697 lift_result: LiftResult,
4698 caller: Caller,
4699 callback: Option<CallbackFn>,
4700 instance: RuntimeInstance,
4701 async_function: bool,
4702 ) -> Result<QualifiedThreadId> {
4703 let host_future_state = match &caller {
4704 Caller::Guest { .. } => HostFutureState::NotApplicable,
4705 Caller::Host {
4706 host_future_present,
4707 ..
4708 } => {
4709 if *host_future_present {
4710 HostFutureState::Live
4711 } else {
4712 HostFutureState::NotApplicable
4713 }
4714 }
4715 };
4716 let task = state.push(Self {
4717 common: WaitableCommon::default(),
4718 lower_params: Some(lower_params),
4719 lift_result: Some(lift_result),
4720 result: None,
4721 callback,
4722 caller,
4723 call_context: CallContext::default(),
4724 sync_result: SyncResult::NotProduced,
4725 cancel_sent: false,
4726 starting_sent: false,
4727 instance,
4728 event: None,
4729 exited: false,
4730 threads: HashSet::new(),
4731 host_future_state,
4732 async_function,
4733 decremented_interesting_task_count: false,
4734 })?;
4735 let new_thread = GuestThread::new_implicit(state, task)?;
4736 let thread = state.push(new_thread)?;
4737 state.get_mut(task)?.threads.insert(thread);
4738 state.interesting_tasks += 1;
4739 Ok(QualifiedThreadId { task, thread })
4740 }
4741}
4742
4743impl TableDebug for GuestTask {
4744 fn type_name() -> &'static str {
4745 "GuestTask"
4746 }
4747}
4748
4749#[derive(Default)]
4751struct WaitableCommon {
4752 event: Option<Event>,
4754 set: Option<TableId<WaitableSet>>,
4756 handle: Option<u32>,
4758}
4759
4760#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4762enum Waitable {
4763 Host(TableId<HostTask>),
4765 Guest(TableId<GuestTask>),
4767 Transmit(TableId<TransmitHandle>),
4769}
4770
4771impl Waitable {
4772 fn from_instance(
4775 state: Pin<&mut ComponentInstance>,
4776 caller_instance: RuntimeComponentInstanceIndex,
4777 waitable: u32,
4778 ) -> Result<Self> {
4779 use crate::runtime::vm::component::Waitable;
4780
4781 let (waitable, kind) = state.instance_states().0[caller_instance]
4782 .handle_table()
4783 .waitable_rep(waitable)?;
4784
4785 Ok(match kind {
4786 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4787 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4788 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4789 })
4790 }
4791
4792 fn rep(&self) -> u32 {
4794 match self {
4795 Self::Host(id) => id.rep(),
4796 Self::Guest(id) => id.rep(),
4797 Self::Transmit(id) => id.rep(),
4798 }
4799 }
4800
4801 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4805 log::trace!("waitable {self:?} join set {set:?}");
4806
4807 let old = mem::replace(&mut self.common(state)?.set, set);
4808
4809 if let Some(old) = old {
4810 match *self {
4811 Waitable::Host(id) => state.remove_child(id, old),
4812 Waitable::Guest(id) => state.remove_child(id, old),
4813 Waitable::Transmit(id) => state.remove_child(id, old),
4814 }?;
4815
4816 state.get_mut(old)?.ready.remove(self);
4817 }
4818
4819 if let Some(set) = set {
4820 match *self {
4821 Waitable::Host(id) => state.add_child(id, set),
4822 Waitable::Guest(id) => state.add_child(id, set),
4823 Waitable::Transmit(id) => state.add_child(id, set),
4824 }?;
4825
4826 if self.common(state)?.event.is_some() {
4827 self.mark_ready(state)?;
4828 }
4829 }
4830
4831 Ok(())
4832 }
4833
4834 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4836 Ok(match self {
4837 Self::Host(id) => &mut state.get_mut(*id)?.common,
4838 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4839 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4840 })
4841 }
4842
4843 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4847 log::trace!("set event for {self:?}: {event:?}");
4848 self.common(state)?.event = event;
4849 self.mark_ready(state)
4850 }
4851
4852 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4854 let common = self.common(state)?;
4855 let event = common.event.take();
4856 if let Some(set) = self.common(state)?.set {
4857 state.get_mut(set)?.ready.remove(self);
4858 }
4859
4860 Ok(event)
4861 }
4862
4863 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4867 if let Some(set) = self.common(state)?.set {
4868 state.get_mut(set)?.ready.insert(*self);
4869 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4870 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4871 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4872
4873 let item = match mode {
4874 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4875 WaitMode::Callback(instance) => WorkItem::GuestCall(
4876 state.get_mut(thread.task)?.instance.index,
4877 GuestCall {
4878 thread,
4879 kind: GuestCallKind::DeliverEvent {
4880 instance,
4881 set: Some(set),
4882 },
4883 },
4884 ),
4885 };
4886 state.push_high_priority(item);
4887 }
4888 }
4889 Ok(())
4890 }
4891
4892 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4894 match self {
4895 Self::Host(task) => {
4896 log::trace!("delete host task {task:?}");
4897 state.delete(*task)?;
4898 }
4899 Self::Guest(task) => {
4900 log::trace!("delete guest task {task:?}");
4901 let task = state.delete(*task)?;
4902
4903 debug_assert!(task.decremented_interesting_task_count);
4910 }
4911 Self::Transmit(task) => {
4912 state.delete(*task)?;
4913 }
4914 }
4915
4916 Ok(())
4917 }
4918}
4919
4920impl fmt::Debug for Waitable {
4921 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4922 match self {
4923 Self::Host(id) => write!(f, "{id:?}"),
4924 Self::Guest(id) => write!(f, "{id:?}"),
4925 Self::Transmit(id) => write!(f, "{id:?}"),
4926 }
4927 }
4928}
4929
4930#[derive(Default)]
4932struct WaitableSet {
4933 ready: BTreeSet<Waitable>,
4935 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4937 is_sync_call_set: bool,
4940}
4941
4942impl TableDebug for WaitableSet {
4943 fn type_name() -> &'static str {
4944 "WaitableSet"
4945 }
4946}
4947
4948type RawLower =
4950 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4951
4952type RawLift = Box<
4954 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4955>;
4956
4957type LiftedResult = Box<dyn Any + Send + Sync>;
4961
4962struct DummyResult;
4965
4966#[derive(Default)]
4968pub struct ConcurrentInstanceState {
4969 backpressure: u16,
4971 do_not_enter: bool,
4973 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4976}
4977
4978impl ConcurrentInstanceState {
4979 pub fn pending_is_empty(&self) -> bool {
4980 self.pending.is_empty()
4981 }
4982}
4983
4984#[derive(Debug, Copy, Clone)]
4985pub(crate) enum CurrentThread {
4986 Guest(QualifiedThreadId),
4987 Host(TableId<HostTask>),
4988 None,
4989}
4990
4991impl CurrentThread {
4992 fn guest(&self) -> Option<&QualifiedThreadId> {
4993 match self {
4994 Self::Guest(id) => Some(id),
4995 _ => None,
4996 }
4997 }
4998
4999 fn host(&self) -> Option<TableId<HostTask>> {
5000 match self {
5001 Self::Host(id) => Some(*id),
5002 _ => None,
5003 }
5004 }
5005
5006 fn is_none(&self) -> bool {
5007 matches!(self, Self::None)
5008 }
5009}
5010
5011impl From<QualifiedThreadId> for CurrentThread {
5012 fn from(id: QualifiedThreadId) -> Self {
5013 Self::Guest(id)
5014 }
5015}
5016
5017impl From<TableId<HostTask>> for CurrentThread {
5018 fn from(id: TableId<HostTask>) -> Self {
5019 Self::Host(id)
5020 }
5021}
5022
5023pub struct ConcurrentState {
5025 current_thread: CurrentThread,
5027
5028 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5033 table: AlwaysMut<ResourceTable>,
5035 high_priority: Vec<WorkItem>,
5037 low_priority: VecDeque<WorkItem>,
5039 suspend_reason: Option<SuspendReason>,
5043 worker: Option<StoreFiber<'static>>,
5047 worker_item: Option<WorkerItem>,
5049
5050 global_error_context_ref_counts:
5063 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5064
5065 interesting_tasks: usize,
5078
5079 interesting_tasks_empty_waker: Option<Waker>,
5083}
5084
5085impl Default for ConcurrentState {
5086 fn default() -> Self {
5087 Self {
5088 current_thread: CurrentThread::None,
5089 table: AlwaysMut::new(ResourceTable::new()),
5090 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5091 high_priority: Vec::new(),
5092 low_priority: VecDeque::new(),
5093 suspend_reason: None,
5094 worker: None,
5095 worker_item: None,
5096 global_error_context_ref_counts: BTreeMap::new(),
5097 interesting_tasks: 0,
5098 interesting_tasks_empty_waker: None,
5099 }
5100 }
5101}
5102
5103impl ConcurrentState {
5104 pub(crate) fn take_fibers_and_futures(
5121 &mut self,
5122 fibers: &mut Vec<StoreFiber<'static>>,
5123 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5124 ) {
5125 for entry in self.table.get_mut().iter_mut() {
5126 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5127 for mode in mem::take(&mut set.waiting).into_values() {
5128 if let WaitMode::Fiber(fiber) = mode {
5129 fibers.push(fiber);
5130 }
5131 }
5132 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5133 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5134 mem::replace(&mut thread.state, GuestThreadState::Completed)
5135 {
5136 fibers.push(fiber);
5137 }
5138 }
5139 }
5140
5141 if let Some(fiber) = self.worker.take() {
5142 fibers.push(fiber);
5143 }
5144
5145 let mut handle_item = |item| match item {
5146 WorkItem::ResumeFiber(fiber) => {
5147 fibers.push(fiber);
5148 }
5149 WorkItem::PushFuture(future) => {
5150 self.futures
5151 .get_mut()
5152 .as_mut()
5153 .unwrap()
5154 .push(future.into_inner());
5155 }
5156 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5157 }
5158 };
5159
5160 for item in mem::take(&mut self.high_priority) {
5161 handle_item(item);
5162 }
5163 for item in mem::take(&mut self.low_priority) {
5164 handle_item(item);
5165 }
5166
5167 if let Some(them) = self.futures.get_mut().take() {
5168 futures.push(them);
5169 }
5170 }
5171
5172 fn push<V: Send + Sync + 'static>(
5173 &mut self,
5174 value: V,
5175 ) -> Result<TableId<V>, ResourceTableError> {
5176 self.table.get_mut().push(value).map(TableId::from)
5177 }
5178
5179 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5180 self.table.get_mut().get_mut(&Resource::from(id))
5181 }
5182
5183 pub fn add_child<T: 'static, U: 'static>(
5184 &mut self,
5185 child: TableId<T>,
5186 parent: TableId<U>,
5187 ) -> Result<(), ResourceTableError> {
5188 self.table
5189 .get_mut()
5190 .add_child(Resource::from(child), Resource::from(parent))
5191 }
5192
5193 pub fn remove_child<T: 'static, U: 'static>(
5194 &mut self,
5195 child: TableId<T>,
5196 parent: TableId<U>,
5197 ) -> Result<(), ResourceTableError> {
5198 self.table
5199 .get_mut()
5200 .remove_child(Resource::from(child), Resource::from(parent))
5201 }
5202
5203 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5204 self.table.get_mut().delete(Resource::from(id))
5205 }
5206
5207 fn push_future(&mut self, future: HostTaskFuture) {
5208 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5215 }
5216
5217 fn push_high_priority(&mut self, item: WorkItem) {
5218 log::trace!("push high priority: {item:?}");
5219 self.high_priority.push(item);
5220 }
5221
5222 fn push_low_priority(&mut self, item: WorkItem) {
5223 log::trace!("push low priority: {item:?}");
5224 self.low_priority.push_front(item);
5225 }
5226
5227 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5228 if high_priority {
5229 self.push_high_priority(item);
5230 } else {
5231 self.push_low_priority(item);
5232 }
5233 }
5234
5235 fn promote_instance_local_thread_work_item(
5236 &mut self,
5237 current_instance: RuntimeComponentInstanceIndex,
5238 ) -> bool {
5239 self.promote_work_items_matching(|item: &WorkItem| match item {
5240 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5241 *instance == current_instance
5242 }
5243 _ => false,
5244 })
5245 }
5246
5247 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5248 self.promote_work_items_matching(|item: &WorkItem| match item {
5249 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5250 *t == thread
5251 }
5252 _ => false,
5253 })
5254 }
5255
5256 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5257 where
5258 F: FnMut(&WorkItem) -> bool,
5259 {
5260 if self.high_priority.iter().any(&mut predicate) {
5264 true
5265 }
5266 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5269 let item = self.low_priority.remove(idx).unwrap();
5270 self.push_high_priority(item);
5271 true
5272 } else {
5273 false
5274 }
5275 }
5276
5277 fn take_pending_cancellation(&mut self) -> Result<bool> {
5280 let thread = self.current_guest_thread()?;
5281 if let Some(event) = self.get_mut(thread.task)?.event.take() {
5282 assert!(matches!(event, Event::Cancelled));
5283 Ok(true)
5284 } else {
5285 Ok(false)
5286 }
5287 }
5288
5289 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5290 if self.may_block(task)? {
5291 Ok(())
5292 } else {
5293 Err(Trap::CannotBlockSyncTask.into())
5294 }
5295 }
5296
5297 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5298 let task = self.get_mut(task)?;
5299 Ok(task.async_function || task.returned_or_cancelled())
5300 }
5301
5302 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5308 let (task, is_host) = (task >> 1, task & 1 == 1);
5309 if is_host {
5310 let task: TableId<HostTask> = TableId::new(task);
5311 Ok(&mut self.get_mut(task)?.call_context)
5312 } else {
5313 let task: TableId<GuestTask> = TableId::new(task);
5314 Ok(&mut self.get_mut(task)?.call_context)
5315 }
5316 }
5317
5318 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5321 let (bits, is_host) = match self.current_thread {
5322 CurrentThread::Guest(id) => (id.task.rep(), false),
5323 CurrentThread::Host(id) => (id.rep(), true),
5324 CurrentThread::None => bail_bug!("current thread is not set"),
5325 };
5326 assert_eq!((bits << 1) >> 1, bits);
5327 Ok((bits << 1) | u32::from(is_host))
5328 }
5329
5330 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5331 match self.current_thread.guest() {
5332 Some(id) => Ok(*id),
5333 None => bail_bug!("current thread is not a guest thread"),
5334 }
5335 }
5336
5337 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5338 match self.current_thread.host() {
5339 Some(id) => Ok(id),
5340 None => bail_bug!("current thread is not a host thread"),
5341 }
5342 }
5343
5344 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5345 match self.futures.get_mut().as_mut() {
5346 Some(f) => Ok(f),
5347 None => bail_bug!("futures field of concurrent state is currently taken"),
5348 }
5349 }
5350
5351 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5352 self.table.get_mut()
5353 }
5354}
5355
5356fn for_any_lower<
5359 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5360>(
5361 fun: F,
5362) -> F {
5363 fun
5364}
5365
5366fn for_any_lift<
5368 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5369>(
5370 fun: F,
5371) -> F {
5372 fun
5373}
5374
5375fn checked<F: Future + Send + 'static>(
5380 id: StoreId,
5381 fut: F,
5382) -> impl Future<Output = F::Output> + Send + 'static {
5383 async move {
5384 let mut fut = pin!(fut);
5385 future::poll_fn(move |cx| {
5386 let message = "\
5387 `Future`s which depend on asynchronous component tasks, streams, or \
5388 futures to complete may only be polled from the event loop of the \
5389 store to which they belong. Please use \
5390 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5391 ";
5392 tls::try_get(|store| {
5393 let matched = match store {
5394 tls::TryGet::Some(store) => store.id() == id,
5395 tls::TryGet::Taken | tls::TryGet::None => false,
5396 };
5397
5398 if !matched {
5399 panic!("{message}")
5400 }
5401 });
5402 fut.as_mut().poll(cx)
5403 })
5404 .await
5405 }
5406}
5407
5408fn check_recursive_run() {
5411 tls::try_get(|store| {
5412 if !matches!(store, tls::TryGet::None) {
5413 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5414 }
5415 });
5416}
5417
5418fn unpack_callback_code(code: u32) -> (u32, u32) {
5419 (code & 0xF, code >> 4)
5420}
5421
5422struct WaitableCheckParams {
5426 set: TableId<WaitableSet>,
5427 options: OptionsIndex,
5428 payload: u32,
5429}
5430
5431enum WaitableCheck {
5434 Wait,
5435 Poll,
5436}
5437
5438pub(crate) struct PreparedCall<R> {
5440 handle: Func,
5442 thread: QualifiedThreadId,
5444 param_count: usize,
5446 rx: oneshot::Receiver<LiftedResult>,
5449 runtime_instance: RuntimeInstance,
5451 _phantom: PhantomData<R>,
5452}
5453
5454impl<R> PreparedCall<R> {
5455 pub(crate) fn task_id(&self) -> TaskId {
5457 TaskId {
5458 task: self.thread.task,
5459 runtime_instance: self.runtime_instance,
5460 }
5461 }
5462}
5463
5464pub(crate) struct TaskId {
5466 task: TableId<GuestTask>,
5467 runtime_instance: RuntimeInstance,
5468}
5469
5470impl TaskId {
5471 pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5477 let task = store.concurrent_state_mut().get_mut(self.task)?;
5478 let delete = if !task.already_lowered_parameters() {
5479 store.cancel_guest_subtask_without_lowered_parameters(
5480 self.runtime_instance,
5481 self.task,
5482 )?;
5483 true
5484 } else {
5485 task.host_future_state = HostFutureState::Dropped;
5486 task.ready_to_delete()
5487 };
5488 if delete {
5489 Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5490 }
5491 Ok(())
5492 }
5493}
5494
5495pub(crate) fn prepare_call<T, R>(
5501 mut store: StoreContextMut<T>,
5502 handle: Func,
5503 param_count: usize,
5504 host_future_present: bool,
5505 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5506 + Send
5507 + Sync
5508 + 'static,
5509 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5510 + Send
5511 + Sync
5512 + 'static,
5513) -> Result<PreparedCall<R>> {
5514 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5515
5516 let instance = handle.instance().id().get(store.0);
5517 let options = &instance.component().env_component().options[options];
5518 let ty = &instance.component().types()[ty];
5519 let async_function = ty.async_;
5520 let task_return_type = ty.results;
5521 let component_instance = raw_options.instance;
5522 let callback = options.callback.map(|i| instance.runtime_callback(i));
5523 let memory = options
5524 .memory()
5525 .map(|i| instance.runtime_memory(i))
5526 .map(SendSyncPtr::new);
5527 let string_encoding = options.string_encoding;
5528 let token = StoreToken::new(store.as_context_mut());
5529 let state = store.0.concurrent_state_mut();
5530
5531 let (tx, rx) = oneshot::channel();
5532
5533 let instance = handle.instance().runtime_instance(component_instance);
5534 let caller = state.current_thread;
5535 let thread = GuestTask::new(
5536 state,
5537 Box::new(for_any_lower(move |store, params| {
5538 lower_params(handle, token.as_context_mut(store), params)
5539 })),
5540 LiftResult {
5541 lift: Box::new(for_any_lift(move |store, result| {
5542 lift_result(handle, store, result)
5543 })),
5544 ty: task_return_type,
5545 memory,
5546 string_encoding,
5547 },
5548 Caller::Host {
5549 tx: Some(tx),
5550 host_future_present,
5551 caller,
5552 },
5553 callback.map(|callback| {
5554 let callback = SendSyncPtr::new(callback);
5555 let instance = handle.instance();
5556 Box::new(move |store: &mut dyn VMStore, event, handle| {
5557 let store = token.as_context_mut(store);
5558 unsafe { instance.call_callback(store, callback, event, handle) }
5561 }) as CallbackFn
5562 }),
5563 instance,
5564 async_function,
5565 )?;
5566
5567 if !store.0.may_enter(instance)? {
5568 bail!(Trap::CannotEnterComponent);
5569 }
5570
5571 Ok(PreparedCall {
5572 handle,
5573 thread,
5574 param_count,
5575 runtime_instance: instance,
5576 rx,
5577 _phantom: PhantomData,
5578 })
5579}
5580
5581pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5588 mut store: StoreContextMut<T>,
5589 prepared: PreparedCall<R>,
5590) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5591 let PreparedCall {
5592 handle,
5593 thread,
5594 param_count,
5595 rx,
5596 ..
5597 } = prepared;
5598
5599 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5600
5601 Ok(checked(
5602 store.0.id(),
5603 rx.map(move |result| match result {
5604 Ok(r) => match r.downcast() {
5605 Ok(r) => Ok(*r),
5606 Err(_) => bail_bug!("wrong type of value produced"),
5607 },
5608 Err(e) => Err(e.into()),
5609 }),
5610 ))
5611}
5612
5613fn queue_call0<T: 'static>(
5616 store: StoreContextMut<T>,
5617 handle: Func,
5618 guest_thread: QualifiedThreadId,
5619 param_count: usize,
5620) -> Result<()> {
5621 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5622 let is_concurrent = raw_options.async_;
5623 let callback = raw_options.callback;
5624 let instance = handle.instance();
5625 let callee = handle.lifted_core_func(store.0);
5626 let post_return = handle.post_return_core_func(store.0);
5627 let callback = callback.map(|i| {
5628 let instance = instance.id().get(store.0);
5629 SendSyncPtr::new(instance.runtime_callback(i))
5630 });
5631
5632 log::trace!("queueing call {guest_thread:?}");
5633
5634 unsafe {
5638 instance.queue_call(
5639 store,
5640 guest_thread,
5641 SendSyncPtr::new(callee),
5642 param_count,
5643 1,
5644 is_concurrent,
5645 callback,
5646 post_return.map(SendSyncPtr::new),
5647 )
5648 }
5649}