1use crate::component::func::{self, Func};
54use crate::component::store::StoreComponentInstanceId;
55use crate::component::{
56 ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{
61 CallContext, ComponentInstance, HandleTable, InstanceFlags, ResourceTables,
62};
63use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
64use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
65use anyhow::{Context as _, Result, anyhow, bail};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, Either, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::slice;
84use std::sync::Arc;
85use std::task::{Context, Poll, Waker};
86use std::vec::Vec;
87use table::{TableDebug, TableId};
88use wasmtime_environ::Trap;
89use wasmtime_environ::component::{
90 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
91 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96
97pub use abort::JoinHandle;
98pub use future_stream_any::{FutureAny, StreamAny};
99pub use futures_and_streams::{
100 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103};
104pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105
106mod abort;
107mod error_contexts;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113const BLOCKED: u32 = 0xffff_ffff;
116
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120 Starting = 0,
121 Started = 1,
122 Returned = 2,
123 StartCancelled = 3,
124 ReturnCancelled = 4,
125}
126
127impl Status {
128 pub fn pack(self, waitable: Option<u32>) -> u32 {
134 assert!(matches!(self, Status::Returned) == waitable.is_none());
135 let waitable = waitable.unwrap_or(0);
136 assert!(waitable < (1 << 28));
137 (waitable << 4) | (self as u32)
138 }
139}
140
141#[derive(Clone, Copy, Debug)]
144enum Event {
145 None,
146 Cancelled,
147 Subtask {
148 status: Status,
149 },
150 StreamRead {
151 code: ReturnCode,
152 pending: Option<(TypeStreamTableIndex, u32)>,
153 },
154 StreamWrite {
155 code: ReturnCode,
156 pending: Option<(TypeStreamTableIndex, u32)>,
157 },
158 FutureRead {
159 code: ReturnCode,
160 pending: Option<(TypeFutureTableIndex, u32)>,
161 },
162 FutureWrite {
163 code: ReturnCode,
164 pending: Option<(TypeFutureTableIndex, u32)>,
165 },
166}
167
168impl Event {
169 fn parts(self) -> (u32, u32) {
174 const EVENT_NONE: u32 = 0;
175 const EVENT_SUBTASK: u32 = 1;
176 const EVENT_STREAM_READ: u32 = 2;
177 const EVENT_STREAM_WRITE: u32 = 3;
178 const EVENT_FUTURE_READ: u32 = 4;
179 const EVENT_FUTURE_WRITE: u32 = 5;
180 const EVENT_CANCELLED: u32 = 6;
181 match self {
182 Event::None => (EVENT_NONE, 0),
183 Event::Cancelled => (EVENT_CANCELLED, 0),
184 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189 }
190 }
191}
192
193mod callback_code {
195 pub const EXIT: u32 = 0;
196 pub const YIELD: u32 = 1;
197 pub const WAIT: u32 = 2;
198}
199
200const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204
205pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211 store: StoreContextMut<'a, T>,
212 get_data: fn(&mut T) -> D::Data<'_>,
213}
214
215impl<'a, T, D> Access<'a, T, D>
216where
217 D: HasData + ?Sized,
218 T: 'static,
219{
220 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222 Self { store, get_data }
223 }
224
225 pub fn data_mut(&mut self) -> &mut T {
227 self.store.data_mut()
228 }
229
230 pub fn get(&mut self) -> D::Data<'_> {
232 (self.get_data)(self.data_mut())
233 }
234
235 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239 where
240 T: 'static,
241 {
242 let accessor = Accessor {
243 get_data: self.get_data,
244 token: StoreToken::new(self.store.as_context_mut()),
245 };
246 self.store
247 .as_context_mut()
248 .spawn_with_accessor(accessor, task)
249 }
250
251 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254 self.get_data
255 }
256}
257
258impl<'a, T, D> AsContext for Access<'a, T, D>
259where
260 D: HasData + ?Sized,
261 T: 'static,
262{
263 type Data = T;
264
265 fn as_context(&self) -> StoreContext<'_, T> {
266 self.store.as_context()
267 }
268}
269
270impl<'a, T, D> AsContextMut for Access<'a, T, D>
271where
272 D: HasData + ?Sized,
273 T: 'static,
274{
275 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276 self.store.as_context_mut()
277 }
278}
279
280pub struct Accessor<T: 'static, D = HasSelf<T>>
340where
341 D: HasData + ?Sized,
342{
343 token: StoreToken<T>,
344 get_data: fn(&mut T) -> D::Data<'_>,
345}
346
347pub trait AsAccessor {
364 type Data: 'static;
366
367 type AccessorData: HasData + ?Sized;
370
371 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373}
374
375impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376 type Data = T::Data;
377 type AccessorData = T::AccessorData;
378
379 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380 T::as_accessor(self)
381 }
382}
383
384impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385 type Data = T;
386 type AccessorData = D;
387
388 fn as_accessor(&self) -> &Accessor<T, D> {
389 self
390 }
391}
392
393const _: () = {
416 const fn assert<T: Send + Sync>() {}
417 assert::<Accessor<UnsafeCell<u32>>>();
418};
419
420impl<T> Accessor<T> {
421 pub(crate) fn new(token: StoreToken<T>) -> Self {
430 Self {
431 token,
432 get_data: |x| x,
433 }
434 }
435}
436
437impl<T, D> Accessor<T, D>
438where
439 D: HasData + ?Sized,
440{
441 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459 tls::get(|vmstore| {
460 fun(Access {
461 store: self.token.as_context_mut(vmstore),
462 get_data: self.get_data,
463 })
464 })
465 }
466
467 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470 self.get_data
471 }
472
473 pub fn with_getter<D2: HasData>(
490 &self,
491 get_data: fn(&mut T) -> D2::Data<'_>,
492 ) -> Accessor<T, D2> {
493 Accessor {
494 token: self.token,
495 get_data,
496 }
497 }
498
499 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515 where
516 T: 'static,
517 {
518 let accessor = self.clone_for_spawn();
519 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520 }
521
522 fn clone_for_spawn(&self) -> Self {
523 Self {
524 token: self.token,
525 get_data: self.get_data,
526 }
527 }
528}
529
530pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
542where
543 D: HasData + ?Sized,
544{
545 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
547}
548
549enum CallerInfo {
552 Async {
554 params: Vec<ValRaw>,
555 has_result: bool,
556 },
557 Sync {
559 params: Vec<ValRaw>,
560 result_count: u32,
561 },
562}
563
564enum WaitMode {
566 Fiber(StoreFiber<'static>),
568 Callback(Instance),
571}
572
573#[derive(Debug)]
575enum SuspendReason {
576 Waiting {
579 set: TableId<WaitableSet>,
580 thread: QualifiedThreadId,
581 skip_may_block_check: bool,
582 },
583 NeedWork,
586 Yielding { thread: QualifiedThreadId },
589 ExplicitlySuspending {
591 thread: QualifiedThreadId,
592 skip_may_block_check: bool,
593 },
594}
595
596enum GuestCallKind {
598 DeliverEvent {
601 instance: Instance,
603 set: Option<TableId<WaitableSet>>,
608 },
609 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
615 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
616}
617
618impl fmt::Debug for GuestCallKind {
619 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
620 match self {
621 Self::DeliverEvent { instance, set } => f
622 .debug_struct("DeliverEvent")
623 .field("instance", instance)
624 .field("set", set)
625 .finish(),
626 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
627 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
628 }
629 }
630}
631
632#[derive(Debug)]
634struct GuestCall {
635 thread: QualifiedThreadId,
636 kind: GuestCallKind,
637}
638
639impl GuestCall {
640 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
650 let instance = store
651 .concurrent_state_mut()
652 .get_mut(self.thread.task)?
653 .instance;
654 let state = store.instance_state(instance);
655
656 let ready = match &self.kind {
657 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
658 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
659 GuestCallKind::StartExplicit(_) => true,
660 };
661 log::trace!(
662 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
663 state.do_not_enter,
664 state.backpressure
665 );
666 Ok(ready)
667 }
668}
669
670enum WorkerItem {
672 GuestCall(GuestCall),
673 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
674}
675
676enum WorkItem {
679 PushFuture(AlwaysMut<HostTaskFuture>),
681 ResumeFiber(StoreFiber<'static>),
683 GuestCall(GuestCall),
685 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
687}
688
689impl fmt::Debug for WorkItem {
690 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
691 match self {
692 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
693 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
694 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
695 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
696 }
697 }
698}
699
700#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
702pub(crate) enum WaitResult {
703 Cancelled,
704 Completed,
705}
706
707pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> {
710 store.concurrent_state_mut().check_blocking()
711}
712
713pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
721 store: &mut dyn VMStore,
722 future: impl Future<Output = Result<R>> + Send + 'static,
723 caller_instance: RuntimeComponentInstanceIndex,
724) -> Result<R> {
725 let state = store.concurrent_state_mut();
726
727 let Some(caller) = state.guest_thread else {
731 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
732 Poll::Ready(result) => result,
733 Poll::Pending => {
734 unreachable!()
735 }
736 };
737 };
738
739 let old_result = state
742 .get_mut(caller.task)
743 .with_context(|| format!("bad handle: {caller:?}"))?
744 .result
745 .take();
746
747 let task = state.push(HostTask::new(caller_instance, None))?;
751
752 log::trace!("new host task child of {caller:?}: {task:?}");
753
754 let mut future = Box::pin(async move {
758 let result = future.await?;
759 tls::get(move |store| {
760 let state = store.concurrent_state_mut();
761 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
762
763 Waitable::Host(task).set_event(
764 state,
765 Some(Event::Subtask {
766 status: Status::Returned,
767 }),
768 )?;
769
770 Ok(())
771 })
772 }) as HostTaskFuture;
773
774 let poll = tls::set(store, || {
778 future
779 .as_mut()
780 .poll(&mut Context::from_waker(&Waker::noop()))
781 });
782
783 match poll {
784 Poll::Ready(result) => {
785 result?;
787 log::trace!("delete host task {task:?} (already ready)");
788 store.concurrent_state_mut().delete(task)?;
789 }
790 Poll::Pending => {
791 let state = store.concurrent_state_mut();
796 state.push_future(future);
797
798 let set = state.get_mut(caller.task)?.sync_call_set;
799 Waitable::Host(task).join(state, Some(set))?;
800
801 store.suspend(SuspendReason::Waiting {
802 set,
803 thread: caller,
804 skip_may_block_check: false,
805 })?;
806 }
807 }
808
809 Ok(*mem::replace(
811 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
812 old_result,
813 )
814 .unwrap()
815 .downcast()
816 .unwrap())
817}
818
819fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821 let mut next = Some(call);
822 while let Some(call) = next.take() {
823 match call.kind {
824 GuestCallKind::DeliverEvent { instance, set } => {
825 let (event, waitable) = instance
826 .get_event(store, call.thread.task, set, true)?
827 .unwrap();
828 let state = store.concurrent_state_mut();
829 let task = state.get_mut(call.thread.task)?;
830 let runtime_instance = task.instance;
831 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
832
833 log::trace!(
834 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
835 call.thread,
836 );
837
838 let old_thread = state.guest_thread.replace(call.thread);
839 log::trace!(
840 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
841 call.thread
842 );
843
844 store.maybe_push_call_context(call.thread.task)?;
845
846 store.enter_instance(runtime_instance);
847
848 let callback = store
849 .concurrent_state_mut()
850 .get_mut(call.thread.task)?
851 .callback
852 .take()
853 .unwrap();
854
855 let code = callback(store, runtime_instance.index, event, handle)?;
856
857 store
858 .concurrent_state_mut()
859 .get_mut(call.thread.task)?
860 .callback = Some(callback);
861
862 store.exit_instance(runtime_instance)?;
863
864 store.maybe_pop_call_context(call.thread.task)?;
865
866 next = instance.handle_callback_code(
867 store,
868 call.thread,
869 runtime_instance.index,
870 code,
871 )?;
872
873 store.concurrent_state_mut().guest_thread = old_thread;
874 log::trace!(
875 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
876 );
877 }
878 GuestCallKind::StartImplicit(fun) => {
879 next = fun(store)?;
880 }
881 GuestCallKind::StartExplicit(fun) => {
882 fun(store)?;
883 }
884 }
885 }
886
887 Ok(())
888}
889
890impl<T> Store<T> {
891 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
893 where
894 T: Send + 'static,
895 {
896 self.as_context_mut().run_concurrent(fun).await
897 }
898
899 #[doc(hidden)]
900 pub fn assert_concurrent_state_empty(&mut self) {
901 self.as_context_mut().assert_concurrent_state_empty();
902 }
903
904 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
906 where
907 T: 'static,
908 {
909 self.as_context_mut().spawn(task)
910 }
911}
912
913impl<T> StoreContextMut<'_, T> {
914 #[doc(hidden)]
923 pub fn assert_concurrent_state_empty(self) {
924 let store = self.0;
925 store
926 .store_data_mut()
927 .components
928 .assert_instance_states_empty();
929 let state = store.concurrent_state_mut();
930 assert!(
931 state.table.get_mut().is_empty(),
932 "non-empty table: {:?}",
933 state.table.get_mut()
934 );
935 assert!(state.high_priority.is_empty());
936 assert!(state.low_priority.is_empty());
937 assert!(state.guest_thread.is_none());
938 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
939 assert!(state.global_error_context_ref_counts.is_empty());
940 }
941
942 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
952 where
953 T: 'static,
954 {
955 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
956 self.spawn_with_accessor(accessor, task)
957 }
958
959 fn spawn_with_accessor<D>(
962 self,
963 accessor: Accessor<T, D>,
964 task: impl AccessorTask<T, D>,
965 ) -> JoinHandle
966 where
967 T: 'static,
968 D: HasData + ?Sized,
969 {
970 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
974 self.0
975 .concurrent_state_mut()
976 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
977 handle
978 }
979
980 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1061 where
1062 T: Send + 'static,
1063 {
1064 self.do_run_concurrent(fun, false).await
1065 }
1066
1067 pub(super) async fn run_concurrent_trap_on_idle<R>(
1068 self,
1069 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1070 ) -> Result<R>
1071 where
1072 T: Send + 'static,
1073 {
1074 self.do_run_concurrent(fun, true).await
1075 }
1076
1077 async fn do_run_concurrent<R>(
1078 mut self,
1079 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1080 trap_on_idle: bool,
1081 ) -> Result<R>
1082 where
1083 T: Send + 'static,
1084 {
1085 check_recursive_run();
1086 let token = StoreToken::new(self.as_context_mut());
1087
1088 struct Dropper<'a, T: 'static, V> {
1089 store: StoreContextMut<'a, T>,
1090 value: ManuallyDrop<V>,
1091 }
1092
1093 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1094 fn drop(&mut self) {
1095 tls::set(self.store.0, || {
1096 unsafe { ManuallyDrop::drop(&mut self.value) }
1101 });
1102 }
1103 }
1104
1105 let accessor = &Accessor::new(token);
1106 let dropper = &mut Dropper {
1107 store: self,
1108 value: ManuallyDrop::new(fun(accessor)),
1109 };
1110 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1112
1113 dropper
1114 .store
1115 .as_context_mut()
1116 .poll_until(future, trap_on_idle)
1117 .await
1118 }
1119
1120 async fn poll_until<R>(
1126 mut self,
1127 mut future: Pin<&mut impl Future<Output = R>>,
1128 trap_on_idle: bool,
1129 ) -> Result<R>
1130 where
1131 T: Send + 'static,
1132 {
1133 struct Reset<'a, T: 'static> {
1134 store: StoreContextMut<'a, T>,
1135 futures: Option<FuturesUnordered<HostTaskFuture>>,
1136 }
1137
1138 impl<'a, T> Drop for Reset<'a, T> {
1139 fn drop(&mut self) {
1140 if let Some(futures) = self.futures.take() {
1141 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1142 }
1143 }
1144 }
1145
1146 loop {
1147 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1151 let mut reset = Reset {
1152 store: self.as_context_mut(),
1153 futures,
1154 };
1155 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1156
1157 let result = future::poll_fn(|cx| {
1158 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1161 return Poll::Ready(Ok(Either::Left(value)));
1162 }
1163
1164 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1168 Poll::Ready(Some(output)) => {
1169 match output {
1170 Err(e) => return Poll::Ready(Err(e)),
1171 Ok(()) => {}
1172 }
1173 Poll::Ready(true)
1174 }
1175 Poll::Ready(None) => Poll::Ready(false),
1176 Poll::Pending => Poll::Pending,
1177 };
1178
1179 let state = reset.store.0.concurrent_state_mut();
1182 let ready = mem::take(&mut state.high_priority);
1183 let ready = if ready.is_empty() {
1184 let ready = mem::take(&mut state.low_priority);
1187 if ready.is_empty() {
1188 return match next {
1189 Poll::Ready(true) => {
1190 Poll::Ready(Ok(Either::Right(Vec::new())))
1196 }
1197 Poll::Ready(false) => {
1198 if let Poll::Ready(value) =
1202 tls::set(reset.store.0, || future.as_mut().poll(cx))
1203 {
1204 Poll::Ready(Ok(Either::Left(value)))
1205 } else {
1206 if trap_on_idle {
1212 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1215 } else {
1216 Poll::Pending
1220 }
1221 }
1222 }
1223 Poll::Pending => Poll::Pending,
1228 };
1229 } else {
1230 ready
1231 }
1232 } else {
1233 ready
1234 };
1235
1236 Poll::Ready(Ok(Either::Right(ready)))
1237 })
1238 .await;
1239
1240 drop(reset);
1244
1245 match result? {
1246 Either::Left(value) => break Ok(value),
1249 Either::Right(ready) => {
1252 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1253 store: StoreContextMut<'a, T>,
1254 ready: I,
1255 }
1256
1257 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1258 fn drop(&mut self) {
1259 while let Some(item) = self.ready.next() {
1260 match item {
1261 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1262 WorkItem::PushFuture(future) => {
1263 tls::set(self.store.0, move || drop(future))
1264 }
1265 _ => {}
1266 }
1267 }
1268 }
1269 }
1270
1271 let mut dispose = Dispose {
1272 store: self.as_context_mut(),
1273 ready: ready.into_iter(),
1274 };
1275
1276 while let Some(item) = dispose.ready.next() {
1277 dispose
1278 .store
1279 .as_context_mut()
1280 .handle_work_item(item)
1281 .await?;
1282 }
1283 }
1284 }
1285 }
1286 }
1287
1288 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1290 where
1291 T: Send,
1292 {
1293 log::trace!("handle work item {item:?}");
1294 match item {
1295 WorkItem::PushFuture(future) => {
1296 self.0
1297 .concurrent_state_mut()
1298 .futures
1299 .get_mut()
1300 .as_mut()
1301 .unwrap()
1302 .push(future.into_inner());
1303 }
1304 WorkItem::ResumeFiber(fiber) => {
1305 self.0.resume_fiber(fiber).await?;
1306 }
1307 WorkItem::GuestCall(call) => {
1308 if call.is_ready(self.0)? {
1309 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1310 } else {
1311 let state = self.0.concurrent_state_mut();
1312 let task = state.get_mut(call.thread.task)?;
1313 if !task.starting_sent {
1314 task.starting_sent = true;
1315 if let GuestCallKind::StartImplicit(_) = &call.kind {
1316 Waitable::Guest(call.thread.task).set_event(
1317 state,
1318 Some(Event::Subtask {
1319 status: Status::Starting,
1320 }),
1321 )?;
1322 }
1323 }
1324
1325 let instance = state.get_mut(call.thread.task)?.instance;
1326 self.0
1327 .instance_state(instance)
1328 .pending
1329 .insert(call.thread, call.kind);
1330 }
1331 }
1332 WorkItem::WorkerFunction(fun) => {
1333 self.run_on_worker(WorkerItem::Function(fun)).await?;
1334 }
1335 }
1336
1337 Ok(())
1338 }
1339
1340 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1342 where
1343 T: Send,
1344 {
1345 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1346 fiber
1347 } else {
1348 fiber::make_fiber(self.0, move |store| {
1349 loop {
1350 match store.concurrent_state_mut().worker_item.take().unwrap() {
1351 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1352 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1353 }
1354
1355 store.suspend(SuspendReason::NeedWork)?;
1356 }
1357 })?
1358 };
1359
1360 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1361 assert!(worker_item.is_none());
1362 *worker_item = Some(item);
1363
1364 self.0.resume_fiber(worker).await
1365 }
1366
1367 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1372 where
1373 T: 'static,
1374 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1375 + Send
1376 + Sync
1377 + 'static,
1378 R: Send + Sync + 'static,
1379 {
1380 let token = StoreToken::new(self);
1381 async move {
1382 let mut accessor = Accessor::new(token);
1383 closure(&mut accessor).await
1384 }
1385 }
1386}
1387
1388#[derive(Debug, Copy, Clone)]
1389pub struct RuntimeInstance {
1390 pub instance: ComponentInstanceId,
1391 pub index: RuntimeComponentInstanceIndex,
1392}
1393
1394impl StoreOpaque {
1395 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1398 StoreComponentInstanceId::new(self.id(), instance.instance)
1399 .get_mut(self)
1400 .instance_state(instance.index)
1401 .unwrap()
1402 .concurrent_state()
1403 }
1404
1405 fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1408 StoreComponentInstanceId::new(self.id(), instance.instance)
1409 .get_mut(self)
1410 .instance_state(instance.index)
1411 .unwrap()
1412 .handle_table()
1413 }
1414
1415 fn enter_instance(&mut self, instance: RuntimeInstance) {
1419 log::trace!("enter {instance:?}");
1420 self.instance_state(instance).do_not_enter = true;
1421 }
1422
1423 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1427 log::trace!("exit {instance:?}");
1428 self.instance_state(instance).do_not_enter = false;
1429 self.partition_pending(instance)
1430 }
1431
1432 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1437 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1438 let call = GuestCall { thread, kind };
1439 if call.is_ready(self)? {
1440 self.concurrent_state_mut()
1441 .push_high_priority(WorkItem::GuestCall(call));
1442 } else {
1443 self.instance_state(instance)
1444 .pending
1445 .insert(call.thread, call.kind);
1446 }
1447 }
1448
1449 Ok(())
1450 }
1451
1452 pub(crate) fn backpressure_modify(
1454 &mut self,
1455 caller_instance: RuntimeInstance,
1456 modify: impl FnOnce(u16) -> Option<u16>,
1457 ) -> Result<()> {
1458 let state = self.instance_state(caller_instance);
1459 let old = state.backpressure;
1460 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
1461 state.backpressure = new;
1462
1463 if old > 0 && new == 0 {
1464 self.partition_pending(caller_instance)?;
1467 }
1468
1469 Ok(())
1470 }
1471
1472 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1475 let old_thread = self.concurrent_state_mut().guest_thread;
1476 log::trace!("resume_fiber: save current thread {old_thread:?}");
1477
1478 let fiber = fiber::resolve_or_release(self, fiber).await?;
1479
1480 let state = self.concurrent_state_mut();
1481
1482 state.guest_thread = old_thread;
1483 if let Some(ref ot) = old_thread {
1484 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1485 }
1486 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1487
1488 if let Some(mut fiber) = fiber {
1489 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1490 match state.suspend_reason.take().unwrap() {
1492 SuspendReason::NeedWork => {
1493 if state.worker.is_none() {
1494 state.worker = Some(fiber);
1495 } else {
1496 fiber.dispose(self);
1497 }
1498 }
1499 SuspendReason::Yielding { thread, .. } => {
1500 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1501 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1502 }
1503 SuspendReason::ExplicitlySuspending { thread, .. } => {
1504 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1505 }
1506 SuspendReason::Waiting { set, thread, .. } => {
1507 let old = state
1508 .get_mut(set)?
1509 .waiting
1510 .insert(thread, WaitMode::Fiber(fiber));
1511 assert!(old.is_none());
1512 }
1513 };
1514 } else {
1515 log::trace!("resume_fiber: fiber has exited");
1516 }
1517
1518 Ok(())
1519 }
1520
1521 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1527 log::trace!("suspend fiber: {reason:?}");
1528
1529 let task = match &reason {
1533 SuspendReason::Yielding { thread, .. }
1534 | SuspendReason::Waiting { thread, .. }
1535 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1536 SuspendReason::NeedWork => None,
1537 };
1538
1539 let old_guest_thread = if let Some(task) = task {
1540 self.maybe_pop_call_context(task)?;
1541 self.concurrent_state_mut().guest_thread
1542 } else {
1543 None
1544 };
1545
1546 assert!(
1552 matches!(
1553 reason,
1554 SuspendReason::ExplicitlySuspending {
1555 skip_may_block_check: true,
1556 ..
1557 } | SuspendReason::Waiting {
1558 skip_may_block_check: true,
1559 ..
1560 }
1561 ) || old_guest_thread
1562 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1563 .unwrap_or(true)
1564 );
1565
1566 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1567 assert!(suspend_reason.is_none());
1568 *suspend_reason = Some(reason);
1569
1570 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1571
1572 if let Some(task) = task {
1573 self.concurrent_state_mut().guest_thread = old_guest_thread;
1574 self.maybe_push_call_context(task)?;
1575 }
1576
1577 Ok(())
1578 }
1579
1580 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1584 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1585
1586 if !task.returned_or_cancelled() {
1587 log::trace!("push call context for {guest_task:?}");
1588 let call_context = task.call_context.take().unwrap();
1589 self.component_resource_state().0.push(call_context);
1590 }
1591 Ok(())
1592 }
1593
1594 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1598 if !self
1599 .concurrent_state_mut()
1600 .get_mut(guest_task)?
1601 .returned_or_cancelled()
1602 {
1603 log::trace!("pop call context for {guest_task:?}");
1604 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1605 self.concurrent_state_mut()
1606 .get_mut(guest_task)?
1607 .call_context = call_context;
1608 }
1609 Ok(())
1610 }
1611
1612 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1613 let state = self.concurrent_state_mut();
1614 let caller = state.guest_thread.unwrap();
1615 let old_set = waitable.common(state)?.set;
1616 let set = state.get_mut(caller.task)?.sync_call_set;
1617 waitable.join(state, Some(set))?;
1618 self.suspend(SuspendReason::Waiting {
1619 set,
1620 thread: caller,
1621 skip_may_block_check: false,
1622 })?;
1623 let state = self.concurrent_state_mut();
1624 waitable.join(state, old_set)
1625 }
1626}
1627
1628impl Instance {
1629 fn get_event(
1632 self,
1633 store: &mut StoreOpaque,
1634 guest_task: TableId<GuestTask>,
1635 set: Option<TableId<WaitableSet>>,
1636 cancellable: bool,
1637 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1638 let state = store.concurrent_state_mut();
1639
1640 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1641 log::trace!("deliver event {event:?} to {guest_task:?}");
1642
1643 if cancellable || !matches!(event, Event::Cancelled) {
1644 return Ok(Some((event, None)));
1645 } else {
1646 state.get_mut(guest_task)?.event = Some(event);
1647 }
1648 }
1649
1650 Ok(
1651 if let Some((set, waitable)) = set
1652 .and_then(|set| {
1653 state
1654 .get_mut(set)
1655 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1656 .transpose()
1657 })
1658 .transpose()?
1659 {
1660 let common = waitable.common(state)?;
1661 let handle = common.handle.unwrap();
1662 let event = common.event.take().unwrap();
1663
1664 log::trace!(
1665 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1666 );
1667
1668 waitable.on_delivery(store, self, event);
1669
1670 Some((event, Some((waitable, handle))))
1671 } else {
1672 None
1673 },
1674 )
1675 }
1676
1677 fn handle_callback_code(
1683 self,
1684 store: &mut StoreOpaque,
1685 guest_thread: QualifiedThreadId,
1686 runtime_instance: RuntimeComponentInstanceIndex,
1687 code: u32,
1688 ) -> Result<Option<GuestCall>> {
1689 let (code, set) = unpack_callback_code(code);
1690
1691 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1692
1693 let state = store.concurrent_state_mut();
1694
1695 let get_set = |store: &mut StoreOpaque, handle| {
1696 if handle == 0 {
1697 bail!("invalid waitable-set handle");
1698 }
1699
1700 let set = store
1701 .handle_table(RuntimeInstance {
1702 instance: self.id().instance(),
1703 index: runtime_instance,
1704 })
1705 .waitable_set_rep(handle)?;
1706
1707 Ok(TableId::<WaitableSet>::new(set))
1708 };
1709
1710 Ok(match code {
1711 callback_code::EXIT => {
1712 log::trace!("implicit thread {guest_thread:?} completed");
1713 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1714 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1715 if task.threads.is_empty() && !task.returned_or_cancelled() {
1716 bail!(Trap::NoAsyncResult);
1717 }
1718 match &task.caller {
1719 Caller::Host { .. } => {
1720 if task.ready_to_delete() {
1721 Waitable::Guest(guest_thread.task)
1722 .delete_from(store.concurrent_state_mut())?;
1723 }
1724 }
1725 Caller::Guest { .. } => {
1726 task.exited = true;
1727 task.callback = None;
1728 }
1729 }
1730 None
1731 }
1732 callback_code::YIELD => {
1733 let task = state.get_mut(guest_thread.task)?;
1734 if let Some(event) = task.event {
1739 assert!(matches!(event, Event::None | Event::Cancelled));
1740 } else {
1741 task.event = Some(Event::None);
1742 }
1743 let call = GuestCall {
1744 thread: guest_thread,
1745 kind: GuestCallKind::DeliverEvent {
1746 instance: self,
1747 set: None,
1748 },
1749 };
1750 if state.may_block(guest_thread.task) {
1751 state.push_low_priority(WorkItem::GuestCall(call));
1754 None
1755 } else {
1756 Some(call)
1760 }
1761 }
1762 callback_code::WAIT => {
1763 state.check_blocking_for(guest_thread.task)?;
1766
1767 let set = get_set(store, set)?;
1768 let state = store.concurrent_state_mut();
1769
1770 if state.get_mut(guest_thread.task)?.event.is_some()
1771 || !state.get_mut(set)?.ready.is_empty()
1772 {
1773 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1775 thread: guest_thread,
1776 kind: GuestCallKind::DeliverEvent {
1777 instance: self,
1778 set: Some(set),
1779 },
1780 }));
1781 } else {
1782 let old = state
1790 .get_mut(guest_thread.thread)?
1791 .wake_on_cancel
1792 .replace(set);
1793 assert!(old.is_none());
1794 let old = state
1795 .get_mut(set)?
1796 .waiting
1797 .insert(guest_thread, WaitMode::Callback(self));
1798 assert!(old.is_none());
1799 }
1800 None
1801 }
1802 _ => bail!("unsupported callback code: {code}"),
1803 })
1804 }
1805
1806 fn cleanup_thread(
1807 self,
1808 store: &mut StoreOpaque,
1809 guest_thread: QualifiedThreadId,
1810 runtime_instance: RuntimeComponentInstanceIndex,
1811 ) -> Result<()> {
1812 let guest_id = store
1813 .concurrent_state_mut()
1814 .get_mut(guest_thread.thread)?
1815 .instance_rep;
1816 store
1817 .handle_table(RuntimeInstance {
1818 instance: self.id().instance(),
1819 index: runtime_instance,
1820 })
1821 .guest_thread_remove(guest_id.unwrap())?;
1822
1823 store.concurrent_state_mut().delete(guest_thread.thread)?;
1824 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1825 task.threads.remove(&guest_thread.thread);
1826 Ok(())
1827 }
1828
1829 unsafe fn queue_call<T: 'static>(
1836 self,
1837 mut store: StoreContextMut<T>,
1838 guest_thread: QualifiedThreadId,
1839 callee: SendSyncPtr<VMFuncRef>,
1840 param_count: usize,
1841 result_count: usize,
1842 flags: Option<InstanceFlags>,
1843 async_: bool,
1844 callback: Option<SendSyncPtr<VMFuncRef>>,
1845 post_return: Option<SendSyncPtr<VMFuncRef>>,
1846 ) -> Result<()> {
1847 unsafe fn make_call<T: 'static>(
1862 store: StoreContextMut<T>,
1863 guest_thread: QualifiedThreadId,
1864 callee: SendSyncPtr<VMFuncRef>,
1865 param_count: usize,
1866 result_count: usize,
1867 flags: Option<InstanceFlags>,
1868 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1869 + Send
1870 + Sync
1871 + 'static
1872 + use<T> {
1873 let token = StoreToken::new(store);
1874 move |store: &mut dyn VMStore| {
1875 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1876
1877 store
1878 .concurrent_state_mut()
1879 .get_mut(guest_thread.thread)?
1880 .state = GuestThreadState::Running;
1881 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1882 let may_enter_after_call = task.call_post_return_automatically();
1883 let lower = task.lower_params.take().unwrap();
1884
1885 lower(store, &mut storage[..param_count])?;
1886
1887 let mut store = token.as_context_mut(store);
1888
1889 unsafe {
1892 if let Some(mut flags) = flags {
1893 flags.set_may_enter(false);
1894 }
1895 crate::Func::call_unchecked_raw(
1896 &mut store,
1897 callee.as_non_null(),
1898 NonNull::new(
1899 &mut storage[..param_count.max(result_count)]
1900 as *mut [MaybeUninit<ValRaw>] as _,
1901 )
1902 .unwrap(),
1903 )?;
1904 if let Some(mut flags) = flags {
1905 flags.set_may_enter(may_enter_after_call);
1906 }
1907 }
1908
1909 Ok(storage)
1910 }
1911 }
1912
1913 let call = unsafe {
1917 make_call(
1918 store.as_context_mut(),
1919 guest_thread,
1920 callee,
1921 param_count,
1922 result_count,
1923 flags,
1924 )
1925 };
1926
1927 let callee_instance = store
1928 .0
1929 .concurrent_state_mut()
1930 .get_mut(guest_thread.task)?
1931 .instance;
1932
1933 let fun = if callback.is_some() {
1934 assert!(async_);
1935
1936 Box::new(move |store: &mut dyn VMStore| {
1937 self.add_guest_thread_to_instance_table(
1938 guest_thread.thread,
1939 store,
1940 callee_instance.index,
1941 )?;
1942 let old_thread = store
1943 .concurrent_state_mut()
1944 .guest_thread
1945 .replace(guest_thread);
1946 log::trace!(
1947 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1948 );
1949
1950 store.maybe_push_call_context(guest_thread.task)?;
1951
1952 store.enter_instance(callee_instance);
1953
1954 let storage = call(store)?;
1961
1962 store.exit_instance(callee_instance)?;
1963
1964 store.maybe_pop_call_context(guest_thread.task)?;
1965
1966 let state = store.concurrent_state_mut();
1967 state.guest_thread = old_thread;
1968 old_thread
1969 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1970 log::trace!("stackless call: restored {old_thread:?} as current thread");
1971
1972 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1975
1976 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
1977 })
1978 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
1979 } else {
1980 let token = StoreToken::new(store.as_context_mut());
1981 Box::new(move |store: &mut dyn VMStore| {
1982 self.add_guest_thread_to_instance_table(
1983 guest_thread.thread,
1984 store,
1985 callee_instance.index,
1986 )?;
1987 let old_thread = store
1988 .concurrent_state_mut()
1989 .guest_thread
1990 .replace(guest_thread);
1991 log::trace!(
1992 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1993 );
1994 let mut flags = self.id().get(store).instance_flags(callee_instance.index);
1995
1996 store.maybe_push_call_context(guest_thread.task)?;
1997
1998 if !async_ {
2002 store.enter_instance(callee_instance);
2003 }
2004
2005 let storage = call(store)?;
2012
2013 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2015
2016 if async_ {
2017 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2018 if task.threads.is_empty() && !task.returned_or_cancelled() {
2019 bail!(Trap::NoAsyncResult);
2020 }
2021 } else {
2022 let lift = {
2028 store.exit_instance(callee_instance)?;
2029
2030 let state = store.concurrent_state_mut();
2031 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2032
2033 state
2034 .get_mut(guest_thread.task)?
2035 .lift_result
2036 .take()
2037 .unwrap()
2038 };
2039
2040 let result = (lift.lift)(store, unsafe {
2043 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2044 &storage[..result_count],
2045 )
2046 })?;
2047
2048 let post_return_arg = match result_count {
2049 0 => ValRaw::i32(0),
2050 1 => unsafe { storage[0].assume_init() },
2053 _ => unreachable!(),
2054 };
2055
2056 if store
2057 .concurrent_state_mut()
2058 .get_mut(guest_thread.task)?
2059 .call_post_return_automatically()
2060 {
2061 unsafe {
2062 flags.set_may_leave(false);
2063 flags.set_needs_post_return(false);
2064 }
2065
2066 if let Some(func) = post_return {
2067 let mut store = token.as_context_mut(store);
2068
2069 unsafe {
2075 crate::Func::call_unchecked_raw(
2076 &mut store,
2077 func.as_non_null(),
2078 slice::from_ref(&post_return_arg).into(),
2079 )?;
2080 }
2081 }
2082
2083 unsafe {
2084 flags.set_may_leave(true);
2085 flags.set_may_enter(true);
2086 }
2087 }
2088
2089 self.task_complete(
2090 store,
2091 guest_thread.task,
2092 result,
2093 Status::Returned,
2094 post_return_arg,
2095 )?;
2096 }
2097
2098 store.maybe_pop_call_context(guest_thread.task)?;
2099
2100 let state = store.concurrent_state_mut();
2101 let task = state.get_mut(guest_thread.task)?;
2102
2103 match &task.caller {
2104 Caller::Host { .. } => {
2105 if task.ready_to_delete() {
2106 Waitable::Guest(guest_thread.task).delete_from(state)?;
2107 }
2108 }
2109 Caller::Guest { .. } => {
2110 task.exited = true;
2111 }
2112 }
2113
2114 Ok(None)
2115 })
2116 };
2117
2118 store
2119 .0
2120 .concurrent_state_mut()
2121 .push_high_priority(WorkItem::GuestCall(GuestCall {
2122 thread: guest_thread,
2123 kind: GuestCallKind::StartImplicit(fun),
2124 }));
2125
2126 Ok(())
2127 }
2128
2129 unsafe fn prepare_call<T: 'static>(
2142 self,
2143 mut store: StoreContextMut<T>,
2144 start: *mut VMFuncRef,
2145 return_: *mut VMFuncRef,
2146 caller_instance: RuntimeComponentInstanceIndex,
2147 callee_instance: RuntimeComponentInstanceIndex,
2148 task_return_type: TypeTupleIndex,
2149 callee_async: bool,
2150 memory: *mut VMMemoryDefinition,
2151 string_encoding: u8,
2152 caller_info: CallerInfo,
2153 ) -> Result<()> {
2154 self.id().get(store.0).check_may_leave(caller_instance)?;
2155
2156 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2157 store.0.concurrent_state_mut().check_blocking()?;
2161 }
2162
2163 enum ResultInfo {
2164 Heap { results: u32 },
2165 Stack { result_count: u32 },
2166 }
2167
2168 let result_info = match &caller_info {
2169 CallerInfo::Async {
2170 has_result: true,
2171 params,
2172 } => ResultInfo::Heap {
2173 results: params.last().unwrap().get_u32(),
2174 },
2175 CallerInfo::Async {
2176 has_result: false, ..
2177 } => ResultInfo::Stack { result_count: 0 },
2178 CallerInfo::Sync {
2179 result_count,
2180 params,
2181 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2182 results: params.last().unwrap().get_u32(),
2183 },
2184 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2185 result_count: *result_count,
2186 },
2187 };
2188
2189 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2190
2191 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2195 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2196 let token = StoreToken::new(store.as_context_mut());
2197 let state = store.0.concurrent_state_mut();
2198 let old_thread = state.guest_thread.take();
2199 let new_task = GuestTask::new(
2200 state,
2201 Box::new(move |store, dst| {
2202 let mut store = token.as_context_mut(store);
2203 assert!(dst.len() <= MAX_FLAT_PARAMS);
2204 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2206 let count = match caller_info {
2207 CallerInfo::Async { params, has_result } => {
2211 let params = ¶ms[..params.len() - usize::from(has_result)];
2212 for (param, src) in params.iter().zip(&mut src) {
2213 src.write(*param);
2214 }
2215 params.len()
2216 }
2217
2218 CallerInfo::Sync { params, .. } => {
2220 for (param, src) in params.iter().zip(&mut src) {
2221 src.write(*param);
2222 }
2223 params.len()
2224 }
2225 };
2226 unsafe {
2233 crate::Func::call_unchecked_raw(
2234 &mut store,
2235 start.as_non_null(),
2236 NonNull::new(
2237 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2238 )
2239 .unwrap(),
2240 )?;
2241 }
2242 dst.copy_from_slice(&src[..dst.len()]);
2243 let state = store.0.concurrent_state_mut();
2244 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2245 state,
2246 Some(Event::Subtask {
2247 status: Status::Started,
2248 }),
2249 )?;
2250 Ok(())
2251 }),
2252 LiftResult {
2253 lift: Box::new(move |store, src| {
2254 let mut store = token.as_context_mut(store);
2257 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2259 my_src.push(ValRaw::u32(*results));
2260 }
2261 unsafe {
2268 crate::Func::call_unchecked_raw(
2269 &mut store,
2270 return_.as_non_null(),
2271 my_src.as_mut_slice().into(),
2272 )?;
2273 }
2274 let state = store.0.concurrent_state_mut();
2275 let thread = state.guest_thread.unwrap();
2276 if sync_caller {
2277 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2278 if let ResultInfo::Stack { result_count } = &result_info {
2279 match result_count {
2280 0 => None,
2281 1 => Some(my_src[0]),
2282 _ => unreachable!(),
2283 }
2284 } else {
2285 None
2286 },
2287 );
2288 }
2289 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2290 }),
2291 ty: task_return_type,
2292 memory: NonNull::new(memory).map(SendSyncPtr::new),
2293 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2294 },
2295 Caller::Guest {
2296 thread: old_thread.unwrap(),
2297 instance: caller_instance,
2298 },
2299 None,
2300 self,
2301 callee_instance,
2302 callee_async,
2303 )?;
2304
2305 let guest_task = state.push(new_task)?;
2306 let new_thread = GuestThread::new_implicit(guest_task);
2307 let guest_thread = state.push(new_thread)?;
2308 state.get_mut(guest_task)?.threads.insert(guest_thread);
2309
2310 let state = store.0.concurrent_state_mut();
2311 if let Some(old_thread) = old_thread {
2312 if !state.may_enter(guest_task) {
2313 bail!(crate::Trap::CannotEnterComponent);
2314 }
2315
2316 state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2317 };
2318
2319 state.guest_thread = Some(QualifiedThreadId {
2322 task: guest_task,
2323 thread: guest_thread,
2324 });
2325 log::trace!(
2326 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2327 );
2328
2329 Ok(())
2330 }
2331
2332 unsafe fn call_callback<T>(
2337 self,
2338 mut store: StoreContextMut<T>,
2339 callee_instance: RuntimeComponentInstanceIndex,
2340 function: SendSyncPtr<VMFuncRef>,
2341 event: Event,
2342 handle: u32,
2343 may_enter_after_call: bool,
2344 ) -> Result<u32> {
2345 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2346
2347 let (ordinal, result) = event.parts();
2348 let params = &mut [
2349 ValRaw::u32(ordinal),
2350 ValRaw::u32(handle),
2351 ValRaw::u32(result),
2352 ];
2353 unsafe {
2358 flags.set_may_enter(false);
2359 crate::Func::call_unchecked_raw(
2360 &mut store,
2361 function.as_non_null(),
2362 params.as_mut_slice().into(),
2363 )?;
2364 flags.set_may_enter(may_enter_after_call);
2365 }
2366 Ok(params[0].get_u32())
2367 }
2368
2369 unsafe fn start_call<T: 'static>(
2382 self,
2383 mut store: StoreContextMut<T>,
2384 callback: *mut VMFuncRef,
2385 post_return: *mut VMFuncRef,
2386 callee: *mut VMFuncRef,
2387 param_count: u32,
2388 result_count: u32,
2389 flags: u32,
2390 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2391 ) -> Result<u32> {
2392 let token = StoreToken::new(store.as_context_mut());
2393 let async_caller = storage.is_none();
2394 let state = store.0.concurrent_state_mut();
2395 let guest_thread = state.guest_thread.unwrap();
2396 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2397 let may_enter_after_call = state
2398 .get_mut(guest_thread.task)?
2399 .call_post_return_automatically();
2400 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2401 let param_count = usize::try_from(param_count).unwrap();
2402 assert!(param_count <= MAX_FLAT_PARAMS);
2403 let result_count = usize::try_from(result_count).unwrap();
2404 assert!(result_count <= MAX_FLAT_RESULTS);
2405
2406 let task = state.get_mut(guest_thread.task)?;
2407 if !callback.is_null() {
2408 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2412 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2413 let store = token.as_context_mut(store);
2414 unsafe {
2415 self.call_callback::<T>(
2416 store,
2417 runtime_instance,
2418 callback,
2419 event,
2420 handle,
2421 may_enter_after_call,
2422 )
2423 }
2424 }));
2425 }
2426
2427 let Caller::Guest {
2428 thread: caller,
2429 instance: runtime_instance,
2430 } = &task.caller
2431 else {
2432 unreachable!()
2435 };
2436 let caller = *caller;
2437 let caller_instance = *runtime_instance;
2438
2439 let callee_instance = task.instance;
2440
2441 let instance_flags = if callback.is_null() {
2442 None
2443 } else {
2444 Some(self.id().get(store.0).instance_flags(callee_instance.index))
2445 };
2446
2447 unsafe {
2449 self.queue_call(
2450 store.as_context_mut(),
2451 guest_thread,
2452 callee,
2453 param_count,
2454 result_count,
2455 instance_flags,
2456 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2457 NonNull::new(callback).map(SendSyncPtr::new),
2458 NonNull::new(post_return).map(SendSyncPtr::new),
2459 )?;
2460 }
2461
2462 let state = store.0.concurrent_state_mut();
2463
2464 let guest_waitable = Waitable::Guest(guest_thread.task);
2467 let old_set = guest_waitable.common(state)?.set;
2468 let set = state.get_mut(caller.task)?.sync_call_set;
2469 guest_waitable.join(state, Some(set))?;
2470
2471 let (status, waitable) = loop {
2487 store.0.suspend(SuspendReason::Waiting {
2488 set,
2489 thread: caller,
2490 skip_may_block_check: async_caller || !callee_async,
2498 })?;
2499
2500 let state = store.0.concurrent_state_mut();
2501
2502 log::trace!("taking event for {:?}", guest_thread.task);
2503 let event = guest_waitable.take_event(state)?;
2504 let Some(Event::Subtask { status }) = event else {
2505 unreachable!();
2506 };
2507
2508 log::trace!("status {status:?} for {:?}", guest_thread.task);
2509
2510 if status == Status::Returned {
2511 break (status, None);
2513 } else if async_caller {
2514 let handle = store
2518 .0
2519 .handle_table(RuntimeInstance {
2520 instance: self.id().instance(),
2521 index: caller_instance,
2522 })
2523 .subtask_insert_guest(guest_thread.task.rep())?;
2524 store
2525 .0
2526 .concurrent_state_mut()
2527 .get_mut(guest_thread.task)?
2528 .common
2529 .handle = Some(handle);
2530 break (status, Some(handle));
2531 } else {
2532 }
2536 };
2537
2538 let state = store.0.concurrent_state_mut();
2539
2540 guest_waitable.join(state, old_set)?;
2541
2542 if let Some(storage) = storage {
2543 let task = state.get_mut(guest_thread.task)?;
2547 if let Some(result) = task.sync_result.take() {
2548 if let Some(result) = result {
2549 storage[0] = MaybeUninit::new(result);
2550 }
2551
2552 if task.exited {
2553 if task.ready_to_delete() {
2554 Waitable::Guest(guest_thread.task).delete_from(state)?;
2555 }
2556 }
2557 }
2558 }
2559
2560 state.guest_thread = Some(caller);
2562 state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2563 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2564
2565 Ok(status.pack(waitable))
2566 }
2567
2568 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2580 self,
2581 mut store: StoreContextMut<'_, T>,
2582 future: impl Future<Output = Result<R>> + Send + 'static,
2583 caller_instance: RuntimeComponentInstanceIndex,
2584 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2585 ) -> Result<Option<u32>> {
2586 let token = StoreToken::new(store.as_context_mut());
2587 let state = store.0.concurrent_state_mut();
2588 let caller = state.guest_thread.unwrap();
2589
2590 let (join_handle, future) = JoinHandle::run(async move {
2593 let mut future = pin!(future);
2594 let mut call_context = None;
2595 future::poll_fn(move |cx| {
2596 tls::get(|store| {
2599 if let Some(call_context) = call_context.take() {
2600 token
2601 .as_context_mut(store)
2602 .0
2603 .component_resource_state()
2604 .0
2605 .push(call_context);
2606 }
2607 });
2608
2609 let result = future.as_mut().poll(cx);
2610
2611 if result.is_pending() {
2612 tls::get(|store| {
2615 call_context = Some(
2616 token
2617 .as_context_mut(store)
2618 .0
2619 .component_resource_state()
2620 .0
2621 .pop()
2622 .unwrap(),
2623 );
2624 });
2625 }
2626 result
2627 })
2628 .await
2629 });
2630
2631 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2635
2636 log::trace!("new host task child of {caller:?}: {task:?}");
2637
2638 let mut future = Box::pin(future);
2639
2640 let poll = tls::set(store.0, || {
2645 future
2646 .as_mut()
2647 .poll(&mut Context::from_waker(&Waker::noop()))
2648 });
2649
2650 Ok(match poll {
2651 Poll::Ready(None) => unreachable!(),
2652 Poll::Ready(Some(result)) => {
2653 lower(store.as_context_mut(), result?)?;
2656 log::trace!("delete host task {task:?} (already ready)");
2657 store.0.concurrent_state_mut().delete(task)?;
2658 None
2659 }
2660 Poll::Pending => {
2661 let future =
2669 Box::pin(async move {
2670 let result = match future.await {
2671 Some(result) => result?,
2672 None => return Ok(()),
2674 };
2675 tls::get(move |store| {
2676 store.concurrent_state_mut().push_high_priority(
2682 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2683 lower(token.as_context_mut(store), result)?;
2684 let state = store.concurrent_state_mut();
2685 state.get_mut(task)?.join_handle.take();
2686 Waitable::Host(task).set_event(
2687 state,
2688 Some(Event::Subtask {
2689 status: Status::Returned,
2690 }),
2691 )
2692 }))),
2693 );
2694 Ok(())
2695 })
2696 });
2697
2698 store.0.concurrent_state_mut().push_future(future);
2699 let handle = store
2700 .0
2701 .handle_table(RuntimeInstance {
2702 instance: self.id().instance(),
2703 index: caller_instance,
2704 })
2705 .subtask_insert_host(task.rep())?;
2706 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2707 log::trace!(
2708 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2709 );
2710 Some(handle)
2711 }
2712 })
2713 }
2714
2715 pub(crate) fn task_return(
2718 self,
2719 store: &mut dyn VMStore,
2720 caller: RuntimeComponentInstanceIndex,
2721 ty: TypeTupleIndex,
2722 options: OptionsIndex,
2723 storage: &[ValRaw],
2724 ) -> Result<()> {
2725 self.id().get(store).check_may_leave(caller)?;
2726 let state = store.concurrent_state_mut();
2727 let guest_thread = state.guest_thread.unwrap();
2728 let lift = state
2729 .get_mut(guest_thread.task)?
2730 .lift_result
2731 .take()
2732 .ok_or_else(|| {
2733 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2734 })?;
2735 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2736
2737 let CanonicalOptions {
2738 string_encoding,
2739 data_model,
2740 ..
2741 } = &self.id().get(store).component().env_component().options[options];
2742
2743 let invalid = ty != lift.ty
2744 || string_encoding != &lift.string_encoding
2745 || match data_model {
2746 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2747 Some(memory) => {
2748 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2749 let actual = self.id().get(store).runtime_memory(memory);
2750 expected != actual.as_ptr()
2751 }
2752 None => false,
2755 },
2756 CanonicalOptionsDataModel::Gc { .. } => true,
2758 };
2759
2760 if invalid {
2761 bail!("invalid `task.return` signature and/or options for current task");
2762 }
2763
2764 log::trace!("task.return for {guest_thread:?}");
2765
2766 let result = (lift.lift)(store, storage)?;
2767 self.task_complete(
2768 store,
2769 guest_thread.task,
2770 result,
2771 Status::Returned,
2772 ValRaw::i32(0),
2773 )
2774 }
2775
2776 pub(crate) fn task_cancel(
2778 self,
2779 store: &mut StoreOpaque,
2780 caller: RuntimeComponentInstanceIndex,
2781 ) -> Result<()> {
2782 self.id().get(store).check_may_leave(caller)?;
2783 let state = store.concurrent_state_mut();
2784 let guest_thread = state.guest_thread.unwrap();
2785 let task = state.get_mut(guest_thread.task)?;
2786 if !task.cancel_sent {
2787 bail!("`task.cancel` called by task which has not been cancelled")
2788 }
2789 _ = task.lift_result.take().ok_or_else(|| {
2790 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2791 })?;
2792
2793 assert!(task.result.is_none());
2794
2795 log::trace!("task.cancel for {guest_thread:?}");
2796
2797 self.task_complete(
2798 store,
2799 guest_thread.task,
2800 Box::new(DummyResult),
2801 Status::ReturnCancelled,
2802 ValRaw::i32(0),
2803 )
2804 }
2805
2806 fn task_complete(
2812 self,
2813 store: &mut StoreOpaque,
2814 guest_task: TableId<GuestTask>,
2815 result: Box<dyn Any + Send + Sync>,
2816 status: Status,
2817 post_return_arg: ValRaw,
2818 ) -> Result<()> {
2819 if store
2820 .concurrent_state_mut()
2821 .get_mut(guest_task)?
2822 .call_post_return_automatically()
2823 {
2824 let (calls, host_table, _, instance) =
2825 store.component_resource_state_with_instance(self);
2826 ResourceTables {
2827 calls,
2828 host_table: Some(host_table),
2829 guest: Some(instance.instance_states()),
2830 }
2831 .exit_call()?;
2832 } else {
2833 let function_index = store
2838 .concurrent_state_mut()
2839 .get_mut(guest_task)?
2840 .function_index
2841 .unwrap();
2842 self.id()
2843 .get_mut(store)
2844 .post_return_arg_set(function_index, post_return_arg);
2845 }
2846
2847 let state = store.concurrent_state_mut();
2848 let task = state.get_mut(guest_task)?;
2849
2850 if let Caller::Host { tx, .. } = &mut task.caller {
2851 if let Some(tx) = tx.take() {
2852 _ = tx.send(result);
2853 }
2854 } else {
2855 task.result = Some(result);
2856 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2857 }
2858
2859 Ok(())
2860 }
2861
2862 pub(crate) fn waitable_set_new(
2864 self,
2865 store: &mut StoreOpaque,
2866 caller_instance: RuntimeComponentInstanceIndex,
2867 ) -> Result<u32> {
2868 self.id().get_mut(store).check_may_leave(caller_instance)?;
2869 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2870 let handle = store
2871 .handle_table(RuntimeInstance {
2872 instance: self.id().instance(),
2873 index: caller_instance,
2874 })
2875 .waitable_set_insert(set.rep())?;
2876 log::trace!("new waitable set {set:?} (handle {handle})");
2877 Ok(handle)
2878 }
2879
2880 pub(crate) fn waitable_set_drop(
2882 self,
2883 store: &mut StoreOpaque,
2884 caller_instance: RuntimeComponentInstanceIndex,
2885 set: u32,
2886 ) -> Result<()> {
2887 self.id().get_mut(store).check_may_leave(caller_instance)?;
2888 let rep = store
2889 .handle_table(RuntimeInstance {
2890 instance: self.id().instance(),
2891 index: caller_instance,
2892 })
2893 .waitable_set_remove(set)?;
2894
2895 log::trace!("drop waitable set {rep} (handle {set})");
2896
2897 let set = store
2898 .concurrent_state_mut()
2899 .delete(TableId::<WaitableSet>::new(rep))?;
2900
2901 if !set.waiting.is_empty() {
2902 bail!("cannot drop waitable set with waiters");
2903 }
2904
2905 Ok(())
2906 }
2907
2908 pub(crate) fn waitable_join(
2910 self,
2911 store: &mut StoreOpaque,
2912 caller_instance: RuntimeComponentInstanceIndex,
2913 waitable_handle: u32,
2914 set_handle: u32,
2915 ) -> Result<()> {
2916 let mut instance = self.id().get_mut(store);
2917 instance.check_may_leave(caller_instance)?;
2918 let waitable =
2919 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2920
2921 let set = if set_handle == 0 {
2922 None
2923 } else {
2924 let set = instance.instance_states().0[caller_instance]
2925 .handle_table()
2926 .waitable_set_rep(set_handle)?;
2927
2928 Some(TableId::<WaitableSet>::new(set))
2929 };
2930
2931 log::trace!(
2932 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2933 );
2934
2935 waitable.join(store.concurrent_state_mut(), set)
2936 }
2937
2938 pub(crate) fn subtask_drop(
2940 self,
2941 store: &mut StoreOpaque,
2942 caller_instance: RuntimeComponentInstanceIndex,
2943 task_id: u32,
2944 ) -> Result<()> {
2945 self.id().get_mut(store).check_may_leave(caller_instance)?;
2946 self.waitable_join(store, caller_instance, task_id, 0)?;
2947
2948 let (rep, is_host) = store
2949 .handle_table(RuntimeInstance {
2950 instance: self.id().instance(),
2951 index: caller_instance,
2952 })
2953 .subtask_remove(task_id)?;
2954
2955 let concurrent_state = store.concurrent_state_mut();
2956 let (waitable, expected_caller_instance, delete) = if is_host {
2957 let id = TableId::<HostTask>::new(rep);
2958 let task = concurrent_state.get_mut(id)?;
2959 if task.join_handle.is_some() {
2960 bail!("cannot drop a subtask which has not yet resolved");
2961 }
2962 (Waitable::Host(id), task.caller_instance, true)
2963 } else {
2964 let id = TableId::<GuestTask>::new(rep);
2965 let task = concurrent_state.get_mut(id)?;
2966 if task.lift_result.is_some() {
2967 bail!("cannot drop a subtask which has not yet resolved");
2968 }
2969 if let Caller::Guest { instance, .. } = &task.caller {
2970 (Waitable::Guest(id), *instance, task.exited)
2971 } else {
2972 unreachable!()
2973 }
2974 };
2975
2976 waitable.common(concurrent_state)?.handle = None;
2977
2978 if waitable.take_event(concurrent_state)?.is_some() {
2979 bail!("cannot drop a subtask with an undelivered event");
2980 }
2981
2982 if delete {
2983 waitable.delete_from(concurrent_state)?;
2984 }
2985
2986 assert_eq!(expected_caller_instance, caller_instance);
2990 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2991 Ok(())
2992 }
2993
2994 pub(crate) fn waitable_set_wait(
2996 self,
2997 store: &mut StoreOpaque,
2998 caller: RuntimeComponentInstanceIndex,
2999 options: OptionsIndex,
3000 set: u32,
3001 payload: u32,
3002 ) -> Result<u32> {
3003 self.id().get(store).check_may_leave(caller)?;
3004
3005 if !self.options(store, options).async_ {
3006 store.concurrent_state_mut().check_blocking()?;
3010 }
3011
3012 let &CanonicalOptions {
3013 cancellable,
3014 instance: caller_instance,
3015 ..
3016 } = &self.id().get(store).component().env_component().options[options];
3017 let rep = store
3018 .handle_table(RuntimeInstance {
3019 instance: self.id().instance(),
3020 index: caller_instance,
3021 })
3022 .waitable_set_rep(set)?;
3023
3024 self.waitable_check(
3025 store,
3026 cancellable,
3027 WaitableCheck::Wait,
3028 WaitableCheckParams {
3029 set: TableId::new(rep),
3030 options,
3031 payload,
3032 },
3033 )
3034 }
3035
3036 pub(crate) fn waitable_set_poll(
3038 self,
3039 store: &mut StoreOpaque,
3040 caller: RuntimeComponentInstanceIndex,
3041 options: OptionsIndex,
3042 set: u32,
3043 payload: u32,
3044 ) -> Result<u32> {
3045 self.id().get(store).check_may_leave(caller)?;
3046
3047 let &CanonicalOptions {
3048 cancellable,
3049 instance: caller_instance,
3050 ..
3051 } = &self.id().get(store).component().env_component().options[options];
3052 let rep = store
3053 .handle_table(RuntimeInstance {
3054 instance: self.id().instance(),
3055 index: caller_instance,
3056 })
3057 .waitable_set_rep(set)?;
3058
3059 self.waitable_check(
3060 store,
3061 cancellable,
3062 WaitableCheck::Poll,
3063 WaitableCheckParams {
3064 set: TableId::new(rep),
3065 options,
3066 payload,
3067 },
3068 )
3069 }
3070
3071 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3073 let thread_id = store
3074 .concurrent_state_mut()
3075 .guest_thread
3076 .ok_or_else(|| anyhow!("no current thread"))?
3077 .thread;
3078 Ok(store
3080 .concurrent_state_mut()
3081 .get_mut(thread_id)?
3082 .instance_rep
3083 .unwrap())
3084 }
3085
3086 pub(crate) fn thread_new_indirect<T: 'static>(
3088 self,
3089 mut store: StoreContextMut<T>,
3090 runtime_instance: RuntimeComponentInstanceIndex,
3091 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3093 start_func_idx: u32,
3094 context: i32,
3095 ) -> Result<u32> {
3096 self.id().get(store.0).check_may_leave(runtime_instance)?;
3097
3098 log::trace!("creating new thread");
3099
3100 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3101 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3102 let callee = instance
3103 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3104 .ok_or_else(|| {
3105 anyhow!("the start function index points to an uninitialized function")
3106 })?;
3107 if callee.type_index(store.0) != start_func_ty.type_index() {
3108 bail!(
3109 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3110 );
3111 }
3112
3113 let token = StoreToken::new(store.as_context_mut());
3114 let start_func = Box::new(
3115 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3116 let old_thread = store
3117 .concurrent_state_mut()
3118 .guest_thread
3119 .replace(guest_thread);
3120 log::trace!(
3121 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3122 );
3123
3124 store.maybe_push_call_context(guest_thread.task)?;
3125
3126 let mut store = token.as_context_mut(store);
3127 let mut params = [ValRaw::i32(context)];
3128 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3131
3132 store.0.maybe_pop_call_context(guest_thread.task)?;
3133
3134 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3135 log::trace!("explicit thread {guest_thread:?} completed");
3136 let state = store.0.concurrent_state_mut();
3137 let task = state.get_mut(guest_thread.task)?;
3138 if task.threads.is_empty() && !task.returned_or_cancelled() {
3139 bail!(Trap::NoAsyncResult);
3140 }
3141 state.guest_thread = old_thread;
3142 old_thread
3143 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3144 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3145 Waitable::Guest(guest_thread.task).delete_from(state)?;
3146 }
3147 log::trace!("thread start: restored {old_thread:?} as current thread");
3148
3149 Ok(())
3150 },
3151 );
3152
3153 let state = store.0.concurrent_state_mut();
3154 let current_thread = state.guest_thread.unwrap();
3155 let parent_task = current_thread.task;
3156
3157 let new_thread = GuestThread::new_explicit(parent_task, start_func);
3158 let thread_id = state.push(new_thread)?;
3159 state.get_mut(parent_task)?.threads.insert(thread_id);
3160
3161 log::trace!("new thread with id {thread_id:?} created");
3162
3163 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3164 }
3165
3166 pub(crate) fn resume_suspended_thread(
3167 self,
3168 store: &mut StoreOpaque,
3169 runtime_instance: RuntimeComponentInstanceIndex,
3170 thread_idx: u32,
3171 high_priority: bool,
3172 ) -> Result<()> {
3173 let thread_id =
3174 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3175 let state = store.concurrent_state_mut();
3176 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3177 let thread = state.get_mut(guest_thread.thread)?;
3178
3179 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3180 GuestThreadState::NotStartedExplicit(start_func) => {
3181 log::trace!("starting thread {guest_thread:?}");
3182 let guest_call = WorkItem::GuestCall(GuestCall {
3183 thread: guest_thread,
3184 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3185 start_func(store, guest_thread)
3186 })),
3187 });
3188 store
3189 .concurrent_state_mut()
3190 .push_work_item(guest_call, high_priority);
3191 }
3192 GuestThreadState::Suspended(fiber) => {
3193 log::trace!("resuming thread {thread_id:?} that was suspended");
3194 store
3195 .concurrent_state_mut()
3196 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3197 }
3198 _ => {
3199 bail!("cannot resume thread which is not suspended");
3200 }
3201 }
3202 Ok(())
3203 }
3204
3205 fn add_guest_thread_to_instance_table(
3206 self,
3207 thread_id: TableId<GuestThread>,
3208 store: &mut StoreOpaque,
3209 runtime_instance: RuntimeComponentInstanceIndex,
3210 ) -> Result<u32> {
3211 let guest_id = store
3212 .handle_table(RuntimeInstance {
3213 instance: self.id().instance(),
3214 index: runtime_instance,
3215 })
3216 .guest_thread_insert(thread_id.rep())?;
3217 store
3218 .concurrent_state_mut()
3219 .get_mut(thread_id)?
3220 .instance_rep = Some(guest_id);
3221 Ok(guest_id)
3222 }
3223
3224 pub(crate) fn suspension_intrinsic(
3227 self,
3228 store: &mut StoreOpaque,
3229 caller: RuntimeComponentInstanceIndex,
3230 cancellable: bool,
3231 yielding: bool,
3232 to_thread: Option<u32>,
3233 ) -> Result<WaitResult> {
3234 self.id().get(store).check_may_leave(caller)?;
3235
3236 if to_thread.is_none() {
3237 let state = store.concurrent_state_mut();
3238 if yielding {
3239 if !state.may_block(state.guest_thread.unwrap().task) {
3241 return Ok(WaitResult::Completed);
3245 }
3246 } else {
3247 state.check_blocking()?;
3251 }
3252 }
3253
3254 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3256 return Ok(WaitResult::Cancelled);
3257 }
3258
3259 if let Some(thread) = to_thread {
3260 self.resume_suspended_thread(store, caller, thread, true)?;
3261 }
3262
3263 let state = store.concurrent_state_mut();
3264 let guest_thread = state.guest_thread.unwrap();
3265 let reason = if yielding {
3266 SuspendReason::Yielding {
3267 thread: guest_thread,
3268 }
3269 } else {
3270 SuspendReason::ExplicitlySuspending {
3271 thread: guest_thread,
3272 skip_may_block_check: to_thread.is_some(),
3276 }
3277 };
3278
3279 store.suspend(reason)?;
3280
3281 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3282 Ok(WaitResult::Cancelled)
3283 } else {
3284 Ok(WaitResult::Completed)
3285 }
3286 }
3287
3288 fn waitable_check(
3290 self,
3291 store: &mut StoreOpaque,
3292 cancellable: bool,
3293 check: WaitableCheck,
3294 params: WaitableCheckParams,
3295 ) -> Result<u32> {
3296 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3297
3298 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3299
3300 let state = store.concurrent_state_mut();
3301 let task = state.get_mut(guest_thread.task)?;
3302
3303 match &check {
3306 WaitableCheck::Wait => {
3307 let set = params.set;
3308
3309 if (task.event.is_none()
3310 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3311 && state.get_mut(set)?.ready.is_empty()
3312 {
3313 if cancellable {
3314 let old = state
3315 .get_mut(guest_thread.thread)?
3316 .wake_on_cancel
3317 .replace(set);
3318 assert!(old.is_none());
3319 }
3320
3321 store.suspend(SuspendReason::Waiting {
3322 set,
3323 thread: guest_thread,
3324 skip_may_block_check: false,
3325 })?;
3326 }
3327 }
3328 WaitableCheck::Poll => {}
3329 }
3330
3331 log::trace!(
3332 "waitable check for {guest_thread:?}; set {:?}, part two",
3333 params.set
3334 );
3335
3336 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3338
3339 let (ordinal, handle, result) = match &check {
3340 WaitableCheck::Wait => {
3341 let (event, waitable) = event.unwrap();
3342 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3343 let (ordinal, result) = event.parts();
3344 (ordinal, handle, result)
3345 }
3346 WaitableCheck::Poll => {
3347 if let Some((event, waitable)) = event {
3348 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3349 let (ordinal, result) = event.parts();
3350 (ordinal, handle, result)
3351 } else {
3352 log::trace!(
3353 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3354 guest_thread.task,
3355 params.set
3356 );
3357 let (ordinal, result) = Event::None.parts();
3358 (ordinal, 0, result)
3359 }
3360 }
3361 };
3362 let memory = self.options_memory_mut(store, params.options);
3363 let ptr = func::validate_inbounds_dynamic(
3364 &CanonicalAbiInfo::POINTER_PAIR,
3365 memory,
3366 &ValRaw::u32(params.payload),
3367 )?;
3368 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3369 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3370 Ok(ordinal)
3371 }
3372
3373 pub(crate) fn subtask_cancel(
3375 self,
3376 store: &mut StoreOpaque,
3377 caller_instance: RuntimeComponentInstanceIndex,
3378 async_: bool,
3379 task_id: u32,
3380 ) -> Result<u32> {
3381 self.id().get(store).check_may_leave(caller_instance)?;
3382
3383 if !async_ {
3384 store.concurrent_state_mut().check_blocking()?;
3388 }
3389
3390 let (rep, is_host) = store
3391 .handle_table(RuntimeInstance {
3392 instance: self.id().instance(),
3393 index: caller_instance,
3394 })
3395 .subtask_rep(task_id)?;
3396 let (waitable, expected_caller_instance) = if is_host {
3397 let id = TableId::<HostTask>::new(rep);
3398 (
3399 Waitable::Host(id),
3400 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3401 )
3402 } else {
3403 let id = TableId::<GuestTask>::new(rep);
3404 if let Caller::Guest { instance, .. } =
3405 &store.concurrent_state_mut().get_mut(id)?.caller
3406 {
3407 (Waitable::Guest(id), *instance)
3408 } else {
3409 unreachable!()
3410 }
3411 };
3412 assert_eq!(expected_caller_instance, caller_instance);
3416
3417 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3418
3419 let concurrent_state = store.concurrent_state_mut();
3420 if let Waitable::Host(host_task) = waitable {
3421 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3422 handle.abort();
3423 return Ok(Status::ReturnCancelled as u32);
3424 }
3425 } else {
3426 let caller = concurrent_state.guest_thread.unwrap();
3427 let guest_task = TableId::<GuestTask>::new(rep);
3428 let task = concurrent_state.get_mut(guest_task)?;
3429 if !task.already_lowered_parameters() {
3430 task.lower_params = None;
3431 task.lift_result = None;
3432
3433 let instance = task.instance;
3435 let pending = &mut store.instance_state(instance).pending;
3436 let pending_count = pending.len();
3437 pending.retain(|thread, _| thread.task != guest_task);
3438 if pending.len() == pending_count {
3440 bail!("`subtask.cancel` called after terminal status delivered");
3441 }
3442 return Ok(Status::StartCancelled as u32);
3443 } else if !task.returned_or_cancelled() {
3444 task.cancel_sent = true;
3447 task.event = Some(Event::Cancelled);
3452 for thread in task.threads.clone() {
3453 let thread = QualifiedThreadId {
3454 task: guest_task,
3455 thread,
3456 };
3457 if let Some(set) = concurrent_state
3458 .get_mut(thread.thread)
3459 .unwrap()
3460 .wake_on_cancel
3461 .take()
3462 {
3463 let item = match concurrent_state
3464 .get_mut(set)?
3465 .waiting
3466 .remove(&thread)
3467 .unwrap()
3468 {
3469 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3470 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3471 thread,
3472 kind: GuestCallKind::DeliverEvent {
3473 instance,
3474 set: None,
3475 },
3476 }),
3477 };
3478 concurrent_state.push_high_priority(item);
3479
3480 store.suspend(SuspendReason::Yielding { thread: caller })?;
3481 break;
3482 }
3483 }
3484
3485 let concurrent_state = store.concurrent_state_mut();
3486 let task = concurrent_state.get_mut(guest_task)?;
3487 if !task.returned_or_cancelled() {
3488 if async_ {
3489 return Ok(BLOCKED);
3490 } else {
3491 store.wait_for_event(Waitable::Guest(guest_task))?;
3492 }
3493 }
3494 }
3495 }
3496
3497 let event = waitable.take_event(store.concurrent_state_mut())?;
3498 if let Some(Event::Subtask {
3499 status: status @ (Status::Returned | Status::ReturnCancelled),
3500 }) = event
3501 {
3502 Ok(status as u32)
3503 } else {
3504 bail!("`subtask.cancel` called after terminal status delivered");
3505 }
3506 }
3507
3508 pub(crate) fn context_get(
3509 self,
3510 store: &mut StoreOpaque,
3511 caller: RuntimeComponentInstanceIndex,
3512 slot: u32,
3513 ) -> Result<u32> {
3514 self.id().get(store).check_may_leave(caller)?;
3515 store.concurrent_state_mut().context_get(slot)
3516 }
3517
3518 pub(crate) fn context_set(
3519 self,
3520 store: &mut StoreOpaque,
3521 caller: RuntimeComponentInstanceIndex,
3522 slot: u32,
3523 value: u32,
3524 ) -> Result<()> {
3525 self.id().get(store).check_may_leave(caller)?;
3526 store.concurrent_state_mut().context_set(slot, value)
3527 }
3528}
3529
3530pub trait VMComponentAsyncStore {
3538 unsafe fn prepare_call(
3544 &mut self,
3545 instance: Instance,
3546 memory: *mut VMMemoryDefinition,
3547 start: *mut VMFuncRef,
3548 return_: *mut VMFuncRef,
3549 caller_instance: RuntimeComponentInstanceIndex,
3550 callee_instance: RuntimeComponentInstanceIndex,
3551 task_return_type: TypeTupleIndex,
3552 callee_async: bool,
3553 string_encoding: u8,
3554 result_count: u32,
3555 storage: *mut ValRaw,
3556 storage_len: usize,
3557 ) -> Result<()>;
3558
3559 unsafe fn sync_start(
3562 &mut self,
3563 instance: Instance,
3564 callback: *mut VMFuncRef,
3565 callee: *mut VMFuncRef,
3566 param_count: u32,
3567 storage: *mut MaybeUninit<ValRaw>,
3568 storage_len: usize,
3569 ) -> Result<()>;
3570
3571 unsafe fn async_start(
3574 &mut self,
3575 instance: Instance,
3576 callback: *mut VMFuncRef,
3577 post_return: *mut VMFuncRef,
3578 callee: *mut VMFuncRef,
3579 param_count: u32,
3580 result_count: u32,
3581 flags: u32,
3582 ) -> Result<u32>;
3583
3584 fn future_write(
3586 &mut self,
3587 instance: Instance,
3588 caller: RuntimeComponentInstanceIndex,
3589 ty: TypeFutureTableIndex,
3590 options: OptionsIndex,
3591 future: u32,
3592 address: u32,
3593 ) -> Result<u32>;
3594
3595 fn future_read(
3597 &mut self,
3598 instance: Instance,
3599 caller: RuntimeComponentInstanceIndex,
3600 ty: TypeFutureTableIndex,
3601 options: OptionsIndex,
3602 future: u32,
3603 address: u32,
3604 ) -> Result<u32>;
3605
3606 fn future_drop_writable(
3608 &mut self,
3609 instance: Instance,
3610 caller: RuntimeComponentInstanceIndex,
3611 ty: TypeFutureTableIndex,
3612 writer: u32,
3613 ) -> Result<()>;
3614
3615 fn stream_write(
3617 &mut self,
3618 instance: Instance,
3619 caller: RuntimeComponentInstanceIndex,
3620 ty: TypeStreamTableIndex,
3621 options: OptionsIndex,
3622 stream: u32,
3623 address: u32,
3624 count: u32,
3625 ) -> Result<u32>;
3626
3627 fn stream_read(
3629 &mut self,
3630 instance: Instance,
3631 caller: RuntimeComponentInstanceIndex,
3632 ty: TypeStreamTableIndex,
3633 options: OptionsIndex,
3634 stream: u32,
3635 address: u32,
3636 count: u32,
3637 ) -> Result<u32>;
3638
3639 fn flat_stream_write(
3642 &mut self,
3643 instance: Instance,
3644 caller: RuntimeComponentInstanceIndex,
3645 ty: TypeStreamTableIndex,
3646 options: OptionsIndex,
3647 payload_size: u32,
3648 payload_align: u32,
3649 stream: u32,
3650 address: u32,
3651 count: u32,
3652 ) -> Result<u32>;
3653
3654 fn flat_stream_read(
3657 &mut self,
3658 instance: Instance,
3659 caller: RuntimeComponentInstanceIndex,
3660 ty: TypeStreamTableIndex,
3661 options: OptionsIndex,
3662 payload_size: u32,
3663 payload_align: u32,
3664 stream: u32,
3665 address: u32,
3666 count: u32,
3667 ) -> Result<u32>;
3668
3669 fn stream_drop_writable(
3671 &mut self,
3672 instance: Instance,
3673 caller: RuntimeComponentInstanceIndex,
3674 ty: TypeStreamTableIndex,
3675 writer: u32,
3676 ) -> Result<()>;
3677
3678 fn error_context_debug_message(
3680 &mut self,
3681 instance: Instance,
3682 caller: RuntimeComponentInstanceIndex,
3683 ty: TypeComponentLocalErrorContextTableIndex,
3684 options: OptionsIndex,
3685 err_ctx_handle: u32,
3686 debug_msg_address: u32,
3687 ) -> Result<()>;
3688
3689 fn thread_new_indirect(
3691 &mut self,
3692 instance: Instance,
3693 caller: RuntimeComponentInstanceIndex,
3694 func_ty_idx: TypeFuncIndex,
3695 start_func_table_idx: RuntimeTableIndex,
3696 start_func_idx: u32,
3697 context: i32,
3698 ) -> Result<u32>;
3699}
3700
3701impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3703 unsafe fn prepare_call(
3704 &mut self,
3705 instance: Instance,
3706 memory: *mut VMMemoryDefinition,
3707 start: *mut VMFuncRef,
3708 return_: *mut VMFuncRef,
3709 caller_instance: RuntimeComponentInstanceIndex,
3710 callee_instance: RuntimeComponentInstanceIndex,
3711 task_return_type: TypeTupleIndex,
3712 callee_async: bool,
3713 string_encoding: u8,
3714 result_count_or_max_if_async: u32,
3715 storage: *mut ValRaw,
3716 storage_len: usize,
3717 ) -> Result<()> {
3718 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3722
3723 unsafe {
3724 instance.prepare_call(
3725 StoreContextMut(self),
3726 start,
3727 return_,
3728 caller_instance,
3729 callee_instance,
3730 task_return_type,
3731 callee_async,
3732 memory,
3733 string_encoding,
3734 match result_count_or_max_if_async {
3735 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3736 params,
3737 has_result: false,
3738 },
3739 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3740 params,
3741 has_result: true,
3742 },
3743 result_count => CallerInfo::Sync {
3744 params,
3745 result_count,
3746 },
3747 },
3748 )
3749 }
3750 }
3751
3752 unsafe fn sync_start(
3753 &mut self,
3754 instance: Instance,
3755 callback: *mut VMFuncRef,
3756 callee: *mut VMFuncRef,
3757 param_count: u32,
3758 storage: *mut MaybeUninit<ValRaw>,
3759 storage_len: usize,
3760 ) -> Result<()> {
3761 unsafe {
3762 instance
3763 .start_call(
3764 StoreContextMut(self),
3765 callback,
3766 ptr::null_mut(),
3767 callee,
3768 param_count,
3769 1,
3770 START_FLAG_ASYNC_CALLEE,
3771 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3775 )
3776 .map(drop)
3777 }
3778 }
3779
3780 unsafe fn async_start(
3781 &mut self,
3782 instance: Instance,
3783 callback: *mut VMFuncRef,
3784 post_return: *mut VMFuncRef,
3785 callee: *mut VMFuncRef,
3786 param_count: u32,
3787 result_count: u32,
3788 flags: u32,
3789 ) -> Result<u32> {
3790 unsafe {
3791 instance.start_call(
3792 StoreContextMut(self),
3793 callback,
3794 post_return,
3795 callee,
3796 param_count,
3797 result_count,
3798 flags,
3799 None,
3800 )
3801 }
3802 }
3803
3804 fn future_write(
3805 &mut self,
3806 instance: Instance,
3807 caller: RuntimeComponentInstanceIndex,
3808 ty: TypeFutureTableIndex,
3809 options: OptionsIndex,
3810 future: u32,
3811 address: u32,
3812 ) -> Result<u32> {
3813 instance.id().get(self).check_may_leave(caller)?;
3814 instance
3815 .guest_write(
3816 StoreContextMut(self),
3817 caller,
3818 TransmitIndex::Future(ty),
3819 options,
3820 None,
3821 future,
3822 address,
3823 1,
3824 )
3825 .map(|result| result.encode())
3826 }
3827
3828 fn future_read(
3829 &mut self,
3830 instance: Instance,
3831 caller: RuntimeComponentInstanceIndex,
3832 ty: TypeFutureTableIndex,
3833 options: OptionsIndex,
3834 future: u32,
3835 address: u32,
3836 ) -> Result<u32> {
3837 instance.id().get(self).check_may_leave(caller)?;
3838 instance
3839 .guest_read(
3840 StoreContextMut(self),
3841 caller,
3842 TransmitIndex::Future(ty),
3843 options,
3844 None,
3845 future,
3846 address,
3847 1,
3848 )
3849 .map(|result| result.encode())
3850 }
3851
3852 fn stream_write(
3853 &mut self,
3854 instance: Instance,
3855 caller: RuntimeComponentInstanceIndex,
3856 ty: TypeStreamTableIndex,
3857 options: OptionsIndex,
3858 stream: u32,
3859 address: u32,
3860 count: u32,
3861 ) -> Result<u32> {
3862 instance.id().get(self).check_may_leave(caller)?;
3863 instance
3864 .guest_write(
3865 StoreContextMut(self),
3866 caller,
3867 TransmitIndex::Stream(ty),
3868 options,
3869 None,
3870 stream,
3871 address,
3872 count,
3873 )
3874 .map(|result| result.encode())
3875 }
3876
3877 fn stream_read(
3878 &mut self,
3879 instance: Instance,
3880 caller: RuntimeComponentInstanceIndex,
3881 ty: TypeStreamTableIndex,
3882 options: OptionsIndex,
3883 stream: u32,
3884 address: u32,
3885 count: u32,
3886 ) -> Result<u32> {
3887 instance.id().get(self).check_may_leave(caller)?;
3888 instance
3889 .guest_read(
3890 StoreContextMut(self),
3891 caller,
3892 TransmitIndex::Stream(ty),
3893 options,
3894 None,
3895 stream,
3896 address,
3897 count,
3898 )
3899 .map(|result| result.encode())
3900 }
3901
3902 fn future_drop_writable(
3903 &mut self,
3904 instance: Instance,
3905 caller: RuntimeComponentInstanceIndex,
3906 ty: TypeFutureTableIndex,
3907 writer: u32,
3908 ) -> Result<()> {
3909 instance.id().get(self).check_may_leave(caller)?;
3910 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3911 }
3912
3913 fn flat_stream_write(
3914 &mut self,
3915 instance: Instance,
3916 caller: RuntimeComponentInstanceIndex,
3917 ty: TypeStreamTableIndex,
3918 options: OptionsIndex,
3919 payload_size: u32,
3920 payload_align: u32,
3921 stream: u32,
3922 address: u32,
3923 count: u32,
3924 ) -> Result<u32> {
3925 instance.id().get(self).check_may_leave(caller)?;
3926 instance
3927 .guest_write(
3928 StoreContextMut(self),
3929 caller,
3930 TransmitIndex::Stream(ty),
3931 options,
3932 Some(FlatAbi {
3933 size: payload_size,
3934 align: payload_align,
3935 }),
3936 stream,
3937 address,
3938 count,
3939 )
3940 .map(|result| result.encode())
3941 }
3942
3943 fn flat_stream_read(
3944 &mut self,
3945 instance: Instance,
3946 caller: RuntimeComponentInstanceIndex,
3947 ty: TypeStreamTableIndex,
3948 options: OptionsIndex,
3949 payload_size: u32,
3950 payload_align: u32,
3951 stream: u32,
3952 address: u32,
3953 count: u32,
3954 ) -> Result<u32> {
3955 instance.id().get(self).check_may_leave(caller)?;
3956 instance
3957 .guest_read(
3958 StoreContextMut(self),
3959 caller,
3960 TransmitIndex::Stream(ty),
3961 options,
3962 Some(FlatAbi {
3963 size: payload_size,
3964 align: payload_align,
3965 }),
3966 stream,
3967 address,
3968 count,
3969 )
3970 .map(|result| result.encode())
3971 }
3972
3973 fn stream_drop_writable(
3974 &mut self,
3975 instance: Instance,
3976 caller: RuntimeComponentInstanceIndex,
3977 ty: TypeStreamTableIndex,
3978 writer: u32,
3979 ) -> Result<()> {
3980 instance.id().get(self).check_may_leave(caller)?;
3981 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3982 }
3983
3984 fn error_context_debug_message(
3985 &mut self,
3986 instance: Instance,
3987 caller: RuntimeComponentInstanceIndex,
3988 ty: TypeComponentLocalErrorContextTableIndex,
3989 options: OptionsIndex,
3990 err_ctx_handle: u32,
3991 debug_msg_address: u32,
3992 ) -> Result<()> {
3993 instance.id().get(self).check_may_leave(caller)?;
3994 instance.error_context_debug_message(
3995 StoreContextMut(self),
3996 ty,
3997 options,
3998 err_ctx_handle,
3999 debug_msg_address,
4000 )
4001 }
4002
4003 fn thread_new_indirect(
4004 &mut self,
4005 instance: Instance,
4006 caller: RuntimeComponentInstanceIndex,
4007 func_ty_idx: TypeFuncIndex,
4008 start_func_table_idx: RuntimeTableIndex,
4009 start_func_idx: u32,
4010 context: i32,
4011 ) -> Result<u32> {
4012 instance.thread_new_indirect(
4013 StoreContextMut(self),
4014 caller,
4015 func_ty_idx,
4016 start_func_table_idx,
4017 start_func_idx,
4018 context,
4019 )
4020 }
4021}
4022
4023type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4024
4025struct HostTask {
4027 common: WaitableCommon,
4028 caller_instance: RuntimeComponentInstanceIndex,
4029 join_handle: Option<JoinHandle>,
4030}
4031
4032impl HostTask {
4033 fn new(
4034 caller_instance: RuntimeComponentInstanceIndex,
4035 join_handle: Option<JoinHandle>,
4036 ) -> Self {
4037 Self {
4038 common: WaitableCommon::default(),
4039 caller_instance,
4040 join_handle,
4041 }
4042 }
4043}
4044
4045impl TableDebug for HostTask {
4046 fn type_name() -> &'static str {
4047 "HostTask"
4048 }
4049}
4050
4051type CallbackFn = Box<
4052 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
4053 + Send
4054 + Sync
4055 + 'static,
4056>;
4057
4058enum Caller {
4060 Host {
4062 tx: Option<oneshot::Sender<LiftedResult>>,
4064 exit_tx: Arc<oneshot::Sender<()>>,
4071 host_future_present: bool,
4074 call_post_return_automatically: bool,
4076 },
4077 Guest {
4079 thread: QualifiedThreadId,
4081 instance: RuntimeComponentInstanceIndex,
4087 },
4088}
4089
4090struct LiftResult {
4093 lift: RawLift,
4094 ty: TypeTupleIndex,
4095 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4096 string_encoding: StringEncoding,
4097}
4098
4099#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4104struct QualifiedThreadId {
4105 task: TableId<GuestTask>,
4106 thread: TableId<GuestThread>,
4107}
4108
4109impl QualifiedThreadId {
4110 fn qualify(
4111 state: &mut ConcurrentState,
4112 thread: TableId<GuestThread>,
4113 ) -> Result<QualifiedThreadId> {
4114 Ok(QualifiedThreadId {
4115 task: state.get_mut(thread)?.parent_task,
4116 thread,
4117 })
4118 }
4119}
4120
4121impl fmt::Debug for QualifiedThreadId {
4122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4123 f.debug_tuple("QualifiedThreadId")
4124 .field(&self.task.rep())
4125 .field(&self.thread.rep())
4126 .finish()
4127 }
4128}
4129
4130enum GuestThreadState {
4131 NotStartedImplicit,
4132 NotStartedExplicit(
4133 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4134 ),
4135 Running,
4136 Suspended(StoreFiber<'static>),
4137 Pending,
4138 Completed,
4139}
4140pub struct GuestThread {
4141 context: [u32; 2],
4144 parent_task: TableId<GuestTask>,
4146 wake_on_cancel: Option<TableId<WaitableSet>>,
4149 state: GuestThreadState,
4151 instance_rep: Option<u32>,
4154}
4155
4156impl GuestThread {
4157 fn from_instance(
4160 state: Pin<&mut ComponentInstance>,
4161 caller_instance: RuntimeComponentInstanceIndex,
4162 guest_thread: u32,
4163 ) -> Result<TableId<Self>> {
4164 let rep = state.instance_states().0[caller_instance]
4165 .handle_table()
4166 .guest_thread_rep(guest_thread)?;
4167 Ok(TableId::new(rep))
4168 }
4169
4170 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4171 Self {
4172 context: [0; 2],
4173 parent_task,
4174 wake_on_cancel: None,
4175 state: GuestThreadState::NotStartedImplicit,
4176 instance_rep: None,
4177 }
4178 }
4179
4180 fn new_explicit(
4181 parent_task: TableId<GuestTask>,
4182 start_func: Box<
4183 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4184 >,
4185 ) -> Self {
4186 Self {
4187 context: [0; 2],
4188 parent_task,
4189 wake_on_cancel: None,
4190 state: GuestThreadState::NotStartedExplicit(start_func),
4191 instance_rep: None,
4192 }
4193 }
4194}
4195
4196impl TableDebug for GuestThread {
4197 fn type_name() -> &'static str {
4198 "GuestThread"
4199 }
4200}
4201
4202enum SyncResult {
4203 NotProduced,
4204 Produced(Option<ValRaw>),
4205 Taken,
4206}
4207
4208impl SyncResult {
4209 fn take(&mut self) -> Option<Option<ValRaw>> {
4210 match mem::replace(self, SyncResult::Taken) {
4211 SyncResult::NotProduced => None,
4212 SyncResult::Produced(val) => Some(val),
4213 SyncResult::Taken => {
4214 panic!("attempted to take a synchronous result that was already taken")
4215 }
4216 }
4217 }
4218}
4219
4220#[derive(Debug)]
4221enum HostFutureState {
4222 NotApplicable,
4223 Live,
4224 Dropped,
4225}
4226
4227pub(crate) struct GuestTask {
4229 common: WaitableCommon,
4231 lower_params: Option<RawLower>,
4233 lift_result: Option<LiftResult>,
4235 result: Option<LiftedResult>,
4238 callback: Option<CallbackFn>,
4241 caller: Caller,
4243 call_context: Option<CallContext>,
4246 sync_result: SyncResult,
4249 cancel_sent: bool,
4252 starting_sent: bool,
4255 subtasks: HashSet<TableId<GuestTask>>,
4260 sync_call_set: TableId<WaitableSet>,
4262 instance: RuntimeInstance,
4269 event: Option<Event>,
4272 function_index: Option<ExportIndex>,
4274 exited: bool,
4276 threads: HashSet<TableId<GuestThread>>,
4278 host_future_state: HostFutureState,
4281 async_function: bool,
4284}
4285
4286impl GuestTask {
4287 fn already_lowered_parameters(&self) -> bool {
4288 self.lower_params.is_none()
4290 }
4291
4292 fn returned_or_cancelled(&self) -> bool {
4293 self.lift_result.is_none()
4295 }
4296
4297 fn ready_to_delete(&self) -> bool {
4298 let threads_completed = self.threads.is_empty();
4299 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4300 let pending_completion_event = matches!(
4301 self.common.event,
4302 Some(Event::Subtask {
4303 status: Status::Returned | Status::ReturnCancelled
4304 })
4305 );
4306 let ready = threads_completed
4307 && !has_sync_result
4308 && !pending_completion_event
4309 && !matches!(self.host_future_state, HostFutureState::Live);
4310 log::trace!(
4311 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4312 threads_completed,
4313 has_sync_result,
4314 pending_completion_event,
4315 self.host_future_state
4316 );
4317 ready
4318 }
4319
4320 fn new(
4321 state: &mut ConcurrentState,
4322 lower_params: RawLower,
4323 lift_result: LiftResult,
4324 caller: Caller,
4325 callback: Option<CallbackFn>,
4326 component_instance: Instance,
4327 instance: RuntimeComponentInstanceIndex,
4328 async_function: bool,
4329 ) -> Result<Self> {
4330 let sync_call_set = state.push(WaitableSet::default())?;
4331 let host_future_state = match &caller {
4332 Caller::Guest { .. } => HostFutureState::NotApplicable,
4333 Caller::Host {
4334 host_future_present,
4335 ..
4336 } => {
4337 if *host_future_present {
4338 HostFutureState::Live
4339 } else {
4340 HostFutureState::NotApplicable
4341 }
4342 }
4343 };
4344 Ok(Self {
4345 common: WaitableCommon::default(),
4346 lower_params: Some(lower_params),
4347 lift_result: Some(lift_result),
4348 result: None,
4349 callback,
4350 caller,
4351 call_context: Some(CallContext::default()),
4352 sync_result: SyncResult::NotProduced,
4353 cancel_sent: false,
4354 starting_sent: false,
4355 subtasks: HashSet::new(),
4356 sync_call_set,
4357 instance: RuntimeInstance {
4358 instance: component_instance.id().instance(),
4359 index: instance,
4360 },
4361 event: None,
4362 function_index: None,
4363 exited: false,
4364 threads: HashSet::new(),
4365 host_future_state,
4366 async_function,
4367 })
4368 }
4369
4370 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4373 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4376 if let Some(Event::Subtask {
4377 status: Status::Returned | Status::ReturnCancelled,
4378 }) = waitable.common(state)?.event
4379 {
4380 waitable.delete_from(state)?;
4381 }
4382 }
4383
4384 assert!(self.threads.is_empty());
4385
4386 state.delete(self.sync_call_set)?;
4387
4388 match &self.caller {
4390 Caller::Guest {
4391 thread,
4392 instance: runtime_instance,
4393 } => {
4394 let task_mut = state.get_mut(thread.task)?;
4395 let present = task_mut.subtasks.remove(&me);
4396 assert!(present);
4397
4398 for subtask in &self.subtasks {
4399 task_mut.subtasks.insert(*subtask);
4400 }
4401
4402 for subtask in &self.subtasks {
4403 state.get_mut(*subtask)?.caller = Caller::Guest {
4404 thread: *thread,
4405 instance: *runtime_instance,
4406 };
4407 }
4408 }
4409 Caller::Host { exit_tx, .. } => {
4410 for subtask in &self.subtasks {
4411 state.get_mut(*subtask)?.caller = Caller::Host {
4412 tx: None,
4413 exit_tx: exit_tx.clone(),
4417 host_future_present: false,
4418 call_post_return_automatically: true,
4419 };
4420 }
4421 }
4422 }
4423
4424 for subtask in self.subtasks {
4425 if state.get_mut(subtask)?.exited {
4426 Waitable::Guest(subtask).delete_from(state)?;
4427 }
4428 }
4429
4430 Ok(())
4431 }
4432
4433 fn call_post_return_automatically(&self) -> bool {
4434 matches!(
4435 self.caller,
4436 Caller::Guest { .. }
4437 | Caller::Host {
4438 call_post_return_automatically: true,
4439 ..
4440 }
4441 )
4442 }
4443}
4444
4445impl TableDebug for GuestTask {
4446 fn type_name() -> &'static str {
4447 "GuestTask"
4448 }
4449}
4450
4451#[derive(Default)]
4453struct WaitableCommon {
4454 event: Option<Event>,
4456 set: Option<TableId<WaitableSet>>,
4458 handle: Option<u32>,
4460}
4461
4462#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4464enum Waitable {
4465 Host(TableId<HostTask>),
4467 Guest(TableId<GuestTask>),
4469 Transmit(TableId<TransmitHandle>),
4471}
4472
4473impl Waitable {
4474 fn from_instance(
4477 state: Pin<&mut ComponentInstance>,
4478 caller_instance: RuntimeComponentInstanceIndex,
4479 waitable: u32,
4480 ) -> Result<Self> {
4481 use crate::runtime::vm::component::Waitable;
4482
4483 let (waitable, kind) = state.instance_states().0[caller_instance]
4484 .handle_table()
4485 .waitable_rep(waitable)?;
4486
4487 Ok(match kind {
4488 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4489 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4490 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4491 })
4492 }
4493
4494 fn rep(&self) -> u32 {
4496 match self {
4497 Self::Host(id) => id.rep(),
4498 Self::Guest(id) => id.rep(),
4499 Self::Transmit(id) => id.rep(),
4500 }
4501 }
4502
4503 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4507 log::trace!("waitable {self:?} join set {set:?}",);
4508
4509 let old = mem::replace(&mut self.common(state)?.set, set);
4510
4511 if let Some(old) = old {
4512 match *self {
4513 Waitable::Host(id) => state.remove_child(id, old),
4514 Waitable::Guest(id) => state.remove_child(id, old),
4515 Waitable::Transmit(id) => state.remove_child(id, old),
4516 }?;
4517
4518 state.get_mut(old)?.ready.remove(self);
4519 }
4520
4521 if let Some(set) = set {
4522 match *self {
4523 Waitable::Host(id) => state.add_child(id, set),
4524 Waitable::Guest(id) => state.add_child(id, set),
4525 Waitable::Transmit(id) => state.add_child(id, set),
4526 }?;
4527
4528 if self.common(state)?.event.is_some() {
4529 self.mark_ready(state)?;
4530 }
4531 }
4532
4533 Ok(())
4534 }
4535
4536 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4538 Ok(match self {
4539 Self::Host(id) => &mut state.get_mut(*id)?.common,
4540 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4541 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4542 })
4543 }
4544
4545 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4549 log::trace!("set event for {self:?}: {event:?}");
4550 self.common(state)?.event = event;
4551 self.mark_ready(state)
4552 }
4553
4554 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4556 let common = self.common(state)?;
4557 let event = common.event.take();
4558 if let Some(set) = self.common(state)?.set {
4559 state.get_mut(set)?.ready.remove(self);
4560 }
4561
4562 Ok(event)
4563 }
4564
4565 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4569 if let Some(set) = self.common(state)?.set {
4570 state.get_mut(set)?.ready.insert(*self);
4571 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4572 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4573 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4574
4575 let item = match mode {
4576 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4577 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4578 thread,
4579 kind: GuestCallKind::DeliverEvent {
4580 instance,
4581 set: Some(set),
4582 },
4583 }),
4584 };
4585 state.push_high_priority(item);
4586 }
4587 }
4588 Ok(())
4589 }
4590
4591 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4593 match self {
4594 Self::Host(task) => {
4595 log::trace!("delete host task {task:?}");
4596 state.delete(*task)?;
4597 }
4598 Self::Guest(task) => {
4599 log::trace!("delete guest task {task:?}");
4600 state.delete(*task)?.dispose(state, *task)?;
4601 }
4602 Self::Transmit(task) => {
4603 state.delete(*task)?;
4604 }
4605 }
4606
4607 Ok(())
4608 }
4609}
4610
4611impl fmt::Debug for Waitable {
4612 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4613 match self {
4614 Self::Host(id) => write!(f, "{id:?}"),
4615 Self::Guest(id) => write!(f, "{id:?}"),
4616 Self::Transmit(id) => write!(f, "{id:?}"),
4617 }
4618 }
4619}
4620
4621#[derive(Default)]
4623struct WaitableSet {
4624 ready: BTreeSet<Waitable>,
4626 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4628}
4629
4630impl TableDebug for WaitableSet {
4631 fn type_name() -> &'static str {
4632 "WaitableSet"
4633 }
4634}
4635
4636type RawLower =
4638 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4639
4640type RawLift = Box<
4642 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4643>;
4644
4645type LiftedResult = Box<dyn Any + Send + Sync>;
4649
4650struct DummyResult;
4653
4654#[derive(Default)]
4656pub struct ConcurrentInstanceState {
4657 backpressure: u16,
4659 do_not_enter: bool,
4661 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4664}
4665
4666impl ConcurrentInstanceState {
4667 pub fn pending_is_empty(&self) -> bool {
4668 self.pending.is_empty()
4669 }
4670}
4671
4672pub struct ConcurrentState {
4674 guest_thread: Option<QualifiedThreadId>,
4676
4677 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4682 table: AlwaysMut<ResourceTable>,
4684 high_priority: Vec<WorkItem>,
4686 low_priority: Vec<WorkItem>,
4688 suspend_reason: Option<SuspendReason>,
4692 worker: Option<StoreFiber<'static>>,
4696 worker_item: Option<WorkerItem>,
4698
4699 global_error_context_ref_counts:
4712 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4713}
4714
4715impl Default for ConcurrentState {
4716 fn default() -> Self {
4717 Self {
4718 guest_thread: None,
4719 table: AlwaysMut::new(ResourceTable::new()),
4720 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4721 high_priority: Vec::new(),
4722 low_priority: Vec::new(),
4723 suspend_reason: None,
4724 worker: None,
4725 worker_item: None,
4726 global_error_context_ref_counts: BTreeMap::new(),
4727 }
4728 }
4729}
4730
4731impl ConcurrentState {
4732 pub(crate) fn take_fibers_and_futures(
4749 &mut self,
4750 fibers: &mut Vec<StoreFiber<'static>>,
4751 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4752 ) {
4753 for entry in self.table.get_mut().iter_mut() {
4754 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4755 for mode in mem::take(&mut set.waiting).into_values() {
4756 if let WaitMode::Fiber(fiber) = mode {
4757 fibers.push(fiber);
4758 }
4759 }
4760 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4761 if let GuestThreadState::Suspended(fiber) =
4762 mem::replace(&mut thread.state, GuestThreadState::Completed)
4763 {
4764 fibers.push(fiber);
4765 }
4766 }
4767 }
4768
4769 if let Some(fiber) = self.worker.take() {
4770 fibers.push(fiber);
4771 }
4772
4773 let mut take_items = |list| {
4774 for item in mem::take(list) {
4775 match item {
4776 WorkItem::ResumeFiber(fiber) => {
4777 fibers.push(fiber);
4778 }
4779 WorkItem::PushFuture(future) => {
4780 self.futures
4781 .get_mut()
4782 .as_mut()
4783 .unwrap()
4784 .push(future.into_inner());
4785 }
4786 _ => {}
4787 }
4788 }
4789 };
4790
4791 take_items(&mut self.high_priority);
4792 take_items(&mut self.low_priority);
4793
4794 if let Some(them) = self.futures.get_mut().take() {
4795 futures.push(them);
4796 }
4797 }
4798
4799 fn push<V: Send + Sync + 'static>(
4800 &mut self,
4801 value: V,
4802 ) -> Result<TableId<V>, ResourceTableError> {
4803 self.table.get_mut().push(value).map(TableId::from)
4804 }
4805
4806 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4807 self.table.get_mut().get_mut(&Resource::from(id))
4808 }
4809
4810 pub fn add_child<T: 'static, U: 'static>(
4811 &mut self,
4812 child: TableId<T>,
4813 parent: TableId<U>,
4814 ) -> Result<(), ResourceTableError> {
4815 self.table
4816 .get_mut()
4817 .add_child(Resource::from(child), Resource::from(parent))
4818 }
4819
4820 pub fn remove_child<T: 'static, U: 'static>(
4821 &mut self,
4822 child: TableId<T>,
4823 parent: TableId<U>,
4824 ) -> Result<(), ResourceTableError> {
4825 self.table
4826 .get_mut()
4827 .remove_child(Resource::from(child), Resource::from(parent))
4828 }
4829
4830 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4831 self.table.get_mut().delete(Resource::from(id))
4832 }
4833
4834 fn push_future(&mut self, future: HostTaskFuture) {
4835 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4842 }
4843
4844 fn push_high_priority(&mut self, item: WorkItem) {
4845 log::trace!("push high priority: {item:?}");
4846 self.high_priority.push(item);
4847 }
4848
4849 fn push_low_priority(&mut self, item: WorkItem) {
4850 log::trace!("push low priority: {item:?}");
4851 self.low_priority.push(item);
4852 }
4853
4854 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4855 if high_priority {
4856 self.push_high_priority(item);
4857 } else {
4858 self.push_low_priority(item);
4859 }
4860 }
4861
4862 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4872 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4873
4874 loop {
4882 let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4883 Caller::Host { .. } => break true,
4884 Caller::Guest { thread, instance } => {
4885 if *instance == guest_instance.index {
4886 break false;
4887 } else {
4888 *thread
4889 }
4890 }
4891 };
4892 guest_task = next_thread.task;
4893 }
4894 }
4895
4896 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4898 let thread = self.guest_thread.unwrap();
4899 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4900 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4901 Ok(val)
4902 }
4903
4904 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4906 let thread = self.guest_thread.unwrap();
4907 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4908 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4909 Ok(())
4910 }
4911
4912 fn take_pending_cancellation(&mut self) -> bool {
4915 let thread = self.guest_thread.unwrap();
4916 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4917 assert!(matches!(event, Event::Cancelled));
4918 true
4919 } else {
4920 false
4921 }
4922 }
4923
4924 fn check_blocking(&mut self) -> Result<()> {
4925 let task = self.guest_thread.unwrap().task;
4926 self.check_blocking_for(task)
4927 }
4928
4929 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4930 if self.may_block(task) {
4931 Ok(())
4932 } else {
4933 Err(Trap::CannotBlockSyncTask.into())
4934 }
4935 }
4936
4937 fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4938 let task = self.get_mut(task).unwrap();
4939 task.async_function || task.returned_or_cancelled()
4940 }
4941}
4942
4943fn for_any_lower<
4946 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4947>(
4948 fun: F,
4949) -> F {
4950 fun
4951}
4952
4953fn for_any_lift<
4955 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4956>(
4957 fun: F,
4958) -> F {
4959 fun
4960}
4961
4962fn checked<F: Future + Send + 'static>(
4967 id: StoreId,
4968 fut: F,
4969) -> impl Future<Output = F::Output> + Send + 'static {
4970 async move {
4971 let mut fut = pin!(fut);
4972 future::poll_fn(move |cx| {
4973 let message = "\
4974 `Future`s which depend on asynchronous component tasks, streams, or \
4975 futures to complete may only be polled from the event loop of the \
4976 store to which they belong. Please use \
4977 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4978 ";
4979 tls::try_get(|store| {
4980 let matched = match store {
4981 tls::TryGet::Some(store) => store.id() == id,
4982 tls::TryGet::Taken | tls::TryGet::None => false,
4983 };
4984
4985 if !matched {
4986 panic!("{message}")
4987 }
4988 });
4989 fut.as_mut().poll(cx)
4990 })
4991 .await
4992 }
4993}
4994
4995fn check_recursive_run() {
4998 tls::try_get(|store| {
4999 if !matches!(store, tls::TryGet::None) {
5000 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5001 }
5002 });
5003}
5004
5005fn unpack_callback_code(code: u32) -> (u32, u32) {
5006 (code & 0xF, code >> 4)
5007}
5008
5009struct WaitableCheckParams {
5013 set: TableId<WaitableSet>,
5014 options: OptionsIndex,
5015 payload: u32,
5016}
5017
5018enum WaitableCheck {
5021 Wait,
5022 Poll,
5023}
5024
5025pub(crate) struct PreparedCall<R> {
5027 handle: Func,
5029 thread: QualifiedThreadId,
5031 param_count: usize,
5033 rx: oneshot::Receiver<LiftedResult>,
5036 exit_rx: oneshot::Receiver<()>,
5039 _phantom: PhantomData<R>,
5040}
5041
5042impl<R> PreparedCall<R> {
5043 pub(crate) fn task_id(&self) -> TaskId {
5045 TaskId {
5046 task: self.thread.task,
5047 }
5048 }
5049}
5050
5051pub(crate) struct TaskId {
5053 task: TableId<GuestTask>,
5054}
5055
5056impl TaskId {
5057 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5063 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5064 if !task.already_lowered_parameters() {
5065 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5066 } else {
5067 task.host_future_state = HostFutureState::Dropped;
5068 if task.ready_to_delete() {
5069 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5070 }
5071 }
5072 Ok(())
5073 }
5074}
5075
5076pub(crate) fn prepare_call<T, R>(
5082 mut store: StoreContextMut<T>,
5083 handle: Func,
5084 param_count: usize,
5085 host_future_present: bool,
5086 call_post_return_automatically: bool,
5087 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5088 + Send
5089 + Sync
5090 + 'static,
5091 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5092 + Send
5093 + Sync
5094 + 'static,
5095) -> Result<PreparedCall<R>> {
5096 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5097
5098 let instance = handle.instance().id().get(store.0);
5099 let options = &instance.component().env_component().options[options];
5100 let ty = &instance.component().types()[ty];
5101 let async_function = ty.async_;
5102 let task_return_type = ty.results;
5103 let component_instance = raw_options.instance;
5104 let callback = options.callback.map(|i| instance.runtime_callback(i));
5105 let memory = options
5106 .memory()
5107 .map(|i| instance.runtime_memory(i))
5108 .map(SendSyncPtr::new);
5109 let string_encoding = options.string_encoding;
5110 let token = StoreToken::new(store.as_context_mut());
5111 let state = store.0.concurrent_state_mut();
5112
5113 let (tx, rx) = oneshot::channel();
5114 let (exit_tx, exit_rx) = oneshot::channel();
5115
5116 let mut task = GuestTask::new(
5117 state,
5118 Box::new(for_any_lower(move |store, params| {
5119 lower_params(handle, token.as_context_mut(store), params)
5120 })),
5121 LiftResult {
5122 lift: Box::new(for_any_lift(move |store, result| {
5123 lift_result(handle, store, result)
5124 })),
5125 ty: task_return_type,
5126 memory,
5127 string_encoding,
5128 },
5129 Caller::Host {
5130 tx: Some(tx),
5131 exit_tx: Arc::new(exit_tx),
5132 host_future_present,
5133 call_post_return_automatically,
5134 },
5135 callback.map(|callback| {
5136 let callback = SendSyncPtr::new(callback);
5137 let instance = handle.instance();
5138 Box::new(
5139 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5140 let store = token.as_context_mut(store);
5141 unsafe {
5144 instance.call_callback(
5145 store,
5146 runtime_instance,
5147 callback,
5148 event,
5149 handle,
5150 call_post_return_automatically,
5151 )
5152 }
5153 },
5154 ) as CallbackFn
5155 }),
5156 handle.instance(),
5157 component_instance,
5158 async_function,
5159 )?;
5160 task.function_index = Some(handle.index());
5161
5162 let task = state.push(task)?;
5163 let thread = state.push(GuestThread::new_implicit(task))?;
5164 state.get_mut(task)?.threads.insert(thread);
5165
5166 Ok(PreparedCall {
5167 handle,
5168 thread: QualifiedThreadId { task, thread },
5169 param_count,
5170 rx,
5171 exit_rx,
5172 _phantom: PhantomData,
5173 })
5174}
5175
5176pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5183 mut store: StoreContextMut<T>,
5184 prepared: PreparedCall<R>,
5185) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5186 let PreparedCall {
5187 handle,
5188 thread,
5189 param_count,
5190 rx,
5191 exit_rx,
5192 ..
5193 } = prepared;
5194
5195 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5196
5197 Ok(checked(
5198 store.0.id(),
5199 rx.map(move |result| {
5200 result
5201 .map(|v| (*v.downcast().unwrap(), exit_rx))
5202 .map_err(anyhow::Error::from)
5203 }),
5204 ))
5205}
5206
5207fn queue_call0<T: 'static>(
5210 store: StoreContextMut<T>,
5211 handle: Func,
5212 guest_thread: QualifiedThreadId,
5213 param_count: usize,
5214) -> Result<()> {
5215 let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5216 let is_concurrent = raw_options.async_;
5217 let callback = raw_options.callback;
5218 let instance = handle.instance();
5219 let callee = handle.lifted_core_func(store.0);
5220 let post_return = handle.post_return_core_func(store.0);
5221 let callback = callback.map(|i| {
5222 let instance = instance.id().get(store.0);
5223 SendSyncPtr::new(instance.runtime_callback(i))
5224 });
5225
5226 log::trace!("queueing call {guest_thread:?}");
5227
5228 let instance_flags = if callback.is_none() {
5229 None
5230 } else {
5231 Some(flags)
5232 };
5233
5234 unsafe {
5238 instance.queue_call(
5239 store,
5240 guest_thread,
5241 SendSyncPtr::new(callee),
5242 param_count,
5243 1,
5244 instance_flags,
5245 is_concurrent,
5246 callback,
5247 post_return.map(SendSyncPtr::new),
5248 )
5249 }
5250}