1use crate::component::func::{self, Func};
54use crate::component::store::StoreComponentInstanceId;
55use crate::component::{
56 ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{
61 CallContext, ComponentInstance, HandleTable, InstanceFlags, ResourceTables,
62};
63use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
64use crate::{
65 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
66 bail,
67 error::{Context as _, format_err},
68};
69use error_contexts::GlobalErrorContextRefCount;
70use futures::channel::oneshot;
71use futures::future::{self, Either, FutureExt};
72use futures::stream::{FuturesUnordered, StreamExt};
73use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
74use std::any::Any;
75use std::borrow::ToOwned;
76use std::boxed::Box;
77use std::cell::UnsafeCell;
78use std::collections::{BTreeMap, BTreeSet, HashSet};
79use std::fmt;
80use std::future::Future;
81use std::marker::PhantomData;
82use std::mem::{self, ManuallyDrop, MaybeUninit};
83use std::ops::DerefMut;
84use std::pin::{Pin, pin};
85use std::ptr::{self, NonNull};
86use std::slice;
87use std::sync::Arc;
88use std::task::{Context, Poll, Waker};
89use std::vec::Vec;
90use table::{TableDebug, TableId};
91use wasmtime_environ::Trap;
92use wasmtime_environ::component::{
93 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
94 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
95 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
96 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
97 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
98};
99
100pub use abort::JoinHandle;
101pub use future_stream_any::{FutureAny, StreamAny};
102pub use futures_and_streams::{
103 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
104 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
105 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
106};
107pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
108
109mod abort;
110mod error_contexts;
111mod future_stream_any;
112mod futures_and_streams;
113pub(crate) mod table;
114pub(crate) mod tls;
115
116const BLOCKED: u32 = 0xffff_ffff;
119
120#[derive(Clone, Copy, Eq, PartialEq, Debug)]
122pub enum Status {
123 Starting = 0,
124 Started = 1,
125 Returned = 2,
126 StartCancelled = 3,
127 ReturnCancelled = 4,
128}
129
130impl Status {
131 pub fn pack(self, waitable: Option<u32>) -> u32 {
137 assert!(matches!(self, Status::Returned) == waitable.is_none());
138 let waitable = waitable.unwrap_or(0);
139 assert!(waitable < (1 << 28));
140 (waitable << 4) | (self as u32)
141 }
142}
143
144#[derive(Clone, Copy, Debug)]
147enum Event {
148 None,
149 Cancelled,
150 Subtask {
151 status: Status,
152 },
153 StreamRead {
154 code: ReturnCode,
155 pending: Option<(TypeStreamTableIndex, u32)>,
156 },
157 StreamWrite {
158 code: ReturnCode,
159 pending: Option<(TypeStreamTableIndex, u32)>,
160 },
161 FutureRead {
162 code: ReturnCode,
163 pending: Option<(TypeFutureTableIndex, u32)>,
164 },
165 FutureWrite {
166 code: ReturnCode,
167 pending: Option<(TypeFutureTableIndex, u32)>,
168 },
169}
170
171impl Event {
172 fn parts(self) -> (u32, u32) {
177 const EVENT_NONE: u32 = 0;
178 const EVENT_SUBTASK: u32 = 1;
179 const EVENT_STREAM_READ: u32 = 2;
180 const EVENT_STREAM_WRITE: u32 = 3;
181 const EVENT_FUTURE_READ: u32 = 4;
182 const EVENT_FUTURE_WRITE: u32 = 5;
183 const EVENT_CANCELLED: u32 = 6;
184 match self {
185 Event::None => (EVENT_NONE, 0),
186 Event::Cancelled => (EVENT_CANCELLED, 0),
187 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
188 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
189 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
190 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
191 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
192 }
193 }
194}
195
196mod callback_code {
198 pub const EXIT: u32 = 0;
199 pub const YIELD: u32 = 1;
200 pub const WAIT: u32 = 2;
201}
202
203const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
207
208pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
214 store: StoreContextMut<'a, T>,
215 get_data: fn(&mut T) -> D::Data<'_>,
216}
217
218impl<'a, T, D> Access<'a, T, D>
219where
220 D: HasData + ?Sized,
221 T: 'static,
222{
223 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
225 Self { store, get_data }
226 }
227
228 pub fn data_mut(&mut self) -> &mut T {
230 self.store.data_mut()
231 }
232
233 pub fn get(&mut self) -> D::Data<'_> {
235 (self.get_data)(self.data_mut())
236 }
237
238 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
242 where
243 T: 'static,
244 {
245 let accessor = Accessor {
246 get_data: self.get_data,
247 token: StoreToken::new(self.store.as_context_mut()),
248 };
249 self.store
250 .as_context_mut()
251 .spawn_with_accessor(accessor, task)
252 }
253
254 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
257 self.get_data
258 }
259}
260
261impl<'a, T, D> AsContext for Access<'a, T, D>
262where
263 D: HasData + ?Sized,
264 T: 'static,
265{
266 type Data = T;
267
268 fn as_context(&self) -> StoreContext<'_, T> {
269 self.store.as_context()
270 }
271}
272
273impl<'a, T, D> AsContextMut for Access<'a, T, D>
274where
275 D: HasData + ?Sized,
276 T: 'static,
277{
278 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
279 self.store.as_context_mut()
280 }
281}
282
283pub struct Accessor<T: 'static, D = HasSelf<T>>
343where
344 D: HasData + ?Sized,
345{
346 token: StoreToken<T>,
347 get_data: fn(&mut T) -> D::Data<'_>,
348}
349
350pub trait AsAccessor {
367 type Data: 'static;
369
370 type AccessorData: HasData + ?Sized;
373
374 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
376}
377
378impl<T: AsAccessor + ?Sized> AsAccessor for &T {
379 type Data = T::Data;
380 type AccessorData = T::AccessorData;
381
382 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
383 T::as_accessor(self)
384 }
385}
386
387impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
388 type Data = T;
389 type AccessorData = D;
390
391 fn as_accessor(&self) -> &Accessor<T, D> {
392 self
393 }
394}
395
396const _: () = {
419 const fn assert<T: Send + Sync>() {}
420 assert::<Accessor<UnsafeCell<u32>>>();
421};
422
423impl<T> Accessor<T> {
424 pub(crate) fn new(token: StoreToken<T>) -> Self {
433 Self {
434 token,
435 get_data: |x| x,
436 }
437 }
438}
439
440impl<T, D> Accessor<T, D>
441where
442 D: HasData + ?Sized,
443{
444 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
462 tls::get(|vmstore| {
463 fun(Access {
464 store: self.token.as_context_mut(vmstore),
465 get_data: self.get_data,
466 })
467 })
468 }
469
470 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
473 self.get_data
474 }
475
476 pub fn with_getter<D2: HasData>(
493 &self,
494 get_data: fn(&mut T) -> D2::Data<'_>,
495 ) -> Accessor<T, D2> {
496 Accessor {
497 token: self.token,
498 get_data,
499 }
500 }
501
502 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
518 where
519 T: 'static,
520 {
521 let accessor = self.clone_for_spawn();
522 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
523 }
524
525 fn clone_for_spawn(&self) -> Self {
526 Self {
527 token: self.token,
528 get_data: self.get_data,
529 }
530 }
531}
532
533pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
545where
546 D: HasData + ?Sized,
547{
548 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
550}
551
552enum CallerInfo {
555 Async {
557 params: Vec<ValRaw>,
558 has_result: bool,
559 },
560 Sync {
562 params: Vec<ValRaw>,
563 result_count: u32,
564 },
565}
566
567enum WaitMode {
569 Fiber(StoreFiber<'static>),
571 Callback(Instance),
574}
575
576#[derive(Debug)]
578enum SuspendReason {
579 Waiting {
582 set: TableId<WaitableSet>,
583 thread: QualifiedThreadId,
584 skip_may_block_check: bool,
585 },
586 NeedWork,
589 Yielding {
592 thread: QualifiedThreadId,
593 skip_may_block_check: bool,
594 },
595 ExplicitlySuspending {
597 thread: QualifiedThreadId,
598 skip_may_block_check: bool,
599 },
600}
601
602enum GuestCallKind {
604 DeliverEvent {
607 instance: Instance,
609 set: Option<TableId<WaitableSet>>,
614 },
615 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
621 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
622}
623
624impl fmt::Debug for GuestCallKind {
625 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
626 match self {
627 Self::DeliverEvent { instance, set } => f
628 .debug_struct("DeliverEvent")
629 .field("instance", instance)
630 .field("set", set)
631 .finish(),
632 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
633 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
634 }
635 }
636}
637
638#[derive(Debug)]
640struct GuestCall {
641 thread: QualifiedThreadId,
642 kind: GuestCallKind,
643}
644
645impl GuestCall {
646 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
656 let instance = store
657 .concurrent_state_mut()
658 .get_mut(self.thread.task)?
659 .instance;
660 let state = store.instance_state(instance);
661
662 let ready = match &self.kind {
663 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
664 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
665 GuestCallKind::StartExplicit(_) => true,
666 };
667 log::trace!(
668 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
669 state.do_not_enter,
670 state.backpressure
671 );
672 Ok(ready)
673 }
674}
675
676enum WorkerItem {
678 GuestCall(GuestCall),
679 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
680}
681
682enum WorkItem {
685 PushFuture(AlwaysMut<HostTaskFuture>),
687 ResumeFiber(StoreFiber<'static>),
689 GuestCall(GuestCall),
691 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693}
694
695impl fmt::Debug for WorkItem {
696 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
697 match self {
698 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
699 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
700 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
701 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
702 }
703 }
704}
705
706#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
708pub(crate) enum WaitResult {
709 Cancelled,
710 Completed,
711}
712
713pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
721 store: &mut dyn VMStore,
722 future: impl Future<Output = Result<R>> + Send + 'static,
723 caller_instance: RuntimeComponentInstanceIndex,
724) -> Result<R> {
725 let state = store.concurrent_state_mut();
726
727 let Some(caller) = state.guest_thread else {
731 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
732 Poll::Ready(result) => result,
733 Poll::Pending => {
734 unreachable!()
735 }
736 };
737 };
738
739 let old_result = state
742 .get_mut(caller.task)
743 .with_context(|| format!("bad handle: {caller:?}"))?
744 .result
745 .take();
746
747 let task = state.push(HostTask::new(caller_instance, None))?;
751
752 log::trace!("new host task child of {caller:?}: {task:?}");
753
754 let mut future = Box::pin(async move {
758 let result = future.await?;
759 tls::get(move |store| {
760 let state = store.concurrent_state_mut();
761 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
762
763 Waitable::Host(task).set_event(
764 state,
765 Some(Event::Subtask {
766 status: Status::Returned,
767 }),
768 )?;
769
770 Ok(())
771 })
772 }) as HostTaskFuture;
773
774 let poll = tls::set(store, || {
778 future
779 .as_mut()
780 .poll(&mut Context::from_waker(&Waker::noop()))
781 });
782
783 match poll {
784 Poll::Ready(result) => {
785 result?;
787 log::trace!("delete host task {task:?} (already ready)");
788 store.concurrent_state_mut().delete(task)?;
789 }
790 Poll::Pending => {
791 let state = store.concurrent_state_mut();
796 state.push_future(future);
797
798 let set = state.get_mut(caller.task)?.sync_call_set;
799 Waitable::Host(task).join(state, Some(set))?;
800
801 store.suspend(SuspendReason::Waiting {
802 set,
803 thread: caller,
804 skip_may_block_check: false,
805 })?;
806 }
807 }
808
809 Ok(*mem::replace(
811 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
812 old_result,
813 )
814 .unwrap()
815 .downcast()
816 .unwrap())
817}
818
819fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821 let mut next = Some(call);
822 while let Some(call) = next.take() {
823 match call.kind {
824 GuestCallKind::DeliverEvent { instance, set } => {
825 let (event, waitable) = instance
826 .get_event(store, call.thread.task, set, true)?
827 .unwrap();
828 let state = store.concurrent_state_mut();
829 let task = state.get_mut(call.thread.task)?;
830 let runtime_instance = task.instance;
831 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
832
833 log::trace!(
834 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
835 call.thread,
836 );
837
838 let old_thread = store.set_thread(Some(call.thread));
839 log::trace!(
840 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
841 call.thread
842 );
843
844 store.maybe_push_call_context(call.thread.task)?;
845
846 store.enter_instance(runtime_instance);
847
848 let callback = store
849 .concurrent_state_mut()
850 .get_mut(call.thread.task)?
851 .callback
852 .take()
853 .unwrap();
854
855 let code = callback(store, runtime_instance.index, event, handle)?;
856
857 store
858 .concurrent_state_mut()
859 .get_mut(call.thread.task)?
860 .callback = Some(callback);
861
862 store.exit_instance(runtime_instance)?;
863
864 store.maybe_pop_call_context(call.thread.task)?;
865
866 store.set_thread(old_thread);
867
868 next = instance.handle_callback_code(
869 store,
870 call.thread,
871 runtime_instance.index,
872 code,
873 )?;
874
875 log::trace!(
876 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
877 );
878 }
879 GuestCallKind::StartImplicit(fun) => {
880 next = fun(store)?;
881 }
882 GuestCallKind::StartExplicit(fun) => {
883 fun(store)?;
884 }
885 }
886 }
887
888 Ok(())
889}
890
891impl<T> Store<T> {
892 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
894 where
895 T: Send + 'static,
896 {
897 self.as_context_mut().run_concurrent(fun).await
898 }
899
900 #[doc(hidden)]
901 pub fn assert_concurrent_state_empty(&mut self) {
902 self.as_context_mut().assert_concurrent_state_empty();
903 }
904
905 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
907 where
908 T: 'static,
909 {
910 self.as_context_mut().spawn(task)
911 }
912}
913
914impl<T> StoreContextMut<'_, T> {
915 #[doc(hidden)]
924 pub fn assert_concurrent_state_empty(self) {
925 let store = self.0;
926 store
927 .store_data_mut()
928 .components
929 .assert_instance_states_empty();
930 let state = store.concurrent_state_mut();
931 assert!(
932 state.table.get_mut().is_empty(),
933 "non-empty table: {:?}",
934 state.table.get_mut()
935 );
936 assert!(state.high_priority.is_empty());
937 assert!(state.low_priority.is_empty());
938 assert!(state.guest_thread.is_none());
939 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
940 assert!(state.global_error_context_ref_counts.is_empty());
941 }
942
943 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
953 where
954 T: 'static,
955 {
956 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
957 self.spawn_with_accessor(accessor, task)
958 }
959
960 fn spawn_with_accessor<D>(
963 self,
964 accessor: Accessor<T, D>,
965 task: impl AccessorTask<T, D>,
966 ) -> JoinHandle
967 where
968 T: 'static,
969 D: HasData + ?Sized,
970 {
971 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
975 self.0
976 .concurrent_state_mut()
977 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
978 handle
979 }
980
981 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1062 where
1063 T: Send + 'static,
1064 {
1065 self.do_run_concurrent(fun, false).await
1066 }
1067
1068 pub(super) async fn run_concurrent_trap_on_idle<R>(
1069 self,
1070 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1071 ) -> Result<R>
1072 where
1073 T: Send + 'static,
1074 {
1075 self.do_run_concurrent(fun, true).await
1076 }
1077
1078 async fn do_run_concurrent<R>(
1079 mut self,
1080 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1081 trap_on_idle: bool,
1082 ) -> Result<R>
1083 where
1084 T: Send + 'static,
1085 {
1086 check_recursive_run();
1087 let token = StoreToken::new(self.as_context_mut());
1088
1089 struct Dropper<'a, T: 'static, V> {
1090 store: StoreContextMut<'a, T>,
1091 value: ManuallyDrop<V>,
1092 }
1093
1094 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1095 fn drop(&mut self) {
1096 tls::set(self.store.0, || {
1097 unsafe { ManuallyDrop::drop(&mut self.value) }
1102 });
1103 }
1104 }
1105
1106 let accessor = &Accessor::new(token);
1107 let dropper = &mut Dropper {
1108 store: self,
1109 value: ManuallyDrop::new(fun(accessor)),
1110 };
1111 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1113
1114 dropper
1115 .store
1116 .as_context_mut()
1117 .poll_until(future, trap_on_idle)
1118 .await
1119 }
1120
1121 async fn poll_until<R>(
1127 mut self,
1128 mut future: Pin<&mut impl Future<Output = R>>,
1129 trap_on_idle: bool,
1130 ) -> Result<R>
1131 where
1132 T: Send + 'static,
1133 {
1134 struct Reset<'a, T: 'static> {
1135 store: StoreContextMut<'a, T>,
1136 futures: Option<FuturesUnordered<HostTaskFuture>>,
1137 }
1138
1139 impl<'a, T> Drop for Reset<'a, T> {
1140 fn drop(&mut self) {
1141 if let Some(futures) = self.futures.take() {
1142 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1143 }
1144 }
1145 }
1146
1147 loop {
1148 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1152 let mut reset = Reset {
1153 store: self.as_context_mut(),
1154 futures,
1155 };
1156 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1157
1158 let result = future::poll_fn(|cx| {
1159 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1162 return Poll::Ready(Ok(Either::Left(value)));
1163 }
1164
1165 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1169 Poll::Ready(Some(output)) => {
1170 match output {
1171 Err(e) => return Poll::Ready(Err(e)),
1172 Ok(()) => {}
1173 }
1174 Poll::Ready(true)
1175 }
1176 Poll::Ready(None) => Poll::Ready(false),
1177 Poll::Pending => Poll::Pending,
1178 };
1179
1180 let state = reset.store.0.concurrent_state_mut();
1183 let ready = mem::take(&mut state.high_priority);
1184 let ready = if ready.is_empty() {
1185 let ready = mem::take(&mut state.low_priority);
1188 if ready.is_empty() {
1189 return match next {
1190 Poll::Ready(true) => {
1191 Poll::Ready(Ok(Either::Right(Vec::new())))
1197 }
1198 Poll::Ready(false) => {
1199 if let Poll::Ready(value) =
1203 tls::set(reset.store.0, || future.as_mut().poll(cx))
1204 {
1205 Poll::Ready(Ok(Either::Left(value)))
1206 } else {
1207 if trap_on_idle {
1213 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1216 } else {
1217 Poll::Pending
1221 }
1222 }
1223 }
1224 Poll::Pending => Poll::Pending,
1229 };
1230 } else {
1231 ready
1232 }
1233 } else {
1234 ready
1235 };
1236
1237 Poll::Ready(Ok(Either::Right(ready)))
1238 })
1239 .await;
1240
1241 drop(reset);
1245
1246 match result? {
1247 Either::Left(value) => break Ok(value),
1250 Either::Right(ready) => {
1253 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1254 store: StoreContextMut<'a, T>,
1255 ready: I,
1256 }
1257
1258 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1259 fn drop(&mut self) {
1260 while let Some(item) = self.ready.next() {
1261 match item {
1262 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1263 WorkItem::PushFuture(future) => {
1264 tls::set(self.store.0, move || drop(future))
1265 }
1266 _ => {}
1267 }
1268 }
1269 }
1270 }
1271
1272 let mut dispose = Dispose {
1273 store: self.as_context_mut(),
1274 ready: ready.into_iter(),
1275 };
1276
1277 while let Some(item) = dispose.ready.next() {
1278 dispose
1279 .store
1280 .as_context_mut()
1281 .handle_work_item(item)
1282 .await?;
1283 }
1284 }
1285 }
1286 }
1287 }
1288
1289 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1291 where
1292 T: Send,
1293 {
1294 log::trace!("handle work item {item:?}");
1295 match item {
1296 WorkItem::PushFuture(future) => {
1297 self.0
1298 .concurrent_state_mut()
1299 .futures
1300 .get_mut()
1301 .as_mut()
1302 .unwrap()
1303 .push(future.into_inner());
1304 }
1305 WorkItem::ResumeFiber(fiber) => {
1306 self.0.resume_fiber(fiber).await?;
1307 }
1308 WorkItem::GuestCall(call) => {
1309 if call.is_ready(self.0)? {
1310 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1311 } else {
1312 let state = self.0.concurrent_state_mut();
1313 let task = state.get_mut(call.thread.task)?;
1314 if !task.starting_sent {
1315 task.starting_sent = true;
1316 if let GuestCallKind::StartImplicit(_) = &call.kind {
1317 Waitable::Guest(call.thread.task).set_event(
1318 state,
1319 Some(Event::Subtask {
1320 status: Status::Starting,
1321 }),
1322 )?;
1323 }
1324 }
1325
1326 let instance = state.get_mut(call.thread.task)?.instance;
1327 self.0
1328 .instance_state(instance)
1329 .pending
1330 .insert(call.thread, call.kind);
1331 }
1332 }
1333 WorkItem::WorkerFunction(fun) => {
1334 self.run_on_worker(WorkerItem::Function(fun)).await?;
1335 }
1336 }
1337
1338 Ok(())
1339 }
1340
1341 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1343 where
1344 T: Send,
1345 {
1346 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1347 fiber
1348 } else {
1349 fiber::make_fiber(self.0, move |store| {
1350 loop {
1351 match store.concurrent_state_mut().worker_item.take().unwrap() {
1352 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1353 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1354 }
1355
1356 store.suspend(SuspendReason::NeedWork)?;
1357 }
1358 })?
1359 };
1360
1361 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1362 assert!(worker_item.is_none());
1363 *worker_item = Some(item);
1364
1365 self.0.resume_fiber(worker).await
1366 }
1367
1368 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1373 where
1374 T: 'static,
1375 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1376 + Send
1377 + Sync
1378 + 'static,
1379 R: Send + Sync + 'static,
1380 {
1381 let token = StoreToken::new(self);
1382 async move {
1383 let mut accessor = Accessor::new(token);
1384 closure(&mut accessor).await
1385 }
1386 }
1387}
1388
1389#[derive(Debug, Copy, Clone)]
1390pub struct RuntimeInstance {
1391 pub instance: ComponentInstanceId,
1392 pub index: RuntimeComponentInstanceIndex,
1393}
1394
1395impl StoreOpaque {
1396 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1399 StoreComponentInstanceId::new(self.id(), instance.instance)
1400 .get_mut(self)
1401 .instance_state(instance.index)
1402 .concurrent_state()
1403 }
1404
1405 fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1408 StoreComponentInstanceId::new(self.id(), instance.instance)
1409 .get_mut(self)
1410 .instance_state(instance.index)
1411 .handle_table()
1412 }
1413
1414 fn set_thread(&mut self, thread: Option<QualifiedThreadId>) -> Option<QualifiedThreadId> {
1415 let state = self.concurrent_state_mut();
1420 let old_thread = state.guest_thread.take();
1421 if let Some(old_thread) = old_thread {
1422 let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1423 StoreComponentInstanceId::new(self.id(), instance)
1424 .get_mut(self)
1425 .set_task_may_block(false)
1426 }
1427
1428 self.concurrent_state_mut().guest_thread = thread;
1429
1430 if thread.is_some() {
1433 self.set_task_may_block();
1434 }
1435
1436 old_thread
1437 }
1438
1439 fn set_task_may_block(&mut self) {
1442 let state = self.concurrent_state_mut();
1443 let guest_thread = state.guest_thread.unwrap();
1444 let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1445 let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1446 StoreComponentInstanceId::new(self.id(), instance)
1447 .get_mut(self)
1448 .set_task_may_block(may_block)
1449 }
1450
1451 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1452 let state = self.concurrent_state_mut();
1453 let task = state.guest_thread.unwrap().task;
1454 let instance = state.get_mut(task).unwrap().instance.instance;
1455 let task_may_block = StoreComponentInstanceId::new(self.id(), instance)
1456 .get_mut(self)
1457 .get_task_may_block();
1458
1459 if task_may_block {
1460 Ok(())
1461 } else {
1462 Err(Trap::CannotBlockSyncTask.into())
1463 }
1464 }
1465
1466 fn enter_instance(&mut self, instance: RuntimeInstance) {
1470 log::trace!("enter {instance:?}");
1471 self.instance_state(instance).do_not_enter = true;
1472 }
1473
1474 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1478 log::trace!("exit {instance:?}");
1479 self.instance_state(instance).do_not_enter = false;
1480 self.partition_pending(instance)
1481 }
1482
1483 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1488 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1489 let call = GuestCall { thread, kind };
1490 if call.is_ready(self)? {
1491 self.concurrent_state_mut()
1492 .push_high_priority(WorkItem::GuestCall(call));
1493 } else {
1494 self.instance_state(instance)
1495 .pending
1496 .insert(call.thread, call.kind);
1497 }
1498 }
1499
1500 Ok(())
1501 }
1502
1503 pub(crate) fn backpressure_modify(
1505 &mut self,
1506 caller_instance: RuntimeInstance,
1507 modify: impl FnOnce(u16) -> Option<u16>,
1508 ) -> Result<()> {
1509 let state = self.instance_state(caller_instance);
1510 let old = state.backpressure;
1511 let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1512 state.backpressure = new;
1513
1514 if old > 0 && new == 0 {
1515 self.partition_pending(caller_instance)?;
1518 }
1519
1520 Ok(())
1521 }
1522
1523 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1526 let old_thread = self.concurrent_state_mut().guest_thread;
1527 log::trace!("resume_fiber: save current thread {old_thread:?}");
1528
1529 let fiber = fiber::resolve_or_release(self, fiber).await?;
1530
1531 self.set_thread(old_thread);
1532
1533 let state = self.concurrent_state_mut();
1534
1535 if let Some(ref ot) = old_thread {
1536 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1537 }
1538 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1539
1540 if let Some(mut fiber) = fiber {
1541 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1542 match state.suspend_reason.take().unwrap() {
1544 SuspendReason::NeedWork => {
1545 if state.worker.is_none() {
1546 state.worker = Some(fiber);
1547 } else {
1548 fiber.dispose(self);
1549 }
1550 }
1551 SuspendReason::Yielding { thread, .. } => {
1552 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1553 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1554 }
1555 SuspendReason::ExplicitlySuspending { thread, .. } => {
1556 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1557 }
1558 SuspendReason::Waiting { set, thread, .. } => {
1559 let old = state
1560 .get_mut(set)?
1561 .waiting
1562 .insert(thread, WaitMode::Fiber(fiber));
1563 assert!(old.is_none());
1564 }
1565 };
1566 } else {
1567 log::trace!("resume_fiber: fiber has exited");
1568 }
1569
1570 Ok(())
1571 }
1572
1573 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1579 log::trace!("suspend fiber: {reason:?}");
1580
1581 let task = match &reason {
1585 SuspendReason::Yielding { thread, .. }
1586 | SuspendReason::Waiting { thread, .. }
1587 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1588 SuspendReason::NeedWork => None,
1589 };
1590
1591 let old_guest_thread = if let Some(task) = task {
1592 self.maybe_pop_call_context(task)?;
1593 self.concurrent_state_mut().guest_thread
1594 } else {
1595 None
1596 };
1597
1598 assert!(
1604 matches!(
1605 reason,
1606 SuspendReason::ExplicitlySuspending {
1607 skip_may_block_check: true,
1608 ..
1609 } | SuspendReason::Waiting {
1610 skip_may_block_check: true,
1611 ..
1612 } | SuspendReason::Yielding {
1613 skip_may_block_check: true,
1614 ..
1615 }
1616 ) || old_guest_thread
1617 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1618 .unwrap_or(true)
1619 );
1620
1621 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1622 assert!(suspend_reason.is_none());
1623 *suspend_reason = Some(reason);
1624
1625 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1626
1627 if let Some(task) = task {
1628 self.set_thread(old_guest_thread);
1629 self.maybe_push_call_context(task)?;
1630 }
1631
1632 Ok(())
1633 }
1634
1635 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1639 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1640
1641 if !task.returned_or_cancelled() {
1642 log::trace!("push call context for {guest_task:?}");
1643 let call_context = task.call_context.take().unwrap();
1644 self.component_resource_state().0.push(call_context);
1645 }
1646 Ok(())
1647 }
1648
1649 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1653 if !self
1654 .concurrent_state_mut()
1655 .get_mut(guest_task)?
1656 .returned_or_cancelled()
1657 {
1658 log::trace!("pop call context for {guest_task:?}");
1659 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1660 self.concurrent_state_mut()
1661 .get_mut(guest_task)?
1662 .call_context = call_context;
1663 }
1664 Ok(())
1665 }
1666
1667 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1668 let state = self.concurrent_state_mut();
1669 let caller = state.guest_thread.unwrap();
1670 let old_set = waitable.common(state)?.set;
1671 let set = state.get_mut(caller.task)?.sync_call_set;
1672 waitable.join(state, Some(set))?;
1673 self.suspend(SuspendReason::Waiting {
1674 set,
1675 thread: caller,
1676 skip_may_block_check: false,
1677 })?;
1678 let state = self.concurrent_state_mut();
1679 waitable.join(state, old_set)
1680 }
1681}
1682
1683impl Instance {
1684 fn get_event(
1687 self,
1688 store: &mut StoreOpaque,
1689 guest_task: TableId<GuestTask>,
1690 set: Option<TableId<WaitableSet>>,
1691 cancellable: bool,
1692 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1693 let state = store.concurrent_state_mut();
1694
1695 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1696 log::trace!("deliver event {event:?} to {guest_task:?}");
1697
1698 if cancellable || !matches!(event, Event::Cancelled) {
1699 return Ok(Some((event, None)));
1700 } else {
1701 state.get_mut(guest_task)?.event = Some(event);
1702 }
1703 }
1704
1705 Ok(
1706 if let Some((set, waitable)) = set
1707 .and_then(|set| {
1708 state
1709 .get_mut(set)
1710 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1711 .transpose()
1712 })
1713 .transpose()?
1714 {
1715 let common = waitable.common(state)?;
1716 let handle = common.handle.unwrap();
1717 let event = common.event.take().unwrap();
1718
1719 log::trace!(
1720 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1721 );
1722
1723 waitable.on_delivery(store, self, event);
1724
1725 Some((event, Some((waitable, handle))))
1726 } else {
1727 None
1728 },
1729 )
1730 }
1731
1732 fn handle_callback_code(
1738 self,
1739 store: &mut StoreOpaque,
1740 guest_thread: QualifiedThreadId,
1741 runtime_instance: RuntimeComponentInstanceIndex,
1742 code: u32,
1743 ) -> Result<Option<GuestCall>> {
1744 let (code, set) = unpack_callback_code(code);
1745
1746 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1747
1748 let state = store.concurrent_state_mut();
1749
1750 let get_set = |store: &mut StoreOpaque, handle| {
1751 if handle == 0 {
1752 bail!("invalid waitable-set handle");
1753 }
1754
1755 let set = store
1756 .handle_table(RuntimeInstance {
1757 instance: self.id().instance(),
1758 index: runtime_instance,
1759 })
1760 .waitable_set_rep(handle)?;
1761
1762 Ok(TableId::<WaitableSet>::new(set))
1763 };
1764
1765 Ok(match code {
1766 callback_code::EXIT => {
1767 log::trace!("implicit thread {guest_thread:?} completed");
1768 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1769 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1770 if task.threads.is_empty() && !task.returned_or_cancelled() {
1771 bail!(Trap::NoAsyncResult);
1772 }
1773 match &task.caller {
1774 Caller::Host { .. } => {
1775 if task.ready_to_delete() {
1776 Waitable::Guest(guest_thread.task)
1777 .delete_from(store.concurrent_state_mut())?;
1778 }
1779 }
1780 Caller::Guest { .. } => {
1781 task.exited = true;
1782 task.callback = None;
1783 }
1784 }
1785 None
1786 }
1787 callback_code::YIELD => {
1788 let task = state.get_mut(guest_thread.task)?;
1789 if let Some(event) = task.event {
1794 assert!(matches!(event, Event::None | Event::Cancelled));
1795 } else {
1796 task.event = Some(Event::None);
1797 }
1798 let call = GuestCall {
1799 thread: guest_thread,
1800 kind: GuestCallKind::DeliverEvent {
1801 instance: self,
1802 set: None,
1803 },
1804 };
1805 if state.may_block(guest_thread.task) {
1806 state.push_low_priority(WorkItem::GuestCall(call));
1809 None
1810 } else {
1811 Some(call)
1815 }
1816 }
1817 callback_code::WAIT => {
1818 state.check_blocking_for(guest_thread.task)?;
1821
1822 let set = get_set(store, set)?;
1823 let state = store.concurrent_state_mut();
1824
1825 if state.get_mut(guest_thread.task)?.event.is_some()
1826 || !state.get_mut(set)?.ready.is_empty()
1827 {
1828 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1830 thread: guest_thread,
1831 kind: GuestCallKind::DeliverEvent {
1832 instance: self,
1833 set: Some(set),
1834 },
1835 }));
1836 } else {
1837 let old = state
1845 .get_mut(guest_thread.thread)?
1846 .wake_on_cancel
1847 .replace(set);
1848 assert!(old.is_none());
1849 let old = state
1850 .get_mut(set)?
1851 .waiting
1852 .insert(guest_thread, WaitMode::Callback(self));
1853 assert!(old.is_none());
1854 }
1855 None
1856 }
1857 _ => bail!("unsupported callback code: {code}"),
1858 })
1859 }
1860
1861 fn cleanup_thread(
1862 self,
1863 store: &mut StoreOpaque,
1864 guest_thread: QualifiedThreadId,
1865 runtime_instance: RuntimeComponentInstanceIndex,
1866 ) -> Result<()> {
1867 let guest_id = store
1868 .concurrent_state_mut()
1869 .get_mut(guest_thread.thread)?
1870 .instance_rep;
1871 store
1872 .handle_table(RuntimeInstance {
1873 instance: self.id().instance(),
1874 index: runtime_instance,
1875 })
1876 .guest_thread_remove(guest_id.unwrap())?;
1877
1878 store.concurrent_state_mut().delete(guest_thread.thread)?;
1879 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1880 task.threads.remove(&guest_thread.thread);
1881 Ok(())
1882 }
1883
1884 unsafe fn queue_call<T: 'static>(
1891 self,
1892 mut store: StoreContextMut<T>,
1893 guest_thread: QualifiedThreadId,
1894 callee: SendSyncPtr<VMFuncRef>,
1895 param_count: usize,
1896 result_count: usize,
1897 flags: Option<InstanceFlags>,
1898 async_: bool,
1899 callback: Option<SendSyncPtr<VMFuncRef>>,
1900 post_return: Option<SendSyncPtr<VMFuncRef>>,
1901 ) -> Result<()> {
1902 unsafe fn make_call<T: 'static>(
1917 store: StoreContextMut<T>,
1918 guest_thread: QualifiedThreadId,
1919 callee: SendSyncPtr<VMFuncRef>,
1920 param_count: usize,
1921 result_count: usize,
1922 flags: Option<InstanceFlags>,
1923 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1924 + Send
1925 + Sync
1926 + 'static
1927 + use<T> {
1928 let token = StoreToken::new(store);
1929 move |store: &mut dyn VMStore| {
1930 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1931
1932 store
1933 .concurrent_state_mut()
1934 .get_mut(guest_thread.thread)?
1935 .state = GuestThreadState::Running;
1936 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1937 let may_enter_after_call = task.call_post_return_automatically();
1938 let lower = task.lower_params.take().unwrap();
1939
1940 lower(store, &mut storage[..param_count])?;
1941
1942 let mut store = token.as_context_mut(store);
1943
1944 unsafe {
1947 if let Some(mut flags) = flags {
1948 flags.set_may_enter(false);
1949 }
1950 crate::Func::call_unchecked_raw(
1951 &mut store,
1952 callee.as_non_null(),
1953 NonNull::new(
1954 &mut storage[..param_count.max(result_count)]
1955 as *mut [MaybeUninit<ValRaw>] as _,
1956 )
1957 .unwrap(),
1958 )?;
1959 if let Some(mut flags) = flags {
1960 flags.set_may_enter(may_enter_after_call);
1961 }
1962 }
1963
1964 Ok(storage)
1965 }
1966 }
1967
1968 let call = unsafe {
1972 make_call(
1973 store.as_context_mut(),
1974 guest_thread,
1975 callee,
1976 param_count,
1977 result_count,
1978 flags,
1979 )
1980 };
1981
1982 let callee_instance = store
1983 .0
1984 .concurrent_state_mut()
1985 .get_mut(guest_thread.task)?
1986 .instance;
1987
1988 let fun = if callback.is_some() {
1989 assert!(async_);
1990
1991 Box::new(move |store: &mut dyn VMStore| {
1992 self.add_guest_thread_to_instance_table(
1993 guest_thread.thread,
1994 store,
1995 callee_instance.index,
1996 )?;
1997 let old_thread = store.set_thread(Some(guest_thread));
1998 log::trace!(
1999 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2000 );
2001
2002 store.maybe_push_call_context(guest_thread.task)?;
2003
2004 store.enter_instance(callee_instance);
2005
2006 let storage = call(store)?;
2013
2014 store.exit_instance(callee_instance)?;
2015
2016 store.maybe_pop_call_context(guest_thread.task)?;
2017
2018 store.set_thread(old_thread);
2019 let state = store.concurrent_state_mut();
2020 old_thread
2021 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2022 log::trace!("stackless call: restored {old_thread:?} as current thread");
2023
2024 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2027
2028 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2029 })
2030 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2031 } else {
2032 let token = StoreToken::new(store.as_context_mut());
2033 Box::new(move |store: &mut dyn VMStore| {
2034 self.add_guest_thread_to_instance_table(
2035 guest_thread.thread,
2036 store,
2037 callee_instance.index,
2038 )?;
2039 let old_thread = store.set_thread(Some(guest_thread));
2040 log::trace!(
2041 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2042 );
2043 let mut flags = self.id().get(store).instance_flags(callee_instance.index);
2044
2045 store.maybe_push_call_context(guest_thread.task)?;
2046
2047 if !async_ {
2051 store.enter_instance(callee_instance);
2052 }
2053
2054 let storage = call(store)?;
2061
2062 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2064
2065 if async_ {
2066 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2067 if task.threads.is_empty() && !task.returned_or_cancelled() {
2068 bail!(Trap::NoAsyncResult);
2069 }
2070 } else {
2071 let lift = {
2077 store.exit_instance(callee_instance)?;
2078
2079 let state = store.concurrent_state_mut();
2080 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2081
2082 state
2083 .get_mut(guest_thread.task)?
2084 .lift_result
2085 .take()
2086 .unwrap()
2087 };
2088
2089 let result = (lift.lift)(store, unsafe {
2092 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2093 &storage[..result_count],
2094 )
2095 })?;
2096
2097 let post_return_arg = match result_count {
2098 0 => ValRaw::i32(0),
2099 1 => unsafe { storage[0].assume_init() },
2102 _ => unreachable!(),
2103 };
2104
2105 if store
2106 .concurrent_state_mut()
2107 .get_mut(guest_thread.task)?
2108 .call_post_return_automatically()
2109 {
2110 unsafe {
2111 flags.set_may_leave(false);
2112 flags.set_needs_post_return(false);
2113 }
2114
2115 if let Some(func) = post_return {
2116 let mut store = token.as_context_mut(store);
2117
2118 unsafe {
2124 crate::Func::call_unchecked_raw(
2125 &mut store,
2126 func.as_non_null(),
2127 slice::from_ref(&post_return_arg).into(),
2128 )?;
2129 }
2130 }
2131
2132 unsafe {
2133 flags.set_may_leave(true);
2134 flags.set_may_enter(true);
2135 }
2136 }
2137
2138 self.task_complete(
2139 store,
2140 guest_thread.task,
2141 result,
2142 Status::Returned,
2143 post_return_arg,
2144 )?;
2145 }
2146
2147 store.set_thread(old_thread);
2148
2149 store.maybe_pop_call_context(guest_thread.task)?;
2150
2151 let state = store.concurrent_state_mut();
2152 let task = state.get_mut(guest_thread.task)?;
2153
2154 match &task.caller {
2155 Caller::Host { .. } => {
2156 if task.ready_to_delete() {
2157 Waitable::Guest(guest_thread.task).delete_from(state)?;
2158 }
2159 }
2160 Caller::Guest { .. } => {
2161 task.exited = true;
2162 }
2163 }
2164
2165 Ok(None)
2166 })
2167 };
2168
2169 store
2170 .0
2171 .concurrent_state_mut()
2172 .push_high_priority(WorkItem::GuestCall(GuestCall {
2173 thread: guest_thread,
2174 kind: GuestCallKind::StartImplicit(fun),
2175 }));
2176
2177 Ok(())
2178 }
2179
2180 unsafe fn prepare_call<T: 'static>(
2193 self,
2194 mut store: StoreContextMut<T>,
2195 start: *mut VMFuncRef,
2196 return_: *mut VMFuncRef,
2197 caller_instance: RuntimeComponentInstanceIndex,
2198 callee_instance: RuntimeComponentInstanceIndex,
2199 task_return_type: TypeTupleIndex,
2200 callee_async: bool,
2201 memory: *mut VMMemoryDefinition,
2202 string_encoding: u8,
2203 caller_info: CallerInfo,
2204 ) -> Result<()> {
2205 self.id().get(store.0).check_may_leave(caller_instance)?;
2206
2207 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2208 store.0.check_blocking()?;
2212 }
2213
2214 enum ResultInfo {
2215 Heap { results: u32 },
2216 Stack { result_count: u32 },
2217 }
2218
2219 let result_info = match &caller_info {
2220 CallerInfo::Async {
2221 has_result: true,
2222 params,
2223 } => ResultInfo::Heap {
2224 results: params.last().unwrap().get_u32(),
2225 },
2226 CallerInfo::Async {
2227 has_result: false, ..
2228 } => ResultInfo::Stack { result_count: 0 },
2229 CallerInfo::Sync {
2230 result_count,
2231 params,
2232 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2233 results: params.last().unwrap().get_u32(),
2234 },
2235 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2236 result_count: *result_count,
2237 },
2238 };
2239
2240 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2241
2242 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2246 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2247 let token = StoreToken::new(store.as_context_mut());
2248 let state = store.0.concurrent_state_mut();
2249 let old_thread = state.guest_thread.take();
2250 let new_task = GuestTask::new(
2251 state,
2252 Box::new(move |store, dst| {
2253 let mut store = token.as_context_mut(store);
2254 assert!(dst.len() <= MAX_FLAT_PARAMS);
2255 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2257 let count = match caller_info {
2258 CallerInfo::Async { params, has_result } => {
2262 let params = ¶ms[..params.len() - usize::from(has_result)];
2263 for (param, src) in params.iter().zip(&mut src) {
2264 src.write(*param);
2265 }
2266 params.len()
2267 }
2268
2269 CallerInfo::Sync { params, .. } => {
2271 for (param, src) in params.iter().zip(&mut src) {
2272 src.write(*param);
2273 }
2274 params.len()
2275 }
2276 };
2277 unsafe {
2284 crate::Func::call_unchecked_raw(
2285 &mut store,
2286 start.as_non_null(),
2287 NonNull::new(
2288 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2289 )
2290 .unwrap(),
2291 )?;
2292 }
2293 dst.copy_from_slice(&src[..dst.len()]);
2294 let state = store.0.concurrent_state_mut();
2295 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2296 state,
2297 Some(Event::Subtask {
2298 status: Status::Started,
2299 }),
2300 )?;
2301 Ok(())
2302 }),
2303 LiftResult {
2304 lift: Box::new(move |store, src| {
2305 let mut store = token.as_context_mut(store);
2308 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2310 my_src.push(ValRaw::u32(*results));
2311 }
2312 unsafe {
2319 crate::Func::call_unchecked_raw(
2320 &mut store,
2321 return_.as_non_null(),
2322 my_src.as_mut_slice().into(),
2323 )?;
2324 }
2325 let state = store.0.concurrent_state_mut();
2326 let thread = state.guest_thread.unwrap();
2327 if sync_caller {
2328 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2329 if let ResultInfo::Stack { result_count } = &result_info {
2330 match result_count {
2331 0 => None,
2332 1 => Some(my_src[0]),
2333 _ => unreachable!(),
2334 }
2335 } else {
2336 None
2337 },
2338 );
2339 }
2340 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2341 }),
2342 ty: task_return_type,
2343 memory: NonNull::new(memory).map(SendSyncPtr::new),
2344 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2345 },
2346 Caller::Guest {
2347 thread: old_thread.unwrap(),
2348 instance: caller_instance,
2349 },
2350 None,
2351 self,
2352 callee_instance,
2353 callee_async,
2354 )?;
2355
2356 let guest_task = state.push(new_task)?;
2357 let new_thread = GuestThread::new_implicit(guest_task);
2358 let guest_thread = state.push(new_thread)?;
2359 state.get_mut(guest_task)?.threads.insert(guest_thread);
2360
2361 let state = store.0.concurrent_state_mut();
2362 if let Some(old_thread) = old_thread {
2363 if !state.may_enter(guest_task) {
2364 bail!(crate::Trap::CannotEnterComponent);
2365 }
2366
2367 state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2368 };
2369
2370 store.0.set_thread(Some(QualifiedThreadId {
2373 task: guest_task,
2374 thread: guest_thread,
2375 }));
2376 log::trace!(
2377 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2378 );
2379
2380 Ok(())
2381 }
2382
2383 unsafe fn call_callback<T>(
2388 self,
2389 mut store: StoreContextMut<T>,
2390 callee_instance: RuntimeComponentInstanceIndex,
2391 function: SendSyncPtr<VMFuncRef>,
2392 event: Event,
2393 handle: u32,
2394 may_enter_after_call: bool,
2395 ) -> Result<u32> {
2396 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2397
2398 let (ordinal, result) = event.parts();
2399 let params = &mut [
2400 ValRaw::u32(ordinal),
2401 ValRaw::u32(handle),
2402 ValRaw::u32(result),
2403 ];
2404 unsafe {
2409 flags.set_may_enter(false);
2410 crate::Func::call_unchecked_raw(
2411 &mut store,
2412 function.as_non_null(),
2413 params.as_mut_slice().into(),
2414 )?;
2415 flags.set_may_enter(may_enter_after_call);
2416 }
2417 Ok(params[0].get_u32())
2418 }
2419
2420 unsafe fn start_call<T: 'static>(
2433 self,
2434 mut store: StoreContextMut<T>,
2435 callback: *mut VMFuncRef,
2436 post_return: *mut VMFuncRef,
2437 callee: *mut VMFuncRef,
2438 param_count: u32,
2439 result_count: u32,
2440 flags: u32,
2441 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2442 ) -> Result<u32> {
2443 let token = StoreToken::new(store.as_context_mut());
2444 let async_caller = storage.is_none();
2445 let state = store.0.concurrent_state_mut();
2446 let guest_thread = state.guest_thread.unwrap();
2447 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2448 let may_enter_after_call = state
2449 .get_mut(guest_thread.task)?
2450 .call_post_return_automatically();
2451 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2452 let param_count = usize::try_from(param_count).unwrap();
2453 assert!(param_count <= MAX_FLAT_PARAMS);
2454 let result_count = usize::try_from(result_count).unwrap();
2455 assert!(result_count <= MAX_FLAT_RESULTS);
2456
2457 let task = state.get_mut(guest_thread.task)?;
2458 if !callback.is_null() {
2459 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2463 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2464 let store = token.as_context_mut(store);
2465 unsafe {
2466 self.call_callback::<T>(
2467 store,
2468 runtime_instance,
2469 callback,
2470 event,
2471 handle,
2472 may_enter_after_call,
2473 )
2474 }
2475 }));
2476 }
2477
2478 let Caller::Guest {
2479 thread: caller,
2480 instance: runtime_instance,
2481 } = &task.caller
2482 else {
2483 unreachable!()
2486 };
2487 let caller = *caller;
2488 let caller_instance = *runtime_instance;
2489
2490 let callee_instance = task.instance;
2491
2492 let instance_flags = if callback.is_null() {
2493 None
2494 } else {
2495 Some(self.id().get(store.0).instance_flags(callee_instance.index))
2496 };
2497
2498 unsafe {
2500 self.queue_call(
2501 store.as_context_mut(),
2502 guest_thread,
2503 callee,
2504 param_count,
2505 result_count,
2506 instance_flags,
2507 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2508 NonNull::new(callback).map(SendSyncPtr::new),
2509 NonNull::new(post_return).map(SendSyncPtr::new),
2510 )?;
2511 }
2512
2513 let state = store.0.concurrent_state_mut();
2514
2515 let guest_waitable = Waitable::Guest(guest_thread.task);
2518 let old_set = guest_waitable.common(state)?.set;
2519 let set = state.get_mut(caller.task)?.sync_call_set;
2520 guest_waitable.join(state, Some(set))?;
2521
2522 let (status, waitable) = loop {
2538 store.0.suspend(SuspendReason::Waiting {
2539 set,
2540 thread: caller,
2541 skip_may_block_check: async_caller || !callee_async,
2549 })?;
2550
2551 let state = store.0.concurrent_state_mut();
2552
2553 log::trace!("taking event for {:?}", guest_thread.task);
2554 let event = guest_waitable.take_event(state)?;
2555 let Some(Event::Subtask { status }) = event else {
2556 unreachable!();
2557 };
2558
2559 log::trace!("status {status:?} for {:?}", guest_thread.task);
2560
2561 if status == Status::Returned {
2562 break (status, None);
2564 } else if async_caller {
2565 let handle = store
2569 .0
2570 .handle_table(RuntimeInstance {
2571 instance: self.id().instance(),
2572 index: caller_instance,
2573 })
2574 .subtask_insert_guest(guest_thread.task.rep())?;
2575 store
2576 .0
2577 .concurrent_state_mut()
2578 .get_mut(guest_thread.task)?
2579 .common
2580 .handle = Some(handle);
2581 break (status, Some(handle));
2582 } else {
2583 }
2587 };
2588
2589 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2590
2591 store.0.set_thread(Some(caller));
2593 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2594 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2595
2596 if let Some(storage) = storage {
2597 let state = store.0.concurrent_state_mut();
2601 let task = state.get_mut(guest_thread.task)?;
2602 if let Some(result) = task.sync_result.take() {
2603 if let Some(result) = result {
2604 storage[0] = MaybeUninit::new(result);
2605 }
2606
2607 if task.exited && task.ready_to_delete() {
2608 Waitable::Guest(guest_thread.task).delete_from(state)?;
2609 }
2610 }
2611 }
2612
2613 Ok(status.pack(waitable))
2614 }
2615
2616 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2628 self,
2629 mut store: StoreContextMut<'_, T>,
2630 future: impl Future<Output = Result<R>> + Send + 'static,
2631 caller_instance: RuntimeComponentInstanceIndex,
2632 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2633 ) -> Result<Option<u32>> {
2634 let token = StoreToken::new(store.as_context_mut());
2635 let state = store.0.concurrent_state_mut();
2636 let caller = state.guest_thread.unwrap();
2637
2638 let (join_handle, future) = JoinHandle::run(async move {
2641 let mut future = pin!(future);
2642 let mut call_context = None;
2643 future::poll_fn(move |cx| {
2644 tls::get(|store| {
2647 if let Some(call_context) = call_context.take() {
2648 token
2649 .as_context_mut(store)
2650 .0
2651 .component_resource_state()
2652 .0
2653 .push(call_context);
2654 }
2655 });
2656
2657 let result = future.as_mut().poll(cx);
2658
2659 if result.is_pending() {
2660 tls::get(|store| {
2663 call_context = Some(
2664 token
2665 .as_context_mut(store)
2666 .0
2667 .component_resource_state()
2668 .0
2669 .pop()
2670 .unwrap(),
2671 );
2672 });
2673 }
2674 result
2675 })
2676 .await
2677 });
2678
2679 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2683
2684 log::trace!("new host task child of {caller:?}: {task:?}");
2685
2686 let mut future = Box::pin(future);
2687
2688 let poll = tls::set(store.0, || {
2693 future
2694 .as_mut()
2695 .poll(&mut Context::from_waker(&Waker::noop()))
2696 });
2697
2698 Ok(match poll {
2699 Poll::Ready(None) => unreachable!(),
2700 Poll::Ready(Some(result)) => {
2701 lower(store.as_context_mut(), result?)?;
2704 log::trace!("delete host task {task:?} (already ready)");
2705 store.0.concurrent_state_mut().delete(task)?;
2706 None
2707 }
2708 Poll::Pending => {
2709 let future =
2717 Box::pin(async move {
2718 let result = match future.await {
2719 Some(result) => result?,
2720 None => return Ok(()),
2722 };
2723 tls::get(move |store| {
2724 store.concurrent_state_mut().push_high_priority(
2730 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2731 lower(token.as_context_mut(store), result)?;
2732 let state = store.concurrent_state_mut();
2733 state.get_mut(task)?.join_handle.take();
2734 Waitable::Host(task).set_event(
2735 state,
2736 Some(Event::Subtask {
2737 status: Status::Returned,
2738 }),
2739 )
2740 }))),
2741 );
2742 Ok(())
2743 })
2744 });
2745
2746 store.0.concurrent_state_mut().push_future(future);
2747 let handle = store
2748 .0
2749 .handle_table(RuntimeInstance {
2750 instance: self.id().instance(),
2751 index: caller_instance,
2752 })
2753 .subtask_insert_host(task.rep())?;
2754 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2755 log::trace!(
2756 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2757 );
2758 Some(handle)
2759 }
2760 })
2761 }
2762
2763 pub(crate) fn task_return(
2766 self,
2767 store: &mut dyn VMStore,
2768 caller: RuntimeComponentInstanceIndex,
2769 ty: TypeTupleIndex,
2770 options: OptionsIndex,
2771 storage: &[ValRaw],
2772 ) -> Result<()> {
2773 self.id().get(store).check_may_leave(caller)?;
2774 let state = store.concurrent_state_mut();
2775 let guest_thread = state.guest_thread.unwrap();
2776 let lift = state
2777 .get_mut(guest_thread.task)?
2778 .lift_result
2779 .take()
2780 .ok_or_else(|| {
2781 format_err!("`task.return` or `task.cancel` called more than once for current task")
2782 })?;
2783 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2784
2785 let CanonicalOptions {
2786 string_encoding,
2787 data_model,
2788 ..
2789 } = &self.id().get(store).component().env_component().options[options];
2790
2791 let invalid = ty != lift.ty
2792 || string_encoding != &lift.string_encoding
2793 || match data_model {
2794 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2795 Some(memory) => {
2796 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2797 let actual = self.id().get(store).runtime_memory(memory);
2798 expected != actual.as_ptr()
2799 }
2800 None => false,
2803 },
2804 CanonicalOptionsDataModel::Gc { .. } => true,
2806 };
2807
2808 if invalid {
2809 bail!("invalid `task.return` signature and/or options for current task");
2810 }
2811
2812 log::trace!("task.return for {guest_thread:?}");
2813
2814 let result = (lift.lift)(store, storage)?;
2815 self.task_complete(
2816 store,
2817 guest_thread.task,
2818 result,
2819 Status::Returned,
2820 ValRaw::i32(0),
2821 )
2822 }
2823
2824 pub(crate) fn task_cancel(
2826 self,
2827 store: &mut StoreOpaque,
2828 caller: RuntimeComponentInstanceIndex,
2829 ) -> Result<()> {
2830 self.id().get(store).check_may_leave(caller)?;
2831 let state = store.concurrent_state_mut();
2832 let guest_thread = state.guest_thread.unwrap();
2833 let task = state.get_mut(guest_thread.task)?;
2834 if !task.cancel_sent {
2835 bail!("`task.cancel` called by task which has not been cancelled")
2836 }
2837 _ = task.lift_result.take().ok_or_else(|| {
2838 format_err!("`task.return` or `task.cancel` called more than once for current task")
2839 })?;
2840
2841 assert!(task.result.is_none());
2842
2843 log::trace!("task.cancel for {guest_thread:?}");
2844
2845 self.task_complete(
2846 store,
2847 guest_thread.task,
2848 Box::new(DummyResult),
2849 Status::ReturnCancelled,
2850 ValRaw::i32(0),
2851 )
2852 }
2853
2854 fn task_complete(
2860 self,
2861 store: &mut StoreOpaque,
2862 guest_task: TableId<GuestTask>,
2863 result: Box<dyn Any + Send + Sync>,
2864 status: Status,
2865 post_return_arg: ValRaw,
2866 ) -> Result<()> {
2867 if store
2868 .concurrent_state_mut()
2869 .get_mut(guest_task)?
2870 .call_post_return_automatically()
2871 {
2872 let (calls, host_table, _, instance) =
2873 store.component_resource_state_with_instance(self);
2874 ResourceTables {
2875 calls,
2876 host_table: Some(host_table),
2877 guest: Some(instance.instance_states()),
2878 }
2879 .exit_call()?;
2880 } else {
2881 let function_index = store
2886 .concurrent_state_mut()
2887 .get_mut(guest_task)?
2888 .function_index
2889 .unwrap();
2890 self.id()
2891 .get_mut(store)
2892 .post_return_arg_set(function_index, post_return_arg);
2893 }
2894
2895 let state = store.concurrent_state_mut();
2896 let task = state.get_mut(guest_task)?;
2897
2898 if let Caller::Host { tx, .. } = &mut task.caller {
2899 if let Some(tx) = tx.take() {
2900 _ = tx.send(result);
2901 }
2902 } else {
2903 task.result = Some(result);
2904 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2905 }
2906
2907 Ok(())
2908 }
2909
2910 pub(crate) fn waitable_set_new(
2912 self,
2913 store: &mut StoreOpaque,
2914 caller_instance: RuntimeComponentInstanceIndex,
2915 ) -> Result<u32> {
2916 self.id().get_mut(store).check_may_leave(caller_instance)?;
2917 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2918 let handle = store
2919 .handle_table(RuntimeInstance {
2920 instance: self.id().instance(),
2921 index: caller_instance,
2922 })
2923 .waitable_set_insert(set.rep())?;
2924 log::trace!("new waitable set {set:?} (handle {handle})");
2925 Ok(handle)
2926 }
2927
2928 pub(crate) fn waitable_set_drop(
2930 self,
2931 store: &mut StoreOpaque,
2932 caller_instance: RuntimeComponentInstanceIndex,
2933 set: u32,
2934 ) -> Result<()> {
2935 self.id().get_mut(store).check_may_leave(caller_instance)?;
2936 let rep = store
2937 .handle_table(RuntimeInstance {
2938 instance: self.id().instance(),
2939 index: caller_instance,
2940 })
2941 .waitable_set_remove(set)?;
2942
2943 log::trace!("drop waitable set {rep} (handle {set})");
2944
2945 let set = store
2946 .concurrent_state_mut()
2947 .delete(TableId::<WaitableSet>::new(rep))?;
2948
2949 if !set.waiting.is_empty() {
2950 bail!("cannot drop waitable set with waiters");
2951 }
2952
2953 Ok(())
2954 }
2955
2956 pub(crate) fn waitable_join(
2958 self,
2959 store: &mut StoreOpaque,
2960 caller_instance: RuntimeComponentInstanceIndex,
2961 waitable_handle: u32,
2962 set_handle: u32,
2963 ) -> Result<()> {
2964 let mut instance = self.id().get_mut(store);
2965 instance.check_may_leave(caller_instance)?;
2966 let waitable =
2967 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2968
2969 let set = if set_handle == 0 {
2970 None
2971 } else {
2972 let set = instance.instance_states().0[caller_instance]
2973 .handle_table()
2974 .waitable_set_rep(set_handle)?;
2975
2976 Some(TableId::<WaitableSet>::new(set))
2977 };
2978
2979 log::trace!(
2980 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2981 );
2982
2983 waitable.join(store.concurrent_state_mut(), set)
2984 }
2985
2986 pub(crate) fn subtask_drop(
2988 self,
2989 store: &mut StoreOpaque,
2990 caller_instance: RuntimeComponentInstanceIndex,
2991 task_id: u32,
2992 ) -> Result<()> {
2993 self.id().get_mut(store).check_may_leave(caller_instance)?;
2994 self.waitable_join(store, caller_instance, task_id, 0)?;
2995
2996 let (rep, is_host) = store
2997 .handle_table(RuntimeInstance {
2998 instance: self.id().instance(),
2999 index: caller_instance,
3000 })
3001 .subtask_remove(task_id)?;
3002
3003 let concurrent_state = store.concurrent_state_mut();
3004 let (waitable, expected_caller_instance, delete) = if is_host {
3005 let id = TableId::<HostTask>::new(rep);
3006 let task = concurrent_state.get_mut(id)?;
3007 if task.join_handle.is_some() {
3008 bail!("cannot drop a subtask which has not yet resolved");
3009 }
3010 (Waitable::Host(id), task.caller_instance, true)
3011 } else {
3012 let id = TableId::<GuestTask>::new(rep);
3013 let task = concurrent_state.get_mut(id)?;
3014 if task.lift_result.is_some() {
3015 bail!("cannot drop a subtask which has not yet resolved");
3016 }
3017 if let Caller::Guest { instance, .. } = &task.caller {
3018 (Waitable::Guest(id), *instance, task.exited)
3019 } else {
3020 unreachable!()
3021 }
3022 };
3023
3024 waitable.common(concurrent_state)?.handle = None;
3025
3026 if waitable.take_event(concurrent_state)?.is_some() {
3027 bail!("cannot drop a subtask with an undelivered event");
3028 }
3029
3030 if delete {
3031 waitable.delete_from(concurrent_state)?;
3032 }
3033
3034 assert_eq!(expected_caller_instance, caller_instance);
3038 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3039 Ok(())
3040 }
3041
3042 pub(crate) fn waitable_set_wait(
3044 self,
3045 store: &mut StoreOpaque,
3046 caller: RuntimeComponentInstanceIndex,
3047 options: OptionsIndex,
3048 set: u32,
3049 payload: u32,
3050 ) -> Result<u32> {
3051 self.id().get(store).check_may_leave(caller)?;
3052
3053 if !self.options(store, options).async_ {
3054 store.check_blocking()?;
3058 }
3059
3060 let &CanonicalOptions {
3061 cancellable,
3062 instance: caller_instance,
3063 ..
3064 } = &self.id().get(store).component().env_component().options[options];
3065 let rep = store
3066 .handle_table(RuntimeInstance {
3067 instance: self.id().instance(),
3068 index: caller_instance,
3069 })
3070 .waitable_set_rep(set)?;
3071
3072 self.waitable_check(
3073 store,
3074 cancellable,
3075 WaitableCheck::Wait,
3076 WaitableCheckParams {
3077 set: TableId::new(rep),
3078 options,
3079 payload,
3080 },
3081 )
3082 }
3083
3084 pub(crate) fn waitable_set_poll(
3086 self,
3087 store: &mut StoreOpaque,
3088 caller: RuntimeComponentInstanceIndex,
3089 options: OptionsIndex,
3090 set: u32,
3091 payload: u32,
3092 ) -> Result<u32> {
3093 self.id().get(store).check_may_leave(caller)?;
3094
3095 let &CanonicalOptions {
3096 cancellable,
3097 instance: caller_instance,
3098 ..
3099 } = &self.id().get(store).component().env_component().options[options];
3100 let rep = store
3101 .handle_table(RuntimeInstance {
3102 instance: self.id().instance(),
3103 index: caller_instance,
3104 })
3105 .waitable_set_rep(set)?;
3106
3107 self.waitable_check(
3108 store,
3109 cancellable,
3110 WaitableCheck::Poll,
3111 WaitableCheckParams {
3112 set: TableId::new(rep),
3113 options,
3114 payload,
3115 },
3116 )
3117 }
3118
3119 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3121 let thread_id = store
3122 .concurrent_state_mut()
3123 .guest_thread
3124 .ok_or_else(|| format_err!("no current thread"))?
3125 .thread;
3126 Ok(store
3128 .concurrent_state_mut()
3129 .get_mut(thread_id)?
3130 .instance_rep
3131 .unwrap())
3132 }
3133
3134 pub(crate) fn thread_new_indirect<T: 'static>(
3136 self,
3137 mut store: StoreContextMut<T>,
3138 runtime_instance: RuntimeComponentInstanceIndex,
3139 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3141 start_func_idx: u32,
3142 context: i32,
3143 ) -> Result<u32> {
3144 self.id().get(store.0).check_may_leave(runtime_instance)?;
3145
3146 log::trace!("creating new thread");
3147
3148 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3149 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3150 let callee = instance
3151 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3152 .ok_or_else(|| {
3153 format_err!("the start function index points to an uninitialized function")
3154 })?;
3155 if callee.type_index(store.0) != start_func_ty.type_index() {
3156 bail!(
3157 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3158 );
3159 }
3160
3161 let token = StoreToken::new(store.as_context_mut());
3162 let start_func = Box::new(
3163 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3164 let old_thread = store.set_thread(Some(guest_thread));
3165 log::trace!(
3166 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3167 );
3168
3169 store.maybe_push_call_context(guest_thread.task)?;
3170
3171 let mut store = token.as_context_mut(store);
3172 let mut params = [ValRaw::i32(context)];
3173 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3176
3177 store.0.maybe_pop_call_context(guest_thread.task)?;
3178
3179 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3180 log::trace!("explicit thread {guest_thread:?} completed");
3181 let state = store.0.concurrent_state_mut();
3182 let task = state.get_mut(guest_thread.task)?;
3183 if task.threads.is_empty() && !task.returned_or_cancelled() {
3184 bail!(Trap::NoAsyncResult);
3185 }
3186 store.0.set_thread(old_thread);
3187 let state = store.0.concurrent_state_mut();
3188 old_thread
3189 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3190 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3191 Waitable::Guest(guest_thread.task).delete_from(state)?;
3192 }
3193 log::trace!("thread start: restored {old_thread:?} as current thread");
3194
3195 Ok(())
3196 },
3197 );
3198
3199 let state = store.0.concurrent_state_mut();
3200 let current_thread = state.guest_thread.unwrap();
3201 let parent_task = current_thread.task;
3202
3203 let new_thread = GuestThread::new_explicit(parent_task, start_func);
3204 let thread_id = state.push(new_thread)?;
3205 state.get_mut(parent_task)?.threads.insert(thread_id);
3206
3207 log::trace!("new thread with id {thread_id:?} created");
3208
3209 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3210 }
3211
3212 pub(crate) fn resume_suspended_thread(
3213 self,
3214 store: &mut StoreOpaque,
3215 runtime_instance: RuntimeComponentInstanceIndex,
3216 thread_idx: u32,
3217 high_priority: bool,
3218 ) -> Result<()> {
3219 let thread_id =
3220 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3221 let state = store.concurrent_state_mut();
3222 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3223 let thread = state.get_mut(guest_thread.thread)?;
3224
3225 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3226 GuestThreadState::NotStartedExplicit(start_func) => {
3227 log::trace!("starting thread {guest_thread:?}");
3228 let guest_call = WorkItem::GuestCall(GuestCall {
3229 thread: guest_thread,
3230 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3231 start_func(store, guest_thread)
3232 })),
3233 });
3234 store
3235 .concurrent_state_mut()
3236 .push_work_item(guest_call, high_priority);
3237 }
3238 GuestThreadState::Suspended(fiber) => {
3239 log::trace!("resuming thread {thread_id:?} that was suspended");
3240 store
3241 .concurrent_state_mut()
3242 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3243 }
3244 _ => {
3245 bail!("cannot resume thread which is not suspended");
3246 }
3247 }
3248 Ok(())
3249 }
3250
3251 fn add_guest_thread_to_instance_table(
3252 self,
3253 thread_id: TableId<GuestThread>,
3254 store: &mut StoreOpaque,
3255 runtime_instance: RuntimeComponentInstanceIndex,
3256 ) -> Result<u32> {
3257 let guest_id = store
3258 .handle_table(RuntimeInstance {
3259 instance: self.id().instance(),
3260 index: runtime_instance,
3261 })
3262 .guest_thread_insert(thread_id.rep())?;
3263 store
3264 .concurrent_state_mut()
3265 .get_mut(thread_id)?
3266 .instance_rep = Some(guest_id);
3267 Ok(guest_id)
3268 }
3269
3270 pub(crate) fn suspension_intrinsic(
3273 self,
3274 store: &mut StoreOpaque,
3275 caller: RuntimeComponentInstanceIndex,
3276 cancellable: bool,
3277 yielding: bool,
3278 to_thread: Option<u32>,
3279 ) -> Result<WaitResult> {
3280 self.id().get(store).check_may_leave(caller)?;
3281
3282 if to_thread.is_none() {
3283 let state = store.concurrent_state_mut();
3284 if yielding {
3285 if !state.may_block(state.guest_thread.unwrap().task) {
3287 return Ok(WaitResult::Completed);
3291 }
3292 } else {
3293 store.check_blocking()?;
3297 }
3298 }
3299
3300 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3302 return Ok(WaitResult::Cancelled);
3303 }
3304
3305 if let Some(thread) = to_thread {
3306 self.resume_suspended_thread(store, caller, thread, true)?;
3307 }
3308
3309 let state = store.concurrent_state_mut();
3310 let guest_thread = state.guest_thread.unwrap();
3311 let reason = if yielding {
3312 SuspendReason::Yielding {
3313 thread: guest_thread,
3314 skip_may_block_check: to_thread.is_some(),
3318 }
3319 } else {
3320 SuspendReason::ExplicitlySuspending {
3321 thread: guest_thread,
3322 skip_may_block_check: to_thread.is_some(),
3326 }
3327 };
3328
3329 store.suspend(reason)?;
3330
3331 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3332 Ok(WaitResult::Cancelled)
3333 } else {
3334 Ok(WaitResult::Completed)
3335 }
3336 }
3337
3338 fn waitable_check(
3340 self,
3341 store: &mut StoreOpaque,
3342 cancellable: bool,
3343 check: WaitableCheck,
3344 params: WaitableCheckParams,
3345 ) -> Result<u32> {
3346 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3347
3348 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3349
3350 let state = store.concurrent_state_mut();
3351 let task = state.get_mut(guest_thread.task)?;
3352
3353 match &check {
3356 WaitableCheck::Wait => {
3357 let set = params.set;
3358
3359 if (task.event.is_none()
3360 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3361 && state.get_mut(set)?.ready.is_empty()
3362 {
3363 if cancellable {
3364 let old = state
3365 .get_mut(guest_thread.thread)?
3366 .wake_on_cancel
3367 .replace(set);
3368 assert!(old.is_none());
3369 }
3370
3371 store.suspend(SuspendReason::Waiting {
3372 set,
3373 thread: guest_thread,
3374 skip_may_block_check: false,
3375 })?;
3376 }
3377 }
3378 WaitableCheck::Poll => {}
3379 }
3380
3381 log::trace!(
3382 "waitable check for {guest_thread:?}; set {:?}, part two",
3383 params.set
3384 );
3385
3386 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3388
3389 let (ordinal, handle, result) = match &check {
3390 WaitableCheck::Wait => {
3391 let (event, waitable) = event.unwrap();
3392 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3393 let (ordinal, result) = event.parts();
3394 (ordinal, handle, result)
3395 }
3396 WaitableCheck::Poll => {
3397 if let Some((event, waitable)) = event {
3398 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3399 let (ordinal, result) = event.parts();
3400 (ordinal, handle, result)
3401 } else {
3402 log::trace!(
3403 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3404 guest_thread.task,
3405 params.set
3406 );
3407 let (ordinal, result) = Event::None.parts();
3408 (ordinal, 0, result)
3409 }
3410 }
3411 };
3412 let memory = self.options_memory_mut(store, params.options);
3413 let ptr = func::validate_inbounds_dynamic(
3414 &CanonicalAbiInfo::POINTER_PAIR,
3415 memory,
3416 &ValRaw::u32(params.payload),
3417 )?;
3418 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3419 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3420 Ok(ordinal)
3421 }
3422
3423 pub(crate) fn subtask_cancel(
3425 self,
3426 store: &mut StoreOpaque,
3427 caller_instance: RuntimeComponentInstanceIndex,
3428 async_: bool,
3429 task_id: u32,
3430 ) -> Result<u32> {
3431 self.id().get(store).check_may_leave(caller_instance)?;
3432
3433 if !async_ {
3434 store.check_blocking()?;
3438 }
3439
3440 let (rep, is_host) = store
3441 .handle_table(RuntimeInstance {
3442 instance: self.id().instance(),
3443 index: caller_instance,
3444 })
3445 .subtask_rep(task_id)?;
3446 let (waitable, expected_caller_instance) = if is_host {
3447 let id = TableId::<HostTask>::new(rep);
3448 (
3449 Waitable::Host(id),
3450 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3451 )
3452 } else {
3453 let id = TableId::<GuestTask>::new(rep);
3454 if let Caller::Guest { instance, .. } =
3455 &store.concurrent_state_mut().get_mut(id)?.caller
3456 {
3457 (Waitable::Guest(id), *instance)
3458 } else {
3459 unreachable!()
3460 }
3461 };
3462 assert_eq!(expected_caller_instance, caller_instance);
3466
3467 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3468
3469 let concurrent_state = store.concurrent_state_mut();
3470 if let Waitable::Host(host_task) = waitable {
3471 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3472 handle.abort();
3473 return Ok(Status::ReturnCancelled as u32);
3474 }
3475 } else {
3476 let caller = concurrent_state.guest_thread.unwrap();
3477 let guest_task = TableId::<GuestTask>::new(rep);
3478 let task = concurrent_state.get_mut(guest_task)?;
3479 if !task.already_lowered_parameters() {
3480 task.lower_params = None;
3481 task.lift_result = None;
3482
3483 let instance = task.instance;
3485 let pending = &mut store.instance_state(instance).pending;
3486 let pending_count = pending.len();
3487 pending.retain(|thread, _| thread.task != guest_task);
3488 if pending.len() == pending_count {
3490 bail!("`subtask.cancel` called after terminal status delivered");
3491 }
3492 return Ok(Status::StartCancelled as u32);
3493 } else if !task.returned_or_cancelled() {
3494 task.cancel_sent = true;
3497 task.event = Some(Event::Cancelled);
3502 for thread in task.threads.clone() {
3503 let thread = QualifiedThreadId {
3504 task: guest_task,
3505 thread,
3506 };
3507 if let Some(set) = concurrent_state
3508 .get_mut(thread.thread)
3509 .unwrap()
3510 .wake_on_cancel
3511 .take()
3512 {
3513 let item = match concurrent_state
3514 .get_mut(set)?
3515 .waiting
3516 .remove(&thread)
3517 .unwrap()
3518 {
3519 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3520 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3521 thread,
3522 kind: GuestCallKind::DeliverEvent {
3523 instance,
3524 set: None,
3525 },
3526 }),
3527 };
3528 concurrent_state.push_high_priority(item);
3529
3530 store.suspend(SuspendReason::Yielding {
3531 thread: caller,
3532 skip_may_block_check: false,
3535 })?;
3536 break;
3537 }
3538 }
3539
3540 let concurrent_state = store.concurrent_state_mut();
3541 let task = concurrent_state.get_mut(guest_task)?;
3542 if !task.returned_or_cancelled() {
3543 if async_ {
3544 return Ok(BLOCKED);
3545 } else {
3546 store.wait_for_event(Waitable::Guest(guest_task))?;
3547 }
3548 }
3549 }
3550 }
3551
3552 let event = waitable.take_event(store.concurrent_state_mut())?;
3553 if let Some(Event::Subtask {
3554 status: status @ (Status::Returned | Status::ReturnCancelled),
3555 }) = event
3556 {
3557 Ok(status as u32)
3558 } else {
3559 bail!("`subtask.cancel` called after terminal status delivered");
3560 }
3561 }
3562
3563 pub(crate) fn context_get(
3564 self,
3565 store: &mut StoreOpaque,
3566 caller: RuntimeComponentInstanceIndex,
3567 slot: u32,
3568 ) -> Result<u32> {
3569 self.id().get(store).check_may_leave(caller)?;
3570 store.concurrent_state_mut().context_get(slot)
3571 }
3572
3573 pub(crate) fn context_set(
3574 self,
3575 store: &mut StoreOpaque,
3576 caller: RuntimeComponentInstanceIndex,
3577 slot: u32,
3578 value: u32,
3579 ) -> Result<()> {
3580 self.id().get(store).check_may_leave(caller)?;
3581 store.concurrent_state_mut().context_set(slot, value)
3582 }
3583}
3584
3585pub trait VMComponentAsyncStore {
3593 unsafe fn prepare_call(
3599 &mut self,
3600 instance: Instance,
3601 memory: *mut VMMemoryDefinition,
3602 start: *mut VMFuncRef,
3603 return_: *mut VMFuncRef,
3604 caller_instance: RuntimeComponentInstanceIndex,
3605 callee_instance: RuntimeComponentInstanceIndex,
3606 task_return_type: TypeTupleIndex,
3607 callee_async: bool,
3608 string_encoding: u8,
3609 result_count: u32,
3610 storage: *mut ValRaw,
3611 storage_len: usize,
3612 ) -> Result<()>;
3613
3614 unsafe fn sync_start(
3617 &mut self,
3618 instance: Instance,
3619 callback: *mut VMFuncRef,
3620 callee: *mut VMFuncRef,
3621 param_count: u32,
3622 storage: *mut MaybeUninit<ValRaw>,
3623 storage_len: usize,
3624 ) -> Result<()>;
3625
3626 unsafe fn async_start(
3629 &mut self,
3630 instance: Instance,
3631 callback: *mut VMFuncRef,
3632 post_return: *mut VMFuncRef,
3633 callee: *mut VMFuncRef,
3634 param_count: u32,
3635 result_count: u32,
3636 flags: u32,
3637 ) -> Result<u32>;
3638
3639 fn future_write(
3641 &mut self,
3642 instance: Instance,
3643 caller: RuntimeComponentInstanceIndex,
3644 ty: TypeFutureTableIndex,
3645 options: OptionsIndex,
3646 future: u32,
3647 address: u32,
3648 ) -> Result<u32>;
3649
3650 fn future_read(
3652 &mut self,
3653 instance: Instance,
3654 caller: RuntimeComponentInstanceIndex,
3655 ty: TypeFutureTableIndex,
3656 options: OptionsIndex,
3657 future: u32,
3658 address: u32,
3659 ) -> Result<u32>;
3660
3661 fn future_drop_writable(
3663 &mut self,
3664 instance: Instance,
3665 caller: RuntimeComponentInstanceIndex,
3666 ty: TypeFutureTableIndex,
3667 writer: u32,
3668 ) -> Result<()>;
3669
3670 fn stream_write(
3672 &mut self,
3673 instance: Instance,
3674 caller: RuntimeComponentInstanceIndex,
3675 ty: TypeStreamTableIndex,
3676 options: OptionsIndex,
3677 stream: u32,
3678 address: u32,
3679 count: u32,
3680 ) -> Result<u32>;
3681
3682 fn stream_read(
3684 &mut self,
3685 instance: Instance,
3686 caller: RuntimeComponentInstanceIndex,
3687 ty: TypeStreamTableIndex,
3688 options: OptionsIndex,
3689 stream: u32,
3690 address: u32,
3691 count: u32,
3692 ) -> Result<u32>;
3693
3694 fn flat_stream_write(
3697 &mut self,
3698 instance: Instance,
3699 caller: RuntimeComponentInstanceIndex,
3700 ty: TypeStreamTableIndex,
3701 options: OptionsIndex,
3702 payload_size: u32,
3703 payload_align: u32,
3704 stream: u32,
3705 address: u32,
3706 count: u32,
3707 ) -> Result<u32>;
3708
3709 fn flat_stream_read(
3712 &mut self,
3713 instance: Instance,
3714 caller: RuntimeComponentInstanceIndex,
3715 ty: TypeStreamTableIndex,
3716 options: OptionsIndex,
3717 payload_size: u32,
3718 payload_align: u32,
3719 stream: u32,
3720 address: u32,
3721 count: u32,
3722 ) -> Result<u32>;
3723
3724 fn stream_drop_writable(
3726 &mut self,
3727 instance: Instance,
3728 caller: RuntimeComponentInstanceIndex,
3729 ty: TypeStreamTableIndex,
3730 writer: u32,
3731 ) -> Result<()>;
3732
3733 fn error_context_debug_message(
3735 &mut self,
3736 instance: Instance,
3737 caller: RuntimeComponentInstanceIndex,
3738 ty: TypeComponentLocalErrorContextTableIndex,
3739 options: OptionsIndex,
3740 err_ctx_handle: u32,
3741 debug_msg_address: u32,
3742 ) -> Result<()>;
3743
3744 fn thread_new_indirect(
3746 &mut self,
3747 instance: Instance,
3748 caller: RuntimeComponentInstanceIndex,
3749 func_ty_idx: TypeFuncIndex,
3750 start_func_table_idx: RuntimeTableIndex,
3751 start_func_idx: u32,
3752 context: i32,
3753 ) -> Result<u32>;
3754}
3755
3756impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3758 unsafe fn prepare_call(
3759 &mut self,
3760 instance: Instance,
3761 memory: *mut VMMemoryDefinition,
3762 start: *mut VMFuncRef,
3763 return_: *mut VMFuncRef,
3764 caller_instance: RuntimeComponentInstanceIndex,
3765 callee_instance: RuntimeComponentInstanceIndex,
3766 task_return_type: TypeTupleIndex,
3767 callee_async: bool,
3768 string_encoding: u8,
3769 result_count_or_max_if_async: u32,
3770 storage: *mut ValRaw,
3771 storage_len: usize,
3772 ) -> Result<()> {
3773 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3777
3778 unsafe {
3779 instance.prepare_call(
3780 StoreContextMut(self),
3781 start,
3782 return_,
3783 caller_instance,
3784 callee_instance,
3785 task_return_type,
3786 callee_async,
3787 memory,
3788 string_encoding,
3789 match result_count_or_max_if_async {
3790 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3791 params,
3792 has_result: false,
3793 },
3794 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3795 params,
3796 has_result: true,
3797 },
3798 result_count => CallerInfo::Sync {
3799 params,
3800 result_count,
3801 },
3802 },
3803 )
3804 }
3805 }
3806
3807 unsafe fn sync_start(
3808 &mut self,
3809 instance: Instance,
3810 callback: *mut VMFuncRef,
3811 callee: *mut VMFuncRef,
3812 param_count: u32,
3813 storage: *mut MaybeUninit<ValRaw>,
3814 storage_len: usize,
3815 ) -> Result<()> {
3816 unsafe {
3817 instance
3818 .start_call(
3819 StoreContextMut(self),
3820 callback,
3821 ptr::null_mut(),
3822 callee,
3823 param_count,
3824 1,
3825 START_FLAG_ASYNC_CALLEE,
3826 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3830 )
3831 .map(drop)
3832 }
3833 }
3834
3835 unsafe fn async_start(
3836 &mut self,
3837 instance: Instance,
3838 callback: *mut VMFuncRef,
3839 post_return: *mut VMFuncRef,
3840 callee: *mut VMFuncRef,
3841 param_count: u32,
3842 result_count: u32,
3843 flags: u32,
3844 ) -> Result<u32> {
3845 unsafe {
3846 instance.start_call(
3847 StoreContextMut(self),
3848 callback,
3849 post_return,
3850 callee,
3851 param_count,
3852 result_count,
3853 flags,
3854 None,
3855 )
3856 }
3857 }
3858
3859 fn future_write(
3860 &mut self,
3861 instance: Instance,
3862 caller: RuntimeComponentInstanceIndex,
3863 ty: TypeFutureTableIndex,
3864 options: OptionsIndex,
3865 future: u32,
3866 address: u32,
3867 ) -> Result<u32> {
3868 instance.id().get(self).check_may_leave(caller)?;
3869 instance
3870 .guest_write(
3871 StoreContextMut(self),
3872 caller,
3873 TransmitIndex::Future(ty),
3874 options,
3875 None,
3876 future,
3877 address,
3878 1,
3879 )
3880 .map(|result| result.encode())
3881 }
3882
3883 fn future_read(
3884 &mut self,
3885 instance: Instance,
3886 caller: RuntimeComponentInstanceIndex,
3887 ty: TypeFutureTableIndex,
3888 options: OptionsIndex,
3889 future: u32,
3890 address: u32,
3891 ) -> Result<u32> {
3892 instance.id().get(self).check_may_leave(caller)?;
3893 instance
3894 .guest_read(
3895 StoreContextMut(self),
3896 caller,
3897 TransmitIndex::Future(ty),
3898 options,
3899 None,
3900 future,
3901 address,
3902 1,
3903 )
3904 .map(|result| result.encode())
3905 }
3906
3907 fn stream_write(
3908 &mut self,
3909 instance: Instance,
3910 caller: RuntimeComponentInstanceIndex,
3911 ty: TypeStreamTableIndex,
3912 options: OptionsIndex,
3913 stream: u32,
3914 address: u32,
3915 count: u32,
3916 ) -> Result<u32> {
3917 instance.id().get(self).check_may_leave(caller)?;
3918 instance
3919 .guest_write(
3920 StoreContextMut(self),
3921 caller,
3922 TransmitIndex::Stream(ty),
3923 options,
3924 None,
3925 stream,
3926 address,
3927 count,
3928 )
3929 .map(|result| result.encode())
3930 }
3931
3932 fn stream_read(
3933 &mut self,
3934 instance: Instance,
3935 caller: RuntimeComponentInstanceIndex,
3936 ty: TypeStreamTableIndex,
3937 options: OptionsIndex,
3938 stream: u32,
3939 address: u32,
3940 count: u32,
3941 ) -> Result<u32> {
3942 instance.id().get(self).check_may_leave(caller)?;
3943 instance
3944 .guest_read(
3945 StoreContextMut(self),
3946 caller,
3947 TransmitIndex::Stream(ty),
3948 options,
3949 None,
3950 stream,
3951 address,
3952 count,
3953 )
3954 .map(|result| result.encode())
3955 }
3956
3957 fn future_drop_writable(
3958 &mut self,
3959 instance: Instance,
3960 caller: RuntimeComponentInstanceIndex,
3961 ty: TypeFutureTableIndex,
3962 writer: u32,
3963 ) -> Result<()> {
3964 instance.id().get(self).check_may_leave(caller)?;
3965 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3966 }
3967
3968 fn flat_stream_write(
3969 &mut self,
3970 instance: Instance,
3971 caller: RuntimeComponentInstanceIndex,
3972 ty: TypeStreamTableIndex,
3973 options: OptionsIndex,
3974 payload_size: u32,
3975 payload_align: u32,
3976 stream: u32,
3977 address: u32,
3978 count: u32,
3979 ) -> Result<u32> {
3980 instance.id().get(self).check_may_leave(caller)?;
3981 instance
3982 .guest_write(
3983 StoreContextMut(self),
3984 caller,
3985 TransmitIndex::Stream(ty),
3986 options,
3987 Some(FlatAbi {
3988 size: payload_size,
3989 align: payload_align,
3990 }),
3991 stream,
3992 address,
3993 count,
3994 )
3995 .map(|result| result.encode())
3996 }
3997
3998 fn flat_stream_read(
3999 &mut self,
4000 instance: Instance,
4001 caller: RuntimeComponentInstanceIndex,
4002 ty: TypeStreamTableIndex,
4003 options: OptionsIndex,
4004 payload_size: u32,
4005 payload_align: u32,
4006 stream: u32,
4007 address: u32,
4008 count: u32,
4009 ) -> Result<u32> {
4010 instance.id().get(self).check_may_leave(caller)?;
4011 instance
4012 .guest_read(
4013 StoreContextMut(self),
4014 caller,
4015 TransmitIndex::Stream(ty),
4016 options,
4017 Some(FlatAbi {
4018 size: payload_size,
4019 align: payload_align,
4020 }),
4021 stream,
4022 address,
4023 count,
4024 )
4025 .map(|result| result.encode())
4026 }
4027
4028 fn stream_drop_writable(
4029 &mut self,
4030 instance: Instance,
4031 caller: RuntimeComponentInstanceIndex,
4032 ty: TypeStreamTableIndex,
4033 writer: u32,
4034 ) -> Result<()> {
4035 instance.id().get(self).check_may_leave(caller)?;
4036 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4037 }
4038
4039 fn error_context_debug_message(
4040 &mut self,
4041 instance: Instance,
4042 caller: RuntimeComponentInstanceIndex,
4043 ty: TypeComponentLocalErrorContextTableIndex,
4044 options: OptionsIndex,
4045 err_ctx_handle: u32,
4046 debug_msg_address: u32,
4047 ) -> Result<()> {
4048 instance.id().get(self).check_may_leave(caller)?;
4049 instance.error_context_debug_message(
4050 StoreContextMut(self),
4051 ty,
4052 options,
4053 err_ctx_handle,
4054 debug_msg_address,
4055 )
4056 }
4057
4058 fn thread_new_indirect(
4059 &mut self,
4060 instance: Instance,
4061 caller: RuntimeComponentInstanceIndex,
4062 func_ty_idx: TypeFuncIndex,
4063 start_func_table_idx: RuntimeTableIndex,
4064 start_func_idx: u32,
4065 context: i32,
4066 ) -> Result<u32> {
4067 instance.thread_new_indirect(
4068 StoreContextMut(self),
4069 caller,
4070 func_ty_idx,
4071 start_func_table_idx,
4072 start_func_idx,
4073 context,
4074 )
4075 }
4076}
4077
4078type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4079
4080struct HostTask {
4082 common: WaitableCommon,
4083 caller_instance: RuntimeComponentInstanceIndex,
4084 join_handle: Option<JoinHandle>,
4085}
4086
4087impl HostTask {
4088 fn new(
4089 caller_instance: RuntimeComponentInstanceIndex,
4090 join_handle: Option<JoinHandle>,
4091 ) -> Self {
4092 Self {
4093 common: WaitableCommon::default(),
4094 caller_instance,
4095 join_handle,
4096 }
4097 }
4098}
4099
4100impl TableDebug for HostTask {
4101 fn type_name() -> &'static str {
4102 "HostTask"
4103 }
4104}
4105
4106type CallbackFn = Box<
4107 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
4108 + Send
4109 + Sync
4110 + 'static,
4111>;
4112
4113enum Caller {
4115 Host {
4117 tx: Option<oneshot::Sender<LiftedResult>>,
4119 exit_tx: Arc<oneshot::Sender<()>>,
4126 host_future_present: bool,
4129 call_post_return_automatically: bool,
4131 },
4132 Guest {
4134 thread: QualifiedThreadId,
4136 instance: RuntimeComponentInstanceIndex,
4142 },
4143}
4144
4145struct LiftResult {
4148 lift: RawLift,
4149 ty: TypeTupleIndex,
4150 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4151 string_encoding: StringEncoding,
4152}
4153
4154#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4159struct QualifiedThreadId {
4160 task: TableId<GuestTask>,
4161 thread: TableId<GuestThread>,
4162}
4163
4164impl QualifiedThreadId {
4165 fn qualify(
4166 state: &mut ConcurrentState,
4167 thread: TableId<GuestThread>,
4168 ) -> Result<QualifiedThreadId> {
4169 Ok(QualifiedThreadId {
4170 task: state.get_mut(thread)?.parent_task,
4171 thread,
4172 })
4173 }
4174}
4175
4176impl fmt::Debug for QualifiedThreadId {
4177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4178 f.debug_tuple("QualifiedThreadId")
4179 .field(&self.task.rep())
4180 .field(&self.thread.rep())
4181 .finish()
4182 }
4183}
4184
4185enum GuestThreadState {
4186 NotStartedImplicit,
4187 NotStartedExplicit(
4188 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4189 ),
4190 Running,
4191 Suspended(StoreFiber<'static>),
4192 Pending,
4193 Completed,
4194}
4195pub struct GuestThread {
4196 context: [u32; 2],
4199 parent_task: TableId<GuestTask>,
4201 wake_on_cancel: Option<TableId<WaitableSet>>,
4204 state: GuestThreadState,
4206 instance_rep: Option<u32>,
4209}
4210
4211impl GuestThread {
4212 fn from_instance(
4215 state: Pin<&mut ComponentInstance>,
4216 caller_instance: RuntimeComponentInstanceIndex,
4217 guest_thread: u32,
4218 ) -> Result<TableId<Self>> {
4219 let rep = state.instance_states().0[caller_instance]
4220 .handle_table()
4221 .guest_thread_rep(guest_thread)?;
4222 Ok(TableId::new(rep))
4223 }
4224
4225 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4226 Self {
4227 context: [0; 2],
4228 parent_task,
4229 wake_on_cancel: None,
4230 state: GuestThreadState::NotStartedImplicit,
4231 instance_rep: None,
4232 }
4233 }
4234
4235 fn new_explicit(
4236 parent_task: TableId<GuestTask>,
4237 start_func: Box<
4238 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4239 >,
4240 ) -> Self {
4241 Self {
4242 context: [0; 2],
4243 parent_task,
4244 wake_on_cancel: None,
4245 state: GuestThreadState::NotStartedExplicit(start_func),
4246 instance_rep: None,
4247 }
4248 }
4249}
4250
4251impl TableDebug for GuestThread {
4252 fn type_name() -> &'static str {
4253 "GuestThread"
4254 }
4255}
4256
4257enum SyncResult {
4258 NotProduced,
4259 Produced(Option<ValRaw>),
4260 Taken,
4261}
4262
4263impl SyncResult {
4264 fn take(&mut self) -> Option<Option<ValRaw>> {
4265 match mem::replace(self, SyncResult::Taken) {
4266 SyncResult::NotProduced => None,
4267 SyncResult::Produced(val) => Some(val),
4268 SyncResult::Taken => {
4269 panic!("attempted to take a synchronous result that was already taken")
4270 }
4271 }
4272 }
4273}
4274
4275#[derive(Debug)]
4276enum HostFutureState {
4277 NotApplicable,
4278 Live,
4279 Dropped,
4280}
4281
4282pub(crate) struct GuestTask {
4284 common: WaitableCommon,
4286 lower_params: Option<RawLower>,
4288 lift_result: Option<LiftResult>,
4290 result: Option<LiftedResult>,
4293 callback: Option<CallbackFn>,
4296 caller: Caller,
4298 call_context: Option<CallContext>,
4301 sync_result: SyncResult,
4304 cancel_sent: bool,
4307 starting_sent: bool,
4310 subtasks: HashSet<TableId<GuestTask>>,
4315 sync_call_set: TableId<WaitableSet>,
4317 instance: RuntimeInstance,
4324 event: Option<Event>,
4327 function_index: Option<ExportIndex>,
4329 exited: bool,
4331 threads: HashSet<TableId<GuestThread>>,
4333 host_future_state: HostFutureState,
4336 async_function: bool,
4339}
4340
4341impl GuestTask {
4342 fn already_lowered_parameters(&self) -> bool {
4343 self.lower_params.is_none()
4345 }
4346
4347 fn returned_or_cancelled(&self) -> bool {
4348 self.lift_result.is_none()
4350 }
4351
4352 fn ready_to_delete(&self) -> bool {
4353 let threads_completed = self.threads.is_empty();
4354 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4355 let pending_completion_event = matches!(
4356 self.common.event,
4357 Some(Event::Subtask {
4358 status: Status::Returned | Status::ReturnCancelled
4359 })
4360 );
4361 let ready = threads_completed
4362 && !has_sync_result
4363 && !pending_completion_event
4364 && !matches!(self.host_future_state, HostFutureState::Live);
4365 log::trace!(
4366 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4367 threads_completed,
4368 has_sync_result,
4369 pending_completion_event,
4370 self.host_future_state
4371 );
4372 ready
4373 }
4374
4375 fn new(
4376 state: &mut ConcurrentState,
4377 lower_params: RawLower,
4378 lift_result: LiftResult,
4379 caller: Caller,
4380 callback: Option<CallbackFn>,
4381 component_instance: Instance,
4382 instance: RuntimeComponentInstanceIndex,
4383 async_function: bool,
4384 ) -> Result<Self> {
4385 let sync_call_set = state.push(WaitableSet::default())?;
4386 let host_future_state = match &caller {
4387 Caller::Guest { .. } => HostFutureState::NotApplicable,
4388 Caller::Host {
4389 host_future_present,
4390 ..
4391 } => {
4392 if *host_future_present {
4393 HostFutureState::Live
4394 } else {
4395 HostFutureState::NotApplicable
4396 }
4397 }
4398 };
4399 Ok(Self {
4400 common: WaitableCommon::default(),
4401 lower_params: Some(lower_params),
4402 lift_result: Some(lift_result),
4403 result: None,
4404 callback,
4405 caller,
4406 call_context: Some(CallContext::default()),
4407 sync_result: SyncResult::NotProduced,
4408 cancel_sent: false,
4409 starting_sent: false,
4410 subtasks: HashSet::new(),
4411 sync_call_set,
4412 instance: RuntimeInstance {
4413 instance: component_instance.id().instance(),
4414 index: instance,
4415 },
4416 event: None,
4417 function_index: None,
4418 exited: false,
4419 threads: HashSet::new(),
4420 host_future_state,
4421 async_function,
4422 })
4423 }
4424
4425 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4428 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4431 if let Some(Event::Subtask {
4432 status: Status::Returned | Status::ReturnCancelled,
4433 }) = waitable.common(state)?.event
4434 {
4435 waitable.delete_from(state)?;
4436 }
4437 }
4438
4439 assert!(self.threads.is_empty());
4440
4441 state.delete(self.sync_call_set)?;
4442
4443 match &self.caller {
4445 Caller::Guest {
4446 thread,
4447 instance: runtime_instance,
4448 } => {
4449 let task_mut = state.get_mut(thread.task)?;
4450 let present = task_mut.subtasks.remove(&me);
4451 assert!(present);
4452
4453 for subtask in &self.subtasks {
4454 task_mut.subtasks.insert(*subtask);
4455 }
4456
4457 for subtask in &self.subtasks {
4458 state.get_mut(*subtask)?.caller = Caller::Guest {
4459 thread: *thread,
4460 instance: *runtime_instance,
4461 };
4462 }
4463 }
4464 Caller::Host { exit_tx, .. } => {
4465 for subtask in &self.subtasks {
4466 state.get_mut(*subtask)?.caller = Caller::Host {
4467 tx: None,
4468 exit_tx: exit_tx.clone(),
4472 host_future_present: false,
4473 call_post_return_automatically: true,
4474 };
4475 }
4476 }
4477 }
4478
4479 for subtask in self.subtasks {
4480 if state.get_mut(subtask)?.exited {
4481 Waitable::Guest(subtask).delete_from(state)?;
4482 }
4483 }
4484
4485 Ok(())
4486 }
4487
4488 fn call_post_return_automatically(&self) -> bool {
4489 matches!(
4490 self.caller,
4491 Caller::Guest { .. }
4492 | Caller::Host {
4493 call_post_return_automatically: true,
4494 ..
4495 }
4496 )
4497 }
4498}
4499
4500impl TableDebug for GuestTask {
4501 fn type_name() -> &'static str {
4502 "GuestTask"
4503 }
4504}
4505
4506#[derive(Default)]
4508struct WaitableCommon {
4509 event: Option<Event>,
4511 set: Option<TableId<WaitableSet>>,
4513 handle: Option<u32>,
4515}
4516
4517#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4519enum Waitable {
4520 Host(TableId<HostTask>),
4522 Guest(TableId<GuestTask>),
4524 Transmit(TableId<TransmitHandle>),
4526}
4527
4528impl Waitable {
4529 fn from_instance(
4532 state: Pin<&mut ComponentInstance>,
4533 caller_instance: RuntimeComponentInstanceIndex,
4534 waitable: u32,
4535 ) -> Result<Self> {
4536 use crate::runtime::vm::component::Waitable;
4537
4538 let (waitable, kind) = state.instance_states().0[caller_instance]
4539 .handle_table()
4540 .waitable_rep(waitable)?;
4541
4542 Ok(match kind {
4543 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4544 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4545 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4546 })
4547 }
4548
4549 fn rep(&self) -> u32 {
4551 match self {
4552 Self::Host(id) => id.rep(),
4553 Self::Guest(id) => id.rep(),
4554 Self::Transmit(id) => id.rep(),
4555 }
4556 }
4557
4558 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4562 log::trace!("waitable {self:?} join set {set:?}",);
4563
4564 let old = mem::replace(&mut self.common(state)?.set, set);
4565
4566 if let Some(old) = old {
4567 match *self {
4568 Waitable::Host(id) => state.remove_child(id, old),
4569 Waitable::Guest(id) => state.remove_child(id, old),
4570 Waitable::Transmit(id) => state.remove_child(id, old),
4571 }?;
4572
4573 state.get_mut(old)?.ready.remove(self);
4574 }
4575
4576 if let Some(set) = set {
4577 match *self {
4578 Waitable::Host(id) => state.add_child(id, set),
4579 Waitable::Guest(id) => state.add_child(id, set),
4580 Waitable::Transmit(id) => state.add_child(id, set),
4581 }?;
4582
4583 if self.common(state)?.event.is_some() {
4584 self.mark_ready(state)?;
4585 }
4586 }
4587
4588 Ok(())
4589 }
4590
4591 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4593 Ok(match self {
4594 Self::Host(id) => &mut state.get_mut(*id)?.common,
4595 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4596 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4597 })
4598 }
4599
4600 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4604 log::trace!("set event for {self:?}: {event:?}");
4605 self.common(state)?.event = event;
4606 self.mark_ready(state)
4607 }
4608
4609 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4611 let common = self.common(state)?;
4612 let event = common.event.take();
4613 if let Some(set) = self.common(state)?.set {
4614 state.get_mut(set)?.ready.remove(self);
4615 }
4616
4617 Ok(event)
4618 }
4619
4620 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4624 if let Some(set) = self.common(state)?.set {
4625 state.get_mut(set)?.ready.insert(*self);
4626 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4627 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4628 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4629
4630 let item = match mode {
4631 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4632 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4633 thread,
4634 kind: GuestCallKind::DeliverEvent {
4635 instance,
4636 set: Some(set),
4637 },
4638 }),
4639 };
4640 state.push_high_priority(item);
4641 }
4642 }
4643 Ok(())
4644 }
4645
4646 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4648 match self {
4649 Self::Host(task) => {
4650 log::trace!("delete host task {task:?}");
4651 state.delete(*task)?;
4652 }
4653 Self::Guest(task) => {
4654 log::trace!("delete guest task {task:?}");
4655 state.delete(*task)?.dispose(state, *task)?;
4656 }
4657 Self::Transmit(task) => {
4658 state.delete(*task)?;
4659 }
4660 }
4661
4662 Ok(())
4663 }
4664}
4665
4666impl fmt::Debug for Waitable {
4667 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4668 match self {
4669 Self::Host(id) => write!(f, "{id:?}"),
4670 Self::Guest(id) => write!(f, "{id:?}"),
4671 Self::Transmit(id) => write!(f, "{id:?}"),
4672 }
4673 }
4674}
4675
4676#[derive(Default)]
4678struct WaitableSet {
4679 ready: BTreeSet<Waitable>,
4681 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4683}
4684
4685impl TableDebug for WaitableSet {
4686 fn type_name() -> &'static str {
4687 "WaitableSet"
4688 }
4689}
4690
4691type RawLower =
4693 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4694
4695type RawLift = Box<
4697 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4698>;
4699
4700type LiftedResult = Box<dyn Any + Send + Sync>;
4704
4705struct DummyResult;
4708
4709#[derive(Default)]
4711pub struct ConcurrentInstanceState {
4712 backpressure: u16,
4714 do_not_enter: bool,
4716 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4719}
4720
4721impl ConcurrentInstanceState {
4722 pub fn pending_is_empty(&self) -> bool {
4723 self.pending.is_empty()
4724 }
4725}
4726
4727pub struct ConcurrentState {
4729 guest_thread: Option<QualifiedThreadId>,
4731
4732 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4737 table: AlwaysMut<ResourceTable>,
4739 high_priority: Vec<WorkItem>,
4741 low_priority: Vec<WorkItem>,
4743 suspend_reason: Option<SuspendReason>,
4747 worker: Option<StoreFiber<'static>>,
4751 worker_item: Option<WorkerItem>,
4753
4754 global_error_context_ref_counts:
4767 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4768}
4769
4770impl Default for ConcurrentState {
4771 fn default() -> Self {
4772 Self {
4773 guest_thread: None,
4774 table: AlwaysMut::new(ResourceTable::new()),
4775 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4776 high_priority: Vec::new(),
4777 low_priority: Vec::new(),
4778 suspend_reason: None,
4779 worker: None,
4780 worker_item: None,
4781 global_error_context_ref_counts: BTreeMap::new(),
4782 }
4783 }
4784}
4785
4786impl ConcurrentState {
4787 pub(crate) fn take_fibers_and_futures(
4804 &mut self,
4805 fibers: &mut Vec<StoreFiber<'static>>,
4806 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4807 ) {
4808 for entry in self.table.get_mut().iter_mut() {
4809 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4810 for mode in mem::take(&mut set.waiting).into_values() {
4811 if let WaitMode::Fiber(fiber) = mode {
4812 fibers.push(fiber);
4813 }
4814 }
4815 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4816 if let GuestThreadState::Suspended(fiber) =
4817 mem::replace(&mut thread.state, GuestThreadState::Completed)
4818 {
4819 fibers.push(fiber);
4820 }
4821 }
4822 }
4823
4824 if let Some(fiber) = self.worker.take() {
4825 fibers.push(fiber);
4826 }
4827
4828 let mut take_items = |list| {
4829 for item in mem::take(list) {
4830 match item {
4831 WorkItem::ResumeFiber(fiber) => {
4832 fibers.push(fiber);
4833 }
4834 WorkItem::PushFuture(future) => {
4835 self.futures
4836 .get_mut()
4837 .as_mut()
4838 .unwrap()
4839 .push(future.into_inner());
4840 }
4841 _ => {}
4842 }
4843 }
4844 };
4845
4846 take_items(&mut self.high_priority);
4847 take_items(&mut self.low_priority);
4848
4849 if let Some(them) = self.futures.get_mut().take() {
4850 futures.push(them);
4851 }
4852 }
4853
4854 fn push<V: Send + Sync + 'static>(
4855 &mut self,
4856 value: V,
4857 ) -> Result<TableId<V>, ResourceTableError> {
4858 self.table.get_mut().push(value).map(TableId::from)
4859 }
4860
4861 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4862 self.table.get_mut().get_mut(&Resource::from(id))
4863 }
4864
4865 pub fn add_child<T: 'static, U: 'static>(
4866 &mut self,
4867 child: TableId<T>,
4868 parent: TableId<U>,
4869 ) -> Result<(), ResourceTableError> {
4870 self.table
4871 .get_mut()
4872 .add_child(Resource::from(child), Resource::from(parent))
4873 }
4874
4875 pub fn remove_child<T: 'static, U: 'static>(
4876 &mut self,
4877 child: TableId<T>,
4878 parent: TableId<U>,
4879 ) -> Result<(), ResourceTableError> {
4880 self.table
4881 .get_mut()
4882 .remove_child(Resource::from(child), Resource::from(parent))
4883 }
4884
4885 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4886 self.table.get_mut().delete(Resource::from(id))
4887 }
4888
4889 fn push_future(&mut self, future: HostTaskFuture) {
4890 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4897 }
4898
4899 fn push_high_priority(&mut self, item: WorkItem) {
4900 log::trace!("push high priority: {item:?}");
4901 self.high_priority.push(item);
4902 }
4903
4904 fn push_low_priority(&mut self, item: WorkItem) {
4905 log::trace!("push low priority: {item:?}");
4906 self.low_priority.push(item);
4907 }
4908
4909 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4910 if high_priority {
4911 self.push_high_priority(item);
4912 } else {
4913 self.push_low_priority(item);
4914 }
4915 }
4916
4917 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4927 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4928
4929 loop {
4937 let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4938 Caller::Host { .. } => break true,
4939 Caller::Guest { thread, instance } => {
4940 if *instance == guest_instance.index {
4941 break false;
4942 } else {
4943 *thread
4944 }
4945 }
4946 };
4947 guest_task = next_thread.task;
4948 }
4949 }
4950
4951 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4953 let thread = self.guest_thread.unwrap();
4954 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4955 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4956 Ok(val)
4957 }
4958
4959 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4961 let thread = self.guest_thread.unwrap();
4962 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4963 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4964 Ok(())
4965 }
4966
4967 fn take_pending_cancellation(&mut self) -> bool {
4970 let thread = self.guest_thread.unwrap();
4971 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4972 assert!(matches!(event, Event::Cancelled));
4973 true
4974 } else {
4975 false
4976 }
4977 }
4978
4979 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4980 if self.may_block(task) {
4981 Ok(())
4982 } else {
4983 Err(Trap::CannotBlockSyncTask.into())
4984 }
4985 }
4986
4987 fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4988 let task = self.get_mut(task).unwrap();
4989 task.async_function || task.returned_or_cancelled()
4990 }
4991}
4992
4993fn for_any_lower<
4996 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4997>(
4998 fun: F,
4999) -> F {
5000 fun
5001}
5002
5003fn for_any_lift<
5005 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5006>(
5007 fun: F,
5008) -> F {
5009 fun
5010}
5011
5012fn checked<F: Future + Send + 'static>(
5017 id: StoreId,
5018 fut: F,
5019) -> impl Future<Output = F::Output> + Send + 'static {
5020 async move {
5021 let mut fut = pin!(fut);
5022 future::poll_fn(move |cx| {
5023 let message = "\
5024 `Future`s which depend on asynchronous component tasks, streams, or \
5025 futures to complete may only be polled from the event loop of the \
5026 store to which they belong. Please use \
5027 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5028 ";
5029 tls::try_get(|store| {
5030 let matched = match store {
5031 tls::TryGet::Some(store) => store.id() == id,
5032 tls::TryGet::Taken | tls::TryGet::None => false,
5033 };
5034
5035 if !matched {
5036 panic!("{message}")
5037 }
5038 });
5039 fut.as_mut().poll(cx)
5040 })
5041 .await
5042 }
5043}
5044
5045fn check_recursive_run() {
5048 tls::try_get(|store| {
5049 if !matches!(store, tls::TryGet::None) {
5050 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5051 }
5052 });
5053}
5054
5055fn unpack_callback_code(code: u32) -> (u32, u32) {
5056 (code & 0xF, code >> 4)
5057}
5058
5059struct WaitableCheckParams {
5063 set: TableId<WaitableSet>,
5064 options: OptionsIndex,
5065 payload: u32,
5066}
5067
5068enum WaitableCheck {
5071 Wait,
5072 Poll,
5073}
5074
5075pub(crate) struct PreparedCall<R> {
5077 handle: Func,
5079 thread: QualifiedThreadId,
5081 param_count: usize,
5083 rx: oneshot::Receiver<LiftedResult>,
5086 exit_rx: oneshot::Receiver<()>,
5089 _phantom: PhantomData<R>,
5090}
5091
5092impl<R> PreparedCall<R> {
5093 pub(crate) fn task_id(&self) -> TaskId {
5095 TaskId {
5096 task: self.thread.task,
5097 }
5098 }
5099}
5100
5101pub(crate) struct TaskId {
5103 task: TableId<GuestTask>,
5104}
5105
5106impl TaskId {
5107 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5113 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5114 if !task.already_lowered_parameters() {
5115 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5116 } else {
5117 task.host_future_state = HostFutureState::Dropped;
5118 if task.ready_to_delete() {
5119 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5120 }
5121 }
5122 Ok(())
5123 }
5124}
5125
5126pub(crate) fn prepare_call<T, R>(
5132 mut store: StoreContextMut<T>,
5133 handle: Func,
5134 param_count: usize,
5135 host_future_present: bool,
5136 call_post_return_automatically: bool,
5137 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5138 + Send
5139 + Sync
5140 + 'static,
5141 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5142 + Send
5143 + Sync
5144 + 'static,
5145) -> Result<PreparedCall<R>> {
5146 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5147
5148 let instance = handle.instance().id().get(store.0);
5149 let options = &instance.component().env_component().options[options];
5150 let ty = &instance.component().types()[ty];
5151 let async_function = ty.async_;
5152 let task_return_type = ty.results;
5153 let component_instance = raw_options.instance;
5154 let callback = options.callback.map(|i| instance.runtime_callback(i));
5155 let memory = options
5156 .memory()
5157 .map(|i| instance.runtime_memory(i))
5158 .map(SendSyncPtr::new);
5159 let string_encoding = options.string_encoding;
5160 let token = StoreToken::new(store.as_context_mut());
5161 let state = store.0.concurrent_state_mut();
5162
5163 let (tx, rx) = oneshot::channel();
5164 let (exit_tx, exit_rx) = oneshot::channel();
5165
5166 let mut task = GuestTask::new(
5167 state,
5168 Box::new(for_any_lower(move |store, params| {
5169 lower_params(handle, token.as_context_mut(store), params)
5170 })),
5171 LiftResult {
5172 lift: Box::new(for_any_lift(move |store, result| {
5173 lift_result(handle, store, result)
5174 })),
5175 ty: task_return_type,
5176 memory,
5177 string_encoding,
5178 },
5179 Caller::Host {
5180 tx: Some(tx),
5181 exit_tx: Arc::new(exit_tx),
5182 host_future_present,
5183 call_post_return_automatically,
5184 },
5185 callback.map(|callback| {
5186 let callback = SendSyncPtr::new(callback);
5187 let instance = handle.instance();
5188 Box::new(
5189 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5190 let store = token.as_context_mut(store);
5191 unsafe {
5194 instance.call_callback(
5195 store,
5196 runtime_instance,
5197 callback,
5198 event,
5199 handle,
5200 call_post_return_automatically,
5201 )
5202 }
5203 },
5204 ) as CallbackFn
5205 }),
5206 handle.instance(),
5207 component_instance,
5208 async_function,
5209 )?;
5210 task.function_index = Some(handle.index());
5211
5212 let task = state.push(task)?;
5213 let thread = state.push(GuestThread::new_implicit(task))?;
5214 state.get_mut(task)?.threads.insert(thread);
5215
5216 Ok(PreparedCall {
5217 handle,
5218 thread: QualifiedThreadId { task, thread },
5219 param_count,
5220 rx,
5221 exit_rx,
5222 _phantom: PhantomData,
5223 })
5224}
5225
5226pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5233 mut store: StoreContextMut<T>,
5234 prepared: PreparedCall<R>,
5235) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5236 let PreparedCall {
5237 handle,
5238 thread,
5239 param_count,
5240 rx,
5241 exit_rx,
5242 ..
5243 } = prepared;
5244
5245 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5246
5247 Ok(checked(
5248 store.0.id(),
5249 rx.map(move |result| {
5250 result
5251 .map(|v| (*v.downcast().unwrap(), exit_rx))
5252 .map_err(crate::Error::from)
5253 }),
5254 ))
5255}
5256
5257fn queue_call0<T: 'static>(
5260 store: StoreContextMut<T>,
5261 handle: Func,
5262 guest_thread: QualifiedThreadId,
5263 param_count: usize,
5264) -> Result<()> {
5265 let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5266 let is_concurrent = raw_options.async_;
5267 let callback = raw_options.callback;
5268 let instance = handle.instance();
5269 let callee = handle.lifted_core_func(store.0);
5270 let post_return = handle.post_return_core_func(store.0);
5271 let callback = callback.map(|i| {
5272 let instance = instance.id().get(store.0);
5273 SendSyncPtr::new(instance.runtime_callback(i))
5274 });
5275
5276 log::trace!("queueing call {guest_thread:?}");
5277
5278 let instance_flags = if callback.is_none() {
5279 None
5280 } else {
5281 Some(flags)
5282 };
5283
5284 unsafe {
5288 instance.queue_call(
5289 store,
5290 guest_thread,
5291 SendSyncPtr::new(callee),
5292 param_count,
5293 1,
5294 instance_flags,
5295 is_concurrent,
5296 callback,
5297 post_return.map(SendSyncPtr::new),
5298 )
5299 }
5300}