1use crate::bail_bug;
54use crate::component::func::{self, Func, call_post_return};
55use crate::component::{
56 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::prelude::*;
60use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63use crate::{
64 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::component::{
87 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
88 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
89 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
90 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
91 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
92};
93use wasmtime_environ::packed_option::ReservedValue;
94use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112const BLOCKED: u32 = 0xffff_ffff;
115
116#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119 Starting = 0,
120 Started = 1,
121 Returned = 2,
122 StartCancelled = 3,
123 ReturnCancelled = 4,
124}
125
126impl Status {
127 pub fn pack(self, waitable: Option<u32>) -> u32 {
133 assert!(matches!(self, Status::Returned) == waitable.is_none());
134 let waitable = waitable.unwrap_or(0);
135 assert!(waitable < (1 << 28));
136 (waitable << 4) | (self as u32)
137 }
138}
139
140#[derive(Clone, Copy, Debug)]
143enum Event {
144 None,
145 Subtask {
146 status: Status,
147 },
148 StreamRead {
149 code: ReturnCode,
150 pending: Option<(TypeStreamTableIndex, u32)>,
151 },
152 StreamWrite {
153 code: ReturnCode,
154 pending: Option<(TypeStreamTableIndex, u32)>,
155 },
156 FutureRead {
157 code: ReturnCode,
158 pending: Option<(TypeFutureTableIndex, u32)>,
159 },
160 FutureWrite {
161 code: ReturnCode,
162 pending: Option<(TypeFutureTableIndex, u32)>,
163 },
164 Cancelled,
165}
166
167impl Event {
168 fn parts(self) -> (u32, u32) {
173 const EVENT_NONE: u32 = 0;
174 const EVENT_SUBTASK: u32 = 1;
175 const EVENT_STREAM_READ: u32 = 2;
176 const EVENT_STREAM_WRITE: u32 = 3;
177 const EVENT_FUTURE_READ: u32 = 4;
178 const EVENT_FUTURE_WRITE: u32 = 5;
179 const EVENT_CANCELLED: u32 = 6;
180 match self {
181 Event::None => (EVENT_NONE, 0),
182 Event::Cancelled => (EVENT_CANCELLED, 0),
183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188 }
189 }
190}
191
192mod callback_code {
194 pub const EXIT: u32 = 0;
195 pub const YIELD: u32 = 1;
196 pub const WAIT: u32 = 2;
197}
198
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210 store: StoreContextMut<'a, T>,
211 get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216 D: HasData + ?Sized,
217 T: 'static,
218{
219 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221 Self { store, get_data }
222 }
223
224 pub fn data_mut(&mut self) -> &mut T {
226 self.store.data_mut()
227 }
228
229 pub fn get(&mut self) -> D::Data<'_> {
231 (self.get_data)(self.data_mut())
232 }
233
234 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238 where
239 T: 'static,
240 {
241 let accessor = Accessor {
242 get_data: self.get_data,
243 token: StoreToken::new(self.store.as_context_mut()),
244 };
245 self.store
246 .as_context_mut()
247 .spawn_with_accessor(accessor, task)
248 }
249
250 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253 self.get_data
254 }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259 D: HasData + ?Sized,
260 T: 'static,
261{
262 type Data = T;
263
264 fn as_context(&self) -> StoreContext<'_, T> {
265 self.store.as_context()
266 }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271 D: HasData + ?Sized,
272 T: 'static,
273{
274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275 self.store.as_context_mut()
276 }
277}
278
279pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340 D: HasData + ?Sized,
341{
342 token: StoreToken<T>,
343 get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346pub trait AsAccessor {
363 type Data: 'static;
365
366 type AccessorData: HasData + ?Sized;
369
370 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375 type Data = T::Data;
376 type AccessorData = T::AccessorData;
377
378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379 T::as_accessor(self)
380 }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384 type Data = T;
385 type AccessorData = D;
386
387 fn as_accessor(&self) -> &Accessor<T, D> {
388 self
389 }
390}
391
392const _: () = {
415 const fn assert<T: Send + Sync>() {}
416 assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420 pub(crate) fn new(token: StoreToken<T>) -> Self {
429 Self {
430 token,
431 get_data: |x| x,
432 }
433 }
434}
435
436impl<T, D> Accessor<T, D>
437where
438 D: HasData + ?Sized,
439{
440 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458 tls::get(|vmstore| {
459 fun(Access {
460 store: self.token.as_context_mut(vmstore),
461 get_data: self.get_data,
462 })
463 })
464 }
465
466 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469 self.get_data
470 }
471
472 pub fn with_getter<D2: HasData>(
489 &self,
490 get_data: fn(&mut T) -> D2::Data<'_>,
491 ) -> Accessor<T, D2> {
492 Accessor {
493 token: self.token,
494 get_data,
495 }
496 }
497
498 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514 where
515 T: 'static,
516 {
517 let accessor = self.clone_for_spawn();
518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519 }
520
521 fn clone_for_spawn(&self) -> Self {
522 Self {
523 token: self.token,
524 get_data: self.get_data,
525 }
526 }
527
528 pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
564 self.with(|mut access| {
565 let store = access.as_context_mut().0;
566 let state = store.concurrent_state_mut();
567 if state.interesting_tasks == 0 {
568 Poll::Ready(())
569 } else {
570 state.interesting_tasks_empty_waker = Some(cx.waker().clone());
571 Poll::Pending
572 }
573 })
574 }
575}
576
577pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
589where
590 D: HasData + ?Sized,
591{
592 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
594}
595
596enum CallerInfo {
599 Async {
601 params: Vec<ValRaw>,
602 has_result: bool,
603 },
604 Sync {
606 params: Vec<ValRaw>,
607 result_count: u32,
608 },
609}
610
611enum WaitMode {
613 Fiber(StoreFiber<'static>),
615 Callback(Instance),
618}
619
620#[derive(Debug)]
622enum SuspendReason {
623 Waiting {
626 set: TableId<WaitableSet>,
627 thread: QualifiedThreadId,
628 skip_may_block_check: bool,
629 },
630 NeedWork,
633 Yielding {
636 thread: QualifiedThreadId,
637 skip_may_block_check: bool,
638 },
639 ExplicitlySuspending {
641 thread: QualifiedThreadId,
642 skip_may_block_check: bool,
643 },
644}
645
646enum GuestCallKind {
648 DeliverEvent {
651 instance: Instance,
653 set: Option<TableId<WaitableSet>>,
658 },
659 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
665 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
666}
667
668impl fmt::Debug for GuestCallKind {
669 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
670 match self {
671 Self::DeliverEvent { instance, set } => f
672 .debug_struct("DeliverEvent")
673 .field("instance", instance)
674 .field("set", set)
675 .finish(),
676 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
677 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
678 }
679 }
680}
681
682#[derive(Copy, Clone, Debug)]
684pub enum SuspensionTarget {
685 SomeSuspended(u32),
686 Some(u32),
687 None,
688}
689
690impl SuspensionTarget {
691 fn is_none(&self) -> bool {
692 matches!(self, SuspensionTarget::None)
693 }
694 fn is_some(&self) -> bool {
695 !self.is_none()
696 }
697}
698
699#[derive(Debug)]
701struct GuestCall {
702 thread: QualifiedThreadId,
703 kind: GuestCallKind,
704}
705
706impl GuestCall {
707 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
717 let instance = store
718 .concurrent_state_mut()
719 .get_mut(self.thread.task)?
720 .instance;
721 let state = store.instance_state(instance).concurrent_state();
722
723 let ready = match &self.kind {
724 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
725 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
726 GuestCallKind::StartExplicit(_) => true,
727 };
728 log::trace!(
729 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
730 state.do_not_enter,
731 state.backpressure
732 );
733 Ok(ready)
734 }
735}
736
737enum WorkerItem {
739 GuestCall(GuestCall),
740 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
741}
742
743enum WorkItem {
746 PushFuture(AlwaysMut<HostTaskFuture>),
748 ResumeFiber(StoreFiber<'static>),
750 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
752 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
754 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
756}
757
758impl fmt::Debug for WorkItem {
759 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
760 match self {
761 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
762 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
763 Self::ResumeThread(instance, thread) => f
764 .debug_tuple("ResumeThread")
765 .field(instance)
766 .field(thread)
767 .finish(),
768 Self::GuestCall(instance, call) => f
769 .debug_tuple("GuestCall")
770 .field(instance)
771 .field(call)
772 .finish(),
773 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
774 }
775 }
776}
777
778#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
780pub(crate) enum WaitResult {
781 Cancelled,
782 Completed,
783}
784
785pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
793 store: &mut dyn VMStore,
794 future: impl Future<Output = Result<R>> + Send + 'static,
795) -> Result<R> {
796 let state = store.concurrent_state_mut();
797 let task = state.current_host_thread()?;
798
799 let mut future = Box::pin(async move {
803 let result = future.await?;
804 tls::get(move |store| {
805 let state = store.concurrent_state_mut();
806 let host_state = &mut state.get_mut(task)?.state;
807 assert!(matches!(host_state, HostTaskState::CalleeStarted));
808 *host_state = HostTaskState::CalleeFinished(Box::new(result));
809
810 Waitable::Host(task).set_event(
811 state,
812 Some(Event::Subtask {
813 status: Status::Returned,
814 }),
815 )?;
816
817 Ok(())
818 })
819 }) as HostTaskFuture;
820
821 let poll = tls::set(store, || {
825 future
826 .as_mut()
827 .poll(&mut Context::from_waker(&Waker::noop()))
828 });
829
830 match poll {
831 Poll::Ready(result) => result?,
833
834 Poll::Pending => {
839 let state = store.concurrent_state_mut();
840 state.push_future(future);
841
842 let caller = state.get_mut(task)?.caller;
843 let set = state.get_mut(caller.thread)?.sync_call_set;
844 Waitable::Host(task).join(state, Some(set))?;
845
846 store.suspend(SuspendReason::Waiting {
847 set,
848 thread: caller,
849 skip_may_block_check: false,
850 })?;
851
852 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
856 }
857 }
858
859 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
861 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
862 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
863 Ok(result) => *result,
864 Err(_) => bail_bug!("host task finished with wrong type of result"),
865 }),
866 _ => bail_bug!("unexpected host task state after completion"),
867 }
868}
869
870fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
872 let mut next = Some(call);
873 while let Some(call) = next.take() {
874 match call.kind {
875 GuestCallKind::DeliverEvent { instance, set } => {
876 let (event, waitable) =
877 match instance.get_event(store, call.thread.task, set, true)? {
878 Some(pair) => pair,
879 None => bail_bug!("delivering non-present event"),
880 };
881 let state = store.concurrent_state_mut();
882 let task = state.get_mut(call.thread.task)?;
883 let runtime_instance = task.instance;
884 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
885
886 log::trace!(
887 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
888 call.thread,
889 );
890
891 let old_thread = store.set_thread(call.thread)?;
892 log::trace!(
893 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
894 call.thread
895 );
896
897 store.enter_instance(runtime_instance);
898
899 let Some(callback) = store
900 .concurrent_state_mut()
901 .get_mut(call.thread.task)?
902 .callback
903 .take()
904 else {
905 bail_bug!("guest task callback field not present")
906 };
907
908 let code = callback(store, event, handle)?;
909
910 store
911 .concurrent_state_mut()
912 .get_mut(call.thread.task)?
913 .callback = Some(callback);
914
915 store.exit_instance(runtime_instance)?;
916
917 store.set_thread(old_thread)?;
918
919 next = instance.handle_callback_code(
920 store,
921 call.thread,
922 runtime_instance.index,
923 code,
924 )?;
925
926 log::trace!(
927 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
928 );
929 }
930 GuestCallKind::StartImplicit(fun) => {
931 next = fun(store)?;
932 }
933 GuestCallKind::StartExplicit(fun) => {
934 fun(store)?;
935 }
936 }
937 }
938
939 Ok(())
940}
941
942impl<T> Store<T> {
943 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
945 where
946 T: Send + 'static,
947 {
948 ensure!(
949 self.as_context().0.concurrency_support(),
950 "cannot use `run_concurrent` when Config::concurrency_support disabled",
951 );
952 self.as_context_mut().run_concurrent(fun).await
953 }
954
955 #[doc(hidden)]
956 pub fn assert_concurrent_state_empty(&mut self) {
957 self.as_context_mut().assert_concurrent_state_empty();
958 }
959
960 #[doc(hidden)]
961 pub fn concurrent_state_table_size(&mut self) -> usize {
962 self.as_context_mut().concurrent_state_table_size()
963 }
964
965 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
967 where
968 T: 'static,
969 {
970 self.as_context_mut().spawn(task)
971 }
972}
973
974impl<T> StoreContextMut<'_, T> {
975 #[doc(hidden)]
986 pub fn assert_concurrent_state_empty(self) {
987 let store = self.0;
988 store
989 .store_data_mut()
990 .components
991 .assert_instance_states_empty();
992 let state = store.concurrent_state_mut();
993 assert!(
994 state.table.get_mut().is_empty(),
995 "non-empty table: {:?}",
996 state.table.get_mut()
997 );
998 assert!(state.high_priority.is_empty());
999 assert!(state.low_priority.is_empty());
1000 assert!(state.current_thread.is_none());
1001 assert!(state.futures_mut().unwrap().is_empty());
1002 assert!(state.global_error_context_ref_counts.is_empty());
1003 }
1004
1005 #[doc(hidden)]
1010 pub fn concurrent_state_table_size(&mut self) -> usize {
1011 self.0
1012 .concurrent_state_mut()
1013 .table
1014 .get_mut()
1015 .iter_mut()
1016 .count()
1017 }
1018
1019 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1029 where
1030 T: 'static,
1031 {
1032 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1033 self.spawn_with_accessor(accessor, task)
1034 }
1035
1036 fn spawn_with_accessor<D>(
1039 self,
1040 accessor: Accessor<T, D>,
1041 task: impl AccessorTask<T, D>,
1042 ) -> JoinHandle
1043 where
1044 T: 'static,
1045 D: HasData + ?Sized,
1046 {
1047 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1051 self.0
1052 .concurrent_state_mut()
1053 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1054 handle
1055 }
1056
1057 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1141 where
1142 T: Send + 'static,
1143 {
1144 ensure!(
1145 self.0.concurrency_support(),
1146 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1147 );
1148 self.do_run_concurrent(fun, false).await
1149 }
1150
1151 pub(super) async fn run_concurrent_trap_on_idle<R>(
1152 self,
1153 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1154 ) -> Result<R>
1155 where
1156 T: Send + 'static,
1157 {
1158 self.do_run_concurrent(fun, true).await
1159 }
1160
1161 async fn do_run_concurrent<R>(
1162 mut self,
1163 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1164 trap_on_idle: bool,
1165 ) -> Result<R>
1166 where
1167 T: Send + 'static,
1168 {
1169 debug_assert!(self.0.concurrency_support());
1170 check_recursive_run();
1171 let token = StoreToken::new(self.as_context_mut());
1172
1173 struct Dropper<'a, T: 'static, V> {
1174 store: StoreContextMut<'a, T>,
1175 value: ManuallyDrop<V>,
1176 }
1177
1178 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1179 fn drop(&mut self) {
1180 tls::set(self.store.0, || {
1181 unsafe { ManuallyDrop::drop(&mut self.value) }
1186 });
1187 }
1188 }
1189
1190 let accessor = &Accessor::new(token);
1191 let dropper = &mut Dropper {
1192 store: self,
1193 value: ManuallyDrop::new(fun(accessor)),
1194 };
1195 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1197
1198 dropper
1199 .store
1200 .as_context_mut()
1201 .poll_until(future, trap_on_idle)
1202 .await
1203 }
1204
1205 async fn poll_until<R>(
1211 mut self,
1212 mut future: Pin<&mut impl Future<Output = R>>,
1213 trap_on_idle: bool,
1214 ) -> Result<R>
1215 where
1216 T: Send + 'static,
1217 {
1218 struct Reset<'a, T: 'static> {
1219 store: StoreContextMut<'a, T>,
1220 futures: Option<FuturesUnordered<HostTaskFuture>>,
1221 }
1222
1223 impl<'a, T> Drop for Reset<'a, T> {
1224 fn drop(&mut self) {
1225 if let Some(futures) = self.futures.take() {
1226 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1227 }
1228 }
1229 }
1230
1231 loop {
1232 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1236 let mut reset = Reset {
1237 store: self.as_context_mut(),
1238 futures,
1239 };
1240 let mut next = match reset.futures.as_mut() {
1241 Some(f) => pin!(f.next()),
1242 None => bail_bug!("concurrent state missing futures field"),
1243 };
1244
1245 enum PollResult<R> {
1246 Complete(R),
1247 ProcessWork {
1248 ready: Vec<WorkItem>,
1249 low_priority: bool,
1250 },
1251 }
1252
1253 let result = future::poll_fn(|cx| {
1254 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1257 return Poll::Ready(Ok(PollResult::Complete(value)));
1258 }
1259
1260 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1264 Poll::Ready(Some(output)) => {
1265 match output {
1266 Err(e) => return Poll::Ready(Err(e)),
1267 Ok(()) => {}
1268 }
1269 Poll::Ready(true)
1270 }
1271 Poll::Ready(None) => Poll::Ready(false),
1272 Poll::Pending => Poll::Pending,
1273 };
1274
1275 let state = reset.store.0.concurrent_state_mut();
1279 let mut ready = mem::take(&mut state.high_priority);
1280 let mut low_priority = false;
1281 if ready.is_empty() {
1282 if let Some(item) = state.low_priority.pop_back() {
1283 ready.push(item);
1284 low_priority = true;
1285 }
1286 }
1287 if !ready.is_empty() {
1288 return Poll::Ready(Ok(PollResult::ProcessWork {
1289 ready,
1290 low_priority,
1291 }));
1292 }
1293
1294 return match next {
1298 Poll::Ready(true) => {
1299 Poll::Ready(Ok(PollResult::ProcessWork {
1305 ready: Vec::new(),
1306 low_priority: false,
1307 }))
1308 }
1309 Poll::Ready(false) => {
1310 if let Poll::Ready(value) =
1314 tls::set(reset.store.0, || future.as_mut().poll(cx))
1315 {
1316 Poll::Ready(Ok(PollResult::Complete(value)))
1317 } else {
1318 if trap_on_idle {
1324 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1327 } else {
1328 Poll::Pending
1332 }
1333 }
1334 }
1335 Poll::Pending => Poll::Pending,
1340 };
1341 })
1342 .await;
1343
1344 drop(reset);
1348
1349 match result? {
1350 PollResult::Complete(value) => break Ok(value),
1353 PollResult::ProcessWork {
1356 ready,
1357 low_priority,
1358 } => {
1359 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1360 store: StoreContextMut<'a, T>,
1361 ready: I,
1362 }
1363
1364 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1365 fn drop(&mut self) {
1366 while let Some(item) = self.ready.next() {
1367 match item {
1368 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1369 WorkItem::PushFuture(future) => {
1370 tls::set(self.store.0, move || drop(future))
1371 }
1372 _ => {}
1373 }
1374 }
1375 }
1376 }
1377
1378 let mut dispose = Dispose {
1379 store: self.as_context_mut(),
1380 ready: ready.into_iter(),
1381 };
1382
1383 if low_priority {
1405 dispose.store.0.yield_now().await
1406 }
1407
1408 while let Some(item) = dispose.ready.next() {
1409 dispose
1410 .store
1411 .as_context_mut()
1412 .handle_work_item(item)
1413 .await?;
1414 }
1415 }
1416 }
1417 }
1418 }
1419
1420 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1422 where
1423 T: Send,
1424 {
1425 log::trace!("handle work item {item:?}");
1426 match item {
1427 WorkItem::PushFuture(future) => {
1428 self.0
1429 .concurrent_state_mut()
1430 .futures_mut()?
1431 .push(future.into_inner());
1432 }
1433 WorkItem::ResumeFiber(fiber) => {
1434 self.0.resume_fiber(fiber).await?;
1435 }
1436 WorkItem::ResumeThread(_, thread) => {
1437 if let GuestThreadState::Ready(fiber) = mem::replace(
1438 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1439 GuestThreadState::Running,
1440 ) {
1441 self.0.resume_fiber(fiber).await?;
1442 } else {
1443 bail_bug!("cannot resume non-pending thread {thread:?}");
1444 }
1445 }
1446 WorkItem::GuestCall(_, call) => {
1447 if call.is_ready(self.0)? {
1448 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1449 } else {
1450 let state = self.0.concurrent_state_mut();
1451 let task = state.get_mut(call.thread.task)?;
1452 if !task.starting_sent {
1453 task.starting_sent = true;
1454 if let GuestCallKind::StartImplicit(_) = &call.kind {
1455 Waitable::Guest(call.thread.task).set_event(
1456 state,
1457 Some(Event::Subtask {
1458 status: Status::Starting,
1459 }),
1460 )?;
1461 }
1462 }
1463
1464 let instance = state.get_mut(call.thread.task)?.instance;
1465 self.0
1466 .instance_state(instance)
1467 .concurrent_state()
1468 .pending
1469 .insert(call.thread, call.kind);
1470 }
1471 }
1472 WorkItem::WorkerFunction(fun) => {
1473 self.run_on_worker(WorkerItem::Function(fun)).await?;
1474 }
1475 }
1476
1477 Ok(())
1478 }
1479
1480 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1482 where
1483 T: Send,
1484 {
1485 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1486 fiber
1487 } else {
1488 fiber::make_fiber(self.0, move |store| {
1489 loop {
1490 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1491 bail_bug!("worker_item not present when resuming fiber")
1492 };
1493 match item {
1494 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1495 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1496 }
1497
1498 store.suspend(SuspendReason::NeedWork)?;
1499 }
1500 })?
1501 };
1502
1503 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1504 assert!(worker_item.is_none());
1505 *worker_item = Some(item);
1506
1507 self.0.resume_fiber(worker).await
1508 }
1509
1510 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1515 where
1516 T: 'static,
1517 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1518 + Send
1519 + Sync
1520 + 'static,
1521 R: Send + Sync + 'static,
1522 {
1523 let token = StoreToken::new(self);
1524 async move {
1525 let mut accessor = Accessor::new(token);
1526 closure(&mut accessor).await
1527 }
1528 }
1529}
1530
1531impl StoreOpaque {
1532 pub(crate) fn enter_guest_sync_call(
1539 &mut self,
1540 guest_caller: Option<RuntimeInstance>,
1541 callee_async: bool,
1542 callee: RuntimeInstance,
1543 ) -> Result<()> {
1544 log::trace!("enter sync call {callee:?}");
1545 if !self.concurrency_support() {
1546 return self.enter_call_not_concurrent();
1547 }
1548
1549 let state = self.concurrent_state_mut();
1550 let thread = state.current_thread;
1551 let instance = if let Some(thread) = thread.guest() {
1552 Some(state.get_mut(thread.task)?.instance)
1553 } else {
1554 None
1555 };
1556 if guest_caller.is_some() {
1557 debug_assert_eq!(instance, guest_caller);
1558 }
1559 let guest_thread = GuestTask::new(
1560 state,
1561 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1562 LiftResult {
1563 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1564 ty: TypeTupleIndex::reserved_value(),
1565 memory: None,
1566 string_encoding: StringEncoding::Utf8,
1567 },
1568 if let Some(thread) = thread.guest() {
1569 Caller::Guest { thread: *thread }
1570 } else {
1571 Caller::Host {
1572 tx: None,
1573 host_future_present: false,
1574 caller: thread,
1575 }
1576 },
1577 None,
1578 callee,
1579 callee_async,
1580 )?;
1581
1582 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1583 guest_thread.thread,
1584 self,
1585 callee.index,
1586 )?;
1587 self.set_thread(guest_thread)?;
1588
1589 Ok(())
1590 }
1591
1592 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1594 if !self.concurrency_support() {
1595 return Ok(self.exit_call_not_concurrent());
1596 }
1597 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1598 Some(t) => *t,
1599 None => bail_bug!("expected task when exiting"),
1600 };
1601 let task = self.concurrent_state_mut().get_mut(thread.task)?;
1602 let instance = task.instance;
1603 let caller = match &task.caller {
1604 &Caller::Guest { thread } => thread.into(),
1605 &Caller::Host { caller, .. } => caller,
1606 };
1607 task.lift_result = None;
1608 task.exited = true;
1609 self.set_thread(caller)?;
1610
1611 log::trace!("exit sync call {instance:?}");
1612 self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1613
1614 Ok(())
1615 }
1616
1617 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1625 if !self.concurrency_support() {
1626 self.enter_call_not_concurrent()?;
1627 return Ok(None);
1628 }
1629 let state = self.concurrent_state_mut();
1630 let caller = state.current_guest_thread()?;
1631 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1632 log::trace!("new host task {task:?}");
1633 self.set_thread(task)?;
1634 Ok(Some(task))
1635 }
1636
1637 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1643 if !self.concurrency_support() {
1644 return Ok(());
1645 }
1646 let task = self.concurrent_state_mut().current_host_thread()?;
1647 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1648 self.set_thread(caller)?;
1649 Ok(())
1650 }
1651
1652 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1659 match task {
1660 Some(task) => {
1661 log::trace!("delete host task {task:?}");
1662 self.concurrent_state_mut().delete(task)?;
1663 }
1664 None => {
1665 self.exit_call_not_concurrent();
1666 }
1667 }
1668 Ok(())
1669 }
1670
1671 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1679 if self.trapped() {
1680 return Ok(false);
1681 }
1682 if !self.concurrency_support() {
1683 return Ok(true);
1684 }
1685 let state = self.concurrent_state_mut();
1686 let mut cur = state.current_thread;
1687 loop {
1688 match cur {
1689 CurrentThread::None => break Ok(true),
1690 CurrentThread::Guest(thread) => {
1691 let task = state.get_mut(thread.task)?;
1692
1693 if task.instance.instance == instance.instance {
1700 break Ok(false);
1701 }
1702 cur = match task.caller {
1703 Caller::Host { caller, .. } => caller,
1704 Caller::Guest { thread } => thread.into(),
1705 };
1706 }
1707 CurrentThread::Host(id) => {
1708 cur = state.get_mut(id)?.caller.into();
1709 }
1710 }
1711 }
1712 }
1713
1714 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1717 self.component_instance_mut(instance.instance)
1718 .instance_state(instance.index)
1719 }
1720
1721 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1727 let thread = thread.into();
1728 let state = self.concurrent_state_mut();
1729 let old_thread = mem::replace(&mut state.current_thread, thread);
1730
1731 if let Some(old_thread) = old_thread.guest() {
1739 let old_context = self.vm_store_context().component_context;
1740 self.concurrent_state_mut()
1741 .get_mut(old_thread.thread)?
1742 .context = old_context;
1743 }
1744 if cfg!(debug_assertions) {
1745 self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1746 }
1747 if let Some(thread) = thread.guest() {
1748 let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1749 let context = thread.context;
1750 if cfg!(debug_assertions) {
1751 thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1752 }
1753 self.vm_store_context_mut().component_context = context;
1754 }
1755
1756 let state = self.concurrent_state_mut();
1764 if let Some(old_thread) = old_thread.guest() {
1765 let instance = state.get_mut(old_thread.task)?.instance.instance;
1766 self.component_instance_mut(instance)
1767 .set_task_may_block(false)
1768 }
1769
1770 if thread.guest().is_some() {
1771 self.set_task_may_block()?;
1772 }
1773
1774 Ok(old_thread)
1775 }
1776
1777 fn set_task_may_block(&mut self) -> Result<()> {
1780 let state = self.concurrent_state_mut();
1781 let guest_thread = state.current_guest_thread()?;
1782 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1783 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1784 self.component_instance_mut(instance)
1785 .set_task_may_block(may_block);
1786 Ok(())
1787 }
1788
1789 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1790 if !self.concurrency_support() {
1791 return Ok(());
1792 }
1793 let state = self.concurrent_state_mut();
1794 let task = state.current_guest_thread()?.task;
1795 let instance = state.get_mut(task)?.instance.instance;
1796 let task_may_block = self.component_instance(instance).get_task_may_block();
1797
1798 if task_may_block {
1799 Ok(())
1800 } else {
1801 Err(Trap::CannotBlockSyncTask.into())
1802 }
1803 }
1804
1805 fn enter_instance(&mut self, instance: RuntimeInstance) {
1809 log::trace!("enter {instance:?}");
1810 self.instance_state(instance)
1811 .concurrent_state()
1812 .do_not_enter = true;
1813 }
1814
1815 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1819 log::trace!("exit {instance:?}");
1820 self.instance_state(instance)
1821 .concurrent_state()
1822 .do_not_enter = false;
1823 self.partition_pending(instance)
1824 }
1825
1826 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1831 for (thread, kind) in
1832 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1833 {
1834 let call = GuestCall { thread, kind };
1835 if call.is_ready(self)? {
1836 self.concurrent_state_mut()
1837 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1838 } else {
1839 self.instance_state(instance)
1840 .concurrent_state()
1841 .pending
1842 .insert(call.thread, call.kind);
1843 }
1844 }
1845
1846 Ok(())
1847 }
1848
1849 pub(crate) fn backpressure_modify(
1851 &mut self,
1852 caller_instance: RuntimeInstance,
1853 modify: impl FnOnce(u16) -> Option<u16>,
1854 ) -> Result<()> {
1855 let state = self.instance_state(caller_instance).concurrent_state();
1856 let old = state.backpressure;
1857 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1858 state.backpressure = new;
1859
1860 if old > 0 && new == 0 {
1861 self.partition_pending(caller_instance)?;
1864 }
1865
1866 Ok(())
1867 }
1868
1869 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1872 let old_thread = self.concurrent_state_mut().current_thread;
1873 log::trace!("resume_fiber: save current thread {old_thread:?}");
1874
1875 let fiber = fiber::resolve_or_release(self, fiber).await?;
1876
1877 self.set_thread(old_thread)?;
1878
1879 let state = self.concurrent_state_mut();
1880
1881 if let Some(ot) = old_thread.guest() {
1882 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1883 }
1884 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1885
1886 if let Some(mut fiber) = fiber {
1887 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1888 let reason = match state.suspend_reason.take() {
1890 Some(r) => r,
1891 None => bail_bug!("suspend reason missing when resuming fiber"),
1892 };
1893 match reason {
1894 SuspendReason::NeedWork => {
1895 if state.worker.is_none() {
1896 state.worker = Some(fiber);
1897 } else {
1898 fiber.dispose(self);
1899 }
1900 }
1901 SuspendReason::Yielding { thread, .. } => {
1902 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1903 let instance = state.get_mut(thread.task)?.instance.index;
1904 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1905 }
1906 SuspendReason::ExplicitlySuspending { thread, .. } => {
1907 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1908 }
1909 SuspendReason::Waiting { set, thread, .. } => {
1910 let old = state
1911 .get_mut(set)?
1912 .waiting
1913 .insert(thread, WaitMode::Fiber(fiber));
1914 assert!(old.is_none());
1915 }
1916 };
1917 } else {
1918 log::trace!("resume_fiber: fiber has exited");
1919 }
1920
1921 Ok(())
1922 }
1923
1924 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1930 log::trace!("suspend fiber: {reason:?}");
1931
1932 let task = match &reason {
1936 SuspendReason::Yielding { thread, .. }
1937 | SuspendReason::Waiting { thread, .. }
1938 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1939 SuspendReason::NeedWork => None,
1940 };
1941
1942 let old_guest_thread = if task.is_some() {
1943 self.concurrent_state_mut().current_thread
1944 } else {
1945 CurrentThread::None
1946 };
1947
1948 debug_assert!(
1954 matches!(
1955 reason,
1956 SuspendReason::ExplicitlySuspending {
1957 skip_may_block_check: true,
1958 ..
1959 } | SuspendReason::Waiting {
1960 skip_may_block_check: true,
1961 ..
1962 } | SuspendReason::Yielding {
1963 skip_may_block_check: true,
1964 ..
1965 }
1966 ) || old_guest_thread
1967 .guest()
1968 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1969 .transpose()?
1970 .unwrap_or(true)
1971 );
1972
1973 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1974 assert!(suspend_reason.is_none());
1975 *suspend_reason = Some(reason);
1976
1977 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1978
1979 if task.is_some() {
1980 self.set_thread(old_guest_thread)?;
1981 }
1982
1983 Ok(())
1984 }
1985
1986 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1987 let state = self.concurrent_state_mut();
1988 let caller = state.current_guest_thread()?;
1989 let old_set = waitable.common(state)?.set;
1990 let set = state.get_mut(caller.thread)?.sync_call_set;
1991 waitable.join(state, Some(set))?;
1992 self.suspend(SuspendReason::Waiting {
1993 set,
1994 thread: caller,
1995 skip_may_block_check: false,
1996 })?;
1997 let state = self.concurrent_state_mut();
1998 waitable.join(state, old_set)
1999 }
2000
2001 fn cleanup_thread(
2023 &mut self,
2024 guest_thread: QualifiedThreadId,
2025 runtime_instance: RuntimeInstance,
2026 cleanup_task: CleanupTask,
2027 ) -> Result<()> {
2028 let state = self.concurrent_state_mut();
2029 let thread_data = state.get_mut(guest_thread.thread)?;
2030 let sync_call_set = thread_data.sync_call_set;
2031 if let Some(guest_id) = thread_data.instance_rep {
2032 self.instance_state(runtime_instance)
2033 .thread_handle_table()
2034 .guest_thread_remove(guest_id)?;
2035 }
2036 let state = self.concurrent_state_mut();
2037
2038 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2040 if let Some(Event::Subtask {
2041 status: Status::Returned | Status::ReturnCancelled,
2042 }) = waitable.common(state)?.event
2043 {
2044 waitable.delete_from(state)?;
2045 }
2046 }
2047
2048 state.delete(guest_thread.thread)?;
2049 state.delete(sync_call_set)?;
2050 let task = state.get_mut(guest_thread.task)?;
2051 task.threads.remove(&guest_thread.thread);
2052
2053 if task.threads.is_empty() && !task.returned_or_cancelled() {
2054 bail!(Trap::NoAsyncResult);
2055 }
2056 let ready_to_delete = task.ready_to_delete();
2057
2058 if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2059 task.decremented_interesting_task_count = true;
2060
2061 debug_assert!(state.interesting_tasks > 0);
2062 state.interesting_tasks -= 1;
2063 if state.interesting_tasks == 0
2064 && let Some(waker) = state.interesting_tasks_empty_waker.take()
2065 {
2066 waker.wake();
2067 }
2068 }
2069
2070 match cleanup_task {
2071 CleanupTask::Yes => {
2072 if ready_to_delete {
2073 Waitable::Guest(guest_thread.task).delete_from(state)?;
2074 }
2075 }
2076 CleanupTask::No => {}
2077 }
2078
2079 Ok(())
2080 }
2081
2082 fn cancel_guest_subtask_without_lowered_parameters(
2095 &mut self,
2096 caller_instance: RuntimeInstance,
2097 guest_task: TableId<GuestTask>,
2098 ) -> Result<()> {
2099 let concurrent_state = self.concurrent_state_mut();
2100 let task = concurrent_state.get_mut(guest_task)?;
2101 assert!(!task.already_lowered_parameters());
2102 task.lower_params = None;
2106 task.lift_result = None;
2107 task.exited = true;
2108 let instance = task.instance;
2109
2110 assert_eq!(1, task.threads.len());
2113 let thread = *task.threads.iter().next().unwrap();
2114 self.cleanup_thread(
2115 QualifiedThreadId {
2116 task: guest_task,
2117 thread,
2118 },
2119 caller_instance,
2120 CleanupTask::No,
2121 )?;
2122
2123 let pending = &mut self.instance_state(instance).concurrent_state().pending;
2125 let pending_count = pending.len();
2126 pending.retain(|thread, _| thread.task != guest_task);
2127 if pending.len() == pending_count {
2129 bail!(Trap::SubtaskCancelAfterTerminal);
2130 }
2131 Ok(())
2132 }
2133}
2134
2135enum CleanupTask {
2136 Yes,
2137 No,
2138}
2139
2140impl Instance {
2141 fn get_event(
2144 self,
2145 store: &mut StoreOpaque,
2146 guest_task: TableId<GuestTask>,
2147 set: Option<TableId<WaitableSet>>,
2148 cancellable: bool,
2149 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2150 let state = store.concurrent_state_mut();
2151
2152 let event = &mut state.get_mut(guest_task)?.event;
2153 if let Some(ev) = event
2154 && (cancellable || !matches!(ev, Event::Cancelled))
2155 {
2156 log::trace!("deliver event {ev:?} to {guest_task:?}");
2157 let ev = *ev;
2158 *event = None;
2159 return Ok(Some((ev, None)));
2160 }
2161
2162 let set = match set {
2163 Some(set) => set,
2164 None => return Ok(None),
2165 };
2166 let waitable = match state.get_mut(set)?.ready.pop_first() {
2167 Some(v) => v,
2168 None => return Ok(None),
2169 };
2170
2171 let common = waitable.common(state)?;
2172 let handle = match common.handle {
2173 Some(h) => h,
2174 None => bail_bug!("handle not set when delivering event"),
2175 };
2176 let event = match common.event.take() {
2177 Some(e) => e,
2178 None => bail_bug!("event not set when delivering event"),
2179 };
2180
2181 log::trace!(
2182 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2183 );
2184
2185 waitable.on_delivery(store, self, event)?;
2186
2187 Ok(Some((event, Some((waitable, handle)))))
2188 }
2189
2190 fn handle_callback_code(
2196 self,
2197 store: &mut StoreOpaque,
2198 guest_thread: QualifiedThreadId,
2199 runtime_instance: RuntimeComponentInstanceIndex,
2200 code: u32,
2201 ) -> Result<Option<GuestCall>> {
2202 let (code, set) = unpack_callback_code(code);
2203
2204 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2205
2206 let state = store.concurrent_state_mut();
2207
2208 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2209 let set = store
2210 .instance_state(self.runtime_instance(runtime_instance))
2211 .handle_table()
2212 .waitable_set_rep(handle)?;
2213
2214 Ok(TableId::<WaitableSet>::new(set))
2215 };
2216
2217 Ok(match code {
2218 callback_code::EXIT => {
2219 log::trace!("implicit thread {guest_thread:?} completed");
2220 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2221 task.exited = true;
2222 task.callback = None;
2223 store.cleanup_thread(
2224 guest_thread,
2225 self.runtime_instance(runtime_instance),
2226 CleanupTask::Yes,
2227 )?;
2228 None
2229 }
2230 callback_code::YIELD => {
2231 let task = state.get_mut(guest_thread.task)?;
2232 if let Some(event) = task.event {
2237 assert!(matches!(event, Event::None | Event::Cancelled));
2238 } else {
2239 task.event = Some(Event::None);
2240 }
2241 let call = GuestCall {
2242 thread: guest_thread,
2243 kind: GuestCallKind::DeliverEvent {
2244 instance: self,
2245 set: None,
2246 },
2247 };
2248 if state.may_block(guest_thread.task)? {
2249 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2252 None
2253 } else {
2254 Some(call)
2258 }
2259 }
2260 callback_code::WAIT => {
2261 state.check_blocking_for(guest_thread.task)?;
2264
2265 let set = get_set(store, set)?;
2266 let state = store.concurrent_state_mut();
2267
2268 if state.get_mut(guest_thread.task)?.event.is_some()
2269 || !state.get_mut(set)?.ready.is_empty()
2270 {
2271 state.push_high_priority(WorkItem::GuestCall(
2273 runtime_instance,
2274 GuestCall {
2275 thread: guest_thread,
2276 kind: GuestCallKind::DeliverEvent {
2277 instance: self,
2278 set: Some(set),
2279 },
2280 },
2281 ));
2282 } else {
2283 let old = state
2291 .get_mut(guest_thread.thread)?
2292 .wake_on_cancel
2293 .replace(set);
2294 if !old.is_none() {
2295 bail_bug!("thread unexpectedly had wake_on_cancel set");
2296 }
2297 let old = state
2298 .get_mut(set)?
2299 .waiting
2300 .insert(guest_thread, WaitMode::Callback(self));
2301 if !old.is_none() {
2302 bail_bug!("set's waiting set already had this thread registered");
2303 }
2304 }
2305 None
2306 }
2307 _ => bail!(Trap::UnsupportedCallbackCode),
2308 })
2309 }
2310
2311 unsafe fn queue_call<T: 'static>(
2318 self,
2319 mut store: StoreContextMut<T>,
2320 guest_thread: QualifiedThreadId,
2321 callee: SendSyncPtr<VMFuncRef>,
2322 param_count: usize,
2323 result_count: usize,
2324 async_: bool,
2325 callback: Option<SendSyncPtr<VMFuncRef>>,
2326 post_return: Option<SendSyncPtr<VMFuncRef>>,
2327 ) -> Result<()> {
2328 unsafe fn make_call<T: 'static>(
2343 store: StoreContextMut<T>,
2344 guest_thread: QualifiedThreadId,
2345 callee: SendSyncPtr<VMFuncRef>,
2346 param_count: usize,
2347 result_count: usize,
2348 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2349 + Send
2350 + Sync
2351 + 'static
2352 + use<T> {
2353 let token = StoreToken::new(store);
2354 move |store: &mut dyn VMStore| {
2355 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2356
2357 store
2358 .concurrent_state_mut()
2359 .get_mut(guest_thread.thread)?
2360 .state = GuestThreadState::Running;
2361 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2362 let lower = match task.lower_params.take() {
2363 Some(l) => l,
2364 None => bail_bug!("lower_params missing"),
2365 };
2366
2367 lower(store, &mut storage[..param_count])?;
2368
2369 let mut store = token.as_context_mut(store);
2370
2371 unsafe {
2374 crate::Func::call_unchecked_raw(
2375 &mut store,
2376 callee.as_non_null(),
2377 NonNull::new(
2378 &mut storage[..param_count.max(result_count)]
2379 as *mut [MaybeUninit<ValRaw>] as _,
2380 )
2381 .unwrap(),
2382 )?;
2383 }
2384
2385 Ok(storage)
2386 }
2387 }
2388
2389 let call = unsafe {
2393 make_call(
2394 store.as_context_mut(),
2395 guest_thread,
2396 callee,
2397 param_count,
2398 result_count,
2399 )
2400 };
2401
2402 let callee_instance = store
2403 .0
2404 .concurrent_state_mut()
2405 .get_mut(guest_thread.task)?
2406 .instance;
2407
2408 let fun = if callback.is_some() {
2409 assert!(async_);
2410
2411 Box::new(move |store: &mut dyn VMStore| {
2412 self.add_guest_thread_to_instance_table(
2413 guest_thread.thread,
2414 store,
2415 callee_instance.index,
2416 )?;
2417 let old_thread = store.set_thread(guest_thread)?;
2418 log::trace!(
2419 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2420 );
2421
2422 store.enter_instance(callee_instance);
2423
2424 let storage = call(store)?;
2431
2432 store.exit_instance(callee_instance)?;
2433
2434 store.set_thread(old_thread)?;
2435 let state = store.concurrent_state_mut();
2436 if let Some(t) = old_thread.guest() {
2437 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2438 }
2439 log::trace!("stackless call: restored {old_thread:?} as current thread");
2440
2441 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2444
2445 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2446 })
2447 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2448 } else {
2449 let token = StoreToken::new(store.as_context_mut());
2450 Box::new(move |store: &mut dyn VMStore| {
2451 self.add_guest_thread_to_instance_table(
2452 guest_thread.thread,
2453 store,
2454 callee_instance.index,
2455 )?;
2456 let old_thread = store.set_thread(guest_thread)?;
2457 log::trace!(
2458 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2459 );
2460 let flags = self.id().get(store).instance_flags(callee_instance.index);
2461
2462 if !async_ {
2466 store.enter_instance(callee_instance);
2467 }
2468
2469 let storage = call(store)?;
2476
2477 if !async_ {
2478 let lift = {
2484 store.exit_instance(callee_instance)?;
2485
2486 let state = store.concurrent_state_mut();
2487 if !state.get_mut(guest_thread.task)?.result.is_none() {
2488 bail_bug!("task has already produced a result");
2489 }
2490
2491 match state.get_mut(guest_thread.task)?.lift_result.take() {
2492 Some(lift) => lift,
2493 None => bail_bug!("lift_result field is missing"),
2494 }
2495 };
2496
2497 let result = (lift.lift)(store, unsafe {
2500 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2501 &storage[..result_count],
2502 )
2503 })?;
2504
2505 let post_return_arg = match result_count {
2506 0 => ValRaw::i32(0),
2507 1 => unsafe { storage[0].assume_init() },
2510 _ => unreachable!(),
2511 };
2512
2513 unsafe {
2514 call_post_return(
2515 token.as_context_mut(store),
2516 post_return.map(|v| v.as_non_null()),
2517 post_return_arg,
2518 flags,
2519 )?;
2520 }
2521
2522 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2523 }
2524
2525 store.set_thread(old_thread)?;
2526
2527 store
2528 .concurrent_state_mut()
2529 .get_mut(guest_thread.task)?
2530 .exited = true;
2531
2532 store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2534 Ok(None)
2535 })
2536 };
2537
2538 store
2539 .0
2540 .concurrent_state_mut()
2541 .push_high_priority(WorkItem::GuestCall(
2542 callee_instance.index,
2543 GuestCall {
2544 thread: guest_thread,
2545 kind: GuestCallKind::StartImplicit(fun),
2546 },
2547 ));
2548
2549 Ok(())
2550 }
2551
2552 unsafe fn prepare_call<T: 'static>(
2565 self,
2566 mut store: StoreContextMut<T>,
2567 start: NonNull<VMFuncRef>,
2568 return_: NonNull<VMFuncRef>,
2569 caller_instance: RuntimeComponentInstanceIndex,
2570 callee_instance: RuntimeComponentInstanceIndex,
2571 task_return_type: TypeTupleIndex,
2572 callee_async: bool,
2573 memory: *mut VMMemoryDefinition,
2574 string_encoding: StringEncoding,
2575 caller_info: CallerInfo,
2576 ) -> Result<()> {
2577 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2578 store.0.check_blocking()?;
2582 }
2583
2584 enum ResultInfo {
2585 Heap { results: u32 },
2586 Stack { result_count: u32 },
2587 }
2588
2589 let result_info = match &caller_info {
2590 CallerInfo::Async {
2591 has_result: true,
2592 params,
2593 } => ResultInfo::Heap {
2594 results: match params.last() {
2595 Some(r) => r.get_u32(),
2596 None => bail_bug!("retptr missing"),
2597 },
2598 },
2599 CallerInfo::Async {
2600 has_result: false, ..
2601 } => ResultInfo::Stack { result_count: 0 },
2602 CallerInfo::Sync {
2603 result_count,
2604 params,
2605 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2606 results: match params.last() {
2607 Some(r) => r.get_u32(),
2608 None => bail_bug!("arg ptr missing"),
2609 },
2610 },
2611 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2612 result_count: *result_count,
2613 },
2614 };
2615
2616 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2617
2618 let start = SendSyncPtr::new(start);
2622 let return_ = SendSyncPtr::new(return_);
2623 let token = StoreToken::new(store.as_context_mut());
2624 let state = store.0.concurrent_state_mut();
2625 let old_thread = state.current_guest_thread()?;
2626
2627 debug_assert_eq!(
2628 state.get_mut(old_thread.task)?.instance,
2629 self.runtime_instance(caller_instance)
2630 );
2631
2632 let guest_thread = GuestTask::new(
2633 state,
2634 Box::new(move |store, dst| {
2635 let mut store = token.as_context_mut(store);
2636 assert!(dst.len() <= MAX_FLAT_PARAMS);
2637 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2639 let count = match caller_info {
2640 CallerInfo::Async { params, has_result } => {
2644 let params = ¶ms[..params.len() - usize::from(has_result)];
2645 for (param, src) in params.iter().zip(&mut src) {
2646 src.write(*param);
2647 }
2648 params.len()
2649 }
2650
2651 CallerInfo::Sync { params, .. } => {
2653 for (param, src) in params.iter().zip(&mut src) {
2654 src.write(*param);
2655 }
2656 params.len()
2657 }
2658 };
2659 unsafe {
2666 crate::Func::call_unchecked_raw(
2667 &mut store,
2668 start.as_non_null(),
2669 NonNull::new(
2670 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2671 )
2672 .unwrap(),
2673 )?;
2674 }
2675 dst.copy_from_slice(&src[..dst.len()]);
2676 let state = store.0.concurrent_state_mut();
2677 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2678 state,
2679 Some(Event::Subtask {
2680 status: Status::Started,
2681 }),
2682 )?;
2683 Ok(())
2684 }),
2685 LiftResult {
2686 lift: Box::new(move |store, src| {
2687 let mut store = token.as_context_mut(store);
2690 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2692 my_src.push(ValRaw::u32(*results));
2693 }
2694
2695 let prev = store.0.set_thread(old_thread)?;
2701
2702 unsafe {
2709 crate::Func::call_unchecked_raw(
2710 &mut store,
2711 return_.as_non_null(),
2712 my_src.as_mut_slice().into(),
2713 )?;
2714 }
2715
2716 store.0.set_thread(prev)?;
2719
2720 let state = store.0.concurrent_state_mut();
2721 let thread = state.current_guest_thread()?;
2722 if sync_caller {
2723 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2724 if let ResultInfo::Stack { result_count } = &result_info {
2725 match result_count {
2726 0 => None,
2727 1 => Some(my_src[0]),
2728 _ => unreachable!(),
2729 }
2730 } else {
2731 None
2732 },
2733 );
2734 }
2735 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2736 }),
2737 ty: task_return_type,
2738 memory: NonNull::new(memory).map(SendSyncPtr::new),
2739 string_encoding,
2740 },
2741 Caller::Guest { thread: old_thread },
2742 None,
2743 self.runtime_instance(callee_instance),
2744 callee_async,
2745 )?;
2746
2747 store.0.set_thread(guest_thread)?;
2750 log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2751
2752 Ok(())
2753 }
2754
2755 unsafe fn call_callback<T>(
2760 self,
2761 mut store: StoreContextMut<T>,
2762 function: SendSyncPtr<VMFuncRef>,
2763 event: Event,
2764 handle: u32,
2765 ) -> Result<u32> {
2766 let (ordinal, result) = event.parts();
2767 let params = &mut [
2768 ValRaw::u32(ordinal),
2769 ValRaw::u32(handle),
2770 ValRaw::u32(result),
2771 ];
2772 unsafe {
2777 crate::Func::call_unchecked_raw(
2778 &mut store,
2779 function.as_non_null(),
2780 params.as_mut_slice().into(),
2781 )?;
2782 }
2783 Ok(params[0].get_u32())
2784 }
2785
2786 unsafe fn start_call<T: 'static>(
2799 self,
2800 mut store: StoreContextMut<T>,
2801 callback: *mut VMFuncRef,
2802 post_return: *mut VMFuncRef,
2803 callee: NonNull<VMFuncRef>,
2804 param_count: u32,
2805 result_count: u32,
2806 flags: u32,
2807 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2808 ) -> Result<u32> {
2809 let token = StoreToken::new(store.as_context_mut());
2810 let async_caller = storage.is_none();
2811 let state = store.0.concurrent_state_mut();
2812 let guest_thread = state.current_guest_thread()?;
2813 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2814 let callee = SendSyncPtr::new(callee);
2815 let param_count = usize::try_from(param_count)?;
2816 assert!(param_count <= MAX_FLAT_PARAMS);
2817 let result_count = usize::try_from(result_count)?;
2818 assert!(result_count <= MAX_FLAT_RESULTS);
2819
2820 let task = state.get_mut(guest_thread.task)?;
2821 if let Some(callback) = NonNull::new(callback) {
2822 let callback = SendSyncPtr::new(callback);
2826 task.callback = Some(Box::new(move |store, event, handle| {
2827 let store = token.as_context_mut(store);
2828 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2829 }));
2830 }
2831
2832 let Caller::Guest { thread: caller } = &task.caller else {
2833 bail_bug!("start_call unexpectedly invoked for host->guest call");
2836 };
2837 let caller = *caller;
2838 let caller_instance = state.get_mut(caller.task)?.instance;
2839
2840 unsafe {
2842 self.queue_call(
2843 store.as_context_mut(),
2844 guest_thread,
2845 callee,
2846 param_count,
2847 result_count,
2848 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2849 NonNull::new(callback).map(SendSyncPtr::new),
2850 NonNull::new(post_return).map(SendSyncPtr::new),
2851 )?;
2852 }
2853
2854 let state = store.0.concurrent_state_mut();
2855
2856 let guest_waitable = Waitable::Guest(guest_thread.task);
2859 let old_set = guest_waitable.common(state)?.set;
2860 let set = state.get_mut(caller.thread)?.sync_call_set;
2861 guest_waitable.join(state, Some(set))?;
2862
2863 store.0.set_thread(CurrentThread::None)?;
2864
2865 let (status, waitable) = loop {
2881 store.0.suspend(SuspendReason::Waiting {
2882 set,
2883 thread: caller,
2884 skip_may_block_check: async_caller || !callee_async,
2892 })?;
2893
2894 let state = store.0.concurrent_state_mut();
2895
2896 log::trace!("taking event for {:?}", guest_thread.task);
2897 let event = guest_waitable.take_event(state)?;
2898 let Some(Event::Subtask { status }) = event else {
2899 bail_bug!("subtasks should only get subtask events, got {event:?}")
2900 };
2901
2902 log::trace!("status {status:?} for {:?}", guest_thread.task);
2903
2904 if status == Status::Returned {
2905 break (status, None);
2907 } else if async_caller {
2908 let handle = store
2912 .0
2913 .instance_state(caller_instance)
2914 .handle_table()
2915 .subtask_insert_guest(guest_thread.task.rep())?;
2916 store
2917 .0
2918 .concurrent_state_mut()
2919 .get_mut(guest_thread.task)?
2920 .common
2921 .handle = Some(handle);
2922 break (status, Some(handle));
2923 } else {
2924 }
2928 };
2929
2930 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2931
2932 store.0.set_thread(caller)?;
2934 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2935 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2936
2937 if let Some(storage) = storage {
2938 let state = store.0.concurrent_state_mut();
2942 let task = state.get_mut(guest_thread.task)?;
2943 if let Some(result) = task.sync_result.take()? {
2944 if let Some(result) = result {
2945 storage[0] = MaybeUninit::new(result);
2946 }
2947
2948 if task.exited && task.ready_to_delete() {
2949 Waitable::Guest(guest_thread.task).delete_from(state)?;
2950 }
2951 }
2952 }
2953
2954 Ok(status.pack(waitable))
2955 }
2956
2957 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2970 self,
2971 mut store: StoreContextMut<'_, T>,
2972 future: impl Future<Output = Result<R>> + Send + 'static,
2973 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2974 ) -> Result<Option<u32>> {
2975 let token = StoreToken::new(store.as_context_mut());
2976 let state = store.0.concurrent_state_mut();
2977 let task = state.current_host_thread()?;
2978
2979 let (join_handle, future) = JoinHandle::run(future);
2982 {
2983 let state = &mut state.get_mut(task)?.state;
2984 assert!(matches!(state, HostTaskState::CalleeStarted));
2985 *state = HostTaskState::CalleeRunning(join_handle);
2986 }
2987
2988 let mut future = Box::pin(future);
2989
2990 let poll = tls::set(store.0, || {
2995 future
2996 .as_mut()
2997 .poll(&mut Context::from_waker(&Waker::noop()))
2998 });
2999
3000 match poll {
3001 Poll::Ready(result) => {
3003 let result = result.transpose()?;
3004 lower(store.as_context_mut(), result)?;
3005 return Ok(None);
3006 }
3007
3008 Poll::Pending => {}
3010 }
3011
3012 let future = Box::pin(async move {
3020 let result = match future.await {
3021 Some(result) => Some(result?),
3022 None => None,
3023 };
3024 let on_complete = move |store: &mut dyn VMStore| {
3025 let mut store = token.as_context_mut(store);
3029 let old = store.0.set_thread(task)?;
3030
3031 let status = if result.is_some() {
3032 Status::Returned
3033 } else {
3034 Status::ReturnCancelled
3035 };
3036
3037 lower(store.as_context_mut(), result)?;
3038 let state = store.0.concurrent_state_mut();
3039 match &mut state.get_mut(task)?.state {
3040 HostTaskState::CalleeDone { .. } => {}
3043
3044 other => *other = HostTaskState::CalleeDone { cancelled: false },
3046 }
3047 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3048
3049 store.0.set_thread(old)?;
3050 Ok(())
3051 };
3052
3053 tls::get(move |store| {
3058 store
3059 .concurrent_state_mut()
3060 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3061 on_complete,
3062 ))));
3063 Ok(())
3064 })
3065 });
3066
3067 let state = store.0.concurrent_state_mut();
3070 state.push_future(future);
3071 let caller = state.get_mut(task)?.caller;
3072 let instance = state.get_mut(caller.task)?.instance;
3073 let handle = store
3074 .0
3075 .instance_state(instance)
3076 .handle_table()
3077 .subtask_insert_host(task.rep())?;
3078 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3079 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3080
3081 store.0.set_thread(caller)?;
3085 Ok(Some(handle))
3086 }
3087
3088 pub(crate) fn task_return(
3091 self,
3092 store: &mut dyn VMStore,
3093 ty: TypeTupleIndex,
3094 options: OptionsIndex,
3095 storage: &[ValRaw],
3096 ) -> Result<()> {
3097 let state = store.concurrent_state_mut();
3098 let guest_thread = state.current_guest_thread()?;
3099 let lift = state
3100 .get_mut(guest_thread.task)?
3101 .lift_result
3102 .take()
3103 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3104 if !state.get_mut(guest_thread.task)?.result.is_none() {
3105 bail_bug!("task result unexpectedly already set");
3106 }
3107
3108 let CanonicalOptions {
3109 string_encoding,
3110 data_model,
3111 ..
3112 } = &self.id().get(store).component().env_component().options[options];
3113
3114 let invalid = ty != lift.ty
3115 || string_encoding != &lift.string_encoding
3116 || match data_model {
3117 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3118 Some(memory) => {
3119 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3120 let actual = self.id().get(store).runtime_memory(memory);
3121 expected != actual.as_ptr()
3122 }
3123 None => false,
3126 },
3127 CanonicalOptionsDataModel::Gc { .. } => true,
3129 };
3130
3131 if invalid {
3132 bail!(Trap::TaskReturnInvalid);
3133 }
3134
3135 log::trace!("task.return for {guest_thread:?}");
3136
3137 let result = (lift.lift)(store, storage)?;
3138 self.task_complete(store, guest_thread.task, result, Status::Returned)
3139 }
3140
3141 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3143 let state = store.concurrent_state_mut();
3144 let guest_thread = state.current_guest_thread()?;
3145 let task = state.get_mut(guest_thread.task)?;
3146 if !task.cancel_sent {
3147 bail!(Trap::TaskCancelNotCancelled);
3148 }
3149 _ = task
3150 .lift_result
3151 .take()
3152 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3153
3154 if !task.result.is_none() {
3155 bail_bug!("task result should not bet set yet");
3156 }
3157
3158 log::trace!("task.cancel for {guest_thread:?}");
3159
3160 self.task_complete(
3161 store,
3162 guest_thread.task,
3163 Box::new(DummyResult),
3164 Status::ReturnCancelled,
3165 )
3166 }
3167
3168 fn task_complete(
3174 self,
3175 store: &mut StoreOpaque,
3176 guest_task: TableId<GuestTask>,
3177 result: Box<dyn Any + Send + Sync>,
3178 status: Status,
3179 ) -> Result<()> {
3180 store
3181 .component_resource_tables(Some(self))
3182 .validate_scope_exit()?;
3183
3184 let state = store.concurrent_state_mut();
3185 let task = state.get_mut(guest_task)?;
3186
3187 if let Caller::Host { tx, .. } = &mut task.caller {
3188 if let Some(tx) = tx.take() {
3189 _ = tx.send(result);
3190 }
3191 } else {
3192 task.result = Some(result);
3193 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3194 }
3195
3196 Ok(())
3197 }
3198
3199 pub(crate) fn waitable_set_new(
3201 self,
3202 store: &mut StoreOpaque,
3203 caller_instance: RuntimeComponentInstanceIndex,
3204 ) -> Result<u32> {
3205 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3206 let handle = store
3207 .instance_state(self.runtime_instance(caller_instance))
3208 .handle_table()
3209 .waitable_set_insert(set.rep())?;
3210 log::trace!("new waitable set {set:?} (handle {handle})");
3211 Ok(handle)
3212 }
3213
3214 pub(crate) fn waitable_set_drop(
3216 self,
3217 store: &mut StoreOpaque,
3218 caller_instance: RuntimeComponentInstanceIndex,
3219 set: u32,
3220 ) -> Result<()> {
3221 let rep = store
3222 .instance_state(self.runtime_instance(caller_instance))
3223 .handle_table()
3224 .waitable_set_remove(set)?;
3225
3226 log::trace!("drop waitable set {rep} (handle {set})");
3227
3228 if !store
3232 .concurrent_state_mut()
3233 .get_mut(TableId::<WaitableSet>::new(rep))?
3234 .waiting
3235 .is_empty()
3236 {
3237 bail!(Trap::WaitableSetDropHasWaiters);
3238 }
3239
3240 store
3241 .concurrent_state_mut()
3242 .delete(TableId::<WaitableSet>::new(rep))?;
3243
3244 Ok(())
3245 }
3246
3247 pub(crate) fn waitable_join(
3249 self,
3250 store: &mut StoreOpaque,
3251 caller_instance: RuntimeComponentInstanceIndex,
3252 waitable_handle: u32,
3253 set_handle: u32,
3254 ) -> Result<()> {
3255 let mut instance = self.id().get_mut(store);
3256 let waitable =
3257 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3258
3259 let set = if set_handle == 0 {
3260 None
3261 } else {
3262 let set = instance.instance_states().0[caller_instance]
3263 .handle_table()
3264 .waitable_set_rep(set_handle)?;
3265
3266 Some(TableId::<WaitableSet>::new(set))
3267 };
3268
3269 log::trace!(
3270 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3271 );
3272
3273 waitable.join(store.concurrent_state_mut(), set)
3274 }
3275
3276 pub(crate) fn subtask_drop(
3278 self,
3279 store: &mut StoreOpaque,
3280 caller_instance: RuntimeComponentInstanceIndex,
3281 task_id: u32,
3282 ) -> Result<()> {
3283 self.waitable_join(store, caller_instance, task_id, 0)?;
3284
3285 let (rep, is_host) = store
3286 .instance_state(self.runtime_instance(caller_instance))
3287 .handle_table()
3288 .subtask_remove(task_id)?;
3289
3290 let concurrent_state = store.concurrent_state_mut();
3291 let (waitable, delete) = if is_host {
3292 let id = TableId::<HostTask>::new(rep);
3293 let task = concurrent_state.get_mut(id)?;
3294 match &task.state {
3295 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3296 HostTaskState::CalleeDone { .. } => {}
3297 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3298 bail_bug!("invalid state for callee in `subtask.drop`")
3299 }
3300 }
3301 (Waitable::Host(id), true)
3302 } else {
3303 let id = TableId::<GuestTask>::new(rep);
3304 let task = concurrent_state.get_mut(id)?;
3305 if task.lift_result.is_some() {
3306 bail!(Trap::SubtaskDropNotResolved);
3307 }
3308 (
3309 Waitable::Guest(id),
3310 concurrent_state.get_mut(id)?.ready_to_delete(),
3311 )
3312 };
3313
3314 waitable.common(concurrent_state)?.handle = None;
3315
3316 if waitable.take_event(concurrent_state)?.is_some() {
3319 bail!(Trap::SubtaskDropNotResolved);
3320 }
3321
3322 if delete {
3323 waitable.delete_from(concurrent_state)?;
3324 }
3325
3326 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3327 Ok(())
3328 }
3329
3330 pub(crate) fn waitable_set_wait(
3332 self,
3333 store: &mut StoreOpaque,
3334 options: OptionsIndex,
3335 set: u32,
3336 payload: u32,
3337 ) -> Result<u32> {
3338 if !self.options(store, options).async_ {
3339 store.check_blocking()?;
3343 }
3344
3345 let &CanonicalOptions {
3346 cancellable,
3347 instance: caller_instance,
3348 ..
3349 } = &self.id().get(store).component().env_component().options[options];
3350 let rep = store
3351 .instance_state(self.runtime_instance(caller_instance))
3352 .handle_table()
3353 .waitable_set_rep(set)?;
3354
3355 self.waitable_check(
3356 store,
3357 cancellable,
3358 WaitableCheck::Wait,
3359 WaitableCheckParams {
3360 set: TableId::new(rep),
3361 options,
3362 payload,
3363 },
3364 )
3365 }
3366
3367 pub(crate) fn waitable_set_poll(
3369 self,
3370 store: &mut StoreOpaque,
3371 options: OptionsIndex,
3372 set: u32,
3373 payload: u32,
3374 ) -> Result<u32> {
3375 let &CanonicalOptions {
3376 cancellable,
3377 instance: caller_instance,
3378 ..
3379 } = &self.id().get(store).component().env_component().options[options];
3380 let rep = store
3381 .instance_state(self.runtime_instance(caller_instance))
3382 .handle_table()
3383 .waitable_set_rep(set)?;
3384
3385 self.waitable_check(
3386 store,
3387 cancellable,
3388 WaitableCheck::Poll,
3389 WaitableCheckParams {
3390 set: TableId::new(rep),
3391 options,
3392 payload,
3393 },
3394 )
3395 }
3396
3397 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3399 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3400 match store
3401 .concurrent_state_mut()
3402 .get_mut(thread_id)?
3403 .instance_rep
3404 {
3405 Some(r) => Ok(r),
3406 None => bail_bug!("thread should have instance_rep by now"),
3407 }
3408 }
3409
3410 pub(crate) fn thread_new_indirect<T: 'static>(
3412 self,
3413 mut store: StoreContextMut<T>,
3414 runtime_instance: RuntimeComponentInstanceIndex,
3415 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3417 start_func_idx: u32,
3418 context: i32,
3419 ) -> Result<u32> {
3420 log::trace!("creating new thread");
3421
3422 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3423 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3424 let callee = instance
3425 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3426 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3427 if callee.type_index(store.0) != start_func_ty.type_index() {
3428 bail!(Trap::ThreadNewIndirectInvalidType);
3429 }
3430
3431 let token = StoreToken::new(store.as_context_mut());
3432 let start_func = Box::new(
3433 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3434 let old_thread = store.set_thread(guest_thread)?;
3435 log::trace!(
3436 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3437 );
3438
3439 let mut store = token.as_context_mut(store);
3440 let mut params = [ValRaw::i32(context)];
3441 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3444
3445 store.0.set_thread(old_thread)?;
3446
3447 store.0.cleanup_thread(
3448 guest_thread,
3449 self.runtime_instance(runtime_instance),
3450 CleanupTask::Yes,
3451 )?;
3452 log::trace!("explicit thread {guest_thread:?} completed");
3453 let state = store.0.concurrent_state_mut();
3454 if let Some(t) = old_thread.guest() {
3455 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3456 }
3457 log::trace!("thread start: restored {old_thread:?} as current thread");
3458
3459 Ok(())
3460 },
3461 );
3462
3463 let state = store.0.concurrent_state_mut();
3464 let current_thread = state.current_guest_thread()?;
3465 let parent_task = current_thread.task;
3466
3467 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3468 let thread_id = state.push(new_thread)?;
3469 state.get_mut(parent_task)?.threads.insert(thread_id);
3470
3471 log::trace!("new thread with id {thread_id:?} created");
3472
3473 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3474 }
3475
3476 pub(crate) fn resume_thread(
3477 self,
3478 store: &mut StoreOpaque,
3479 runtime_instance: RuntimeComponentInstanceIndex,
3480 thread_idx: u32,
3481 high_priority: bool,
3482 allow_ready: bool,
3483 ) -> Result<()> {
3484 let thread_id =
3485 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3486 let state = store.concurrent_state_mut();
3487 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3488 let thread = state.get_mut(guest_thread.thread)?;
3489
3490 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3491 GuestThreadState::NotStartedExplicit(start_func) => {
3492 log::trace!("starting thread {guest_thread:?}");
3493 let guest_call = WorkItem::GuestCall(
3494 runtime_instance,
3495 GuestCall {
3496 thread: guest_thread,
3497 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3498 start_func(store, guest_thread)
3499 })),
3500 },
3501 );
3502 store
3503 .concurrent_state_mut()
3504 .push_work_item(guest_call, high_priority);
3505 }
3506 GuestThreadState::Suspended(fiber) => {
3507 log::trace!("resuming thread {thread_id:?} that was suspended");
3508 store
3509 .concurrent_state_mut()
3510 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3511 }
3512 GuestThreadState::Ready(fiber) if allow_ready => {
3513 log::trace!("resuming thread {thread_id:?} that was ready");
3514 thread.state = GuestThreadState::Ready(fiber);
3515 store
3516 .concurrent_state_mut()
3517 .promote_thread_work_item(guest_thread);
3518 }
3519 other => {
3520 thread.state = other;
3521 bail!(Trap::CannotResumeThread);
3522 }
3523 }
3524 Ok(())
3525 }
3526
3527 fn add_guest_thread_to_instance_table(
3528 self,
3529 thread_id: TableId<GuestThread>,
3530 store: &mut StoreOpaque,
3531 runtime_instance: RuntimeComponentInstanceIndex,
3532 ) -> Result<u32> {
3533 let guest_id = store
3534 .instance_state(self.runtime_instance(runtime_instance))
3535 .thread_handle_table()
3536 .guest_thread_insert(thread_id.rep())?;
3537 store
3538 .concurrent_state_mut()
3539 .get_mut(thread_id)?
3540 .instance_rep = Some(guest_id);
3541 Ok(guest_id)
3542 }
3543
3544 pub(crate) fn suspension_intrinsic(
3547 self,
3548 store: &mut StoreOpaque,
3549 caller: RuntimeComponentInstanceIndex,
3550 cancellable: bool,
3551 yielding: bool,
3552 to_thread: SuspensionTarget,
3553 ) -> Result<WaitResult> {
3554 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3555 if to_thread.is_none() {
3556 let state = store.concurrent_state_mut();
3557 if yielding {
3558 if !state.may_block(guest_thread.task)? {
3560 if !state.promote_instance_local_thread_work_item(caller) {
3563 return Ok(WaitResult::Completed);
3565 }
3566 }
3567 } else {
3568 store.check_blocking()?;
3572 }
3573 }
3574
3575 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3577 return Ok(WaitResult::Cancelled);
3578 }
3579
3580 match to_thread {
3581 SuspensionTarget::SomeSuspended(thread) => {
3582 self.resume_thread(store, caller, thread, true, false)?
3583 }
3584 SuspensionTarget::Some(thread) => {
3585 self.resume_thread(store, caller, thread, true, true)?
3586 }
3587 SuspensionTarget::None => { }
3588 }
3589
3590 let reason = if yielding {
3591 SuspendReason::Yielding {
3592 thread: guest_thread,
3593 skip_may_block_check: to_thread.is_some(),
3597 }
3598 } else {
3599 SuspendReason::ExplicitlySuspending {
3600 thread: guest_thread,
3601 skip_may_block_check: to_thread.is_some(),
3605 }
3606 };
3607
3608 store.suspend(reason)?;
3609
3610 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3611 Ok(WaitResult::Cancelled)
3612 } else {
3613 Ok(WaitResult::Completed)
3614 }
3615 }
3616
3617 fn waitable_check(
3619 self,
3620 store: &mut StoreOpaque,
3621 cancellable: bool,
3622 check: WaitableCheck,
3623 params: WaitableCheckParams,
3624 ) -> Result<u32> {
3625 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3626
3627 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3628
3629 let state = store.concurrent_state_mut();
3630 let task = state.get_mut(guest_thread.task)?;
3631
3632 match &check {
3635 WaitableCheck::Wait => {
3636 let set = params.set;
3637
3638 if (task.event.is_none()
3639 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3640 && state.get_mut(set)?.ready.is_empty()
3641 {
3642 if cancellable {
3643 let old = state
3644 .get_mut(guest_thread.thread)?
3645 .wake_on_cancel
3646 .replace(set);
3647 if !old.is_none() {
3648 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3649 }
3650 }
3651
3652 store.suspend(SuspendReason::Waiting {
3653 set,
3654 thread: guest_thread,
3655 skip_may_block_check: false,
3656 })?;
3657 }
3658 }
3659 WaitableCheck::Poll => {}
3660 }
3661
3662 log::trace!(
3663 "waitable check for {guest_thread:?}; set {:?}, part two",
3664 params.set
3665 );
3666
3667 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3669
3670 let (ordinal, handle, result) = match &check {
3671 WaitableCheck::Wait => {
3672 let (event, waitable) = match event {
3673 Some(p) => p,
3674 None => bail_bug!("event expected to be present"),
3675 };
3676 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3677 let (ordinal, result) = event.parts();
3678 (ordinal, handle, result)
3679 }
3680 WaitableCheck::Poll => {
3681 if let Some((event, waitable)) = event {
3682 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3683 let (ordinal, result) = event.parts();
3684 (ordinal, handle, result)
3685 } else {
3686 log::trace!(
3687 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3688 guest_thread.task,
3689 params.set
3690 );
3691 let (ordinal, result) = Event::None.parts();
3692 (ordinal, 0, result)
3693 }
3694 }
3695 };
3696 let memory = self.options_memory_mut(store, params.options);
3697 let ptr = func::validate_inbounds_dynamic(
3698 &CanonicalAbiInfo::POINTER_PAIR,
3699 memory,
3700 &ValRaw::u32(params.payload),
3701 )?;
3702 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3703 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3704 Ok(ordinal)
3705 }
3706
3707 pub(crate) fn subtask_cancel(
3709 self,
3710 store: &mut StoreOpaque,
3711 caller_instance: RuntimeComponentInstanceIndex,
3712 async_: bool,
3713 task_id: u32,
3714 ) -> Result<u32> {
3715 if !async_ {
3716 store.check_blocking()?;
3720 }
3721
3722 let (rep, is_host) = store
3723 .instance_state(self.runtime_instance(caller_instance))
3724 .handle_table()
3725 .subtask_rep(task_id)?;
3726 let waitable = if is_host {
3727 Waitable::Host(TableId::<HostTask>::new(rep))
3728 } else {
3729 Waitable::Guest(TableId::<GuestTask>::new(rep))
3730 };
3731 let concurrent_state = store.concurrent_state_mut();
3732
3733 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3734
3735 let needs_block;
3736 if let Waitable::Host(host_task) = waitable {
3737 let state = &mut concurrent_state.get_mut(host_task)?.state;
3738 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3739 HostTaskState::CalleeRunning(handle) => {
3746 handle.abort();
3747 needs_block = true;
3748 }
3749
3750 HostTaskState::CalleeDone { cancelled } => {
3753 if cancelled {
3754 bail!(Trap::SubtaskCancelAfterTerminal);
3755 } else {
3756 needs_block = false;
3759 }
3760 }
3761
3762 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3765 bail_bug!("invalid states for host callee")
3766 }
3767 }
3768 } else {
3769 let guest_task = TableId::<GuestTask>::new(rep);
3770 let task = concurrent_state.get_mut(guest_task)?;
3771 if !task.already_lowered_parameters() {
3772 store.cancel_guest_subtask_without_lowered_parameters(
3773 self.runtime_instance(caller_instance),
3774 guest_task,
3775 )?;
3776 return Ok(Status::StartCancelled as u32);
3777 } else if !task.returned_or_cancelled() {
3778 task.cancel_sent = true;
3781 task.event = Some(Event::Cancelled);
3786 let runtime_instance = task.instance.index;
3787 for thread in task.threads.clone() {
3788 let thread = QualifiedThreadId {
3789 task: guest_task,
3790 thread,
3791 };
3792 if let Some(set) = concurrent_state
3793 .get_mut(thread.thread)?
3794 .wake_on_cancel
3795 .take()
3796 {
3797 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3798 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3799 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3800 runtime_instance,
3801 GuestCall {
3802 thread,
3803 kind: GuestCallKind::DeliverEvent {
3804 instance,
3805 set: None,
3806 },
3807 },
3808 ),
3809 None => bail_bug!("thread not present in wake_on_cancel set"),
3810 };
3811 concurrent_state.push_high_priority(item);
3812
3813 let caller = concurrent_state.current_guest_thread()?;
3814 store.suspend(SuspendReason::Yielding {
3815 thread: caller,
3816 skip_may_block_check: false,
3819 })?;
3820 break;
3821 }
3822 }
3823
3824 needs_block = !store
3827 .concurrent_state_mut()
3828 .get_mut(guest_task)?
3829 .returned_or_cancelled()
3830 } else {
3831 needs_block = false;
3832 }
3833 };
3834
3835 if needs_block {
3839 if async_ {
3840 return Ok(BLOCKED);
3841 }
3842
3843 store.wait_for_event(waitable)?;
3847
3848 }
3850
3851 let event = waitable.take_event(store.concurrent_state_mut())?;
3852 if let Some(Event::Subtask {
3853 status: status @ (Status::Returned | Status::ReturnCancelled),
3854 }) = event
3855 {
3856 Ok(status as u32)
3857 } else {
3858 bail!(Trap::SubtaskCancelAfterTerminal);
3859 }
3860 }
3861}
3862
3863pub trait VMComponentAsyncStore {
3871 unsafe fn prepare_call(
3877 &mut self,
3878 instance: Instance,
3879 memory: *mut VMMemoryDefinition,
3880 start: NonNull<VMFuncRef>,
3881 return_: NonNull<VMFuncRef>,
3882 caller_instance: RuntimeComponentInstanceIndex,
3883 callee_instance: RuntimeComponentInstanceIndex,
3884 task_return_type: TypeTupleIndex,
3885 callee_async: bool,
3886 string_encoding: StringEncoding,
3887 result_count: u32,
3888 storage: *mut ValRaw,
3889 storage_len: usize,
3890 ) -> Result<()>;
3891
3892 unsafe fn sync_start(
3895 &mut self,
3896 instance: Instance,
3897 callback: *mut VMFuncRef,
3898 callee: NonNull<VMFuncRef>,
3899 param_count: u32,
3900 storage: *mut MaybeUninit<ValRaw>,
3901 storage_len: usize,
3902 ) -> Result<()>;
3903
3904 unsafe fn async_start(
3907 &mut self,
3908 instance: Instance,
3909 callback: *mut VMFuncRef,
3910 post_return: *mut VMFuncRef,
3911 callee: NonNull<VMFuncRef>,
3912 param_count: u32,
3913 result_count: u32,
3914 flags: u32,
3915 ) -> Result<u32>;
3916
3917 fn future_write(
3919 &mut self,
3920 instance: Instance,
3921 caller: RuntimeComponentInstanceIndex,
3922 ty: TypeFutureTableIndex,
3923 options: OptionsIndex,
3924 future: u32,
3925 address: u32,
3926 ) -> Result<u32>;
3927
3928 fn future_read(
3930 &mut self,
3931 instance: Instance,
3932 caller: RuntimeComponentInstanceIndex,
3933 ty: TypeFutureTableIndex,
3934 options: OptionsIndex,
3935 future: u32,
3936 address: u32,
3937 ) -> Result<u32>;
3938
3939 fn future_drop_writable(
3941 &mut self,
3942 instance: Instance,
3943 ty: TypeFutureTableIndex,
3944 writer: u32,
3945 ) -> Result<()>;
3946
3947 fn stream_write(
3949 &mut self,
3950 instance: Instance,
3951 caller: RuntimeComponentInstanceIndex,
3952 ty: TypeStreamTableIndex,
3953 options: OptionsIndex,
3954 stream: u32,
3955 address: u32,
3956 count: u32,
3957 ) -> Result<u32>;
3958
3959 fn stream_read(
3961 &mut self,
3962 instance: Instance,
3963 caller: RuntimeComponentInstanceIndex,
3964 ty: TypeStreamTableIndex,
3965 options: OptionsIndex,
3966 stream: u32,
3967 address: u32,
3968 count: u32,
3969 ) -> Result<u32>;
3970
3971 fn flat_stream_write(
3974 &mut self,
3975 instance: Instance,
3976 caller: RuntimeComponentInstanceIndex,
3977 ty: TypeStreamTableIndex,
3978 options: OptionsIndex,
3979 payload_size: u32,
3980 payload_align: u32,
3981 stream: u32,
3982 address: u32,
3983 count: u32,
3984 ) -> Result<u32>;
3985
3986 fn flat_stream_read(
3989 &mut self,
3990 instance: Instance,
3991 caller: RuntimeComponentInstanceIndex,
3992 ty: TypeStreamTableIndex,
3993 options: OptionsIndex,
3994 payload_size: u32,
3995 payload_align: u32,
3996 stream: u32,
3997 address: u32,
3998 count: u32,
3999 ) -> Result<u32>;
4000
4001 fn stream_drop_writable(
4003 &mut self,
4004 instance: Instance,
4005 ty: TypeStreamTableIndex,
4006 writer: u32,
4007 ) -> Result<()>;
4008
4009 fn error_context_debug_message(
4011 &mut self,
4012 instance: Instance,
4013 ty: TypeComponentLocalErrorContextTableIndex,
4014 options: OptionsIndex,
4015 err_ctx_handle: u32,
4016 debug_msg_address: u32,
4017 ) -> Result<()>;
4018
4019 fn thread_new_indirect(
4021 &mut self,
4022 instance: Instance,
4023 caller: RuntimeComponentInstanceIndex,
4024 func_ty_idx: TypeFuncIndex,
4025 start_func_table_idx: RuntimeTableIndex,
4026 start_func_idx: u32,
4027 context: i32,
4028 ) -> Result<u32>;
4029}
4030
4031impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4033 unsafe fn prepare_call(
4034 &mut self,
4035 instance: Instance,
4036 memory: *mut VMMemoryDefinition,
4037 start: NonNull<VMFuncRef>,
4038 return_: NonNull<VMFuncRef>,
4039 caller_instance: RuntimeComponentInstanceIndex,
4040 callee_instance: RuntimeComponentInstanceIndex,
4041 task_return_type: TypeTupleIndex,
4042 callee_async: bool,
4043 string_encoding: StringEncoding,
4044 result_count_or_max_if_async: u32,
4045 storage: *mut ValRaw,
4046 storage_len: usize,
4047 ) -> Result<()> {
4048 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
4052
4053 unsafe {
4054 instance.prepare_call(
4055 StoreContextMut(self),
4056 start,
4057 return_,
4058 caller_instance,
4059 callee_instance,
4060 task_return_type,
4061 callee_async,
4062 memory,
4063 string_encoding,
4064 match result_count_or_max_if_async {
4065 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4066 params,
4067 has_result: false,
4068 },
4069 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4070 params,
4071 has_result: true,
4072 },
4073 result_count => CallerInfo::Sync {
4074 params,
4075 result_count,
4076 },
4077 },
4078 )
4079 }
4080 }
4081
4082 unsafe fn sync_start(
4083 &mut self,
4084 instance: Instance,
4085 callback: *mut VMFuncRef,
4086 callee: NonNull<VMFuncRef>,
4087 param_count: u32,
4088 storage: *mut MaybeUninit<ValRaw>,
4089 storage_len: usize,
4090 ) -> Result<()> {
4091 unsafe {
4092 instance
4093 .start_call(
4094 StoreContextMut(self),
4095 callback,
4096 ptr::null_mut(),
4097 callee,
4098 param_count,
4099 1,
4100 START_FLAG_ASYNC_CALLEE,
4101 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4105 )
4106 .map(drop)
4107 }
4108 }
4109
4110 unsafe fn async_start(
4111 &mut self,
4112 instance: Instance,
4113 callback: *mut VMFuncRef,
4114 post_return: *mut VMFuncRef,
4115 callee: NonNull<VMFuncRef>,
4116 param_count: u32,
4117 result_count: u32,
4118 flags: u32,
4119 ) -> Result<u32> {
4120 unsafe {
4121 instance.start_call(
4122 StoreContextMut(self),
4123 callback,
4124 post_return,
4125 callee,
4126 param_count,
4127 result_count,
4128 flags,
4129 None,
4130 )
4131 }
4132 }
4133
4134 fn future_write(
4135 &mut self,
4136 instance: Instance,
4137 caller: RuntimeComponentInstanceIndex,
4138 ty: TypeFutureTableIndex,
4139 options: OptionsIndex,
4140 future: u32,
4141 address: u32,
4142 ) -> Result<u32> {
4143 instance
4144 .guest_write(
4145 StoreContextMut(self),
4146 caller,
4147 TransmitIndex::Future(ty),
4148 options,
4149 None,
4150 future,
4151 address,
4152 1,
4153 )
4154 .map(|result| result.encode())
4155 }
4156
4157 fn future_read(
4158 &mut self,
4159 instance: Instance,
4160 caller: RuntimeComponentInstanceIndex,
4161 ty: TypeFutureTableIndex,
4162 options: OptionsIndex,
4163 future: u32,
4164 address: u32,
4165 ) -> Result<u32> {
4166 instance
4167 .guest_read(
4168 StoreContextMut(self),
4169 caller,
4170 TransmitIndex::Future(ty),
4171 options,
4172 None,
4173 future,
4174 address,
4175 1,
4176 )
4177 .map(|result| result.encode())
4178 }
4179
4180 fn stream_write(
4181 &mut self,
4182 instance: Instance,
4183 caller: RuntimeComponentInstanceIndex,
4184 ty: TypeStreamTableIndex,
4185 options: OptionsIndex,
4186 stream: u32,
4187 address: u32,
4188 count: u32,
4189 ) -> Result<u32> {
4190 instance
4191 .guest_write(
4192 StoreContextMut(self),
4193 caller,
4194 TransmitIndex::Stream(ty),
4195 options,
4196 None,
4197 stream,
4198 address,
4199 count,
4200 )
4201 .map(|result| result.encode())
4202 }
4203
4204 fn stream_read(
4205 &mut self,
4206 instance: Instance,
4207 caller: RuntimeComponentInstanceIndex,
4208 ty: TypeStreamTableIndex,
4209 options: OptionsIndex,
4210 stream: u32,
4211 address: u32,
4212 count: u32,
4213 ) -> Result<u32> {
4214 instance
4215 .guest_read(
4216 StoreContextMut(self),
4217 caller,
4218 TransmitIndex::Stream(ty),
4219 options,
4220 None,
4221 stream,
4222 address,
4223 count,
4224 )
4225 .map(|result| result.encode())
4226 }
4227
4228 fn future_drop_writable(
4229 &mut self,
4230 instance: Instance,
4231 ty: TypeFutureTableIndex,
4232 writer: u32,
4233 ) -> Result<()> {
4234 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4235 }
4236
4237 fn flat_stream_write(
4238 &mut self,
4239 instance: Instance,
4240 caller: RuntimeComponentInstanceIndex,
4241 ty: TypeStreamTableIndex,
4242 options: OptionsIndex,
4243 payload_size: u32,
4244 payload_align: u32,
4245 stream: u32,
4246 address: u32,
4247 count: u32,
4248 ) -> Result<u32> {
4249 instance
4250 .guest_write(
4251 StoreContextMut(self),
4252 caller,
4253 TransmitIndex::Stream(ty),
4254 options,
4255 Some(FlatAbi {
4256 size: payload_size,
4257 align: payload_align,
4258 }),
4259 stream,
4260 address,
4261 count,
4262 )
4263 .map(|result| result.encode())
4264 }
4265
4266 fn flat_stream_read(
4267 &mut self,
4268 instance: Instance,
4269 caller: RuntimeComponentInstanceIndex,
4270 ty: TypeStreamTableIndex,
4271 options: OptionsIndex,
4272 payload_size: u32,
4273 payload_align: u32,
4274 stream: u32,
4275 address: u32,
4276 count: u32,
4277 ) -> Result<u32> {
4278 instance
4279 .guest_read(
4280 StoreContextMut(self),
4281 caller,
4282 TransmitIndex::Stream(ty),
4283 options,
4284 Some(FlatAbi {
4285 size: payload_size,
4286 align: payload_align,
4287 }),
4288 stream,
4289 address,
4290 count,
4291 )
4292 .map(|result| result.encode())
4293 }
4294
4295 fn stream_drop_writable(
4296 &mut self,
4297 instance: Instance,
4298 ty: TypeStreamTableIndex,
4299 writer: u32,
4300 ) -> Result<()> {
4301 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4302 }
4303
4304 fn error_context_debug_message(
4305 &mut self,
4306 instance: Instance,
4307 ty: TypeComponentLocalErrorContextTableIndex,
4308 options: OptionsIndex,
4309 err_ctx_handle: u32,
4310 debug_msg_address: u32,
4311 ) -> Result<()> {
4312 instance.error_context_debug_message(
4313 StoreContextMut(self),
4314 ty,
4315 options,
4316 err_ctx_handle,
4317 debug_msg_address,
4318 )
4319 }
4320
4321 fn thread_new_indirect(
4322 &mut self,
4323 instance: Instance,
4324 caller: RuntimeComponentInstanceIndex,
4325 func_ty_idx: TypeFuncIndex,
4326 start_func_table_idx: RuntimeTableIndex,
4327 start_func_idx: u32,
4328 context: i32,
4329 ) -> Result<u32> {
4330 instance.thread_new_indirect(
4331 StoreContextMut(self),
4332 caller,
4333 func_ty_idx,
4334 start_func_table_idx,
4335 start_func_idx,
4336 context,
4337 )
4338 }
4339}
4340
4341type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4342
4343pub(crate) struct HostTask {
4347 common: WaitableCommon,
4348
4349 caller: QualifiedThreadId,
4351
4352 call_context: CallContext,
4355
4356 state: HostTaskState,
4357}
4358
4359enum HostTaskState {
4360 CalleeStarted,
4365
4366 CalleeRunning(JoinHandle),
4371
4372 CalleeFinished(LiftedResult),
4376
4377 CalleeDone { cancelled: bool },
4380}
4381
4382impl HostTask {
4383 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4384 Self {
4385 common: WaitableCommon::default(),
4386 call_context: CallContext::default(),
4387 caller,
4388 state,
4389 }
4390 }
4391}
4392
4393impl TableDebug for HostTask {
4394 fn type_name() -> &'static str {
4395 "HostTask"
4396 }
4397}
4398
4399type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4400
4401enum Caller {
4403 Host {
4405 tx: Option<oneshot::Sender<LiftedResult>>,
4407 host_future_present: bool,
4410 caller: CurrentThread,
4414 },
4415 Guest {
4417 thread: QualifiedThreadId,
4419 },
4420}
4421
4422struct LiftResult {
4425 lift: RawLift,
4426 ty: TypeTupleIndex,
4427 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4428 string_encoding: StringEncoding,
4429}
4430
4431#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4436pub(crate) struct QualifiedThreadId {
4437 task: TableId<GuestTask>,
4438 thread: TableId<GuestThread>,
4439}
4440
4441impl QualifiedThreadId {
4442 fn qualify(
4443 state: &mut ConcurrentState,
4444 thread: TableId<GuestThread>,
4445 ) -> Result<QualifiedThreadId> {
4446 Ok(QualifiedThreadId {
4447 task: state.get_mut(thread)?.parent_task,
4448 thread,
4449 })
4450 }
4451}
4452
4453impl fmt::Debug for QualifiedThreadId {
4454 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4455 f.debug_tuple("QualifiedThreadId")
4456 .field(&self.task.rep())
4457 .field(&self.thread.rep())
4458 .finish()
4459 }
4460}
4461
4462enum GuestThreadState {
4463 NotStartedImplicit,
4464 NotStartedExplicit(
4465 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4466 ),
4467 Running,
4468 Suspended(StoreFiber<'static>),
4469 Ready(StoreFiber<'static>),
4470 Completed,
4471}
4472pub struct GuestThread {
4473 context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4476 parent_task: TableId<GuestTask>,
4478 wake_on_cancel: Option<TableId<WaitableSet>>,
4481 state: GuestThreadState,
4483 instance_rep: Option<u32>,
4486 sync_call_set: TableId<WaitableSet>,
4488}
4489
4490impl GuestThread {
4491 fn from_instance(
4494 state: Pin<&mut ComponentInstance>,
4495 caller_instance: RuntimeComponentInstanceIndex,
4496 guest_thread: u32,
4497 ) -> Result<TableId<Self>> {
4498 let rep = state.instance_states().0[caller_instance]
4499 .thread_handle_table()
4500 .guest_thread_rep(guest_thread)?;
4501 Ok(TableId::new(rep))
4502 }
4503
4504 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4505 let sync_call_set = state.push(WaitableSet::default())?;
4506 Ok(Self {
4507 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4508 parent_task,
4509 wake_on_cancel: None,
4510 state: GuestThreadState::NotStartedImplicit,
4511 instance_rep: None,
4512 sync_call_set,
4513 })
4514 }
4515
4516 fn new_explicit(
4517 state: &mut ConcurrentState,
4518 parent_task: TableId<GuestTask>,
4519 start_func: Box<
4520 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4521 >,
4522 ) -> Result<Self> {
4523 let sync_call_set = state.push(WaitableSet::default())?;
4524 Ok(Self {
4525 context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4526 parent_task,
4527 wake_on_cancel: None,
4528 state: GuestThreadState::NotStartedExplicit(start_func),
4529 instance_rep: None,
4530 sync_call_set,
4531 })
4532 }
4533}
4534
4535impl TableDebug for GuestThread {
4536 fn type_name() -> &'static str {
4537 "GuestThread"
4538 }
4539}
4540
4541enum SyncResult {
4542 NotProduced,
4543 Produced(Option<ValRaw>),
4544 Taken,
4545}
4546
4547impl SyncResult {
4548 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4549 Ok(match mem::replace(self, SyncResult::Taken) {
4550 SyncResult::NotProduced => None,
4551 SyncResult::Produced(val) => Some(val),
4552 SyncResult::Taken => {
4553 bail_bug!("attempted to take a synchronous result that was already taken")
4554 }
4555 })
4556 }
4557}
4558
4559#[derive(Debug)]
4560enum HostFutureState {
4561 NotApplicable,
4562 Live,
4563 Dropped,
4564}
4565
4566pub(crate) struct GuestTask {
4568 common: WaitableCommon,
4570 lower_params: Option<RawLower>,
4572 lift_result: Option<LiftResult>,
4574 result: Option<LiftedResult>,
4577 callback: Option<CallbackFn>,
4580 caller: Caller,
4582 call_context: CallContext,
4587 sync_result: SyncResult,
4590 cancel_sent: bool,
4593 starting_sent: bool,
4596 instance: RuntimeInstance,
4603 event: Option<Event>,
4606 exited: bool,
4608 threads: HashSet<TableId<GuestThread>>,
4610 host_future_state: HostFutureState,
4613 async_function: bool,
4616
4617 decremented_interesting_task_count: bool,
4618}
4619
4620impl GuestTask {
4621 fn already_lowered_parameters(&self) -> bool {
4622 self.lower_params.is_none()
4624 }
4625
4626 fn returned_or_cancelled(&self) -> bool {
4627 self.lift_result.is_none()
4629 }
4630
4631 fn ready_to_delete(&self) -> bool {
4632 let threads_completed = self.threads.is_empty();
4633 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4634 let pending_completion_event = matches!(
4635 self.common.event,
4636 Some(Event::Subtask {
4637 status: Status::Returned | Status::ReturnCancelled
4638 })
4639 );
4640 let ready = threads_completed
4641 && !has_sync_result
4642 && !pending_completion_event
4643 && !matches!(self.host_future_state, HostFutureState::Live);
4644 log::trace!(
4645 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4646 threads_completed,
4647 has_sync_result,
4648 pending_completion_event,
4649 self.host_future_state
4650 );
4651 ready
4652 }
4653
4654 fn new(
4655 state: &mut ConcurrentState,
4656 lower_params: RawLower,
4657 lift_result: LiftResult,
4658 caller: Caller,
4659 callback: Option<CallbackFn>,
4660 instance: RuntimeInstance,
4661 async_function: bool,
4662 ) -> Result<QualifiedThreadId> {
4663 let host_future_state = match &caller {
4664 Caller::Guest { .. } => HostFutureState::NotApplicable,
4665 Caller::Host {
4666 host_future_present,
4667 ..
4668 } => {
4669 if *host_future_present {
4670 HostFutureState::Live
4671 } else {
4672 HostFutureState::NotApplicable
4673 }
4674 }
4675 };
4676 let task = state.push(Self {
4677 common: WaitableCommon::default(),
4678 lower_params: Some(lower_params),
4679 lift_result: Some(lift_result),
4680 result: None,
4681 callback,
4682 caller,
4683 call_context: CallContext::default(),
4684 sync_result: SyncResult::NotProduced,
4685 cancel_sent: false,
4686 starting_sent: false,
4687 instance,
4688 event: None,
4689 exited: false,
4690 threads: HashSet::new(),
4691 host_future_state,
4692 async_function,
4693 decremented_interesting_task_count: false,
4694 })?;
4695 let new_thread = GuestThread::new_implicit(state, task)?;
4696 let thread = state.push(new_thread)?;
4697 state.get_mut(task)?.threads.insert(thread);
4698 state.interesting_tasks += 1;
4699 Ok(QualifiedThreadId { task, thread })
4700 }
4701}
4702
4703impl TableDebug for GuestTask {
4704 fn type_name() -> &'static str {
4705 "GuestTask"
4706 }
4707}
4708
4709#[derive(Default)]
4711struct WaitableCommon {
4712 event: Option<Event>,
4714 set: Option<TableId<WaitableSet>>,
4716 handle: Option<u32>,
4718}
4719
4720#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4722enum Waitable {
4723 Host(TableId<HostTask>),
4725 Guest(TableId<GuestTask>),
4727 Transmit(TableId<TransmitHandle>),
4729}
4730
4731impl Waitable {
4732 fn from_instance(
4735 state: Pin<&mut ComponentInstance>,
4736 caller_instance: RuntimeComponentInstanceIndex,
4737 waitable: u32,
4738 ) -> Result<Self> {
4739 use crate::runtime::vm::component::Waitable;
4740
4741 let (waitable, kind) = state.instance_states().0[caller_instance]
4742 .handle_table()
4743 .waitable_rep(waitable)?;
4744
4745 Ok(match kind {
4746 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4747 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4748 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4749 })
4750 }
4751
4752 fn rep(&self) -> u32 {
4754 match self {
4755 Self::Host(id) => id.rep(),
4756 Self::Guest(id) => id.rep(),
4757 Self::Transmit(id) => id.rep(),
4758 }
4759 }
4760
4761 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4765 log::trace!("waitable {self:?} join set {set:?}",);
4766
4767 let old = mem::replace(&mut self.common(state)?.set, set);
4768
4769 if let Some(old) = old {
4770 match *self {
4771 Waitable::Host(id) => state.remove_child(id, old),
4772 Waitable::Guest(id) => state.remove_child(id, old),
4773 Waitable::Transmit(id) => state.remove_child(id, old),
4774 }?;
4775
4776 state.get_mut(old)?.ready.remove(self);
4777 }
4778
4779 if let Some(set) = set {
4780 match *self {
4781 Waitable::Host(id) => state.add_child(id, set),
4782 Waitable::Guest(id) => state.add_child(id, set),
4783 Waitable::Transmit(id) => state.add_child(id, set),
4784 }?;
4785
4786 if self.common(state)?.event.is_some() {
4787 self.mark_ready(state)?;
4788 }
4789 }
4790
4791 Ok(())
4792 }
4793
4794 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4796 Ok(match self {
4797 Self::Host(id) => &mut state.get_mut(*id)?.common,
4798 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4799 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4800 })
4801 }
4802
4803 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4807 log::trace!("set event for {self:?}: {event:?}");
4808 self.common(state)?.event = event;
4809 self.mark_ready(state)
4810 }
4811
4812 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4814 let common = self.common(state)?;
4815 let event = common.event.take();
4816 if let Some(set) = self.common(state)?.set {
4817 state.get_mut(set)?.ready.remove(self);
4818 }
4819
4820 Ok(event)
4821 }
4822
4823 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4827 if let Some(set) = self.common(state)?.set {
4828 state.get_mut(set)?.ready.insert(*self);
4829 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4830 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4831 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4832
4833 let item = match mode {
4834 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4835 WaitMode::Callback(instance) => WorkItem::GuestCall(
4836 state.get_mut(thread.task)?.instance.index,
4837 GuestCall {
4838 thread,
4839 kind: GuestCallKind::DeliverEvent {
4840 instance,
4841 set: Some(set),
4842 },
4843 },
4844 ),
4845 };
4846 state.push_high_priority(item);
4847 }
4848 }
4849 Ok(())
4850 }
4851
4852 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4854 match self {
4855 Self::Host(task) => {
4856 log::trace!("delete host task {task:?}");
4857 state.delete(*task)?;
4858 }
4859 Self::Guest(task) => {
4860 log::trace!("delete guest task {task:?}");
4861 let task = state.delete(*task)?;
4862
4863 debug_assert!(task.decremented_interesting_task_count);
4870 }
4871 Self::Transmit(task) => {
4872 state.delete(*task)?;
4873 }
4874 }
4875
4876 Ok(())
4877 }
4878}
4879
4880impl fmt::Debug for Waitable {
4881 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4882 match self {
4883 Self::Host(id) => write!(f, "{id:?}"),
4884 Self::Guest(id) => write!(f, "{id:?}"),
4885 Self::Transmit(id) => write!(f, "{id:?}"),
4886 }
4887 }
4888}
4889
4890#[derive(Default)]
4892struct WaitableSet {
4893 ready: BTreeSet<Waitable>,
4895 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4897}
4898
4899impl TableDebug for WaitableSet {
4900 fn type_name() -> &'static str {
4901 "WaitableSet"
4902 }
4903}
4904
4905type RawLower =
4907 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4908
4909type RawLift = Box<
4911 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4912>;
4913
4914type LiftedResult = Box<dyn Any + Send + Sync>;
4918
4919struct DummyResult;
4922
4923#[derive(Default)]
4925pub struct ConcurrentInstanceState {
4926 backpressure: u16,
4928 do_not_enter: bool,
4930 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4933}
4934
4935impl ConcurrentInstanceState {
4936 pub fn pending_is_empty(&self) -> bool {
4937 self.pending.is_empty()
4938 }
4939}
4940
4941#[derive(Debug, Copy, Clone)]
4942pub(crate) enum CurrentThread {
4943 Guest(QualifiedThreadId),
4944 Host(TableId<HostTask>),
4945 None,
4946}
4947
4948impl CurrentThread {
4949 fn guest(&self) -> Option<&QualifiedThreadId> {
4950 match self {
4951 Self::Guest(id) => Some(id),
4952 _ => None,
4953 }
4954 }
4955
4956 fn host(&self) -> Option<TableId<HostTask>> {
4957 match self {
4958 Self::Host(id) => Some(*id),
4959 _ => None,
4960 }
4961 }
4962
4963 fn is_none(&self) -> bool {
4964 matches!(self, Self::None)
4965 }
4966}
4967
4968impl From<QualifiedThreadId> for CurrentThread {
4969 fn from(id: QualifiedThreadId) -> Self {
4970 Self::Guest(id)
4971 }
4972}
4973
4974impl From<TableId<HostTask>> for CurrentThread {
4975 fn from(id: TableId<HostTask>) -> Self {
4976 Self::Host(id)
4977 }
4978}
4979
4980pub struct ConcurrentState {
4982 current_thread: CurrentThread,
4984
4985 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4990 table: AlwaysMut<ResourceTable>,
4992 high_priority: Vec<WorkItem>,
4994 low_priority: VecDeque<WorkItem>,
4996 suspend_reason: Option<SuspendReason>,
5000 worker: Option<StoreFiber<'static>>,
5004 worker_item: Option<WorkerItem>,
5006
5007 global_error_context_ref_counts:
5020 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5021
5022 interesting_tasks: usize,
5035
5036 interesting_tasks_empty_waker: Option<Waker>,
5040}
5041
5042impl Default for ConcurrentState {
5043 fn default() -> Self {
5044 Self {
5045 current_thread: CurrentThread::None,
5046 table: AlwaysMut::new(ResourceTable::new()),
5047 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5048 high_priority: Vec::new(),
5049 low_priority: VecDeque::new(),
5050 suspend_reason: None,
5051 worker: None,
5052 worker_item: None,
5053 global_error_context_ref_counts: BTreeMap::new(),
5054 interesting_tasks: 0,
5055 interesting_tasks_empty_waker: None,
5056 }
5057 }
5058}
5059
5060impl ConcurrentState {
5061 pub(crate) fn take_fibers_and_futures(
5078 &mut self,
5079 fibers: &mut Vec<StoreFiber<'static>>,
5080 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5081 ) {
5082 for entry in self.table.get_mut().iter_mut() {
5083 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5084 for mode in mem::take(&mut set.waiting).into_values() {
5085 if let WaitMode::Fiber(fiber) = mode {
5086 fibers.push(fiber);
5087 }
5088 }
5089 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5090 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
5091 mem::replace(&mut thread.state, GuestThreadState::Completed)
5092 {
5093 fibers.push(fiber);
5094 }
5095 }
5096 }
5097
5098 if let Some(fiber) = self.worker.take() {
5099 fibers.push(fiber);
5100 }
5101
5102 let mut handle_item = |item| match item {
5103 WorkItem::ResumeFiber(fiber) => {
5104 fibers.push(fiber);
5105 }
5106 WorkItem::PushFuture(future) => {
5107 self.futures
5108 .get_mut()
5109 .as_mut()
5110 .unwrap()
5111 .push(future.into_inner());
5112 }
5113 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5114 }
5115 };
5116
5117 for item in mem::take(&mut self.high_priority) {
5118 handle_item(item);
5119 }
5120 for item in mem::take(&mut self.low_priority) {
5121 handle_item(item);
5122 }
5123
5124 if let Some(them) = self.futures.get_mut().take() {
5125 futures.push(them);
5126 }
5127 }
5128
5129 fn push<V: Send + Sync + 'static>(
5130 &mut self,
5131 value: V,
5132 ) -> Result<TableId<V>, ResourceTableError> {
5133 self.table.get_mut().push(value).map(TableId::from)
5134 }
5135
5136 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5137 self.table.get_mut().get_mut(&Resource::from(id))
5138 }
5139
5140 pub fn add_child<T: 'static, U: 'static>(
5141 &mut self,
5142 child: TableId<T>,
5143 parent: TableId<U>,
5144 ) -> Result<(), ResourceTableError> {
5145 self.table
5146 .get_mut()
5147 .add_child(Resource::from(child), Resource::from(parent))
5148 }
5149
5150 pub fn remove_child<T: 'static, U: 'static>(
5151 &mut self,
5152 child: TableId<T>,
5153 parent: TableId<U>,
5154 ) -> Result<(), ResourceTableError> {
5155 self.table
5156 .get_mut()
5157 .remove_child(Resource::from(child), Resource::from(parent))
5158 }
5159
5160 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5161 self.table.get_mut().delete(Resource::from(id))
5162 }
5163
5164 fn push_future(&mut self, future: HostTaskFuture) {
5165 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5172 }
5173
5174 fn push_high_priority(&mut self, item: WorkItem) {
5175 log::trace!("push high priority: {item:?}");
5176 self.high_priority.push(item);
5177 }
5178
5179 fn push_low_priority(&mut self, item: WorkItem) {
5180 log::trace!("push low priority: {item:?}");
5181 self.low_priority.push_front(item);
5182 }
5183
5184 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5185 if high_priority {
5186 self.push_high_priority(item);
5187 } else {
5188 self.push_low_priority(item);
5189 }
5190 }
5191
5192 fn promote_instance_local_thread_work_item(
5193 &mut self,
5194 current_instance: RuntimeComponentInstanceIndex,
5195 ) -> bool {
5196 self.promote_work_items_matching(|item: &WorkItem| match item {
5197 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5198 *instance == current_instance
5199 }
5200 _ => false,
5201 })
5202 }
5203
5204 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5205 self.promote_work_items_matching(|item: &WorkItem| match item {
5206 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5207 *t == thread
5208 }
5209 _ => false,
5210 })
5211 }
5212
5213 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5214 where
5215 F: FnMut(&WorkItem) -> bool,
5216 {
5217 if self.high_priority.iter().any(&mut predicate) {
5221 true
5222 }
5223 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5226 let item = self.low_priority.remove(idx).unwrap();
5227 self.push_high_priority(item);
5228 true
5229 } else {
5230 false
5231 }
5232 }
5233
5234 fn take_pending_cancellation(&mut self) -> Result<bool> {
5237 let thread = self.current_guest_thread()?;
5238 if let Some(event) = self.get_mut(thread.task)?.event.take() {
5239 assert!(matches!(event, Event::Cancelled));
5240 Ok(true)
5241 } else {
5242 Ok(false)
5243 }
5244 }
5245
5246 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5247 if self.may_block(task)? {
5248 Ok(())
5249 } else {
5250 Err(Trap::CannotBlockSyncTask.into())
5251 }
5252 }
5253
5254 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5255 let task = self.get_mut(task)?;
5256 Ok(task.async_function || task.returned_or_cancelled())
5257 }
5258
5259 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5265 let (task, is_host) = (task >> 1, task & 1 == 1);
5266 if is_host {
5267 let task: TableId<HostTask> = TableId::new(task);
5268 Ok(&mut self.get_mut(task)?.call_context)
5269 } else {
5270 let task: TableId<GuestTask> = TableId::new(task);
5271 Ok(&mut self.get_mut(task)?.call_context)
5272 }
5273 }
5274
5275 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5278 let (bits, is_host) = match self.current_thread {
5279 CurrentThread::Guest(id) => (id.task.rep(), false),
5280 CurrentThread::Host(id) => (id.rep(), true),
5281 CurrentThread::None => bail_bug!("current thread is not set"),
5282 };
5283 assert_eq!((bits << 1) >> 1, bits);
5284 Ok((bits << 1) | u32::from(is_host))
5285 }
5286
5287 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5288 match self.current_thread.guest() {
5289 Some(id) => Ok(*id),
5290 None => bail_bug!("current thread is not a guest thread"),
5291 }
5292 }
5293
5294 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5295 match self.current_thread.host() {
5296 Some(id) => Ok(id),
5297 None => bail_bug!("current thread is not a host thread"),
5298 }
5299 }
5300
5301 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5302 match self.futures.get_mut().as_mut() {
5303 Some(f) => Ok(f),
5304 None => bail_bug!("futures field of concurrent state is currently taken"),
5305 }
5306 }
5307
5308 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5309 self.table.get_mut()
5310 }
5311}
5312
5313fn for_any_lower<
5316 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5317>(
5318 fun: F,
5319) -> F {
5320 fun
5321}
5322
5323fn for_any_lift<
5325 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5326>(
5327 fun: F,
5328) -> F {
5329 fun
5330}
5331
5332fn checked<F: Future + Send + 'static>(
5337 id: StoreId,
5338 fut: F,
5339) -> impl Future<Output = F::Output> + Send + 'static {
5340 async move {
5341 let mut fut = pin!(fut);
5342 future::poll_fn(move |cx| {
5343 let message = "\
5344 `Future`s which depend on asynchronous component tasks, streams, or \
5345 futures to complete may only be polled from the event loop of the \
5346 store to which they belong. Please use \
5347 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5348 ";
5349 tls::try_get(|store| {
5350 let matched = match store {
5351 tls::TryGet::Some(store) => store.id() == id,
5352 tls::TryGet::Taken | tls::TryGet::None => false,
5353 };
5354
5355 if !matched {
5356 panic!("{message}")
5357 }
5358 });
5359 fut.as_mut().poll(cx)
5360 })
5361 .await
5362 }
5363}
5364
5365fn check_recursive_run() {
5368 tls::try_get(|store| {
5369 if !matches!(store, tls::TryGet::None) {
5370 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5371 }
5372 });
5373}
5374
5375fn unpack_callback_code(code: u32) -> (u32, u32) {
5376 (code & 0xF, code >> 4)
5377}
5378
5379struct WaitableCheckParams {
5383 set: TableId<WaitableSet>,
5384 options: OptionsIndex,
5385 payload: u32,
5386}
5387
5388enum WaitableCheck {
5391 Wait,
5392 Poll,
5393}
5394
5395pub(crate) struct PreparedCall<R> {
5397 handle: Func,
5399 thread: QualifiedThreadId,
5401 param_count: usize,
5403 rx: oneshot::Receiver<LiftedResult>,
5406 runtime_instance: RuntimeInstance,
5408 _phantom: PhantomData<R>,
5409}
5410
5411impl<R> PreparedCall<R> {
5412 pub(crate) fn task_id(&self) -> TaskId {
5414 TaskId {
5415 task: self.thread.task,
5416 runtime_instance: self.runtime_instance,
5417 }
5418 }
5419}
5420
5421pub(crate) struct TaskId {
5423 task: TableId<GuestTask>,
5424 runtime_instance: RuntimeInstance,
5425}
5426
5427impl TaskId {
5428 pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5434 let task = store.concurrent_state_mut().get_mut(self.task)?;
5435 let delete = if !task.already_lowered_parameters() {
5436 store.cancel_guest_subtask_without_lowered_parameters(
5437 self.runtime_instance,
5438 self.task,
5439 )?;
5440 true
5441 } else {
5442 task.host_future_state = HostFutureState::Dropped;
5443 task.ready_to_delete()
5444 };
5445 if delete {
5446 Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5447 }
5448 Ok(())
5449 }
5450}
5451
5452pub(crate) fn prepare_call<T, R>(
5458 mut store: StoreContextMut<T>,
5459 handle: Func,
5460 param_count: usize,
5461 host_future_present: bool,
5462 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5463 + Send
5464 + Sync
5465 + 'static,
5466 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5467 + Send
5468 + Sync
5469 + 'static,
5470) -> Result<PreparedCall<R>> {
5471 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5472
5473 let instance = handle.instance().id().get(store.0);
5474 let options = &instance.component().env_component().options[options];
5475 let ty = &instance.component().types()[ty];
5476 let async_function = ty.async_;
5477 let task_return_type = ty.results;
5478 let component_instance = raw_options.instance;
5479 let callback = options.callback.map(|i| instance.runtime_callback(i));
5480 let memory = options
5481 .memory()
5482 .map(|i| instance.runtime_memory(i))
5483 .map(SendSyncPtr::new);
5484 let string_encoding = options.string_encoding;
5485 let token = StoreToken::new(store.as_context_mut());
5486 let state = store.0.concurrent_state_mut();
5487
5488 let (tx, rx) = oneshot::channel();
5489
5490 let instance = handle.instance().runtime_instance(component_instance);
5491 let caller = state.current_thread;
5492 let thread = GuestTask::new(
5493 state,
5494 Box::new(for_any_lower(move |store, params| {
5495 lower_params(handle, token.as_context_mut(store), params)
5496 })),
5497 LiftResult {
5498 lift: Box::new(for_any_lift(move |store, result| {
5499 lift_result(handle, store, result)
5500 })),
5501 ty: task_return_type,
5502 memory,
5503 string_encoding,
5504 },
5505 Caller::Host {
5506 tx: Some(tx),
5507 host_future_present,
5508 caller,
5509 },
5510 callback.map(|callback| {
5511 let callback = SendSyncPtr::new(callback);
5512 let instance = handle.instance();
5513 Box::new(move |store: &mut dyn VMStore, event, handle| {
5514 let store = token.as_context_mut(store);
5515 unsafe { instance.call_callback(store, callback, event, handle) }
5518 }) as CallbackFn
5519 }),
5520 instance,
5521 async_function,
5522 )?;
5523
5524 if !store.0.may_enter(instance)? {
5525 bail!(Trap::CannotEnterComponent);
5526 }
5527
5528 Ok(PreparedCall {
5529 handle,
5530 thread,
5531 param_count,
5532 runtime_instance: instance,
5533 rx,
5534 _phantom: PhantomData,
5535 })
5536}
5537
5538pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5545 mut store: StoreContextMut<T>,
5546 prepared: PreparedCall<R>,
5547) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5548 let PreparedCall {
5549 handle,
5550 thread,
5551 param_count,
5552 rx,
5553 ..
5554 } = prepared;
5555
5556 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5557
5558 Ok(checked(
5559 store.0.id(),
5560 rx.map(move |result| match result {
5561 Ok(r) => match r.downcast() {
5562 Ok(r) => Ok(*r),
5563 Err(_) => bail_bug!("wrong type of value produced"),
5564 },
5565 Err(e) => Err(e.into()),
5566 }),
5567 ))
5568}
5569
5570fn queue_call0<T: 'static>(
5573 store: StoreContextMut<T>,
5574 handle: Func,
5575 guest_thread: QualifiedThreadId,
5576 param_count: usize,
5577) -> Result<()> {
5578 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5579 let is_concurrent = raw_options.async_;
5580 let callback = raw_options.callback;
5581 let instance = handle.instance();
5582 let callee = handle.lifted_core_func(store.0);
5583 let post_return = handle.post_return_core_func(store.0);
5584 let callback = callback.map(|i| {
5585 let instance = instance.id().get(store.0);
5586 SendSyncPtr::new(instance.runtime_callback(i))
5587 });
5588
5589 log::trace!("queueing call {guest_thread:?}");
5590
5591 unsafe {
5595 instance.queue_call(
5596 store,
5597 guest_thread,
5598 SendSyncPtr::new(callee),
5599 param_count,
5600 1,
5601 is_concurrent,
5602 callback,
5603 post_return.map(SendSyncPtr::new),
5604 )
5605 }
5606}