1use crate::component::func::{self, Func};
54use crate::component::store::StoreComponentInstanceId;
55use crate::component::{
56 ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{
61 CallContext, ComponentInstance, HandleTable, InstanceFlags, ResourceTables,
62};
63use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
64use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
65use anyhow::{Context as _, Result, anyhow, bail};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, Either, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::slice;
84use std::sync::Arc;
85use std::task::{Context, Poll, Waker};
86use std::vec::Vec;
87use table::{TableDebug, TableId};
88use wasmtime_environ::Trap;
89use wasmtime_environ::component::{
90 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
91 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96
97pub use abort::JoinHandle;
98pub use future_stream_any::{FutureAny, StreamAny};
99pub use futures_and_streams::{
100 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103};
104pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105
106mod abort;
107mod error_contexts;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113const BLOCKED: u32 = 0xffff_ffff;
116
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120 Starting = 0,
121 Started = 1,
122 Returned = 2,
123 StartCancelled = 3,
124 ReturnCancelled = 4,
125}
126
127impl Status {
128 pub fn pack(self, waitable: Option<u32>) -> u32 {
134 assert!(matches!(self, Status::Returned) == waitable.is_none());
135 let waitable = waitable.unwrap_or(0);
136 assert!(waitable < (1 << 28));
137 (waitable << 4) | (self as u32)
138 }
139}
140
141#[derive(Clone, Copy, Debug)]
144enum Event {
145 None,
146 Cancelled,
147 Subtask {
148 status: Status,
149 },
150 StreamRead {
151 code: ReturnCode,
152 pending: Option<(TypeStreamTableIndex, u32)>,
153 },
154 StreamWrite {
155 code: ReturnCode,
156 pending: Option<(TypeStreamTableIndex, u32)>,
157 },
158 FutureRead {
159 code: ReturnCode,
160 pending: Option<(TypeFutureTableIndex, u32)>,
161 },
162 FutureWrite {
163 code: ReturnCode,
164 pending: Option<(TypeFutureTableIndex, u32)>,
165 },
166}
167
168impl Event {
169 fn parts(self) -> (u32, u32) {
174 const EVENT_NONE: u32 = 0;
175 const EVENT_SUBTASK: u32 = 1;
176 const EVENT_STREAM_READ: u32 = 2;
177 const EVENT_STREAM_WRITE: u32 = 3;
178 const EVENT_FUTURE_READ: u32 = 4;
179 const EVENT_FUTURE_WRITE: u32 = 5;
180 const EVENT_CANCELLED: u32 = 6;
181 match self {
182 Event::None => (EVENT_NONE, 0),
183 Event::Cancelled => (EVENT_CANCELLED, 0),
184 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189 }
190 }
191}
192
193mod callback_code {
195 pub const EXIT: u32 = 0;
196 pub const YIELD: u32 = 1;
197 pub const WAIT: u32 = 2;
198 pub const POLL: u32 = 3;
199}
200
201const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
205
206pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
212 store: StoreContextMut<'a, T>,
213 get_data: fn(&mut T) -> D::Data<'_>,
214}
215
216impl<'a, T, D> Access<'a, T, D>
217where
218 D: HasData + ?Sized,
219 T: 'static,
220{
221 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
223 Self { store, get_data }
224 }
225
226 pub fn data_mut(&mut self) -> &mut T {
228 self.store.data_mut()
229 }
230
231 pub fn get(&mut self) -> D::Data<'_> {
233 (self.get_data)(self.data_mut())
234 }
235
236 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
240 where
241 T: 'static,
242 {
243 let accessor = Accessor {
244 get_data: self.get_data,
245 token: StoreToken::new(self.store.as_context_mut()),
246 };
247 self.store
248 .as_context_mut()
249 .spawn_with_accessor(accessor, task)
250 }
251
252 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
255 self.get_data
256 }
257}
258
259impl<'a, T, D> AsContext for Access<'a, T, D>
260where
261 D: HasData + ?Sized,
262 T: 'static,
263{
264 type Data = T;
265
266 fn as_context(&self) -> StoreContext<'_, T> {
267 self.store.as_context()
268 }
269}
270
271impl<'a, T, D> AsContextMut for Access<'a, T, D>
272where
273 D: HasData + ?Sized,
274 T: 'static,
275{
276 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
277 self.store.as_context_mut()
278 }
279}
280
281pub struct Accessor<T: 'static, D = HasSelf<T>>
341where
342 D: HasData + ?Sized,
343{
344 token: StoreToken<T>,
345 get_data: fn(&mut T) -> D::Data<'_>,
346}
347
348pub trait AsAccessor {
365 type Data: 'static;
367
368 type AccessorData: HasData + ?Sized;
371
372 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
374}
375
376impl<T: AsAccessor + ?Sized> AsAccessor for &T {
377 type Data = T::Data;
378 type AccessorData = T::AccessorData;
379
380 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
381 T::as_accessor(self)
382 }
383}
384
385impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
386 type Data = T;
387 type AccessorData = D;
388
389 fn as_accessor(&self) -> &Accessor<T, D> {
390 self
391 }
392}
393
394const _: () = {
417 const fn assert<T: Send + Sync>() {}
418 assert::<Accessor<UnsafeCell<u32>>>();
419};
420
421impl<T> Accessor<T> {
422 pub(crate) fn new(token: StoreToken<T>) -> Self {
431 Self {
432 token,
433 get_data: |x| x,
434 }
435 }
436}
437
438impl<T, D> Accessor<T, D>
439where
440 D: HasData + ?Sized,
441{
442 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
460 tls::get(|vmstore| {
461 fun(Access {
462 store: self.token.as_context_mut(vmstore),
463 get_data: self.get_data,
464 })
465 })
466 }
467
468 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
471 self.get_data
472 }
473
474 pub fn with_getter<D2: HasData>(
491 &self,
492 get_data: fn(&mut T) -> D2::Data<'_>,
493 ) -> Accessor<T, D2> {
494 Accessor {
495 token: self.token,
496 get_data,
497 }
498 }
499
500 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
516 where
517 T: 'static,
518 {
519 let accessor = self.clone_for_spawn();
520 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
521 }
522
523 fn clone_for_spawn(&self) -> Self {
524 Self {
525 token: self.token,
526 get_data: self.get_data,
527 }
528 }
529}
530
531pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
543where
544 D: HasData + ?Sized,
545{
546 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
548}
549
550enum CallerInfo {
553 Async {
555 params: Vec<ValRaw>,
556 has_result: bool,
557 },
558 Sync {
560 params: Vec<ValRaw>,
561 result_count: u32,
562 },
563}
564
565enum WaitMode {
567 Fiber(StoreFiber<'static>),
569 Callback(Instance),
572}
573
574#[derive(Debug)]
576enum SuspendReason {
577 Waiting {
580 set: TableId<WaitableSet>,
581 thread: QualifiedThreadId,
582 skip_may_block_check: bool,
583 },
584 NeedWork,
587 Yielding { thread: QualifiedThreadId },
590 ExplicitlySuspending {
592 thread: QualifiedThreadId,
593 skip_may_block_check: bool,
594 },
595}
596
597enum GuestCallKind {
599 DeliverEvent {
602 instance: Instance,
604 set: Option<TableId<WaitableSet>>,
609 },
610 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
616 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
617}
618
619impl fmt::Debug for GuestCallKind {
620 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
621 match self {
622 Self::DeliverEvent { instance, set } => f
623 .debug_struct("DeliverEvent")
624 .field("instance", instance)
625 .field("set", set)
626 .finish(),
627 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
628 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
629 }
630 }
631}
632
633#[derive(Debug)]
635struct GuestCall {
636 thread: QualifiedThreadId,
637 kind: GuestCallKind,
638}
639
640impl GuestCall {
641 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
651 let instance = store
652 .concurrent_state_mut()
653 .get_mut(self.thread.task)?
654 .instance;
655 let state = store.instance_state(instance);
656
657 let ready = match &self.kind {
658 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
659 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
660 GuestCallKind::StartExplicit(_) => true,
661 };
662 log::trace!(
663 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
664 state.do_not_enter,
665 state.backpressure
666 );
667 Ok(ready)
668 }
669}
670
671enum WorkerItem {
673 GuestCall(GuestCall),
674 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
675}
676
677#[derive(Debug)]
680struct PollParams {
681 instance: Instance,
683 thread: QualifiedThreadId,
685 set: TableId<WaitableSet>,
687}
688
689enum WorkItem {
692 PushFuture(AlwaysMut<HostTaskFuture>),
694 ResumeFiber(StoreFiber<'static>),
696 GuestCall(GuestCall),
698 Poll(PollParams),
700 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
702}
703
704impl fmt::Debug for WorkItem {
705 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
706 match self {
707 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
708 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
709 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
710 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
711 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
712 }
713 }
714}
715
716#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
718pub(crate) enum WaitResult {
719 Cancelled,
720 Completed,
721}
722
723pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> {
726 store.concurrent_state_mut().check_blocking()
727}
728
729pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
737 store: &mut dyn VMStore,
738 future: impl Future<Output = Result<R>> + Send + 'static,
739 caller_instance: RuntimeComponentInstanceIndex,
740) -> Result<R> {
741 let state = store.concurrent_state_mut();
742
743 let Some(caller) = state.guest_thread else {
747 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
748 Poll::Ready(result) => result,
749 Poll::Pending => {
750 unreachable!()
751 }
752 };
753 };
754
755 let old_result = state
758 .get_mut(caller.task)
759 .with_context(|| format!("bad handle: {caller:?}"))?
760 .result
761 .take();
762
763 let task = state.push(HostTask::new(caller_instance, None))?;
767
768 log::trace!("new host task child of {caller:?}: {task:?}");
769
770 let mut future = Box::pin(async move {
774 let result = future.await?;
775 tls::get(move |store| {
776 let state = store.concurrent_state_mut();
777 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
778
779 Waitable::Host(task).set_event(
780 state,
781 Some(Event::Subtask {
782 status: Status::Returned,
783 }),
784 )?;
785
786 Ok(())
787 })
788 }) as HostTaskFuture;
789
790 let poll = tls::set(store, || {
794 future
795 .as_mut()
796 .poll(&mut Context::from_waker(&Waker::noop()))
797 });
798
799 match poll {
800 Poll::Ready(result) => {
801 result?;
803 log::trace!("delete host task {task:?} (already ready)");
804 store.concurrent_state_mut().delete(task)?;
805 }
806 Poll::Pending => {
807 let state = store.concurrent_state_mut();
812 state.push_future(future);
813
814 let set = state.get_mut(caller.task)?.sync_call_set;
815 Waitable::Host(task).join(state, Some(set))?;
816
817 store.suspend(SuspendReason::Waiting {
818 set,
819 thread: caller,
820 skip_may_block_check: false,
821 })?;
822 }
823 }
824
825 Ok(*mem::replace(
827 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
828 old_result,
829 )
830 .unwrap()
831 .downcast()
832 .unwrap())
833}
834
835fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
837 let mut next = Some(call);
838 while let Some(call) = next.take() {
839 match call.kind {
840 GuestCallKind::DeliverEvent { instance, set } => {
841 let (event, waitable) = instance
842 .get_event(store, call.thread.task, set, true)?
843 .unwrap();
844 let state = store.concurrent_state_mut();
845 let task = state.get_mut(call.thread.task)?;
846 let runtime_instance = task.instance;
847 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
848
849 log::trace!(
850 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
851 call.thread,
852 );
853
854 let old_thread = state.guest_thread.replace(call.thread);
855 log::trace!(
856 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
857 call.thread
858 );
859
860 store.maybe_push_call_context(call.thread.task)?;
861
862 store.enter_instance(runtime_instance);
863
864 let callback = store
865 .concurrent_state_mut()
866 .get_mut(call.thread.task)?
867 .callback
868 .take()
869 .unwrap();
870
871 let code = callback(store, runtime_instance.index, event, handle)?;
872
873 store
874 .concurrent_state_mut()
875 .get_mut(call.thread.task)?
876 .callback = Some(callback);
877
878 store.exit_instance(runtime_instance)?;
879
880 store.maybe_pop_call_context(call.thread.task)?;
881
882 next = instance.handle_callback_code(
883 store,
884 call.thread,
885 runtime_instance.index,
886 code,
887 )?;
888
889 store.concurrent_state_mut().guest_thread = old_thread;
890 log::trace!(
891 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
892 );
893 }
894 GuestCallKind::StartImplicit(fun) => {
895 next = fun(store)?;
896 }
897 GuestCallKind::StartExplicit(fun) => {
898 fun(store)?;
899 }
900 }
901 }
902
903 Ok(())
904}
905
906impl<T> Store<T> {
907 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
909 where
910 T: Send + 'static,
911 {
912 self.as_context_mut().run_concurrent(fun).await
913 }
914
915 #[doc(hidden)]
916 pub fn assert_concurrent_state_empty(&mut self) {
917 self.as_context_mut().assert_concurrent_state_empty();
918 }
919
920 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
922 where
923 T: 'static,
924 {
925 self.as_context_mut().spawn(task)
926 }
927}
928
929impl<T> StoreContextMut<'_, T> {
930 #[doc(hidden)]
939 pub fn assert_concurrent_state_empty(self) {
940 let store = self.0;
941 store
942 .store_data_mut()
943 .components
944 .assert_instance_states_empty();
945 let state = store.concurrent_state_mut();
946 assert!(
947 state.table.get_mut().is_empty(),
948 "non-empty table: {:?}",
949 state.table.get_mut()
950 );
951 assert!(state.high_priority.is_empty());
952 assert!(state.low_priority.is_empty());
953 assert!(state.guest_thread.is_none());
954 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
955 assert!(state.global_error_context_ref_counts.is_empty());
956 }
957
958 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
968 where
969 T: 'static,
970 {
971 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
972 self.spawn_with_accessor(accessor, task)
973 }
974
975 fn spawn_with_accessor<D>(
978 self,
979 accessor: Accessor<T, D>,
980 task: impl AccessorTask<T, D>,
981 ) -> JoinHandle
982 where
983 T: 'static,
984 D: HasData + ?Sized,
985 {
986 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
990 self.0
991 .concurrent_state_mut()
992 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
993 handle
994 }
995
996 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1077 where
1078 T: Send + 'static,
1079 {
1080 self.do_run_concurrent(fun, false).await
1081 }
1082
1083 pub(super) async fn run_concurrent_trap_on_idle<R>(
1084 self,
1085 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1086 ) -> Result<R>
1087 where
1088 T: Send + 'static,
1089 {
1090 self.do_run_concurrent(fun, true).await
1091 }
1092
1093 async fn do_run_concurrent<R>(
1094 mut self,
1095 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1096 trap_on_idle: bool,
1097 ) -> Result<R>
1098 where
1099 T: Send + 'static,
1100 {
1101 check_recursive_run();
1102 let token = StoreToken::new(self.as_context_mut());
1103
1104 struct Dropper<'a, T: 'static, V> {
1105 store: StoreContextMut<'a, T>,
1106 value: ManuallyDrop<V>,
1107 }
1108
1109 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1110 fn drop(&mut self) {
1111 tls::set(self.store.0, || {
1112 unsafe { ManuallyDrop::drop(&mut self.value) }
1117 });
1118 }
1119 }
1120
1121 let accessor = &Accessor::new(token);
1122 let dropper = &mut Dropper {
1123 store: self,
1124 value: ManuallyDrop::new(fun(accessor)),
1125 };
1126 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1128
1129 dropper
1130 .store
1131 .as_context_mut()
1132 .poll_until(future, trap_on_idle)
1133 .await
1134 }
1135
1136 async fn poll_until<R>(
1142 mut self,
1143 mut future: Pin<&mut impl Future<Output = R>>,
1144 trap_on_idle: bool,
1145 ) -> Result<R>
1146 where
1147 T: Send + 'static,
1148 {
1149 struct Reset<'a, T: 'static> {
1150 store: StoreContextMut<'a, T>,
1151 futures: Option<FuturesUnordered<HostTaskFuture>>,
1152 }
1153
1154 impl<'a, T> Drop for Reset<'a, T> {
1155 fn drop(&mut self) {
1156 if let Some(futures) = self.futures.take() {
1157 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1158 }
1159 }
1160 }
1161
1162 loop {
1163 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1167 let mut reset = Reset {
1168 store: self.as_context_mut(),
1169 futures,
1170 };
1171 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1172
1173 let result = future::poll_fn(|cx| {
1174 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1177 return Poll::Ready(Ok(Either::Left(value)));
1178 }
1179
1180 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1184 Poll::Ready(Some(output)) => {
1185 match output {
1186 Err(e) => return Poll::Ready(Err(e)),
1187 Ok(()) => {}
1188 }
1189 Poll::Ready(true)
1190 }
1191 Poll::Ready(None) => Poll::Ready(false),
1192 Poll::Pending => Poll::Pending,
1193 };
1194
1195 let state = reset.store.0.concurrent_state_mut();
1198 let ready = mem::take(&mut state.high_priority);
1199 let ready = if ready.is_empty() {
1200 let ready = mem::take(&mut state.low_priority);
1203 if ready.is_empty() {
1204 return match next {
1205 Poll::Ready(true) => {
1206 Poll::Ready(Ok(Either::Right(Vec::new())))
1212 }
1213 Poll::Ready(false) => {
1214 if let Poll::Ready(value) =
1218 tls::set(reset.store.0, || future.as_mut().poll(cx))
1219 {
1220 Poll::Ready(Ok(Either::Left(value)))
1221 } else {
1222 if trap_on_idle {
1228 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1231 } else {
1232 Poll::Pending
1236 }
1237 }
1238 }
1239 Poll::Pending => Poll::Pending,
1244 };
1245 } else {
1246 ready
1247 }
1248 } else {
1249 ready
1250 };
1251
1252 Poll::Ready(Ok(Either::Right(ready)))
1253 })
1254 .await;
1255
1256 drop(reset);
1260
1261 match result? {
1262 Either::Left(value) => break Ok(value),
1265 Either::Right(ready) => {
1268 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1269 store: StoreContextMut<'a, T>,
1270 ready: I,
1271 }
1272
1273 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1274 fn drop(&mut self) {
1275 while let Some(item) = self.ready.next() {
1276 match item {
1277 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1278 WorkItem::PushFuture(future) => {
1279 tls::set(self.store.0, move || drop(future))
1280 }
1281 _ => {}
1282 }
1283 }
1284 }
1285 }
1286
1287 let mut dispose = Dispose {
1288 store: self.as_context_mut(),
1289 ready: ready.into_iter(),
1290 };
1291
1292 while let Some(item) = dispose.ready.next() {
1293 dispose
1294 .store
1295 .as_context_mut()
1296 .handle_work_item(item)
1297 .await?;
1298 }
1299 }
1300 }
1301 }
1302 }
1303
1304 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1306 where
1307 T: Send,
1308 {
1309 log::trace!("handle work item {item:?}");
1310 match item {
1311 WorkItem::PushFuture(future) => {
1312 self.0
1313 .concurrent_state_mut()
1314 .futures
1315 .get_mut()
1316 .as_mut()
1317 .unwrap()
1318 .push(future.into_inner());
1319 }
1320 WorkItem::ResumeFiber(fiber) => {
1321 self.0.resume_fiber(fiber).await?;
1322 }
1323 WorkItem::GuestCall(call) => {
1324 if call.is_ready(self.0)? {
1325 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1326 } else {
1327 let state = self.0.concurrent_state_mut();
1328 let task = state.get_mut(call.thread.task)?;
1329 if !task.starting_sent {
1330 task.starting_sent = true;
1331 if let GuestCallKind::StartImplicit(_) = &call.kind {
1332 Waitable::Guest(call.thread.task).set_event(
1333 state,
1334 Some(Event::Subtask {
1335 status: Status::Starting,
1336 }),
1337 )?;
1338 }
1339 }
1340
1341 let instance = state.get_mut(call.thread.task)?.instance;
1342 self.0
1343 .instance_state(instance)
1344 .pending
1345 .insert(call.thread, call.kind);
1346 }
1347 }
1348 WorkItem::Poll(params) => {
1349 let state = self.0.concurrent_state_mut();
1350 if state.get_mut(params.thread.task)?.event.is_some()
1351 || !state.get_mut(params.set)?.ready.is_empty()
1352 {
1353 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1356 thread: params.thread,
1357 kind: GuestCallKind::DeliverEvent {
1358 instance: params.instance,
1359 set: Some(params.set),
1360 },
1361 }));
1362 } else {
1363 state.get_mut(params.thread.task)?.event = Some(Event::None);
1366 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1367 thread: params.thread,
1368 kind: GuestCallKind::DeliverEvent {
1369 instance: params.instance,
1370 set: Some(params.set),
1371 },
1372 }));
1373 }
1374 }
1375 WorkItem::WorkerFunction(fun) => {
1376 self.run_on_worker(WorkerItem::Function(fun)).await?;
1377 }
1378 }
1379
1380 Ok(())
1381 }
1382
1383 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1385 where
1386 T: Send,
1387 {
1388 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1389 fiber
1390 } else {
1391 fiber::make_fiber(self.0, move |store| {
1392 loop {
1393 match store.concurrent_state_mut().worker_item.take().unwrap() {
1394 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1395 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1396 }
1397
1398 store.suspend(SuspendReason::NeedWork)?;
1399 }
1400 })?
1401 };
1402
1403 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1404 assert!(worker_item.is_none());
1405 *worker_item = Some(item);
1406
1407 self.0.resume_fiber(worker).await
1408 }
1409
1410 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1415 where
1416 T: 'static,
1417 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1418 + Send
1419 + Sync
1420 + 'static,
1421 R: Send + Sync + 'static,
1422 {
1423 let token = StoreToken::new(self);
1424 async move {
1425 let mut accessor = Accessor::new(token);
1426 closure(&mut accessor).await
1427 }
1428 }
1429}
1430
1431#[derive(Debug, Copy, Clone)]
1432pub struct RuntimeInstance {
1433 pub instance: ComponentInstanceId,
1434 pub index: RuntimeComponentInstanceIndex,
1435}
1436
1437impl StoreOpaque {
1438 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1441 StoreComponentInstanceId::new(self.id(), instance.instance)
1442 .get_mut(self)
1443 .instance_state(instance.index)
1444 .unwrap()
1445 .concurrent_state()
1446 }
1447
1448 fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1451 StoreComponentInstanceId::new(self.id(), instance.instance)
1452 .get_mut(self)
1453 .instance_state(instance.index)
1454 .unwrap()
1455 .handle_table()
1456 }
1457
1458 fn enter_instance(&mut self, instance: RuntimeInstance) {
1462 log::trace!("enter {instance:?}");
1463 self.instance_state(instance).do_not_enter = true;
1464 }
1465
1466 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1470 log::trace!("exit {instance:?}");
1471 self.instance_state(instance).do_not_enter = false;
1472 self.partition_pending(instance)
1473 }
1474
1475 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1480 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1481 let call = GuestCall { thread, kind };
1482 if call.is_ready(self)? {
1483 self.concurrent_state_mut()
1484 .push_high_priority(WorkItem::GuestCall(call));
1485 } else {
1486 self.instance_state(instance)
1487 .pending
1488 .insert(call.thread, call.kind);
1489 }
1490 }
1491
1492 Ok(())
1493 }
1494
1495 pub(crate) fn backpressure_modify(
1497 &mut self,
1498 caller_instance: RuntimeInstance,
1499 modify: impl FnOnce(u16) -> Option<u16>,
1500 ) -> Result<()> {
1501 let state = self.instance_state(caller_instance);
1502 let old = state.backpressure;
1503 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
1504 state.backpressure = new;
1505
1506 if old > 0 && new == 0 {
1507 self.partition_pending(caller_instance)?;
1510 }
1511
1512 Ok(())
1513 }
1514
1515 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1518 let old_thread = self.concurrent_state_mut().guest_thread;
1519 log::trace!("resume_fiber: save current thread {old_thread:?}");
1520
1521 let fiber = fiber::resolve_or_release(self, fiber).await?;
1522
1523 let state = self.concurrent_state_mut();
1524
1525 state.guest_thread = old_thread;
1526 if let Some(ref ot) = old_thread {
1527 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1528 }
1529 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1530
1531 if let Some(mut fiber) = fiber {
1532 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1533 match state.suspend_reason.take().unwrap() {
1535 SuspendReason::NeedWork => {
1536 if state.worker.is_none() {
1537 state.worker = Some(fiber);
1538 } else {
1539 fiber.dispose(self);
1540 }
1541 }
1542 SuspendReason::Yielding { thread, .. } => {
1543 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1544 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1545 }
1546 SuspendReason::ExplicitlySuspending { thread, .. } => {
1547 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1548 }
1549 SuspendReason::Waiting { set, thread, .. } => {
1550 let old = state
1551 .get_mut(set)?
1552 .waiting
1553 .insert(thread, WaitMode::Fiber(fiber));
1554 assert!(old.is_none());
1555 }
1556 };
1557 } else {
1558 log::trace!("resume_fiber: fiber has exited");
1559 }
1560
1561 Ok(())
1562 }
1563
1564 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1570 log::trace!("suspend fiber: {reason:?}");
1571
1572 let task = match &reason {
1576 SuspendReason::Yielding { thread, .. }
1577 | SuspendReason::Waiting { thread, .. }
1578 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1579 SuspendReason::NeedWork => None,
1580 };
1581
1582 let old_guest_thread = if let Some(task) = task {
1583 self.maybe_pop_call_context(task)?;
1584 self.concurrent_state_mut().guest_thread
1585 } else {
1586 None
1587 };
1588
1589 assert!(
1595 matches!(
1596 reason,
1597 SuspendReason::ExplicitlySuspending {
1598 skip_may_block_check: true,
1599 ..
1600 } | SuspendReason::Waiting {
1601 skip_may_block_check: true,
1602 ..
1603 }
1604 ) || old_guest_thread
1605 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1606 .unwrap_or(true)
1607 );
1608
1609 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1610 assert!(suspend_reason.is_none());
1611 *suspend_reason = Some(reason);
1612
1613 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1614
1615 if let Some(task) = task {
1616 self.concurrent_state_mut().guest_thread = old_guest_thread;
1617 self.maybe_push_call_context(task)?;
1618 }
1619
1620 Ok(())
1621 }
1622
1623 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1627 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1628
1629 if !task.returned_or_cancelled() {
1630 log::trace!("push call context for {guest_task:?}");
1631 let call_context = task.call_context.take().unwrap();
1632 self.component_resource_state().0.push(call_context);
1633 }
1634 Ok(())
1635 }
1636
1637 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1641 if !self
1642 .concurrent_state_mut()
1643 .get_mut(guest_task)?
1644 .returned_or_cancelled()
1645 {
1646 log::trace!("pop call context for {guest_task:?}");
1647 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1648 self.concurrent_state_mut()
1649 .get_mut(guest_task)?
1650 .call_context = call_context;
1651 }
1652 Ok(())
1653 }
1654
1655 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1656 let state = self.concurrent_state_mut();
1657 let caller = state.guest_thread.unwrap();
1658 let old_set = waitable.common(state)?.set;
1659 let set = state.get_mut(caller.task)?.sync_call_set;
1660 waitable.join(state, Some(set))?;
1661 self.suspend(SuspendReason::Waiting {
1662 set,
1663 thread: caller,
1664 skip_may_block_check: false,
1665 })?;
1666 let state = self.concurrent_state_mut();
1667 waitable.join(state, old_set)
1668 }
1669}
1670
1671impl Instance {
1672 fn get_event(
1675 self,
1676 store: &mut StoreOpaque,
1677 guest_task: TableId<GuestTask>,
1678 set: Option<TableId<WaitableSet>>,
1679 cancellable: bool,
1680 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1681 let state = store.concurrent_state_mut();
1682
1683 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1684 log::trace!("deliver event {event:?} to {guest_task:?}");
1685
1686 if cancellable || !matches!(event, Event::Cancelled) {
1687 return Ok(Some((event, None)));
1688 } else {
1689 state.get_mut(guest_task)?.event = Some(event);
1690 }
1691 }
1692
1693 Ok(
1694 if let Some((set, waitable)) = set
1695 .and_then(|set| {
1696 state
1697 .get_mut(set)
1698 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1699 .transpose()
1700 })
1701 .transpose()?
1702 {
1703 let common = waitable.common(state)?;
1704 let handle = common.handle.unwrap();
1705 let event = common.event.take().unwrap();
1706
1707 log::trace!(
1708 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1709 );
1710
1711 waitable.on_delivery(store, self, event);
1712
1713 Some((event, Some((waitable, handle))))
1714 } else {
1715 None
1716 },
1717 )
1718 }
1719
1720 fn handle_callback_code(
1726 self,
1727 store: &mut StoreOpaque,
1728 guest_thread: QualifiedThreadId,
1729 runtime_instance: RuntimeComponentInstanceIndex,
1730 code: u32,
1731 ) -> Result<Option<GuestCall>> {
1732 let (code, set) = unpack_callback_code(code);
1733
1734 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1735
1736 let state = store.concurrent_state_mut();
1737
1738 let get_set = |store: &mut StoreOpaque, handle| {
1739 if handle == 0 {
1740 bail!("invalid waitable-set handle");
1741 }
1742
1743 let set = store
1744 .handle_table(RuntimeInstance {
1745 instance: self.id().instance(),
1746 index: runtime_instance,
1747 })
1748 .waitable_set_rep(handle)?;
1749
1750 Ok(TableId::<WaitableSet>::new(set))
1751 };
1752
1753 Ok(match code {
1754 callback_code::EXIT => {
1755 log::trace!("implicit thread {guest_thread:?} completed");
1756 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1757 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1758 if task.threads.is_empty() && !task.returned_or_cancelled() {
1759 bail!(Trap::NoAsyncResult);
1760 }
1761 match &task.caller {
1762 Caller::Host { .. } => {
1763 if task.ready_to_delete() {
1764 Waitable::Guest(guest_thread.task)
1765 .delete_from(store.concurrent_state_mut())?;
1766 }
1767 }
1768 Caller::Guest { .. } => {
1769 task.exited = true;
1770 task.callback = None;
1771 }
1772 }
1773 None
1774 }
1775 callback_code::YIELD => {
1776 let task = state.get_mut(guest_thread.task)?;
1777 assert!(task.event.is_none());
1778 task.event = Some(Event::None);
1779 let call = GuestCall {
1780 thread: guest_thread,
1781 kind: GuestCallKind::DeliverEvent {
1782 instance: self,
1783 set: None,
1784 },
1785 };
1786 if state.may_block(guest_thread.task) {
1787 state.push_low_priority(WorkItem::GuestCall(call));
1790 None
1791 } else {
1792 Some(call)
1796 }
1797 }
1798 callback_code::WAIT | callback_code::POLL => {
1799 state.check_blocking_for(guest_thread.task)?;
1802
1803 let set = get_set(store, set)?;
1804 let state = store.concurrent_state_mut();
1805
1806 if state.get_mut(guest_thread.task)?.event.is_some()
1807 || !state.get_mut(set)?.ready.is_empty()
1808 {
1809 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1811 thread: guest_thread,
1812 kind: GuestCallKind::DeliverEvent {
1813 instance: self,
1814 set: Some(set),
1815 },
1816 }));
1817 } else {
1818 match code {
1820 callback_code::POLL => {
1821 state.push_low_priority(WorkItem::Poll(PollParams {
1824 instance: self,
1825 thread: guest_thread,
1826 set,
1827 }));
1828 }
1829 callback_code::WAIT => {
1830 let old = state
1837 .get_mut(guest_thread.thread)?
1838 .wake_on_cancel
1839 .replace(set);
1840 assert!(old.is_none());
1841 let old = state
1842 .get_mut(set)?
1843 .waiting
1844 .insert(guest_thread, WaitMode::Callback(self));
1845 assert!(old.is_none());
1846 }
1847 _ => unreachable!(),
1848 }
1849 }
1850 None
1851 }
1852 _ => bail!("unsupported callback code: {code}"),
1853 })
1854 }
1855
1856 fn cleanup_thread(
1857 self,
1858 store: &mut StoreOpaque,
1859 guest_thread: QualifiedThreadId,
1860 runtime_instance: RuntimeComponentInstanceIndex,
1861 ) -> Result<()> {
1862 let guest_id = store
1863 .concurrent_state_mut()
1864 .get_mut(guest_thread.thread)?
1865 .instance_rep;
1866 store
1867 .handle_table(RuntimeInstance {
1868 instance: self.id().instance(),
1869 index: runtime_instance,
1870 })
1871 .guest_thread_remove(guest_id.unwrap())?;
1872
1873 store.concurrent_state_mut().delete(guest_thread.thread)?;
1874 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1875 task.threads.remove(&guest_thread.thread);
1876 Ok(())
1877 }
1878
1879 unsafe fn queue_call<T: 'static>(
1886 self,
1887 mut store: StoreContextMut<T>,
1888 guest_thread: QualifiedThreadId,
1889 callee: SendSyncPtr<VMFuncRef>,
1890 param_count: usize,
1891 result_count: usize,
1892 flags: Option<InstanceFlags>,
1893 async_: bool,
1894 callback: Option<SendSyncPtr<VMFuncRef>>,
1895 post_return: Option<SendSyncPtr<VMFuncRef>>,
1896 ) -> Result<()> {
1897 unsafe fn make_call<T: 'static>(
1912 store: StoreContextMut<T>,
1913 guest_thread: QualifiedThreadId,
1914 callee: SendSyncPtr<VMFuncRef>,
1915 param_count: usize,
1916 result_count: usize,
1917 flags: Option<InstanceFlags>,
1918 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1919 + Send
1920 + Sync
1921 + 'static
1922 + use<T> {
1923 let token = StoreToken::new(store);
1924 move |store: &mut dyn VMStore| {
1925 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1926
1927 store
1928 .concurrent_state_mut()
1929 .get_mut(guest_thread.thread)?
1930 .state = GuestThreadState::Running;
1931 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1932 let may_enter_after_call = task.call_post_return_automatically();
1933 let lower = task.lower_params.take().unwrap();
1934
1935 lower(store, &mut storage[..param_count])?;
1936
1937 let mut store = token.as_context_mut(store);
1938
1939 unsafe {
1942 if let Some(mut flags) = flags {
1943 flags.set_may_enter(false);
1944 }
1945 crate::Func::call_unchecked_raw(
1946 &mut store,
1947 callee.as_non_null(),
1948 NonNull::new(
1949 &mut storage[..param_count.max(result_count)]
1950 as *mut [MaybeUninit<ValRaw>] as _,
1951 )
1952 .unwrap(),
1953 )?;
1954 if let Some(mut flags) = flags {
1955 flags.set_may_enter(may_enter_after_call);
1956 }
1957 }
1958
1959 Ok(storage)
1960 }
1961 }
1962
1963 let call = unsafe {
1967 make_call(
1968 store.as_context_mut(),
1969 guest_thread,
1970 callee,
1971 param_count,
1972 result_count,
1973 flags,
1974 )
1975 };
1976
1977 let callee_instance = store
1978 .0
1979 .concurrent_state_mut()
1980 .get_mut(guest_thread.task)?
1981 .instance;
1982
1983 let fun = if callback.is_some() {
1984 assert!(async_);
1985
1986 Box::new(move |store: &mut dyn VMStore| {
1987 self.add_guest_thread_to_instance_table(
1988 guest_thread.thread,
1989 store,
1990 callee_instance.index,
1991 )?;
1992 let old_thread = store
1993 .concurrent_state_mut()
1994 .guest_thread
1995 .replace(guest_thread);
1996 log::trace!(
1997 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1998 );
1999
2000 store.maybe_push_call_context(guest_thread.task)?;
2001
2002 store.enter_instance(callee_instance);
2003
2004 let storage = call(store)?;
2011
2012 store.exit_instance(callee_instance)?;
2013
2014 store.maybe_pop_call_context(guest_thread.task)?;
2015
2016 let state = store.concurrent_state_mut();
2017 state.guest_thread = old_thread;
2018 old_thread
2019 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2020 log::trace!("stackless call: restored {old_thread:?} as current thread");
2021
2022 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2025
2026 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2027 })
2028 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2029 } else {
2030 let token = StoreToken::new(store.as_context_mut());
2031 Box::new(move |store: &mut dyn VMStore| {
2032 self.add_guest_thread_to_instance_table(
2033 guest_thread.thread,
2034 store,
2035 callee_instance.index,
2036 )?;
2037 let old_thread = store
2038 .concurrent_state_mut()
2039 .guest_thread
2040 .replace(guest_thread);
2041 log::trace!(
2042 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2043 );
2044 let mut flags = self.id().get(store).instance_flags(callee_instance.index);
2045
2046 store.maybe_push_call_context(guest_thread.task)?;
2047
2048 if !async_ {
2052 store.enter_instance(callee_instance);
2053 }
2054
2055 let storage = call(store)?;
2062
2063 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2065
2066 if async_ {
2067 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2068 if task.threads.is_empty() && !task.returned_or_cancelled() {
2069 bail!(Trap::NoAsyncResult);
2070 }
2071 } else {
2072 let lift = {
2078 store.exit_instance(callee_instance)?;
2079
2080 let state = store.concurrent_state_mut();
2081 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2082
2083 state
2084 .get_mut(guest_thread.task)?
2085 .lift_result
2086 .take()
2087 .unwrap()
2088 };
2089
2090 let result = (lift.lift)(store, unsafe {
2093 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2094 &storage[..result_count],
2095 )
2096 })?;
2097
2098 let post_return_arg = match result_count {
2099 0 => ValRaw::i32(0),
2100 1 => unsafe { storage[0].assume_init() },
2103 _ => unreachable!(),
2104 };
2105
2106 if store
2107 .concurrent_state_mut()
2108 .get_mut(guest_thread.task)?
2109 .call_post_return_automatically()
2110 {
2111 unsafe {
2112 flags.set_may_leave(false);
2113 flags.set_needs_post_return(false);
2114 }
2115
2116 if let Some(func) = post_return {
2117 let mut store = token.as_context_mut(store);
2118
2119 unsafe {
2125 crate::Func::call_unchecked_raw(
2126 &mut store,
2127 func.as_non_null(),
2128 slice::from_ref(&post_return_arg).into(),
2129 )?;
2130 }
2131 }
2132
2133 unsafe {
2134 flags.set_may_leave(true);
2135 flags.set_may_enter(true);
2136 }
2137 }
2138
2139 self.task_complete(
2140 store,
2141 guest_thread.task,
2142 result,
2143 Status::Returned,
2144 post_return_arg,
2145 )?;
2146 }
2147
2148 store.maybe_pop_call_context(guest_thread.task)?;
2149
2150 let state = store.concurrent_state_mut();
2151 let task = state.get_mut(guest_thread.task)?;
2152
2153 match &task.caller {
2154 Caller::Host { .. } => {
2155 if task.ready_to_delete() {
2156 Waitable::Guest(guest_thread.task).delete_from(state)?;
2157 }
2158 }
2159 Caller::Guest { .. } => {
2160 task.exited = true;
2161 }
2162 }
2163
2164 Ok(None)
2165 })
2166 };
2167
2168 store
2169 .0
2170 .concurrent_state_mut()
2171 .push_high_priority(WorkItem::GuestCall(GuestCall {
2172 thread: guest_thread,
2173 kind: GuestCallKind::StartImplicit(fun),
2174 }));
2175
2176 Ok(())
2177 }
2178
2179 unsafe fn prepare_call<T: 'static>(
2192 self,
2193 mut store: StoreContextMut<T>,
2194 start: *mut VMFuncRef,
2195 return_: *mut VMFuncRef,
2196 caller_instance: RuntimeComponentInstanceIndex,
2197 callee_instance: RuntimeComponentInstanceIndex,
2198 task_return_type: TypeTupleIndex,
2199 callee_async: bool,
2200 memory: *mut VMMemoryDefinition,
2201 string_encoding: u8,
2202 caller_info: CallerInfo,
2203 ) -> Result<()> {
2204 self.id().get(store.0).check_may_leave(caller_instance)?;
2205
2206 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2207 store.0.concurrent_state_mut().check_blocking()?;
2211 }
2212
2213 enum ResultInfo {
2214 Heap { results: u32 },
2215 Stack { result_count: u32 },
2216 }
2217
2218 let result_info = match &caller_info {
2219 CallerInfo::Async {
2220 has_result: true,
2221 params,
2222 } => ResultInfo::Heap {
2223 results: params.last().unwrap().get_u32(),
2224 },
2225 CallerInfo::Async {
2226 has_result: false, ..
2227 } => ResultInfo::Stack { result_count: 0 },
2228 CallerInfo::Sync {
2229 result_count,
2230 params,
2231 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2232 results: params.last().unwrap().get_u32(),
2233 },
2234 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2235 result_count: *result_count,
2236 },
2237 };
2238
2239 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2240
2241 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2245 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2246 let token = StoreToken::new(store.as_context_mut());
2247 let state = store.0.concurrent_state_mut();
2248 let old_thread = state.guest_thread.take();
2249 let new_task = GuestTask::new(
2250 state,
2251 Box::new(move |store, dst| {
2252 let mut store = token.as_context_mut(store);
2253 assert!(dst.len() <= MAX_FLAT_PARAMS);
2254 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2256 let count = match caller_info {
2257 CallerInfo::Async { params, has_result } => {
2261 let params = ¶ms[..params.len() - usize::from(has_result)];
2262 for (param, src) in params.iter().zip(&mut src) {
2263 src.write(*param);
2264 }
2265 params.len()
2266 }
2267
2268 CallerInfo::Sync { params, .. } => {
2270 for (param, src) in params.iter().zip(&mut src) {
2271 src.write(*param);
2272 }
2273 params.len()
2274 }
2275 };
2276 unsafe {
2283 crate::Func::call_unchecked_raw(
2284 &mut store,
2285 start.as_non_null(),
2286 NonNull::new(
2287 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2288 )
2289 .unwrap(),
2290 )?;
2291 }
2292 dst.copy_from_slice(&src[..dst.len()]);
2293 let state = store.0.concurrent_state_mut();
2294 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2295 state,
2296 Some(Event::Subtask {
2297 status: Status::Started,
2298 }),
2299 )?;
2300 Ok(())
2301 }),
2302 LiftResult {
2303 lift: Box::new(move |store, src| {
2304 let mut store = token.as_context_mut(store);
2307 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2309 my_src.push(ValRaw::u32(*results));
2310 }
2311 unsafe {
2318 crate::Func::call_unchecked_raw(
2319 &mut store,
2320 return_.as_non_null(),
2321 my_src.as_mut_slice().into(),
2322 )?;
2323 }
2324 let state = store.0.concurrent_state_mut();
2325 let thread = state.guest_thread.unwrap();
2326 if sync_caller {
2327 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2328 if let ResultInfo::Stack { result_count } = &result_info {
2329 match result_count {
2330 0 => None,
2331 1 => Some(my_src[0]),
2332 _ => unreachable!(),
2333 }
2334 } else {
2335 None
2336 },
2337 );
2338 }
2339 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2340 }),
2341 ty: task_return_type,
2342 memory: NonNull::new(memory).map(SendSyncPtr::new),
2343 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2344 },
2345 Caller::Guest {
2346 thread: old_thread.unwrap(),
2347 instance: caller_instance,
2348 },
2349 None,
2350 self,
2351 callee_instance,
2352 callee_async,
2353 )?;
2354
2355 let guest_task = state.push(new_task)?;
2356 let new_thread = GuestThread::new_implicit(guest_task);
2357 let guest_thread = state.push(new_thread)?;
2358 state.get_mut(guest_task)?.threads.insert(guest_thread);
2359
2360 let state = store.0.concurrent_state_mut();
2361 if let Some(old_thread) = old_thread {
2362 if !state.may_enter(guest_task) {
2363 bail!(crate::Trap::CannotEnterComponent);
2364 }
2365
2366 state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2367 };
2368
2369 state.guest_thread = Some(QualifiedThreadId {
2372 task: guest_task,
2373 thread: guest_thread,
2374 });
2375 log::trace!(
2376 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2377 );
2378
2379 Ok(())
2380 }
2381
2382 unsafe fn call_callback<T>(
2387 self,
2388 mut store: StoreContextMut<T>,
2389 callee_instance: RuntimeComponentInstanceIndex,
2390 function: SendSyncPtr<VMFuncRef>,
2391 event: Event,
2392 handle: u32,
2393 may_enter_after_call: bool,
2394 ) -> Result<u32> {
2395 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2396
2397 let (ordinal, result) = event.parts();
2398 let params = &mut [
2399 ValRaw::u32(ordinal),
2400 ValRaw::u32(handle),
2401 ValRaw::u32(result),
2402 ];
2403 unsafe {
2408 flags.set_may_enter(false);
2409 crate::Func::call_unchecked_raw(
2410 &mut store,
2411 function.as_non_null(),
2412 params.as_mut_slice().into(),
2413 )?;
2414 flags.set_may_enter(may_enter_after_call);
2415 }
2416 Ok(params[0].get_u32())
2417 }
2418
2419 unsafe fn start_call<T: 'static>(
2432 self,
2433 mut store: StoreContextMut<T>,
2434 callback: *mut VMFuncRef,
2435 post_return: *mut VMFuncRef,
2436 callee: *mut VMFuncRef,
2437 param_count: u32,
2438 result_count: u32,
2439 flags: u32,
2440 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2441 ) -> Result<u32> {
2442 let token = StoreToken::new(store.as_context_mut());
2443 let async_caller = storage.is_none();
2444 let state = store.0.concurrent_state_mut();
2445 let guest_thread = state.guest_thread.unwrap();
2446 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2447 let may_enter_after_call = state
2448 .get_mut(guest_thread.task)?
2449 .call_post_return_automatically();
2450 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2451 let param_count = usize::try_from(param_count).unwrap();
2452 assert!(param_count <= MAX_FLAT_PARAMS);
2453 let result_count = usize::try_from(result_count).unwrap();
2454 assert!(result_count <= MAX_FLAT_RESULTS);
2455
2456 let task = state.get_mut(guest_thread.task)?;
2457 if !callback.is_null() {
2458 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2462 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2463 let store = token.as_context_mut(store);
2464 unsafe {
2465 self.call_callback::<T>(
2466 store,
2467 runtime_instance,
2468 callback,
2469 event,
2470 handle,
2471 may_enter_after_call,
2472 )
2473 }
2474 }));
2475 }
2476
2477 let Caller::Guest {
2478 thread: caller,
2479 instance: runtime_instance,
2480 } = &task.caller
2481 else {
2482 unreachable!()
2485 };
2486 let caller = *caller;
2487 let caller_instance = *runtime_instance;
2488
2489 let callee_instance = task.instance;
2490
2491 let instance_flags = if callback.is_null() {
2492 None
2493 } else {
2494 Some(self.id().get(store.0).instance_flags(callee_instance.index))
2495 };
2496
2497 unsafe {
2499 self.queue_call(
2500 store.as_context_mut(),
2501 guest_thread,
2502 callee,
2503 param_count,
2504 result_count,
2505 instance_flags,
2506 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2507 NonNull::new(callback).map(SendSyncPtr::new),
2508 NonNull::new(post_return).map(SendSyncPtr::new),
2509 )?;
2510 }
2511
2512 let state = store.0.concurrent_state_mut();
2513
2514 let guest_waitable = Waitable::Guest(guest_thread.task);
2517 let old_set = guest_waitable.common(state)?.set;
2518 let set = state.get_mut(caller.task)?.sync_call_set;
2519 guest_waitable.join(state, Some(set))?;
2520
2521 let (status, waitable) = loop {
2537 store.0.suspend(SuspendReason::Waiting {
2538 set,
2539 thread: caller,
2540 skip_may_block_check: async_caller || !callee_async,
2548 })?;
2549
2550 let state = store.0.concurrent_state_mut();
2551
2552 log::trace!("taking event for {:?}", guest_thread.task);
2553 let event = guest_waitable.take_event(state)?;
2554 let Some(Event::Subtask { status }) = event else {
2555 unreachable!();
2556 };
2557
2558 log::trace!("status {status:?} for {:?}", guest_thread.task);
2559
2560 if status == Status::Returned {
2561 break (status, None);
2563 } else if async_caller {
2564 let handle = store
2568 .0
2569 .handle_table(RuntimeInstance {
2570 instance: self.id().instance(),
2571 index: caller_instance,
2572 })
2573 .subtask_insert_guest(guest_thread.task.rep())?;
2574 store
2575 .0
2576 .concurrent_state_mut()
2577 .get_mut(guest_thread.task)?
2578 .common
2579 .handle = Some(handle);
2580 break (status, Some(handle));
2581 } else {
2582 }
2586 };
2587
2588 let state = store.0.concurrent_state_mut();
2589
2590 guest_waitable.join(state, old_set)?;
2591
2592 if let Some(storage) = storage {
2593 let task = state.get_mut(guest_thread.task)?;
2597 if let Some(result) = task.sync_result.take() {
2598 if let Some(result) = result {
2599 storage[0] = MaybeUninit::new(result);
2600 }
2601
2602 if task.exited {
2603 if task.ready_to_delete() {
2604 Waitable::Guest(guest_thread.task).delete_from(state)?;
2605 }
2606 }
2607 }
2608 }
2609
2610 state.guest_thread = Some(caller);
2612 state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2613 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2614
2615 Ok(status.pack(waitable))
2616 }
2617
2618 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2630 self,
2631 mut store: StoreContextMut<'_, T>,
2632 future: impl Future<Output = Result<R>> + Send + 'static,
2633 caller_instance: RuntimeComponentInstanceIndex,
2634 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2635 ) -> Result<Option<u32>> {
2636 let token = StoreToken::new(store.as_context_mut());
2637 let state = store.0.concurrent_state_mut();
2638 let caller = state.guest_thread.unwrap();
2639
2640 let (join_handle, future) = JoinHandle::run(async move {
2643 let mut future = pin!(future);
2644 let mut call_context = None;
2645 future::poll_fn(move |cx| {
2646 tls::get(|store| {
2649 if let Some(call_context) = call_context.take() {
2650 token
2651 .as_context_mut(store)
2652 .0
2653 .component_resource_state()
2654 .0
2655 .push(call_context);
2656 }
2657 });
2658
2659 let result = future.as_mut().poll(cx);
2660
2661 if result.is_pending() {
2662 tls::get(|store| {
2665 call_context = Some(
2666 token
2667 .as_context_mut(store)
2668 .0
2669 .component_resource_state()
2670 .0
2671 .pop()
2672 .unwrap(),
2673 );
2674 });
2675 }
2676 result
2677 })
2678 .await
2679 });
2680
2681 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2685
2686 log::trace!("new host task child of {caller:?}: {task:?}");
2687
2688 let mut future = Box::pin(future);
2689
2690 let poll = tls::set(store.0, || {
2695 future
2696 .as_mut()
2697 .poll(&mut Context::from_waker(&Waker::noop()))
2698 });
2699
2700 Ok(match poll {
2701 Poll::Ready(None) => unreachable!(),
2702 Poll::Ready(Some(result)) => {
2703 lower(store.as_context_mut(), result?)?;
2706 log::trace!("delete host task {task:?} (already ready)");
2707 store.0.concurrent_state_mut().delete(task)?;
2708 None
2709 }
2710 Poll::Pending => {
2711 let future =
2719 Box::pin(async move {
2720 let result = match future.await {
2721 Some(result) => result?,
2722 None => return Ok(()),
2724 };
2725 tls::get(move |store| {
2726 store.concurrent_state_mut().push_high_priority(
2732 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2733 lower(token.as_context_mut(store), result)?;
2734 let state = store.concurrent_state_mut();
2735 state.get_mut(task)?.join_handle.take();
2736 Waitable::Host(task).set_event(
2737 state,
2738 Some(Event::Subtask {
2739 status: Status::Returned,
2740 }),
2741 )
2742 }))),
2743 );
2744 Ok(())
2745 })
2746 });
2747
2748 store.0.concurrent_state_mut().push_future(future);
2749 let handle = store
2750 .0
2751 .handle_table(RuntimeInstance {
2752 instance: self.id().instance(),
2753 index: caller_instance,
2754 })
2755 .subtask_insert_host(task.rep())?;
2756 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2757 log::trace!(
2758 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2759 );
2760 Some(handle)
2761 }
2762 })
2763 }
2764
2765 pub(crate) fn task_return(
2768 self,
2769 store: &mut dyn VMStore,
2770 caller: RuntimeComponentInstanceIndex,
2771 ty: TypeTupleIndex,
2772 options: OptionsIndex,
2773 storage: &[ValRaw],
2774 ) -> Result<()> {
2775 self.id().get(store).check_may_leave(caller)?;
2776 let state = store.concurrent_state_mut();
2777 let guest_thread = state.guest_thread.unwrap();
2778 let lift = state
2779 .get_mut(guest_thread.task)?
2780 .lift_result
2781 .take()
2782 .ok_or_else(|| {
2783 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2784 })?;
2785 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2786
2787 let CanonicalOptions {
2788 string_encoding,
2789 data_model,
2790 ..
2791 } = &self.id().get(store).component().env_component().options[options];
2792
2793 let invalid = ty != lift.ty
2794 || string_encoding != &lift.string_encoding
2795 || match data_model {
2796 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2797 Some(memory) => {
2798 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2799 let actual = self.id().get(store).runtime_memory(memory);
2800 expected != actual.as_ptr()
2801 }
2802 None => false,
2805 },
2806 CanonicalOptionsDataModel::Gc { .. } => true,
2808 };
2809
2810 if invalid {
2811 bail!("invalid `task.return` signature and/or options for current task");
2812 }
2813
2814 log::trace!("task.return for {guest_thread:?}");
2815
2816 let result = (lift.lift)(store, storage)?;
2817 self.task_complete(
2818 store,
2819 guest_thread.task,
2820 result,
2821 Status::Returned,
2822 ValRaw::i32(0),
2823 )
2824 }
2825
2826 pub(crate) fn task_cancel(
2828 self,
2829 store: &mut StoreOpaque,
2830 caller: RuntimeComponentInstanceIndex,
2831 ) -> Result<()> {
2832 self.id().get(store).check_may_leave(caller)?;
2833 let state = store.concurrent_state_mut();
2834 let guest_thread = state.guest_thread.unwrap();
2835 let task = state.get_mut(guest_thread.task)?;
2836 if !task.cancel_sent {
2837 bail!("`task.cancel` called by task which has not been cancelled")
2838 }
2839 _ = task.lift_result.take().ok_or_else(|| {
2840 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2841 })?;
2842
2843 assert!(task.result.is_none());
2844
2845 log::trace!("task.cancel for {guest_thread:?}");
2846
2847 self.task_complete(
2848 store,
2849 guest_thread.task,
2850 Box::new(DummyResult),
2851 Status::ReturnCancelled,
2852 ValRaw::i32(0),
2853 )
2854 }
2855
2856 fn task_complete(
2862 self,
2863 store: &mut StoreOpaque,
2864 guest_task: TableId<GuestTask>,
2865 result: Box<dyn Any + Send + Sync>,
2866 status: Status,
2867 post_return_arg: ValRaw,
2868 ) -> Result<()> {
2869 if store
2870 .concurrent_state_mut()
2871 .get_mut(guest_task)?
2872 .call_post_return_automatically()
2873 {
2874 let (calls, host_table, _, instance) =
2875 store.component_resource_state_with_instance(self);
2876 ResourceTables {
2877 calls,
2878 host_table: Some(host_table),
2879 guest: Some(instance.instance_states()),
2880 }
2881 .exit_call()?;
2882 } else {
2883 let function_index = store
2888 .concurrent_state_mut()
2889 .get_mut(guest_task)?
2890 .function_index
2891 .unwrap();
2892 self.id()
2893 .get_mut(store)
2894 .post_return_arg_set(function_index, post_return_arg);
2895 }
2896
2897 let state = store.concurrent_state_mut();
2898 let task = state.get_mut(guest_task)?;
2899
2900 if let Caller::Host { tx, .. } = &mut task.caller {
2901 if let Some(tx) = tx.take() {
2902 _ = tx.send(result);
2903 }
2904 } else {
2905 task.result = Some(result);
2906 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2907 }
2908
2909 Ok(())
2910 }
2911
2912 pub(crate) fn waitable_set_new(
2914 self,
2915 store: &mut StoreOpaque,
2916 caller_instance: RuntimeComponentInstanceIndex,
2917 ) -> Result<u32> {
2918 self.id().get_mut(store).check_may_leave(caller_instance)?;
2919 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2920 let handle = store
2921 .handle_table(RuntimeInstance {
2922 instance: self.id().instance(),
2923 index: caller_instance,
2924 })
2925 .waitable_set_insert(set.rep())?;
2926 log::trace!("new waitable set {set:?} (handle {handle})");
2927 Ok(handle)
2928 }
2929
2930 pub(crate) fn waitable_set_drop(
2932 self,
2933 store: &mut StoreOpaque,
2934 caller_instance: RuntimeComponentInstanceIndex,
2935 set: u32,
2936 ) -> Result<()> {
2937 self.id().get_mut(store).check_may_leave(caller_instance)?;
2938 let rep = store
2939 .handle_table(RuntimeInstance {
2940 instance: self.id().instance(),
2941 index: caller_instance,
2942 })
2943 .waitable_set_remove(set)?;
2944
2945 log::trace!("drop waitable set {rep} (handle {set})");
2946
2947 let set = store
2948 .concurrent_state_mut()
2949 .delete(TableId::<WaitableSet>::new(rep))?;
2950
2951 if !set.waiting.is_empty() {
2952 bail!("cannot drop waitable set with waiters");
2953 }
2954
2955 Ok(())
2956 }
2957
2958 pub(crate) fn waitable_join(
2960 self,
2961 store: &mut StoreOpaque,
2962 caller_instance: RuntimeComponentInstanceIndex,
2963 waitable_handle: u32,
2964 set_handle: u32,
2965 ) -> Result<()> {
2966 let mut instance = self.id().get_mut(store);
2967 instance.check_may_leave(caller_instance)?;
2968 let waitable =
2969 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2970
2971 let set = if set_handle == 0 {
2972 None
2973 } else {
2974 let set = instance.instance_states().0[caller_instance]
2975 .handle_table()
2976 .waitable_set_rep(set_handle)?;
2977
2978 Some(TableId::<WaitableSet>::new(set))
2979 };
2980
2981 log::trace!(
2982 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2983 );
2984
2985 waitable.join(store.concurrent_state_mut(), set)
2986 }
2987
2988 pub(crate) fn subtask_drop(
2990 self,
2991 store: &mut StoreOpaque,
2992 caller_instance: RuntimeComponentInstanceIndex,
2993 task_id: u32,
2994 ) -> Result<()> {
2995 self.id().get_mut(store).check_may_leave(caller_instance)?;
2996 self.waitable_join(store, caller_instance, task_id, 0)?;
2997
2998 let (rep, is_host) = store
2999 .handle_table(RuntimeInstance {
3000 instance: self.id().instance(),
3001 index: caller_instance,
3002 })
3003 .subtask_remove(task_id)?;
3004
3005 let concurrent_state = store.concurrent_state_mut();
3006 let (waitable, expected_caller_instance, delete) = if is_host {
3007 let id = TableId::<HostTask>::new(rep);
3008 let task = concurrent_state.get_mut(id)?;
3009 if task.join_handle.is_some() {
3010 bail!("cannot drop a subtask which has not yet resolved");
3011 }
3012 (Waitable::Host(id), task.caller_instance, true)
3013 } else {
3014 let id = TableId::<GuestTask>::new(rep);
3015 let task = concurrent_state.get_mut(id)?;
3016 if task.lift_result.is_some() {
3017 bail!("cannot drop a subtask which has not yet resolved");
3018 }
3019 if let Caller::Guest { instance, .. } = &task.caller {
3020 (Waitable::Guest(id), *instance, task.exited)
3021 } else {
3022 unreachable!()
3023 }
3024 };
3025
3026 waitable.common(concurrent_state)?.handle = None;
3027
3028 if waitable.take_event(concurrent_state)?.is_some() {
3029 bail!("cannot drop a subtask with an undelivered event");
3030 }
3031
3032 if delete {
3033 waitable.delete_from(concurrent_state)?;
3034 }
3035
3036 assert_eq!(expected_caller_instance, caller_instance);
3040 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3041 Ok(())
3042 }
3043
3044 pub(crate) fn waitable_set_wait(
3046 self,
3047 store: &mut StoreOpaque,
3048 caller: RuntimeComponentInstanceIndex,
3049 options: OptionsIndex,
3050 set: u32,
3051 payload: u32,
3052 ) -> Result<u32> {
3053 self.id().get(store).check_may_leave(caller)?;
3054
3055 if !self.options(store, options).async_ {
3056 store.concurrent_state_mut().check_blocking()?;
3060 }
3061
3062 let &CanonicalOptions {
3063 cancellable,
3064 instance: caller_instance,
3065 ..
3066 } = &self.id().get(store).component().env_component().options[options];
3067 let rep = store
3068 .handle_table(RuntimeInstance {
3069 instance: self.id().instance(),
3070 index: caller_instance,
3071 })
3072 .waitable_set_rep(set)?;
3073
3074 self.waitable_check(
3075 store,
3076 cancellable,
3077 WaitableCheck::Wait(WaitableCheckParams {
3078 set: TableId::new(rep),
3079 options,
3080 payload,
3081 }),
3082 )
3083 }
3084
3085 pub(crate) fn waitable_set_poll(
3087 self,
3088 store: &mut StoreOpaque,
3089 caller: RuntimeComponentInstanceIndex,
3090 options: OptionsIndex,
3091 set: u32,
3092 payload: u32,
3093 ) -> Result<u32> {
3094 self.id().get(store).check_may_leave(caller)?;
3095
3096 if !self.options(store, options).async_ {
3097 store.concurrent_state_mut().check_blocking()?;
3101 }
3102
3103 let &CanonicalOptions {
3104 cancellable,
3105 instance: caller_instance,
3106 ..
3107 } = &self.id().get(store).component().env_component().options[options];
3108 let rep = store
3109 .handle_table(RuntimeInstance {
3110 instance: self.id().instance(),
3111 index: caller_instance,
3112 })
3113 .waitable_set_rep(set)?;
3114
3115 self.waitable_check(
3116 store,
3117 cancellable,
3118 WaitableCheck::Poll(WaitableCheckParams {
3119 set: TableId::new(rep),
3120 options,
3121 payload,
3122 }),
3123 )
3124 }
3125
3126 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3128 let thread_id = store
3129 .concurrent_state_mut()
3130 .guest_thread
3131 .ok_or_else(|| anyhow!("no current thread"))?
3132 .thread;
3133 Ok(store
3135 .concurrent_state_mut()
3136 .get_mut(thread_id)?
3137 .instance_rep
3138 .unwrap())
3139 }
3140
3141 pub(crate) fn thread_new_indirect<T: 'static>(
3143 self,
3144 mut store: StoreContextMut<T>,
3145 runtime_instance: RuntimeComponentInstanceIndex,
3146 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3148 start_func_idx: u32,
3149 context: i32,
3150 ) -> Result<u32> {
3151 self.id().get(store.0).check_may_leave(runtime_instance)?;
3152
3153 log::trace!("creating new thread");
3154
3155 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3156 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3157 let callee = instance
3158 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3159 .ok_or_else(|| {
3160 anyhow!("the start function index points to an uninitialized function")
3161 })?;
3162 if callee.type_index(store.0) != start_func_ty.type_index() {
3163 bail!(
3164 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3165 );
3166 }
3167
3168 let token = StoreToken::new(store.as_context_mut());
3169 let start_func = Box::new(
3170 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3171 let old_thread = store
3172 .concurrent_state_mut()
3173 .guest_thread
3174 .replace(guest_thread);
3175 log::trace!(
3176 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3177 );
3178
3179 store.maybe_push_call_context(guest_thread.task)?;
3180
3181 let mut store = token.as_context_mut(store);
3182 let mut params = [ValRaw::i32(context)];
3183 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3186
3187 store.0.maybe_pop_call_context(guest_thread.task)?;
3188
3189 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3190 log::trace!("explicit thread {guest_thread:?} completed");
3191 let state = store.0.concurrent_state_mut();
3192 let task = state.get_mut(guest_thread.task)?;
3193 if task.threads.is_empty() && !task.returned_or_cancelled() {
3194 bail!(Trap::NoAsyncResult);
3195 }
3196 state.guest_thread = old_thread;
3197 old_thread
3198 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3199 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3200 Waitable::Guest(guest_thread.task).delete_from(state)?;
3201 }
3202 log::trace!("thread start: restored {old_thread:?} as current thread");
3203
3204 Ok(())
3205 },
3206 );
3207
3208 let state = store.0.concurrent_state_mut();
3209 let current_thread = state.guest_thread.unwrap();
3210 let parent_task = current_thread.task;
3211
3212 let new_thread = GuestThread::new_explicit(parent_task, start_func);
3213 let thread_id = state.push(new_thread)?;
3214 state.get_mut(parent_task)?.threads.insert(thread_id);
3215
3216 log::trace!("new thread with id {thread_id:?} created");
3217
3218 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3219 }
3220
3221 pub(crate) fn resume_suspended_thread(
3222 self,
3223 store: &mut StoreOpaque,
3224 runtime_instance: RuntimeComponentInstanceIndex,
3225 thread_idx: u32,
3226 high_priority: bool,
3227 ) -> Result<()> {
3228 let thread_id =
3229 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3230 let state = store.concurrent_state_mut();
3231 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3232 let thread = state.get_mut(guest_thread.thread)?;
3233
3234 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3235 GuestThreadState::NotStartedExplicit(start_func) => {
3236 log::trace!("starting thread {guest_thread:?}");
3237 let guest_call = WorkItem::GuestCall(GuestCall {
3238 thread: guest_thread,
3239 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3240 start_func(store, guest_thread)
3241 })),
3242 });
3243 store
3244 .concurrent_state_mut()
3245 .push_work_item(guest_call, high_priority);
3246 }
3247 GuestThreadState::Suspended(fiber) => {
3248 log::trace!("resuming thread {thread_id:?} that was suspended");
3249 store
3250 .concurrent_state_mut()
3251 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3252 }
3253 _ => {
3254 bail!("cannot resume thread which is not suspended");
3255 }
3256 }
3257 Ok(())
3258 }
3259
3260 fn add_guest_thread_to_instance_table(
3261 self,
3262 thread_id: TableId<GuestThread>,
3263 store: &mut StoreOpaque,
3264 runtime_instance: RuntimeComponentInstanceIndex,
3265 ) -> Result<u32> {
3266 let guest_id = store
3267 .handle_table(RuntimeInstance {
3268 instance: self.id().instance(),
3269 index: runtime_instance,
3270 })
3271 .guest_thread_insert(thread_id.rep())?;
3272 store
3273 .concurrent_state_mut()
3274 .get_mut(thread_id)?
3275 .instance_rep = Some(guest_id);
3276 Ok(guest_id)
3277 }
3278
3279 pub(crate) fn suspension_intrinsic(
3282 self,
3283 store: &mut StoreOpaque,
3284 caller: RuntimeComponentInstanceIndex,
3285 cancellable: bool,
3286 yielding: bool,
3287 to_thread: Option<u32>,
3288 ) -> Result<WaitResult> {
3289 self.id().get(store).check_may_leave(caller)?;
3290
3291 if to_thread.is_none() {
3292 let state = store.concurrent_state_mut();
3293 if yielding {
3294 if !state.may_block(state.guest_thread.unwrap().task) {
3296 return Ok(WaitResult::Completed);
3300 }
3301 } else {
3302 state.check_blocking()?;
3306 }
3307 }
3308
3309 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3311 return Ok(WaitResult::Cancelled);
3312 }
3313
3314 if let Some(thread) = to_thread {
3315 self.resume_suspended_thread(store, caller, thread, true)?;
3316 }
3317
3318 let state = store.concurrent_state_mut();
3319 let guest_thread = state.guest_thread.unwrap();
3320 let reason = if yielding {
3321 SuspendReason::Yielding {
3322 thread: guest_thread,
3323 }
3324 } else {
3325 SuspendReason::ExplicitlySuspending {
3326 thread: guest_thread,
3327 skip_may_block_check: to_thread.is_some(),
3331 }
3332 };
3333
3334 store.suspend(reason)?;
3335
3336 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3337 Ok(WaitResult::Cancelled)
3338 } else {
3339 Ok(WaitResult::Completed)
3340 }
3341 }
3342
3343 fn waitable_check(
3345 self,
3346 store: &mut StoreOpaque,
3347 cancellable: bool,
3348 check: WaitableCheck,
3349 ) -> Result<u32> {
3350 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3351
3352 let (wait, set) = match &check {
3353 WaitableCheck::Wait(params) => (true, Some(params.set)),
3354 WaitableCheck::Poll(params) => (false, Some(params.set)),
3355 };
3356
3357 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3358 store.suspend(SuspendReason::Yielding {
3360 thread: guest_thread,
3361 })?;
3362
3363 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3364
3365 let state = store.concurrent_state_mut();
3366 let task = state.get_mut(guest_thread.task)?;
3367
3368 if wait {
3371 let set = set.unwrap();
3372
3373 if (task.event.is_none()
3374 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3375 && state.get_mut(set)?.ready.is_empty()
3376 {
3377 if cancellable {
3378 let old = state
3379 .get_mut(guest_thread.thread)?
3380 .wake_on_cancel
3381 .replace(set);
3382 assert!(old.is_none());
3383 }
3384
3385 store.suspend(SuspendReason::Waiting {
3386 set,
3387 thread: guest_thread,
3388 skip_may_block_check: false,
3389 })?;
3390 }
3391 }
3392
3393 log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3394
3395 let result = match check {
3396 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3398 let event =
3399 self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3400
3401 let (ordinal, handle, result) = if wait {
3402 let (event, waitable) = event.unwrap();
3403 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3404 let (ordinal, result) = event.parts();
3405 (ordinal, handle, result)
3406 } else {
3407 if let Some((event, waitable)) = event {
3408 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3409 let (ordinal, result) = event.parts();
3410 (ordinal, handle, result)
3411 } else {
3412 log::trace!(
3413 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3414 guest_thread.task,
3415 params.set
3416 );
3417 let (ordinal, result) = Event::None.parts();
3418 (ordinal, 0, result)
3419 }
3420 };
3421 let memory = self.options_memory_mut(store, params.options);
3422 let ptr = func::validate_inbounds_dynamic(
3423 &CanonicalAbiInfo::POINTER_PAIR,
3424 memory,
3425 &ValRaw::u32(params.payload),
3426 )?;
3427 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3428 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3429 Ok(ordinal)
3430 }
3431 };
3432
3433 result
3434 }
3435
3436 pub(crate) fn subtask_cancel(
3438 self,
3439 store: &mut StoreOpaque,
3440 caller_instance: RuntimeComponentInstanceIndex,
3441 async_: bool,
3442 task_id: u32,
3443 ) -> Result<u32> {
3444 self.id().get(store).check_may_leave(caller_instance)?;
3445
3446 if !async_ {
3447 store.concurrent_state_mut().check_blocking()?;
3451 }
3452
3453 let (rep, is_host) = store
3454 .handle_table(RuntimeInstance {
3455 instance: self.id().instance(),
3456 index: caller_instance,
3457 })
3458 .subtask_rep(task_id)?;
3459 let (waitable, expected_caller_instance) = if is_host {
3460 let id = TableId::<HostTask>::new(rep);
3461 (
3462 Waitable::Host(id),
3463 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3464 )
3465 } else {
3466 let id = TableId::<GuestTask>::new(rep);
3467 if let Caller::Guest { instance, .. } =
3468 &store.concurrent_state_mut().get_mut(id)?.caller
3469 {
3470 (Waitable::Guest(id), *instance)
3471 } else {
3472 unreachable!()
3473 }
3474 };
3475 assert_eq!(expected_caller_instance, caller_instance);
3479
3480 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3481
3482 let concurrent_state = store.concurrent_state_mut();
3483 if let Waitable::Host(host_task) = waitable {
3484 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3485 handle.abort();
3486 return Ok(Status::ReturnCancelled as u32);
3487 }
3488 } else {
3489 let caller = concurrent_state.guest_thread.unwrap();
3490 let guest_task = TableId::<GuestTask>::new(rep);
3491 let task = concurrent_state.get_mut(guest_task)?;
3492 if !task.already_lowered_parameters() {
3493 task.lower_params = None;
3494 task.lift_result = None;
3495
3496 let instance = task.instance;
3498 let pending = &mut store.instance_state(instance).pending;
3499 let pending_count = pending.len();
3500 pending.retain(|thread, _| thread.task != guest_task);
3501 if pending.len() == pending_count {
3503 bail!("`subtask.cancel` called after terminal status delivered");
3504 }
3505 return Ok(Status::StartCancelled as u32);
3506 } else if !task.returned_or_cancelled() {
3507 task.cancel_sent = true;
3510 task.event = Some(Event::Cancelled);
3515 for thread in task.threads.clone() {
3516 let thread = QualifiedThreadId {
3517 task: guest_task,
3518 thread,
3519 };
3520 if let Some(set) = concurrent_state
3521 .get_mut(thread.thread)
3522 .unwrap()
3523 .wake_on_cancel
3524 .take()
3525 {
3526 let item = match concurrent_state
3527 .get_mut(set)?
3528 .waiting
3529 .remove(&thread)
3530 .unwrap()
3531 {
3532 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3533 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3534 thread,
3535 kind: GuestCallKind::DeliverEvent {
3536 instance,
3537 set: None,
3538 },
3539 }),
3540 };
3541 concurrent_state.push_high_priority(item);
3542
3543 store.suspend(SuspendReason::Yielding { thread: caller })?;
3544 break;
3545 }
3546 }
3547
3548 let concurrent_state = store.concurrent_state_mut();
3549 let task = concurrent_state.get_mut(guest_task)?;
3550 if !task.returned_or_cancelled() {
3551 if async_ {
3552 return Ok(BLOCKED);
3553 } else {
3554 store.wait_for_event(Waitable::Guest(guest_task))?;
3555 }
3556 }
3557 }
3558 }
3559
3560 let event = waitable.take_event(store.concurrent_state_mut())?;
3561 if let Some(Event::Subtask {
3562 status: status @ (Status::Returned | Status::ReturnCancelled),
3563 }) = event
3564 {
3565 Ok(status as u32)
3566 } else {
3567 bail!("`subtask.cancel` called after terminal status delivered");
3568 }
3569 }
3570
3571 pub(crate) fn context_get(
3572 self,
3573 store: &mut StoreOpaque,
3574 caller: RuntimeComponentInstanceIndex,
3575 slot: u32,
3576 ) -> Result<u32> {
3577 self.id().get(store).check_may_leave(caller)?;
3578 store.concurrent_state_mut().context_get(slot)
3579 }
3580
3581 pub(crate) fn context_set(
3582 self,
3583 store: &mut StoreOpaque,
3584 caller: RuntimeComponentInstanceIndex,
3585 slot: u32,
3586 value: u32,
3587 ) -> Result<()> {
3588 self.id().get(store).check_may_leave(caller)?;
3589 store.concurrent_state_mut().context_set(slot, value)
3590 }
3591}
3592
3593pub trait VMComponentAsyncStore {
3601 unsafe fn prepare_call(
3607 &mut self,
3608 instance: Instance,
3609 memory: *mut VMMemoryDefinition,
3610 start: *mut VMFuncRef,
3611 return_: *mut VMFuncRef,
3612 caller_instance: RuntimeComponentInstanceIndex,
3613 callee_instance: RuntimeComponentInstanceIndex,
3614 task_return_type: TypeTupleIndex,
3615 callee_async: bool,
3616 string_encoding: u8,
3617 result_count: u32,
3618 storage: *mut ValRaw,
3619 storage_len: usize,
3620 ) -> Result<()>;
3621
3622 unsafe fn sync_start(
3625 &mut self,
3626 instance: Instance,
3627 callback: *mut VMFuncRef,
3628 callee: *mut VMFuncRef,
3629 param_count: u32,
3630 storage: *mut MaybeUninit<ValRaw>,
3631 storage_len: usize,
3632 ) -> Result<()>;
3633
3634 unsafe fn async_start(
3637 &mut self,
3638 instance: Instance,
3639 callback: *mut VMFuncRef,
3640 post_return: *mut VMFuncRef,
3641 callee: *mut VMFuncRef,
3642 param_count: u32,
3643 result_count: u32,
3644 flags: u32,
3645 ) -> Result<u32>;
3646
3647 fn future_write(
3649 &mut self,
3650 instance: Instance,
3651 caller: RuntimeComponentInstanceIndex,
3652 ty: TypeFutureTableIndex,
3653 options: OptionsIndex,
3654 future: u32,
3655 address: u32,
3656 ) -> Result<u32>;
3657
3658 fn future_read(
3660 &mut self,
3661 instance: Instance,
3662 caller: RuntimeComponentInstanceIndex,
3663 ty: TypeFutureTableIndex,
3664 options: OptionsIndex,
3665 future: u32,
3666 address: u32,
3667 ) -> Result<u32>;
3668
3669 fn future_drop_writable(
3671 &mut self,
3672 instance: Instance,
3673 caller: RuntimeComponentInstanceIndex,
3674 ty: TypeFutureTableIndex,
3675 writer: u32,
3676 ) -> Result<()>;
3677
3678 fn stream_write(
3680 &mut self,
3681 instance: Instance,
3682 caller: RuntimeComponentInstanceIndex,
3683 ty: TypeStreamTableIndex,
3684 options: OptionsIndex,
3685 stream: u32,
3686 address: u32,
3687 count: u32,
3688 ) -> Result<u32>;
3689
3690 fn stream_read(
3692 &mut self,
3693 instance: Instance,
3694 caller: RuntimeComponentInstanceIndex,
3695 ty: TypeStreamTableIndex,
3696 options: OptionsIndex,
3697 stream: u32,
3698 address: u32,
3699 count: u32,
3700 ) -> Result<u32>;
3701
3702 fn flat_stream_write(
3705 &mut self,
3706 instance: Instance,
3707 caller: RuntimeComponentInstanceIndex,
3708 ty: TypeStreamTableIndex,
3709 options: OptionsIndex,
3710 payload_size: u32,
3711 payload_align: u32,
3712 stream: u32,
3713 address: u32,
3714 count: u32,
3715 ) -> Result<u32>;
3716
3717 fn flat_stream_read(
3720 &mut self,
3721 instance: Instance,
3722 caller: RuntimeComponentInstanceIndex,
3723 ty: TypeStreamTableIndex,
3724 options: OptionsIndex,
3725 payload_size: u32,
3726 payload_align: u32,
3727 stream: u32,
3728 address: u32,
3729 count: u32,
3730 ) -> Result<u32>;
3731
3732 fn stream_drop_writable(
3734 &mut self,
3735 instance: Instance,
3736 caller: RuntimeComponentInstanceIndex,
3737 ty: TypeStreamTableIndex,
3738 writer: u32,
3739 ) -> Result<()>;
3740
3741 fn error_context_debug_message(
3743 &mut self,
3744 instance: Instance,
3745 caller: RuntimeComponentInstanceIndex,
3746 ty: TypeComponentLocalErrorContextTableIndex,
3747 options: OptionsIndex,
3748 err_ctx_handle: u32,
3749 debug_msg_address: u32,
3750 ) -> Result<()>;
3751
3752 fn thread_new_indirect(
3754 &mut self,
3755 instance: Instance,
3756 caller: RuntimeComponentInstanceIndex,
3757 func_ty_idx: TypeFuncIndex,
3758 start_func_table_idx: RuntimeTableIndex,
3759 start_func_idx: u32,
3760 context: i32,
3761 ) -> Result<u32>;
3762}
3763
3764impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3766 unsafe fn prepare_call(
3767 &mut self,
3768 instance: Instance,
3769 memory: *mut VMMemoryDefinition,
3770 start: *mut VMFuncRef,
3771 return_: *mut VMFuncRef,
3772 caller_instance: RuntimeComponentInstanceIndex,
3773 callee_instance: RuntimeComponentInstanceIndex,
3774 task_return_type: TypeTupleIndex,
3775 callee_async: bool,
3776 string_encoding: u8,
3777 result_count_or_max_if_async: u32,
3778 storage: *mut ValRaw,
3779 storage_len: usize,
3780 ) -> Result<()> {
3781 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3785
3786 unsafe {
3787 instance.prepare_call(
3788 StoreContextMut(self),
3789 start,
3790 return_,
3791 caller_instance,
3792 callee_instance,
3793 task_return_type,
3794 callee_async,
3795 memory,
3796 string_encoding,
3797 match result_count_or_max_if_async {
3798 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3799 params,
3800 has_result: false,
3801 },
3802 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3803 params,
3804 has_result: true,
3805 },
3806 result_count => CallerInfo::Sync {
3807 params,
3808 result_count,
3809 },
3810 },
3811 )
3812 }
3813 }
3814
3815 unsafe fn sync_start(
3816 &mut self,
3817 instance: Instance,
3818 callback: *mut VMFuncRef,
3819 callee: *mut VMFuncRef,
3820 param_count: u32,
3821 storage: *mut MaybeUninit<ValRaw>,
3822 storage_len: usize,
3823 ) -> Result<()> {
3824 unsafe {
3825 instance
3826 .start_call(
3827 StoreContextMut(self),
3828 callback,
3829 ptr::null_mut(),
3830 callee,
3831 param_count,
3832 1,
3833 START_FLAG_ASYNC_CALLEE,
3834 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3838 )
3839 .map(drop)
3840 }
3841 }
3842
3843 unsafe fn async_start(
3844 &mut self,
3845 instance: Instance,
3846 callback: *mut VMFuncRef,
3847 post_return: *mut VMFuncRef,
3848 callee: *mut VMFuncRef,
3849 param_count: u32,
3850 result_count: u32,
3851 flags: u32,
3852 ) -> Result<u32> {
3853 unsafe {
3854 instance.start_call(
3855 StoreContextMut(self),
3856 callback,
3857 post_return,
3858 callee,
3859 param_count,
3860 result_count,
3861 flags,
3862 None,
3863 )
3864 }
3865 }
3866
3867 fn future_write(
3868 &mut self,
3869 instance: Instance,
3870 caller: RuntimeComponentInstanceIndex,
3871 ty: TypeFutureTableIndex,
3872 options: OptionsIndex,
3873 future: u32,
3874 address: u32,
3875 ) -> Result<u32> {
3876 instance.id().get(self).check_may_leave(caller)?;
3877 instance
3878 .guest_write(
3879 StoreContextMut(self),
3880 caller,
3881 TransmitIndex::Future(ty),
3882 options,
3883 None,
3884 future,
3885 address,
3886 1,
3887 )
3888 .map(|result| result.encode())
3889 }
3890
3891 fn future_read(
3892 &mut self,
3893 instance: Instance,
3894 caller: RuntimeComponentInstanceIndex,
3895 ty: TypeFutureTableIndex,
3896 options: OptionsIndex,
3897 future: u32,
3898 address: u32,
3899 ) -> Result<u32> {
3900 instance.id().get(self).check_may_leave(caller)?;
3901 instance
3902 .guest_read(
3903 StoreContextMut(self),
3904 caller,
3905 TransmitIndex::Future(ty),
3906 options,
3907 None,
3908 future,
3909 address,
3910 1,
3911 )
3912 .map(|result| result.encode())
3913 }
3914
3915 fn stream_write(
3916 &mut self,
3917 instance: Instance,
3918 caller: RuntimeComponentInstanceIndex,
3919 ty: TypeStreamTableIndex,
3920 options: OptionsIndex,
3921 stream: u32,
3922 address: u32,
3923 count: u32,
3924 ) -> Result<u32> {
3925 instance.id().get(self).check_may_leave(caller)?;
3926 instance
3927 .guest_write(
3928 StoreContextMut(self),
3929 caller,
3930 TransmitIndex::Stream(ty),
3931 options,
3932 None,
3933 stream,
3934 address,
3935 count,
3936 )
3937 .map(|result| result.encode())
3938 }
3939
3940 fn stream_read(
3941 &mut self,
3942 instance: Instance,
3943 caller: RuntimeComponentInstanceIndex,
3944 ty: TypeStreamTableIndex,
3945 options: OptionsIndex,
3946 stream: u32,
3947 address: u32,
3948 count: u32,
3949 ) -> Result<u32> {
3950 instance.id().get(self).check_may_leave(caller)?;
3951 instance
3952 .guest_read(
3953 StoreContextMut(self),
3954 caller,
3955 TransmitIndex::Stream(ty),
3956 options,
3957 None,
3958 stream,
3959 address,
3960 count,
3961 )
3962 .map(|result| result.encode())
3963 }
3964
3965 fn future_drop_writable(
3966 &mut self,
3967 instance: Instance,
3968 caller: RuntimeComponentInstanceIndex,
3969 ty: TypeFutureTableIndex,
3970 writer: u32,
3971 ) -> Result<()> {
3972 instance.id().get(self).check_may_leave(caller)?;
3973 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3974 }
3975
3976 fn flat_stream_write(
3977 &mut self,
3978 instance: Instance,
3979 caller: RuntimeComponentInstanceIndex,
3980 ty: TypeStreamTableIndex,
3981 options: OptionsIndex,
3982 payload_size: u32,
3983 payload_align: u32,
3984 stream: u32,
3985 address: u32,
3986 count: u32,
3987 ) -> Result<u32> {
3988 instance.id().get(self).check_may_leave(caller)?;
3989 instance
3990 .guest_write(
3991 StoreContextMut(self),
3992 caller,
3993 TransmitIndex::Stream(ty),
3994 options,
3995 Some(FlatAbi {
3996 size: payload_size,
3997 align: payload_align,
3998 }),
3999 stream,
4000 address,
4001 count,
4002 )
4003 .map(|result| result.encode())
4004 }
4005
4006 fn flat_stream_read(
4007 &mut self,
4008 instance: Instance,
4009 caller: RuntimeComponentInstanceIndex,
4010 ty: TypeStreamTableIndex,
4011 options: OptionsIndex,
4012 payload_size: u32,
4013 payload_align: u32,
4014 stream: u32,
4015 address: u32,
4016 count: u32,
4017 ) -> Result<u32> {
4018 instance.id().get(self).check_may_leave(caller)?;
4019 instance
4020 .guest_read(
4021 StoreContextMut(self),
4022 caller,
4023 TransmitIndex::Stream(ty),
4024 options,
4025 Some(FlatAbi {
4026 size: payload_size,
4027 align: payload_align,
4028 }),
4029 stream,
4030 address,
4031 count,
4032 )
4033 .map(|result| result.encode())
4034 }
4035
4036 fn stream_drop_writable(
4037 &mut self,
4038 instance: Instance,
4039 caller: RuntimeComponentInstanceIndex,
4040 ty: TypeStreamTableIndex,
4041 writer: u32,
4042 ) -> Result<()> {
4043 instance.id().get(self).check_may_leave(caller)?;
4044 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4045 }
4046
4047 fn error_context_debug_message(
4048 &mut self,
4049 instance: Instance,
4050 caller: RuntimeComponentInstanceIndex,
4051 ty: TypeComponentLocalErrorContextTableIndex,
4052 options: OptionsIndex,
4053 err_ctx_handle: u32,
4054 debug_msg_address: u32,
4055 ) -> Result<()> {
4056 instance.id().get(self).check_may_leave(caller)?;
4057 instance.error_context_debug_message(
4058 StoreContextMut(self),
4059 ty,
4060 options,
4061 err_ctx_handle,
4062 debug_msg_address,
4063 )
4064 }
4065
4066 fn thread_new_indirect(
4067 &mut self,
4068 instance: Instance,
4069 caller: RuntimeComponentInstanceIndex,
4070 func_ty_idx: TypeFuncIndex,
4071 start_func_table_idx: RuntimeTableIndex,
4072 start_func_idx: u32,
4073 context: i32,
4074 ) -> Result<u32> {
4075 instance.thread_new_indirect(
4076 StoreContextMut(self),
4077 caller,
4078 func_ty_idx,
4079 start_func_table_idx,
4080 start_func_idx,
4081 context,
4082 )
4083 }
4084}
4085
4086type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4087
4088struct HostTask {
4090 common: WaitableCommon,
4091 caller_instance: RuntimeComponentInstanceIndex,
4092 join_handle: Option<JoinHandle>,
4093}
4094
4095impl HostTask {
4096 fn new(
4097 caller_instance: RuntimeComponentInstanceIndex,
4098 join_handle: Option<JoinHandle>,
4099 ) -> Self {
4100 Self {
4101 common: WaitableCommon::default(),
4102 caller_instance,
4103 join_handle,
4104 }
4105 }
4106}
4107
4108impl TableDebug for HostTask {
4109 fn type_name() -> &'static str {
4110 "HostTask"
4111 }
4112}
4113
4114type CallbackFn = Box<
4115 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
4116 + Send
4117 + Sync
4118 + 'static,
4119>;
4120
4121enum Caller {
4123 Host {
4125 tx: Option<oneshot::Sender<LiftedResult>>,
4127 exit_tx: Arc<oneshot::Sender<()>>,
4134 host_future_present: bool,
4137 call_post_return_automatically: bool,
4139 },
4140 Guest {
4142 thread: QualifiedThreadId,
4144 instance: RuntimeComponentInstanceIndex,
4150 },
4151}
4152
4153struct LiftResult {
4156 lift: RawLift,
4157 ty: TypeTupleIndex,
4158 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4159 string_encoding: StringEncoding,
4160}
4161
4162#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4167struct QualifiedThreadId {
4168 task: TableId<GuestTask>,
4169 thread: TableId<GuestThread>,
4170}
4171
4172impl QualifiedThreadId {
4173 fn qualify(
4174 state: &mut ConcurrentState,
4175 thread: TableId<GuestThread>,
4176 ) -> Result<QualifiedThreadId> {
4177 Ok(QualifiedThreadId {
4178 task: state.get_mut(thread)?.parent_task,
4179 thread,
4180 })
4181 }
4182}
4183
4184impl fmt::Debug for QualifiedThreadId {
4185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4186 f.debug_tuple("QualifiedThreadId")
4187 .field(&self.task.rep())
4188 .field(&self.thread.rep())
4189 .finish()
4190 }
4191}
4192
4193enum GuestThreadState {
4194 NotStartedImplicit,
4195 NotStartedExplicit(
4196 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4197 ),
4198 Running,
4199 Suspended(StoreFiber<'static>),
4200 Pending,
4201 Completed,
4202}
4203pub struct GuestThread {
4204 context: [u32; 2],
4207 parent_task: TableId<GuestTask>,
4209 wake_on_cancel: Option<TableId<WaitableSet>>,
4212 state: GuestThreadState,
4214 instance_rep: Option<u32>,
4217}
4218
4219impl GuestThread {
4220 fn from_instance(
4223 state: Pin<&mut ComponentInstance>,
4224 caller_instance: RuntimeComponentInstanceIndex,
4225 guest_thread: u32,
4226 ) -> Result<TableId<Self>> {
4227 let rep = state.instance_states().0[caller_instance]
4228 .handle_table()
4229 .guest_thread_rep(guest_thread)?;
4230 Ok(TableId::new(rep))
4231 }
4232
4233 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4234 Self {
4235 context: [0; 2],
4236 parent_task,
4237 wake_on_cancel: None,
4238 state: GuestThreadState::NotStartedImplicit,
4239 instance_rep: None,
4240 }
4241 }
4242
4243 fn new_explicit(
4244 parent_task: TableId<GuestTask>,
4245 start_func: Box<
4246 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4247 >,
4248 ) -> Self {
4249 Self {
4250 context: [0; 2],
4251 parent_task,
4252 wake_on_cancel: None,
4253 state: GuestThreadState::NotStartedExplicit(start_func),
4254 instance_rep: None,
4255 }
4256 }
4257}
4258
4259impl TableDebug for GuestThread {
4260 fn type_name() -> &'static str {
4261 "GuestThread"
4262 }
4263}
4264
4265enum SyncResult {
4266 NotProduced,
4267 Produced(Option<ValRaw>),
4268 Taken,
4269}
4270
4271impl SyncResult {
4272 fn take(&mut self) -> Option<Option<ValRaw>> {
4273 match mem::replace(self, SyncResult::Taken) {
4274 SyncResult::NotProduced => None,
4275 SyncResult::Produced(val) => Some(val),
4276 SyncResult::Taken => {
4277 panic!("attempted to take a synchronous result that was already taken")
4278 }
4279 }
4280 }
4281}
4282
4283#[derive(Debug)]
4284enum HostFutureState {
4285 NotApplicable,
4286 Live,
4287 Dropped,
4288}
4289
4290pub(crate) struct GuestTask {
4292 common: WaitableCommon,
4294 lower_params: Option<RawLower>,
4296 lift_result: Option<LiftResult>,
4298 result: Option<LiftedResult>,
4301 callback: Option<CallbackFn>,
4304 caller: Caller,
4306 call_context: Option<CallContext>,
4309 sync_result: SyncResult,
4312 cancel_sent: bool,
4315 starting_sent: bool,
4318 subtasks: HashSet<TableId<GuestTask>>,
4323 sync_call_set: TableId<WaitableSet>,
4325 instance: RuntimeInstance,
4332 event: Option<Event>,
4335 function_index: Option<ExportIndex>,
4337 exited: bool,
4339 threads: HashSet<TableId<GuestThread>>,
4341 host_future_state: HostFutureState,
4344 async_function: bool,
4347}
4348
4349impl GuestTask {
4350 fn already_lowered_parameters(&self) -> bool {
4351 self.lower_params.is_none()
4353 }
4354
4355 fn returned_or_cancelled(&self) -> bool {
4356 self.lift_result.is_none()
4358 }
4359
4360 fn ready_to_delete(&self) -> bool {
4361 let threads_completed = self.threads.is_empty();
4362 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4363 let pending_completion_event = matches!(
4364 self.common.event,
4365 Some(Event::Subtask {
4366 status: Status::Returned | Status::ReturnCancelled
4367 })
4368 );
4369 let ready = threads_completed
4370 && !has_sync_result
4371 && !pending_completion_event
4372 && !matches!(self.host_future_state, HostFutureState::Live);
4373 log::trace!(
4374 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4375 threads_completed,
4376 has_sync_result,
4377 pending_completion_event,
4378 self.host_future_state
4379 );
4380 ready
4381 }
4382
4383 fn new(
4384 state: &mut ConcurrentState,
4385 lower_params: RawLower,
4386 lift_result: LiftResult,
4387 caller: Caller,
4388 callback: Option<CallbackFn>,
4389 component_instance: Instance,
4390 instance: RuntimeComponentInstanceIndex,
4391 async_function: bool,
4392 ) -> Result<Self> {
4393 let sync_call_set = state.push(WaitableSet::default())?;
4394 let host_future_state = match &caller {
4395 Caller::Guest { .. } => HostFutureState::NotApplicable,
4396 Caller::Host {
4397 host_future_present,
4398 ..
4399 } => {
4400 if *host_future_present {
4401 HostFutureState::Live
4402 } else {
4403 HostFutureState::NotApplicable
4404 }
4405 }
4406 };
4407 Ok(Self {
4408 common: WaitableCommon::default(),
4409 lower_params: Some(lower_params),
4410 lift_result: Some(lift_result),
4411 result: None,
4412 callback,
4413 caller,
4414 call_context: Some(CallContext::default()),
4415 sync_result: SyncResult::NotProduced,
4416 cancel_sent: false,
4417 starting_sent: false,
4418 subtasks: HashSet::new(),
4419 sync_call_set,
4420 instance: RuntimeInstance {
4421 instance: component_instance.id().instance(),
4422 index: instance,
4423 },
4424 event: None,
4425 function_index: None,
4426 exited: false,
4427 threads: HashSet::new(),
4428 host_future_state,
4429 async_function,
4430 })
4431 }
4432
4433 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4436 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4439 if let Some(Event::Subtask {
4440 status: Status::Returned | Status::ReturnCancelled,
4441 }) = waitable.common(state)?.event
4442 {
4443 waitable.delete_from(state)?;
4444 }
4445 }
4446
4447 assert!(self.threads.is_empty());
4448
4449 state.delete(self.sync_call_set)?;
4450
4451 match &self.caller {
4453 Caller::Guest {
4454 thread,
4455 instance: runtime_instance,
4456 } => {
4457 let task_mut = state.get_mut(thread.task)?;
4458 let present = task_mut.subtasks.remove(&me);
4459 assert!(present);
4460
4461 for subtask in &self.subtasks {
4462 task_mut.subtasks.insert(*subtask);
4463 }
4464
4465 for subtask in &self.subtasks {
4466 state.get_mut(*subtask)?.caller = Caller::Guest {
4467 thread: *thread,
4468 instance: *runtime_instance,
4469 };
4470 }
4471 }
4472 Caller::Host { exit_tx, .. } => {
4473 for subtask in &self.subtasks {
4474 state.get_mut(*subtask)?.caller = Caller::Host {
4475 tx: None,
4476 exit_tx: exit_tx.clone(),
4480 host_future_present: false,
4481 call_post_return_automatically: true,
4482 };
4483 }
4484 }
4485 }
4486
4487 for subtask in self.subtasks {
4488 if state.get_mut(subtask)?.exited {
4489 Waitable::Guest(subtask).delete_from(state)?;
4490 }
4491 }
4492
4493 Ok(())
4494 }
4495
4496 fn call_post_return_automatically(&self) -> bool {
4497 matches!(
4498 self.caller,
4499 Caller::Guest { .. }
4500 | Caller::Host {
4501 call_post_return_automatically: true,
4502 ..
4503 }
4504 )
4505 }
4506}
4507
4508impl TableDebug for GuestTask {
4509 fn type_name() -> &'static str {
4510 "GuestTask"
4511 }
4512}
4513
4514#[derive(Default)]
4516struct WaitableCommon {
4517 event: Option<Event>,
4519 set: Option<TableId<WaitableSet>>,
4521 handle: Option<u32>,
4523}
4524
4525#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4527enum Waitable {
4528 Host(TableId<HostTask>),
4530 Guest(TableId<GuestTask>),
4532 Transmit(TableId<TransmitHandle>),
4534}
4535
4536impl Waitable {
4537 fn from_instance(
4540 state: Pin<&mut ComponentInstance>,
4541 caller_instance: RuntimeComponentInstanceIndex,
4542 waitable: u32,
4543 ) -> Result<Self> {
4544 use crate::runtime::vm::component::Waitable;
4545
4546 let (waitable, kind) = state.instance_states().0[caller_instance]
4547 .handle_table()
4548 .waitable_rep(waitable)?;
4549
4550 Ok(match kind {
4551 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4552 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4553 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4554 })
4555 }
4556
4557 fn rep(&self) -> u32 {
4559 match self {
4560 Self::Host(id) => id.rep(),
4561 Self::Guest(id) => id.rep(),
4562 Self::Transmit(id) => id.rep(),
4563 }
4564 }
4565
4566 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4570 log::trace!("waitable {self:?} join set {set:?}",);
4571
4572 let old = mem::replace(&mut self.common(state)?.set, set);
4573
4574 if let Some(old) = old {
4575 match *self {
4576 Waitable::Host(id) => state.remove_child(id, old),
4577 Waitable::Guest(id) => state.remove_child(id, old),
4578 Waitable::Transmit(id) => state.remove_child(id, old),
4579 }?;
4580
4581 state.get_mut(old)?.ready.remove(self);
4582 }
4583
4584 if let Some(set) = set {
4585 match *self {
4586 Waitable::Host(id) => state.add_child(id, set),
4587 Waitable::Guest(id) => state.add_child(id, set),
4588 Waitable::Transmit(id) => state.add_child(id, set),
4589 }?;
4590
4591 if self.common(state)?.event.is_some() {
4592 self.mark_ready(state)?;
4593 }
4594 }
4595
4596 Ok(())
4597 }
4598
4599 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4601 Ok(match self {
4602 Self::Host(id) => &mut state.get_mut(*id)?.common,
4603 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4604 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4605 })
4606 }
4607
4608 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4612 log::trace!("set event for {self:?}: {event:?}");
4613 self.common(state)?.event = event;
4614 self.mark_ready(state)
4615 }
4616
4617 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4619 let common = self.common(state)?;
4620 let event = common.event.take();
4621 if let Some(set) = self.common(state)?.set {
4622 state.get_mut(set)?.ready.remove(self);
4623 }
4624
4625 Ok(event)
4626 }
4627
4628 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4632 if let Some(set) = self.common(state)?.set {
4633 state.get_mut(set)?.ready.insert(*self);
4634 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4635 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4636 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4637
4638 let item = match mode {
4639 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4640 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4641 thread,
4642 kind: GuestCallKind::DeliverEvent {
4643 instance,
4644 set: Some(set),
4645 },
4646 }),
4647 };
4648 state.push_high_priority(item);
4649 }
4650 }
4651 Ok(())
4652 }
4653
4654 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4656 match self {
4657 Self::Host(task) => {
4658 log::trace!("delete host task {task:?}");
4659 state.delete(*task)?;
4660 }
4661 Self::Guest(task) => {
4662 log::trace!("delete guest task {task:?}");
4663 state.delete(*task)?.dispose(state, *task)?;
4664 }
4665 Self::Transmit(task) => {
4666 state.delete(*task)?;
4667 }
4668 }
4669
4670 Ok(())
4671 }
4672}
4673
4674impl fmt::Debug for Waitable {
4675 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4676 match self {
4677 Self::Host(id) => write!(f, "{id:?}"),
4678 Self::Guest(id) => write!(f, "{id:?}"),
4679 Self::Transmit(id) => write!(f, "{id:?}"),
4680 }
4681 }
4682}
4683
4684#[derive(Default)]
4686struct WaitableSet {
4687 ready: BTreeSet<Waitable>,
4689 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4691}
4692
4693impl TableDebug for WaitableSet {
4694 fn type_name() -> &'static str {
4695 "WaitableSet"
4696 }
4697}
4698
4699type RawLower =
4701 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4702
4703type RawLift = Box<
4705 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4706>;
4707
4708type LiftedResult = Box<dyn Any + Send + Sync>;
4712
4713struct DummyResult;
4716
4717#[derive(Default)]
4719pub struct ConcurrentInstanceState {
4720 backpressure: u16,
4722 do_not_enter: bool,
4724 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4727}
4728
4729impl ConcurrentInstanceState {
4730 pub fn pending_is_empty(&self) -> bool {
4731 self.pending.is_empty()
4732 }
4733}
4734
4735pub struct ConcurrentState {
4737 guest_thread: Option<QualifiedThreadId>,
4739
4740 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4745 table: AlwaysMut<ResourceTable>,
4747 high_priority: Vec<WorkItem>,
4749 low_priority: Vec<WorkItem>,
4751 suspend_reason: Option<SuspendReason>,
4755 worker: Option<StoreFiber<'static>>,
4759 worker_item: Option<WorkerItem>,
4761
4762 global_error_context_ref_counts:
4775 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4776}
4777
4778impl Default for ConcurrentState {
4779 fn default() -> Self {
4780 Self {
4781 guest_thread: None,
4782 table: AlwaysMut::new(ResourceTable::new()),
4783 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4784 high_priority: Vec::new(),
4785 low_priority: Vec::new(),
4786 suspend_reason: None,
4787 worker: None,
4788 worker_item: None,
4789 global_error_context_ref_counts: BTreeMap::new(),
4790 }
4791 }
4792}
4793
4794impl ConcurrentState {
4795 pub(crate) fn take_fibers_and_futures(
4812 &mut self,
4813 fibers: &mut Vec<StoreFiber<'static>>,
4814 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4815 ) {
4816 for entry in self.table.get_mut().iter_mut() {
4817 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4818 for mode in mem::take(&mut set.waiting).into_values() {
4819 if let WaitMode::Fiber(fiber) = mode {
4820 fibers.push(fiber);
4821 }
4822 }
4823 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4824 if let GuestThreadState::Suspended(fiber) =
4825 mem::replace(&mut thread.state, GuestThreadState::Completed)
4826 {
4827 fibers.push(fiber);
4828 }
4829 }
4830 }
4831
4832 if let Some(fiber) = self.worker.take() {
4833 fibers.push(fiber);
4834 }
4835
4836 let mut take_items = |list| {
4837 for item in mem::take(list) {
4838 match item {
4839 WorkItem::ResumeFiber(fiber) => {
4840 fibers.push(fiber);
4841 }
4842 WorkItem::PushFuture(future) => {
4843 self.futures
4844 .get_mut()
4845 .as_mut()
4846 .unwrap()
4847 .push(future.into_inner());
4848 }
4849 _ => {}
4850 }
4851 }
4852 };
4853
4854 take_items(&mut self.high_priority);
4855 take_items(&mut self.low_priority);
4856
4857 if let Some(them) = self.futures.get_mut().take() {
4858 futures.push(them);
4859 }
4860 }
4861
4862 fn push<V: Send + Sync + 'static>(
4863 &mut self,
4864 value: V,
4865 ) -> Result<TableId<V>, ResourceTableError> {
4866 self.table.get_mut().push(value).map(TableId::from)
4867 }
4868
4869 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4870 self.table.get_mut().get_mut(&Resource::from(id))
4871 }
4872
4873 pub fn add_child<T: 'static, U: 'static>(
4874 &mut self,
4875 child: TableId<T>,
4876 parent: TableId<U>,
4877 ) -> Result<(), ResourceTableError> {
4878 self.table
4879 .get_mut()
4880 .add_child(Resource::from(child), Resource::from(parent))
4881 }
4882
4883 pub fn remove_child<T: 'static, U: 'static>(
4884 &mut self,
4885 child: TableId<T>,
4886 parent: TableId<U>,
4887 ) -> Result<(), ResourceTableError> {
4888 self.table
4889 .get_mut()
4890 .remove_child(Resource::from(child), Resource::from(parent))
4891 }
4892
4893 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4894 self.table.get_mut().delete(Resource::from(id))
4895 }
4896
4897 fn push_future(&mut self, future: HostTaskFuture) {
4898 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4905 }
4906
4907 fn push_high_priority(&mut self, item: WorkItem) {
4908 log::trace!("push high priority: {item:?}");
4909 self.high_priority.push(item);
4910 }
4911
4912 fn push_low_priority(&mut self, item: WorkItem) {
4913 log::trace!("push low priority: {item:?}");
4914 self.low_priority.push(item);
4915 }
4916
4917 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4918 if high_priority {
4919 self.push_high_priority(item);
4920 } else {
4921 self.push_low_priority(item);
4922 }
4923 }
4924
4925 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4935 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4936
4937 loop {
4945 let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4946 Caller::Host { .. } => break true,
4947 Caller::Guest { thread, instance } => {
4948 if *instance == guest_instance.index {
4949 break false;
4950 } else {
4951 *thread
4952 }
4953 }
4954 };
4955 guest_task = next_thread.task;
4956 }
4957 }
4958
4959 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4961 let thread = self.guest_thread.unwrap();
4962 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4963 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4964 Ok(val)
4965 }
4966
4967 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4969 let thread = self.guest_thread.unwrap();
4970 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4971 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4972 Ok(())
4973 }
4974
4975 fn take_pending_cancellation(&mut self) -> bool {
4978 let thread = self.guest_thread.unwrap();
4979 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4980 assert!(matches!(event, Event::Cancelled));
4981 true
4982 } else {
4983 false
4984 }
4985 }
4986
4987 fn check_blocking(&mut self) -> Result<()> {
4988 let task = self.guest_thread.unwrap().task;
4989 self.check_blocking_for(task)
4990 }
4991
4992 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4993 if self.may_block(task) {
4994 Ok(())
4995 } else {
4996 Err(Trap::CannotBlockSyncTask.into())
4997 }
4998 }
4999
5000 fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5001 let task = self.get_mut(task).unwrap();
5002 task.async_function || task.returned_or_cancelled()
5003 }
5004}
5005
5006fn for_any_lower<
5009 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5010>(
5011 fun: F,
5012) -> F {
5013 fun
5014}
5015
5016fn for_any_lift<
5018 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5019>(
5020 fun: F,
5021) -> F {
5022 fun
5023}
5024
5025fn checked<F: Future + Send + 'static>(
5030 id: StoreId,
5031 fut: F,
5032) -> impl Future<Output = F::Output> + Send + 'static {
5033 async move {
5034 let mut fut = pin!(fut);
5035 future::poll_fn(move |cx| {
5036 let message = "\
5037 `Future`s which depend on asynchronous component tasks, streams, or \
5038 futures to complete may only be polled from the event loop of the \
5039 store to which they belong. Please use \
5040 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5041 ";
5042 tls::try_get(|store| {
5043 let matched = match store {
5044 tls::TryGet::Some(store) => store.id() == id,
5045 tls::TryGet::Taken | tls::TryGet::None => false,
5046 };
5047
5048 if !matched {
5049 panic!("{message}")
5050 }
5051 });
5052 fut.as_mut().poll(cx)
5053 })
5054 .await
5055 }
5056}
5057
5058fn check_recursive_run() {
5061 tls::try_get(|store| {
5062 if !matches!(store, tls::TryGet::None) {
5063 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5064 }
5065 });
5066}
5067
5068fn unpack_callback_code(code: u32) -> (u32, u32) {
5069 (code & 0xF, code >> 4)
5070}
5071
5072struct WaitableCheckParams {
5076 set: TableId<WaitableSet>,
5077 options: OptionsIndex,
5078 payload: u32,
5079}
5080
5081enum WaitableCheck {
5083 Wait(WaitableCheckParams),
5084 Poll(WaitableCheckParams),
5085}
5086
5087pub(crate) struct PreparedCall<R> {
5089 handle: Func,
5091 thread: QualifiedThreadId,
5093 param_count: usize,
5095 rx: oneshot::Receiver<LiftedResult>,
5098 exit_rx: oneshot::Receiver<()>,
5101 _phantom: PhantomData<R>,
5102}
5103
5104impl<R> PreparedCall<R> {
5105 pub(crate) fn task_id(&self) -> TaskId {
5107 TaskId {
5108 task: self.thread.task,
5109 }
5110 }
5111}
5112
5113pub(crate) struct TaskId {
5115 task: TableId<GuestTask>,
5116}
5117
5118impl TaskId {
5119 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5125 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5126 if !task.already_lowered_parameters() {
5127 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5128 } else {
5129 task.host_future_state = HostFutureState::Dropped;
5130 if task.ready_to_delete() {
5131 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5132 }
5133 }
5134 Ok(())
5135 }
5136}
5137
5138pub(crate) fn prepare_call<T, R>(
5144 mut store: StoreContextMut<T>,
5145 handle: Func,
5146 param_count: usize,
5147 host_future_present: bool,
5148 call_post_return_automatically: bool,
5149 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5150 + Send
5151 + Sync
5152 + 'static,
5153 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5154 + Send
5155 + Sync
5156 + 'static,
5157) -> Result<PreparedCall<R>> {
5158 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5159
5160 let instance = handle.instance().id().get(store.0);
5161 let options = &instance.component().env_component().options[options];
5162 let ty = &instance.component().types()[ty];
5163 let async_function = ty.async_;
5164 let task_return_type = ty.results;
5165 let component_instance = raw_options.instance;
5166 let callback = options.callback.map(|i| instance.runtime_callback(i));
5167 let memory = options
5168 .memory()
5169 .map(|i| instance.runtime_memory(i))
5170 .map(SendSyncPtr::new);
5171 let string_encoding = options.string_encoding;
5172 let token = StoreToken::new(store.as_context_mut());
5173 let state = store.0.concurrent_state_mut();
5174
5175 let (tx, rx) = oneshot::channel();
5176 let (exit_tx, exit_rx) = oneshot::channel();
5177
5178 let mut task = GuestTask::new(
5179 state,
5180 Box::new(for_any_lower(move |store, params| {
5181 lower_params(handle, token.as_context_mut(store), params)
5182 })),
5183 LiftResult {
5184 lift: Box::new(for_any_lift(move |store, result| {
5185 lift_result(handle, store, result)
5186 })),
5187 ty: task_return_type,
5188 memory,
5189 string_encoding,
5190 },
5191 Caller::Host {
5192 tx: Some(tx),
5193 exit_tx: Arc::new(exit_tx),
5194 host_future_present,
5195 call_post_return_automatically,
5196 },
5197 callback.map(|callback| {
5198 let callback = SendSyncPtr::new(callback);
5199 let instance = handle.instance();
5200 Box::new(
5201 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5202 let store = token.as_context_mut(store);
5203 unsafe {
5206 instance.call_callback(
5207 store,
5208 runtime_instance,
5209 callback,
5210 event,
5211 handle,
5212 call_post_return_automatically,
5213 )
5214 }
5215 },
5216 ) as CallbackFn
5217 }),
5218 handle.instance(),
5219 component_instance,
5220 async_function,
5221 )?;
5222 task.function_index = Some(handle.index());
5223
5224 let task = state.push(task)?;
5225 let thread = state.push(GuestThread::new_implicit(task))?;
5226 state.get_mut(task)?.threads.insert(thread);
5227
5228 Ok(PreparedCall {
5229 handle,
5230 thread: QualifiedThreadId { task, thread },
5231 param_count,
5232 rx,
5233 exit_rx,
5234 _phantom: PhantomData,
5235 })
5236}
5237
5238pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5245 mut store: StoreContextMut<T>,
5246 prepared: PreparedCall<R>,
5247) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5248 let PreparedCall {
5249 handle,
5250 thread,
5251 param_count,
5252 rx,
5253 exit_rx,
5254 ..
5255 } = prepared;
5256
5257 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5258
5259 Ok(checked(
5260 store.0.id(),
5261 rx.map(move |result| {
5262 result
5263 .map(|v| (*v.downcast().unwrap(), exit_rx))
5264 .map_err(anyhow::Error::from)
5265 }),
5266 ))
5267}
5268
5269fn queue_call0<T: 'static>(
5272 store: StoreContextMut<T>,
5273 handle: Func,
5274 guest_thread: QualifiedThreadId,
5275 param_count: usize,
5276) -> Result<()> {
5277 let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5278 let is_concurrent = raw_options.async_;
5279 let callback = raw_options.callback;
5280 let instance = handle.instance();
5281 let callee = handle.lifted_core_func(store.0);
5282 let post_return = handle.post_return_core_func(store.0);
5283 let callback = callback.map(|i| {
5284 let instance = instance.id().get(store.0);
5285 SendSyncPtr::new(instance.runtime_callback(i))
5286 });
5287
5288 log::trace!("queueing call {guest_thread:?}");
5289
5290 let instance_flags = if callback.is_none() {
5291 None
5292 } else {
5293 Some(flags)
5294 };
5295
5296 unsafe {
5300 instance.queue_call(
5301 store,
5302 guest_thread,
5303 SendSyncPtr::new(callee),
5304 param_count,
5305 1,
5306 instance_flags,
5307 is_concurrent,
5308 callback,
5309 post_return.map(SendSyncPtr::new),
5310 )
5311 }
5312}