1use crate::component::func::{self, Func, call_post_return};
54use crate::component::{
55 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, InstanceState, ResourceTables};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64 bail,
65 error::{Context as _, format_err},
66};
67use error_contexts::GlobalErrorContextRefCount;
68use futures::channel::oneshot;
69use futures::future::{self, FutureExt};
70use futures::stream::{FuturesUnordered, StreamExt};
71use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
72use std::any::Any;
73use std::borrow::ToOwned;
74use std::boxed::Box;
75use std::cell::UnsafeCell;
76use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
77use std::fmt;
78use std::future::Future;
79use std::marker::PhantomData;
80use std::mem::{self, ManuallyDrop, MaybeUninit};
81use std::ops::DerefMut;
82use std::pin::{Pin, pin};
83use std::ptr::{self, NonNull};
84use std::sync::Arc;
85use std::task::{Context, Poll, Waker};
86use std::vec::Vec;
87use table::{TableDebug, TableId};
88use wasmtime_environ::Trap;
89use wasmtime_environ::component::{
90 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
91 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96use wasmtime_environ::packed_option::ReservedValue;
97
98pub use abort::JoinHandle;
99pub use future_stream_any::{FutureAny, StreamAny};
100pub use futures_and_streams::{
101 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
102 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
103 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
104};
105pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
106
107mod abort;
108mod error_contexts;
109mod future_stream_any;
110mod futures_and_streams;
111pub(crate) mod table;
112pub(crate) mod tls;
113
114const BLOCKED: u32 = 0xffff_ffff;
117
118#[derive(Clone, Copy, Eq, PartialEq, Debug)]
120pub enum Status {
121 Starting = 0,
122 Started = 1,
123 Returned = 2,
124 StartCancelled = 3,
125 ReturnCancelled = 4,
126}
127
128impl Status {
129 pub fn pack(self, waitable: Option<u32>) -> u32 {
135 assert!(matches!(self, Status::Returned) == waitable.is_none());
136 let waitable = waitable.unwrap_or(0);
137 assert!(waitable < (1 << 28));
138 (waitable << 4) | (self as u32)
139 }
140}
141
142#[derive(Clone, Copy, Debug)]
145enum Event {
146 None,
147 Cancelled,
148 Subtask {
149 status: Status,
150 },
151 StreamRead {
152 code: ReturnCode,
153 pending: Option<(TypeStreamTableIndex, u32)>,
154 },
155 StreamWrite {
156 code: ReturnCode,
157 pending: Option<(TypeStreamTableIndex, u32)>,
158 },
159 FutureRead {
160 code: ReturnCode,
161 pending: Option<(TypeFutureTableIndex, u32)>,
162 },
163 FutureWrite {
164 code: ReturnCode,
165 pending: Option<(TypeFutureTableIndex, u32)>,
166 },
167}
168
169impl Event {
170 fn parts(self) -> (u32, u32) {
175 const EVENT_NONE: u32 = 0;
176 const EVENT_SUBTASK: u32 = 1;
177 const EVENT_STREAM_READ: u32 = 2;
178 const EVENT_STREAM_WRITE: u32 = 3;
179 const EVENT_FUTURE_READ: u32 = 4;
180 const EVENT_FUTURE_WRITE: u32 = 5;
181 const EVENT_CANCELLED: u32 = 6;
182 match self {
183 Event::None => (EVENT_NONE, 0),
184 Event::Cancelled => (EVENT_CANCELLED, 0),
185 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
186 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
187 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
188 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
189 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
190 }
191 }
192}
193
194mod callback_code {
196 pub const EXIT: u32 = 0;
197 pub const YIELD: u32 = 1;
198 pub const WAIT: u32 = 2;
199}
200
201const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
205
206pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
212 store: StoreContextMut<'a, T>,
213 get_data: fn(&mut T) -> D::Data<'_>,
214}
215
216impl<'a, T, D> Access<'a, T, D>
217where
218 D: HasData + ?Sized,
219 T: 'static,
220{
221 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
223 Self { store, get_data }
224 }
225
226 pub fn data_mut(&mut self) -> &mut T {
228 self.store.data_mut()
229 }
230
231 pub fn get(&mut self) -> D::Data<'_> {
233 (self.get_data)(self.data_mut())
234 }
235
236 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
240 where
241 T: 'static,
242 {
243 let accessor = Accessor {
244 get_data: self.get_data,
245 token: StoreToken::new(self.store.as_context_mut()),
246 };
247 self.store
248 .as_context_mut()
249 .spawn_with_accessor(accessor, task)
250 }
251
252 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
255 self.get_data
256 }
257}
258
259impl<'a, T, D> AsContext for Access<'a, T, D>
260where
261 D: HasData + ?Sized,
262 T: 'static,
263{
264 type Data = T;
265
266 fn as_context(&self) -> StoreContext<'_, T> {
267 self.store.as_context()
268 }
269}
270
271impl<'a, T, D> AsContextMut for Access<'a, T, D>
272where
273 D: HasData + ?Sized,
274 T: 'static,
275{
276 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
277 self.store.as_context_mut()
278 }
279}
280
281pub struct Accessor<T: 'static, D = HasSelf<T>>
341where
342 D: HasData + ?Sized,
343{
344 token: StoreToken<T>,
345 get_data: fn(&mut T) -> D::Data<'_>,
346}
347
348pub trait AsAccessor {
365 type Data: 'static;
367
368 type AccessorData: HasData + ?Sized;
371
372 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
374}
375
376impl<T: AsAccessor + ?Sized> AsAccessor for &T {
377 type Data = T::Data;
378 type AccessorData = T::AccessorData;
379
380 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
381 T::as_accessor(self)
382 }
383}
384
385impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
386 type Data = T;
387 type AccessorData = D;
388
389 fn as_accessor(&self) -> &Accessor<T, D> {
390 self
391 }
392}
393
394const _: () = {
417 const fn assert<T: Send + Sync>() {}
418 assert::<Accessor<UnsafeCell<u32>>>();
419};
420
421impl<T> Accessor<T> {
422 pub(crate) fn new(token: StoreToken<T>) -> Self {
431 Self {
432 token,
433 get_data: |x| x,
434 }
435 }
436}
437
438impl<T, D> Accessor<T, D>
439where
440 D: HasData + ?Sized,
441{
442 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
460 tls::get(|vmstore| {
461 fun(Access {
462 store: self.token.as_context_mut(vmstore),
463 get_data: self.get_data,
464 })
465 })
466 }
467
468 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
471 self.get_data
472 }
473
474 pub fn with_getter<D2: HasData>(
491 &self,
492 get_data: fn(&mut T) -> D2::Data<'_>,
493 ) -> Accessor<T, D2> {
494 Accessor {
495 token: self.token,
496 get_data,
497 }
498 }
499
500 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
516 where
517 T: 'static,
518 {
519 let accessor = self.clone_for_spawn();
520 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
521 }
522
523 fn clone_for_spawn(&self) -> Self {
524 Self {
525 token: self.token,
526 get_data: self.get_data,
527 }
528 }
529}
530
531pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
543where
544 D: HasData + ?Sized,
545{
546 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
548}
549
550enum CallerInfo {
553 Async {
555 params: Vec<ValRaw>,
556 has_result: bool,
557 },
558 Sync {
560 params: Vec<ValRaw>,
561 result_count: u32,
562 },
563}
564
565enum WaitMode {
567 Fiber(StoreFiber<'static>),
569 Callback(Instance),
572}
573
574#[derive(Debug)]
576enum SuspendReason {
577 Waiting {
580 set: TableId<WaitableSet>,
581 thread: QualifiedThreadId,
582 skip_may_block_check: bool,
583 },
584 NeedWork,
587 Yielding {
590 thread: QualifiedThreadId,
591 skip_may_block_check: bool,
592 },
593 ExplicitlySuspending {
595 thread: QualifiedThreadId,
596 skip_may_block_check: bool,
597 },
598}
599
600enum GuestCallKind {
602 DeliverEvent {
605 instance: Instance,
607 set: Option<TableId<WaitableSet>>,
612 },
613 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
619 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
620}
621
622impl fmt::Debug for GuestCallKind {
623 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
624 match self {
625 Self::DeliverEvent { instance, set } => f
626 .debug_struct("DeliverEvent")
627 .field("instance", instance)
628 .field("set", set)
629 .finish(),
630 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
631 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
632 }
633 }
634}
635
636#[derive(Debug)]
638struct GuestCall {
639 thread: QualifiedThreadId,
640 kind: GuestCallKind,
641}
642
643impl GuestCall {
644 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
654 let instance = store
655 .concurrent_state_mut()
656 .get_mut(self.thread.task)?
657 .instance;
658 let state = store.instance_state(instance).concurrent_state();
659
660 let ready = match &self.kind {
661 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
662 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
663 GuestCallKind::StartExplicit(_) => true,
664 };
665 log::trace!(
666 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
667 state.do_not_enter,
668 state.backpressure
669 );
670 Ok(ready)
671 }
672}
673
674enum WorkerItem {
676 GuestCall(GuestCall),
677 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
678}
679
680enum WorkItem {
683 PushFuture(AlwaysMut<HostTaskFuture>),
685 ResumeFiber(StoreFiber<'static>),
687 GuestCall(GuestCall),
689 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
691}
692
693impl fmt::Debug for WorkItem {
694 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
695 match self {
696 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
697 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
698 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
699 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
700 }
701 }
702}
703
704#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
706pub(crate) enum WaitResult {
707 Cancelled,
708 Completed,
709}
710
711pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
719 store: &mut dyn VMStore,
720 future: impl Future<Output = Result<R>> + Send + 'static,
721 caller_instance: RuntimeInstance,
722) -> Result<R> {
723 let state = store.concurrent_state_mut();
724
725 let caller = state.guest_thread.unwrap();
726
727 let old_result = state
730 .get_mut(caller.task)
731 .with_context(|| format!("bad handle: {caller:?}"))?
732 .result
733 .take();
734
735 let task = state.push(HostTask::new(caller_instance, None))?;
739
740 log::trace!("new host task child of {caller:?}: {task:?}");
741
742 let mut future = Box::pin(async move {
746 let result = future.await?;
747 tls::get(move |store| {
748 let state = store.concurrent_state_mut();
749 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
750
751 Waitable::Host(task).set_event(
752 state,
753 Some(Event::Subtask {
754 status: Status::Returned,
755 }),
756 )?;
757
758 Ok(())
759 })
760 }) as HostTaskFuture;
761
762 let poll = tls::set(store, || {
766 future
767 .as_mut()
768 .poll(&mut Context::from_waker(&Waker::noop()))
769 });
770
771 match poll {
772 Poll::Ready(result) => {
773 result?;
775 log::trace!("delete host task {task:?} (already ready)");
776 store.concurrent_state_mut().delete(task)?;
777 }
778 Poll::Pending => {
779 let state = store.concurrent_state_mut();
784 state.push_future(future);
785
786 let set = state.get_mut(caller.task)?.sync_call_set;
787 Waitable::Host(task).join(state, Some(set))?;
788
789 store.suspend(SuspendReason::Waiting {
790 set,
791 thread: caller,
792 skip_may_block_check: false,
793 })?;
794 }
795 }
796
797 Ok(*mem::replace(
799 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
800 old_result,
801 )
802 .unwrap()
803 .downcast()
804 .unwrap())
805}
806
807fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
809 let mut next = Some(call);
810 while let Some(call) = next.take() {
811 match call.kind {
812 GuestCallKind::DeliverEvent { instance, set } => {
813 let (event, waitable) = instance
814 .get_event(store, call.thread.task, set, true)?
815 .unwrap();
816 let state = store.concurrent_state_mut();
817 let task = state.get_mut(call.thread.task)?;
818 let runtime_instance = task.instance;
819 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
820
821 log::trace!(
822 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
823 call.thread,
824 );
825
826 let old_thread = store.set_thread(Some(call.thread));
827 log::trace!(
828 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
829 call.thread
830 );
831
832 store.maybe_push_call_context(call.thread.task)?;
833
834 store.enter_instance(runtime_instance);
835
836 let callback = store
837 .concurrent_state_mut()
838 .get_mut(call.thread.task)?
839 .callback
840 .take()
841 .unwrap();
842
843 let code = callback(store, event, handle)?;
844
845 store
846 .concurrent_state_mut()
847 .get_mut(call.thread.task)?
848 .callback = Some(callback);
849
850 store.exit_instance(runtime_instance)?;
851
852 store.maybe_pop_call_context(call.thread.task)?;
853
854 store.set_thread(old_thread);
855
856 next = instance.handle_callback_code(
857 store,
858 call.thread,
859 runtime_instance.index,
860 code,
861 )?;
862
863 log::trace!(
864 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
865 );
866 }
867 GuestCallKind::StartImplicit(fun) => {
868 next = fun(store)?;
869 }
870 GuestCallKind::StartExplicit(fun) => {
871 fun(store)?;
872 }
873 }
874 }
875
876 Ok(())
877}
878
879impl<T> Store<T> {
880 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
882 where
883 T: Send + 'static,
884 {
885 ensure!(
886 self.as_context().0.concurrency_support(),
887 "cannot use `run_concurrent` when Config::concurrency_support disabled",
888 );
889 self.as_context_mut().run_concurrent(fun).await
890 }
891
892 #[doc(hidden)]
893 pub fn assert_concurrent_state_empty(&mut self) {
894 self.as_context_mut().assert_concurrent_state_empty();
895 }
896
897 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
899 where
900 T: 'static,
901 {
902 self.as_context_mut().spawn(task)
903 }
904}
905
906impl<T> StoreContextMut<'_, T> {
907 #[doc(hidden)]
916 pub fn assert_concurrent_state_empty(self) {
917 let store = self.0;
918 store
919 .store_data_mut()
920 .components
921 .assert_instance_states_empty();
922 let state = store.concurrent_state_mut();
923 assert!(
924 state.table.get_mut().is_empty(),
925 "non-empty table: {:?}",
926 state.table.get_mut()
927 );
928 assert!(state.high_priority.is_empty());
929 assert!(state.low_priority.is_empty());
930 assert!(state.guest_thread.is_none());
931 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
932 assert!(state.global_error_context_ref_counts.is_empty());
933 }
934
935 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
945 where
946 T: 'static,
947 {
948 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
949 self.spawn_with_accessor(accessor, task)
950 }
951
952 fn spawn_with_accessor<D>(
955 self,
956 accessor: Accessor<T, D>,
957 task: impl AccessorTask<T, D>,
958 ) -> JoinHandle
959 where
960 T: 'static,
961 D: HasData + ?Sized,
962 {
963 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
967 self.0
968 .concurrent_state_mut()
969 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
970 handle
971 }
972
973 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1057 where
1058 T: Send + 'static,
1059 {
1060 ensure!(
1061 self.0.concurrency_support(),
1062 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1063 );
1064 self.do_run_concurrent(fun, false).await
1065 }
1066
1067 pub(super) async fn run_concurrent_trap_on_idle<R>(
1068 self,
1069 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1070 ) -> Result<R>
1071 where
1072 T: Send + 'static,
1073 {
1074 self.do_run_concurrent(fun, true).await
1075 }
1076
1077 async fn do_run_concurrent<R>(
1078 mut self,
1079 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1080 trap_on_idle: bool,
1081 ) -> Result<R>
1082 where
1083 T: Send + 'static,
1084 {
1085 debug_assert!(self.0.concurrency_support());
1086 check_recursive_run();
1087 let token = StoreToken::new(self.as_context_mut());
1088
1089 struct Dropper<'a, T: 'static, V> {
1090 store: StoreContextMut<'a, T>,
1091 value: ManuallyDrop<V>,
1092 }
1093
1094 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1095 fn drop(&mut self) {
1096 tls::set(self.store.0, || {
1097 unsafe { ManuallyDrop::drop(&mut self.value) }
1102 });
1103 }
1104 }
1105
1106 let accessor = &Accessor::new(token);
1107 let dropper = &mut Dropper {
1108 store: self,
1109 value: ManuallyDrop::new(fun(accessor)),
1110 };
1111 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1113
1114 dropper
1115 .store
1116 .as_context_mut()
1117 .poll_until(future, trap_on_idle)
1118 .await
1119 }
1120
1121 async fn poll_until<R>(
1127 mut self,
1128 mut future: Pin<&mut impl Future<Output = R>>,
1129 trap_on_idle: bool,
1130 ) -> Result<R>
1131 where
1132 T: Send + 'static,
1133 {
1134 struct Reset<'a, T: 'static> {
1135 store: StoreContextMut<'a, T>,
1136 futures: Option<FuturesUnordered<HostTaskFuture>>,
1137 }
1138
1139 impl<'a, T> Drop for Reset<'a, T> {
1140 fn drop(&mut self) {
1141 if let Some(futures) = self.futures.take() {
1142 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1143 }
1144 }
1145 }
1146
1147 loop {
1148 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1152 let mut reset = Reset {
1153 store: self.as_context_mut(),
1154 futures,
1155 };
1156 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1157
1158 enum PollResult<R> {
1159 Complete(R),
1160 ProcessWork(Vec<WorkItem>),
1161 }
1162 let result = future::poll_fn(|cx| {
1163 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1166 return Poll::Ready(Ok(PollResult::Complete(value)));
1167 }
1168
1169 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1173 Poll::Ready(Some(output)) => {
1174 match output {
1175 Err(e) => return Poll::Ready(Err(e)),
1176 Ok(()) => {}
1177 }
1178 Poll::Ready(true)
1179 }
1180 Poll::Ready(None) => Poll::Ready(false),
1181 Poll::Pending => Poll::Pending,
1182 };
1183
1184 let state = reset.store.0.concurrent_state_mut();
1188 let ready = state.collect_work_items_to_run();
1189 if !ready.is_empty() {
1190 return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1191 }
1192
1193 return match next {
1197 Poll::Ready(true) => {
1198 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1204 }
1205 Poll::Ready(false) => {
1206 if let Poll::Ready(value) =
1210 tls::set(reset.store.0, || future.as_mut().poll(cx))
1211 {
1212 Poll::Ready(Ok(PollResult::Complete(value)))
1213 } else {
1214 if trap_on_idle {
1220 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1223 } else {
1224 Poll::Pending
1228 }
1229 }
1230 }
1231 Poll::Pending => Poll::Pending,
1236 };
1237 })
1238 .await;
1239
1240 drop(reset);
1244
1245 match result? {
1246 PollResult::Complete(value) => break Ok(value),
1249 PollResult::ProcessWork(ready) => {
1252 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1253 store: StoreContextMut<'a, T>,
1254 ready: I,
1255 }
1256
1257 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1258 fn drop(&mut self) {
1259 while let Some(item) = self.ready.next() {
1260 match item {
1261 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1262 WorkItem::PushFuture(future) => {
1263 tls::set(self.store.0, move || drop(future))
1264 }
1265 _ => {}
1266 }
1267 }
1268 }
1269 }
1270
1271 let mut dispose = Dispose {
1272 store: self.as_context_mut(),
1273 ready: ready.into_iter(),
1274 };
1275
1276 while let Some(item) = dispose.ready.next() {
1277 dispose
1278 .store
1279 .as_context_mut()
1280 .handle_work_item(item)
1281 .await?;
1282 }
1283 }
1284 }
1285 }
1286 }
1287
1288 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1290 where
1291 T: Send,
1292 {
1293 log::trace!("handle work item {item:?}");
1294 match item {
1295 WorkItem::PushFuture(future) => {
1296 self.0
1297 .concurrent_state_mut()
1298 .futures
1299 .get_mut()
1300 .as_mut()
1301 .unwrap()
1302 .push(future.into_inner());
1303 }
1304 WorkItem::ResumeFiber(fiber) => {
1305 self.0.resume_fiber(fiber).await?;
1306 }
1307 WorkItem::GuestCall(call) => {
1308 if call.is_ready(self.0)? {
1309 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1310 } else {
1311 let state = self.0.concurrent_state_mut();
1312 let task = state.get_mut(call.thread.task)?;
1313 if !task.starting_sent {
1314 task.starting_sent = true;
1315 if let GuestCallKind::StartImplicit(_) = &call.kind {
1316 Waitable::Guest(call.thread.task).set_event(
1317 state,
1318 Some(Event::Subtask {
1319 status: Status::Starting,
1320 }),
1321 )?;
1322 }
1323 }
1324
1325 let instance = state.get_mut(call.thread.task)?.instance;
1326 self.0
1327 .instance_state(instance)
1328 .concurrent_state()
1329 .pending
1330 .insert(call.thread, call.kind);
1331 }
1332 }
1333 WorkItem::WorkerFunction(fun) => {
1334 self.run_on_worker(WorkerItem::Function(fun)).await?;
1335 }
1336 }
1337
1338 Ok(())
1339 }
1340
1341 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1343 where
1344 T: Send,
1345 {
1346 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1347 fiber
1348 } else {
1349 fiber::make_fiber(self.0, move |store| {
1350 loop {
1351 match store.concurrent_state_mut().worker_item.take().unwrap() {
1352 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1353 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1354 }
1355
1356 store.suspend(SuspendReason::NeedWork)?;
1357 }
1358 })?
1359 };
1360
1361 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1362 assert!(worker_item.is_none());
1363 *worker_item = Some(item);
1364
1365 self.0.resume_fiber(worker).await
1366 }
1367
1368 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1373 where
1374 T: 'static,
1375 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1376 + Send
1377 + Sync
1378 + 'static,
1379 R: Send + Sync + 'static,
1380 {
1381 let token = StoreToken::new(self);
1382 async move {
1383 let mut accessor = Accessor::new(token);
1384 closure(&mut accessor).await
1385 }
1386 }
1387}
1388
1389impl StoreOpaque {
1390 pub(crate) fn enter_sync_call(
1397 &mut self,
1398 guest_caller: Option<RuntimeInstance>,
1399 callee_async: bool,
1400 callee: RuntimeInstance,
1401 ) -> Result<()> {
1402 log::trace!("enter sync call {callee:?}");
1403
1404 let state = self.concurrent_state_mut();
1405 let thread = state.guest_thread;
1406 let instance = if let Some(thread) = thread {
1407 Some(state.get_mut(thread.task)?.instance)
1408 } else {
1409 None
1410 };
1411 let task = GuestTask::new(
1412 state,
1413 Box::new(move |_, _| unreachable!()),
1414 LiftResult {
1415 lift: Box::new(move |_, _| unreachable!()),
1416 ty: TypeTupleIndex::reserved_value(),
1417 memory: None,
1418 string_encoding: StringEncoding::Utf8,
1419 },
1420 if let Some(caller) = guest_caller {
1421 assert_eq!(caller, instance.unwrap());
1422 Caller::Guest {
1423 thread: thread.unwrap(),
1424 }
1425 } else {
1426 Caller::Host {
1427 tx: None,
1428 exit_tx: Arc::new(oneshot::channel().0),
1429 host_future_present: false,
1430 caller: state.guest_thread,
1431 }
1432 },
1433 None,
1434 callee,
1435 callee_async,
1436 )?;
1437
1438 let guest_task = state.push(task)?;
1439 let new_thread = GuestThread::new_implicit(guest_task);
1440 let guest_thread = state.push(new_thread)?;
1441 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1442 guest_thread,
1443 self,
1444 callee.index,
1445 )?;
1446
1447 let state = self.concurrent_state_mut();
1448 state.get_mut(guest_task)?.threads.insert(guest_thread);
1449 if guest_caller.is_some() {
1450 let thread = state.guest_thread.unwrap();
1451 state.get_mut(thread.task)?.subtasks.insert(guest_task);
1452 }
1453
1454 self.set_thread(Some(QualifiedThreadId {
1455 task: guest_task,
1456 thread: guest_thread,
1457 }));
1458
1459 Ok(())
1460 }
1461
1462 pub(crate) fn exit_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1464 let thread = self.set_thread(None).unwrap();
1465 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1466 log::trace!("exit sync call {instance:?}");
1467 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1468 self,
1469 thread,
1470 instance.index,
1471 )?;
1472
1473 let state = self.concurrent_state_mut();
1474 let task = state.get_mut(thread.task)?;
1475 let caller = match &task.caller {
1476 &Caller::Guest { thread } => {
1477 assert!(guest_caller);
1478 Some(thread)
1479 }
1480 &Caller::Host { caller, .. } => {
1481 assert!(!guest_caller);
1482 caller
1483 }
1484 };
1485 self.set_thread(caller);
1486
1487 let state = self.concurrent_state_mut();
1488 let task = state.get_mut(thread.task)?;
1489 if task.ready_to_delete() {
1490 state.delete(thread.task)?.dispose(state, thread.task)?;
1491 }
1492
1493 Ok(())
1494 }
1495
1496 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1504 if !self.concurrency_support() {
1505 return !self.trapped();
1506 }
1507 let state = self.concurrent_state_mut();
1508 if let Some(caller) = state.guest_thread {
1509 instance != state.get_mut(caller.task).unwrap().instance
1510 && self.may_enter_from_caller(caller.task, instance)
1511 } else {
1512 !self.trapped()
1513 }
1514 }
1515
1516 fn may_enter_task(&mut self, task: TableId<GuestTask>) -> bool {
1519 let instance = self.concurrent_state_mut().get_mut(task).unwrap().instance;
1520 self.may_enter_from_caller(task, instance)
1521 }
1522
1523 fn may_enter_from_caller(
1526 &mut self,
1527 mut guest_task: TableId<GuestTask>,
1528 instance: RuntimeInstance,
1529 ) -> bool {
1530 !self.trapped() && {
1531 let state = self.concurrent_state_mut();
1532 let guest_instance = instance.instance;
1533 loop {
1534 let next_thread = match &state.get_mut(guest_task).unwrap().caller {
1540 Caller::Host { caller: None, .. } => break true,
1541 &Caller::Host {
1542 caller: Some(caller),
1543 ..
1544 } => {
1545 let instance = state.get_mut(caller.task).unwrap().instance;
1546 if instance.instance == guest_instance {
1547 break false;
1548 } else {
1549 caller
1550 }
1551 }
1552 &Caller::Guest { thread } => {
1553 if state.get_mut(thread.task).unwrap().instance.instance == guest_instance {
1554 break false;
1555 } else {
1556 thread
1557 }
1558 }
1559 };
1560 guest_task = next_thread.task;
1561 }
1562 }
1563 }
1564
1565 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1568 self.component_instance_mut(instance.instance)
1569 .instance_state(instance.index)
1570 }
1571
1572 fn set_thread(&mut self, thread: Option<QualifiedThreadId>) -> Option<QualifiedThreadId> {
1573 let state = self.concurrent_state_mut();
1578 let old_thread = state.guest_thread.take();
1579 if let Some(old_thread) = old_thread {
1580 let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1581 self.component_instance_mut(instance)
1582 .set_task_may_block(false)
1583 }
1584
1585 self.concurrent_state_mut().guest_thread = thread;
1586
1587 if thread.is_some() {
1590 self.set_task_may_block();
1591 }
1592
1593 old_thread
1594 }
1595
1596 fn set_task_may_block(&mut self) {
1599 let state = self.concurrent_state_mut();
1600 let guest_thread = state.guest_thread.unwrap();
1601 let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1602 let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1603 self.component_instance_mut(instance)
1604 .set_task_may_block(may_block)
1605 }
1606
1607 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1608 if !self.concurrency_support() {
1609 return Ok(());
1610 }
1611 let state = self.concurrent_state_mut();
1612 let task = state.guest_thread.unwrap().task;
1613 let instance = state.get_mut(task).unwrap().instance.instance;
1614 let task_may_block = self.component_instance(instance).get_task_may_block();
1615
1616 if task_may_block {
1617 Ok(())
1618 } else {
1619 Err(Trap::CannotBlockSyncTask.into())
1620 }
1621 }
1622
1623 fn enter_instance(&mut self, instance: RuntimeInstance) {
1627 log::trace!("enter {instance:?}");
1628 self.instance_state(instance)
1629 .concurrent_state()
1630 .do_not_enter = true;
1631 }
1632
1633 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1637 log::trace!("exit {instance:?}");
1638 self.instance_state(instance)
1639 .concurrent_state()
1640 .do_not_enter = false;
1641 self.partition_pending(instance)
1642 }
1643
1644 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1649 for (thread, kind) in
1650 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1651 {
1652 let call = GuestCall { thread, kind };
1653 if call.is_ready(self)? {
1654 self.concurrent_state_mut()
1655 .push_high_priority(WorkItem::GuestCall(call));
1656 } else {
1657 self.instance_state(instance)
1658 .concurrent_state()
1659 .pending
1660 .insert(call.thread, call.kind);
1661 }
1662 }
1663
1664 Ok(())
1665 }
1666
1667 pub(crate) fn backpressure_modify(
1669 &mut self,
1670 caller_instance: RuntimeInstance,
1671 modify: impl FnOnce(u16) -> Option<u16>,
1672 ) -> Result<()> {
1673 let state = self.instance_state(caller_instance).concurrent_state();
1674 let old = state.backpressure;
1675 let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1676 state.backpressure = new;
1677
1678 if old > 0 && new == 0 {
1679 self.partition_pending(caller_instance)?;
1682 }
1683
1684 Ok(())
1685 }
1686
1687 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1690 let old_thread = self.concurrent_state_mut().guest_thread;
1691 log::trace!("resume_fiber: save current thread {old_thread:?}");
1692
1693 let fiber = fiber::resolve_or_release(self, fiber).await?;
1694
1695 self.set_thread(old_thread);
1696
1697 let state = self.concurrent_state_mut();
1698
1699 if let Some(ref ot) = old_thread {
1700 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1701 }
1702 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1703
1704 if let Some(mut fiber) = fiber {
1705 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1706 match state.suspend_reason.take().unwrap() {
1708 SuspendReason::NeedWork => {
1709 if state.worker.is_none() {
1710 state.worker = Some(fiber);
1711 } else {
1712 fiber.dispose(self);
1713 }
1714 }
1715 SuspendReason::Yielding { thread, .. } => {
1716 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1717 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1718 }
1719 SuspendReason::ExplicitlySuspending { thread, .. } => {
1720 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1721 }
1722 SuspendReason::Waiting { set, thread, .. } => {
1723 let old = state
1724 .get_mut(set)?
1725 .waiting
1726 .insert(thread, WaitMode::Fiber(fiber));
1727 assert!(old.is_none());
1728 }
1729 };
1730 } else {
1731 log::trace!("resume_fiber: fiber has exited");
1732 }
1733
1734 Ok(())
1735 }
1736
1737 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1743 log::trace!("suspend fiber: {reason:?}");
1744
1745 let task = match &reason {
1749 SuspendReason::Yielding { thread, .. }
1750 | SuspendReason::Waiting { thread, .. }
1751 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1752 SuspendReason::NeedWork => None,
1753 };
1754
1755 let old_guest_thread = if let Some(task) = task {
1756 self.maybe_pop_call_context(task)?;
1757 self.concurrent_state_mut().guest_thread
1758 } else {
1759 None
1760 };
1761
1762 assert!(
1768 matches!(
1769 reason,
1770 SuspendReason::ExplicitlySuspending {
1771 skip_may_block_check: true,
1772 ..
1773 } | SuspendReason::Waiting {
1774 skip_may_block_check: true,
1775 ..
1776 } | SuspendReason::Yielding {
1777 skip_may_block_check: true,
1778 ..
1779 }
1780 ) || old_guest_thread
1781 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1782 .unwrap_or(true)
1783 );
1784
1785 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1786 assert!(suspend_reason.is_none());
1787 *suspend_reason = Some(reason);
1788
1789 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1790
1791 if let Some(task) = task {
1792 self.set_thread(old_guest_thread);
1793 self.maybe_push_call_context(task)?;
1794 }
1795
1796 Ok(())
1797 }
1798
1799 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1803 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1804
1805 if !task.returned_or_cancelled() {
1806 log::trace!("push call context for {guest_task:?}");
1807 let call_context = task.call_context.take().unwrap();
1808 self.component_resource_state().0.push(call_context);
1809 }
1810 Ok(())
1811 }
1812
1813 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1817 if !self
1818 .concurrent_state_mut()
1819 .get_mut(guest_task)?
1820 .returned_or_cancelled()
1821 {
1822 log::trace!("pop call context for {guest_task:?}");
1823 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1824 self.concurrent_state_mut()
1825 .get_mut(guest_task)?
1826 .call_context = call_context;
1827 }
1828 Ok(())
1829 }
1830
1831 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1832 let state = self.concurrent_state_mut();
1833 let caller = state.guest_thread.unwrap();
1834 let old_set = waitable.common(state)?.set;
1835 let set = state.get_mut(caller.task)?.sync_call_set;
1836 waitable.join(state, Some(set))?;
1837 self.suspend(SuspendReason::Waiting {
1838 set,
1839 thread: caller,
1840 skip_may_block_check: false,
1841 })?;
1842 let state = self.concurrent_state_mut();
1843 waitable.join(state, old_set)
1844 }
1845}
1846
1847impl Instance {
1848 fn get_event(
1851 self,
1852 store: &mut StoreOpaque,
1853 guest_task: TableId<GuestTask>,
1854 set: Option<TableId<WaitableSet>>,
1855 cancellable: bool,
1856 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1857 let state = store.concurrent_state_mut();
1858
1859 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1860 log::trace!("deliver event {event:?} to {guest_task:?}");
1861
1862 if cancellable || !matches!(event, Event::Cancelled) {
1863 return Ok(Some((event, None)));
1864 } else {
1865 state.get_mut(guest_task)?.event = Some(event);
1866 }
1867 }
1868
1869 Ok(
1870 if let Some((set, waitable)) = set
1871 .and_then(|set| {
1872 state
1873 .get_mut(set)
1874 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1875 .transpose()
1876 })
1877 .transpose()?
1878 {
1879 let common = waitable.common(state)?;
1880 let handle = common.handle.unwrap();
1881 let event = common.event.take().unwrap();
1882
1883 log::trace!(
1884 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1885 );
1886
1887 waitable.on_delivery(store, self, event);
1888
1889 Some((event, Some((waitable, handle))))
1890 } else {
1891 None
1892 },
1893 )
1894 }
1895
1896 fn handle_callback_code(
1902 self,
1903 store: &mut StoreOpaque,
1904 guest_thread: QualifiedThreadId,
1905 runtime_instance: RuntimeComponentInstanceIndex,
1906 code: u32,
1907 ) -> Result<Option<GuestCall>> {
1908 let (code, set) = unpack_callback_code(code);
1909
1910 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1911
1912 let state = store.concurrent_state_mut();
1913
1914 let get_set = |store: &mut StoreOpaque, handle| {
1915 if handle == 0 {
1916 bail!("invalid waitable-set handle");
1917 }
1918
1919 let set = store
1920 .instance_state(RuntimeInstance {
1921 instance: self.id().instance(),
1922 index: runtime_instance,
1923 })
1924 .handle_table()
1925 .waitable_set_rep(handle)?;
1926
1927 Ok(TableId::<WaitableSet>::new(set))
1928 };
1929
1930 Ok(match code {
1931 callback_code::EXIT => {
1932 log::trace!("implicit thread {guest_thread:?} completed");
1933 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1934 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1935 if task.threads.is_empty() && !task.returned_or_cancelled() {
1936 bail!(Trap::NoAsyncResult);
1937 }
1938 match &task.caller {
1939 Caller::Host { .. } => {
1940 if task.ready_to_delete() {
1941 Waitable::Guest(guest_thread.task)
1942 .delete_from(store.concurrent_state_mut())?;
1943 }
1944 }
1945 Caller::Guest { .. } => {
1946 task.exited = true;
1947 task.callback = None;
1948 }
1949 }
1950 None
1951 }
1952 callback_code::YIELD => {
1953 let task = state.get_mut(guest_thread.task)?;
1954 if let Some(event) = task.event {
1959 assert!(matches!(event, Event::None | Event::Cancelled));
1960 } else {
1961 task.event = Some(Event::None);
1962 }
1963 let call = GuestCall {
1964 thread: guest_thread,
1965 kind: GuestCallKind::DeliverEvent {
1966 instance: self,
1967 set: None,
1968 },
1969 };
1970 if state.may_block(guest_thread.task) {
1971 state.push_low_priority(WorkItem::GuestCall(call));
1974 None
1975 } else {
1976 Some(call)
1980 }
1981 }
1982 callback_code::WAIT => {
1983 state.check_blocking_for(guest_thread.task)?;
1986
1987 let set = get_set(store, set)?;
1988 let state = store.concurrent_state_mut();
1989
1990 if state.get_mut(guest_thread.task)?.event.is_some()
1991 || !state.get_mut(set)?.ready.is_empty()
1992 {
1993 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1995 thread: guest_thread,
1996 kind: GuestCallKind::DeliverEvent {
1997 instance: self,
1998 set: Some(set),
1999 },
2000 }));
2001 } else {
2002 let old = state
2010 .get_mut(guest_thread.thread)?
2011 .wake_on_cancel
2012 .replace(set);
2013 assert!(old.is_none());
2014 let old = state
2015 .get_mut(set)?
2016 .waiting
2017 .insert(guest_thread, WaitMode::Callback(self));
2018 assert!(old.is_none());
2019 }
2020 None
2021 }
2022 _ => bail!("unsupported callback code: {code}"),
2023 })
2024 }
2025
2026 fn cleanup_thread(
2027 self,
2028 store: &mut StoreOpaque,
2029 guest_thread: QualifiedThreadId,
2030 runtime_instance: RuntimeComponentInstanceIndex,
2031 ) -> Result<()> {
2032 let guest_id = store
2033 .concurrent_state_mut()
2034 .get_mut(guest_thread.thread)?
2035 .instance_rep;
2036 store
2037 .instance_state(RuntimeInstance {
2038 instance: self.id().instance(),
2039 index: runtime_instance,
2040 })
2041 .thread_handle_table()
2042 .guest_thread_remove(guest_id.unwrap())?;
2043
2044 store.concurrent_state_mut().delete(guest_thread.thread)?;
2045 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2046 task.threads.remove(&guest_thread.thread);
2047 Ok(())
2048 }
2049
2050 unsafe fn queue_call<T: 'static>(
2057 self,
2058 mut store: StoreContextMut<T>,
2059 guest_thread: QualifiedThreadId,
2060 callee: SendSyncPtr<VMFuncRef>,
2061 param_count: usize,
2062 result_count: usize,
2063 async_: bool,
2064 callback: Option<SendSyncPtr<VMFuncRef>>,
2065 post_return: Option<SendSyncPtr<VMFuncRef>>,
2066 ) -> Result<()> {
2067 unsafe fn make_call<T: 'static>(
2082 store: StoreContextMut<T>,
2083 guest_thread: QualifiedThreadId,
2084 callee: SendSyncPtr<VMFuncRef>,
2085 param_count: usize,
2086 result_count: usize,
2087 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2088 + Send
2089 + Sync
2090 + 'static
2091 + use<T> {
2092 let token = StoreToken::new(store);
2093 move |store: &mut dyn VMStore| {
2094 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2095
2096 store
2097 .concurrent_state_mut()
2098 .get_mut(guest_thread.thread)?
2099 .state = GuestThreadState::Running;
2100 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2101 let lower = task.lower_params.take().unwrap();
2102
2103 lower(store, &mut storage[..param_count])?;
2104
2105 let mut store = token.as_context_mut(store);
2106
2107 unsafe {
2110 crate::Func::call_unchecked_raw(
2111 &mut store,
2112 callee.as_non_null(),
2113 NonNull::new(
2114 &mut storage[..param_count.max(result_count)]
2115 as *mut [MaybeUninit<ValRaw>] as _,
2116 )
2117 .unwrap(),
2118 )?;
2119 }
2120
2121 Ok(storage)
2122 }
2123 }
2124
2125 let call = unsafe {
2129 make_call(
2130 store.as_context_mut(),
2131 guest_thread,
2132 callee,
2133 param_count,
2134 result_count,
2135 )
2136 };
2137
2138 let callee_instance = store
2139 .0
2140 .concurrent_state_mut()
2141 .get_mut(guest_thread.task)?
2142 .instance;
2143
2144 let fun = if callback.is_some() {
2145 assert!(async_);
2146
2147 Box::new(move |store: &mut dyn VMStore| {
2148 self.add_guest_thread_to_instance_table(
2149 guest_thread.thread,
2150 store,
2151 callee_instance.index,
2152 )?;
2153 let old_thread = store.set_thread(Some(guest_thread));
2154 log::trace!(
2155 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2156 );
2157
2158 store.maybe_push_call_context(guest_thread.task)?;
2159
2160 store.enter_instance(callee_instance);
2161
2162 let storage = call(store)?;
2169
2170 store.exit_instance(callee_instance)?;
2171
2172 store.maybe_pop_call_context(guest_thread.task)?;
2173
2174 store.set_thread(old_thread);
2175 let state = store.concurrent_state_mut();
2176 old_thread
2177 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2178 log::trace!("stackless call: restored {old_thread:?} as current thread");
2179
2180 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2183
2184 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2185 })
2186 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2187 } else {
2188 let token = StoreToken::new(store.as_context_mut());
2189 Box::new(move |store: &mut dyn VMStore| {
2190 self.add_guest_thread_to_instance_table(
2191 guest_thread.thread,
2192 store,
2193 callee_instance.index,
2194 )?;
2195 let old_thread = store.set_thread(Some(guest_thread));
2196 log::trace!(
2197 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2198 );
2199 let flags = self.id().get(store).instance_flags(callee_instance.index);
2200
2201 store.maybe_push_call_context(guest_thread.task)?;
2202
2203 if !async_ {
2207 store.enter_instance(callee_instance);
2208 }
2209
2210 let storage = call(store)?;
2217
2218 if async_ {
2219 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2220 if task.threads.len() == 1 && !task.returned_or_cancelled() {
2221 bail!(Trap::NoAsyncResult);
2222 }
2223 } else {
2224 let lift = {
2230 store.exit_instance(callee_instance)?;
2231
2232 let state = store.concurrent_state_mut();
2233 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2234
2235 state
2236 .get_mut(guest_thread.task)?
2237 .lift_result
2238 .take()
2239 .unwrap()
2240 };
2241
2242 let result = (lift.lift)(store, unsafe {
2245 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2246 &storage[..result_count],
2247 )
2248 })?;
2249
2250 let post_return_arg = match result_count {
2251 0 => ValRaw::i32(0),
2252 1 => unsafe { storage[0].assume_init() },
2255 _ => unreachable!(),
2256 };
2257
2258 unsafe {
2259 call_post_return(
2260 token.as_context_mut(store),
2261 post_return.map(|v| v.as_non_null()),
2262 post_return_arg,
2263 flags,
2264 )?;
2265 }
2266
2267 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2268 }
2269
2270 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2272
2273 store.set_thread(old_thread);
2274
2275 store.maybe_pop_call_context(guest_thread.task)?;
2276
2277 let state = store.concurrent_state_mut();
2278 let task = state.get_mut(guest_thread.task)?;
2279
2280 match &task.caller {
2281 Caller::Host { .. } => {
2282 if task.ready_to_delete() {
2283 Waitable::Guest(guest_thread.task).delete_from(state)?;
2284 }
2285 }
2286 Caller::Guest { .. } => {
2287 task.exited = true;
2288 }
2289 }
2290
2291 Ok(None)
2292 })
2293 };
2294
2295 store
2296 .0
2297 .concurrent_state_mut()
2298 .push_high_priority(WorkItem::GuestCall(GuestCall {
2299 thread: guest_thread,
2300 kind: GuestCallKind::StartImplicit(fun),
2301 }));
2302
2303 Ok(())
2304 }
2305
2306 unsafe fn prepare_call<T: 'static>(
2319 self,
2320 mut store: StoreContextMut<T>,
2321 start: *mut VMFuncRef,
2322 return_: *mut VMFuncRef,
2323 caller_instance: RuntimeComponentInstanceIndex,
2324 callee_instance: RuntimeComponentInstanceIndex,
2325 task_return_type: TypeTupleIndex,
2326 callee_async: bool,
2327 memory: *mut VMMemoryDefinition,
2328 string_encoding: u8,
2329 caller_info: CallerInfo,
2330 ) -> Result<()> {
2331 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2332 store.0.check_blocking()?;
2336 }
2337
2338 enum ResultInfo {
2339 Heap { results: u32 },
2340 Stack { result_count: u32 },
2341 }
2342
2343 let result_info = match &caller_info {
2344 CallerInfo::Async {
2345 has_result: true,
2346 params,
2347 } => ResultInfo::Heap {
2348 results: params.last().unwrap().get_u32(),
2349 },
2350 CallerInfo::Async {
2351 has_result: false, ..
2352 } => ResultInfo::Stack { result_count: 0 },
2353 CallerInfo::Sync {
2354 result_count,
2355 params,
2356 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2357 results: params.last().unwrap().get_u32(),
2358 },
2359 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2360 result_count: *result_count,
2361 },
2362 };
2363
2364 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2365
2366 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2370 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2371 let token = StoreToken::new(store.as_context_mut());
2372 let state = store.0.concurrent_state_mut();
2373 let old_thread = state.guest_thread.unwrap();
2374
2375 assert_eq!(
2376 state.get_mut(old_thread.task)?.instance,
2377 RuntimeInstance {
2378 instance: self.id().instance(),
2379 index: caller_instance,
2380 }
2381 );
2382
2383 let new_task = GuestTask::new(
2384 state,
2385 Box::new(move |store, dst| {
2386 let mut store = token.as_context_mut(store);
2387 assert!(dst.len() <= MAX_FLAT_PARAMS);
2388 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2390 let count = match caller_info {
2391 CallerInfo::Async { params, has_result } => {
2395 let params = ¶ms[..params.len() - usize::from(has_result)];
2396 for (param, src) in params.iter().zip(&mut src) {
2397 src.write(*param);
2398 }
2399 params.len()
2400 }
2401
2402 CallerInfo::Sync { params, .. } => {
2404 for (param, src) in params.iter().zip(&mut src) {
2405 src.write(*param);
2406 }
2407 params.len()
2408 }
2409 };
2410 unsafe {
2417 crate::Func::call_unchecked_raw(
2418 &mut store,
2419 start.as_non_null(),
2420 NonNull::new(
2421 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2422 )
2423 .unwrap(),
2424 )?;
2425 }
2426 dst.copy_from_slice(&src[..dst.len()]);
2427 let state = store.0.concurrent_state_mut();
2428 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2429 state,
2430 Some(Event::Subtask {
2431 status: Status::Started,
2432 }),
2433 )?;
2434 Ok(())
2435 }),
2436 LiftResult {
2437 lift: Box::new(move |store, src| {
2438 let mut store = token.as_context_mut(store);
2441 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2443 my_src.push(ValRaw::u32(*results));
2444 }
2445 unsafe {
2452 crate::Func::call_unchecked_raw(
2453 &mut store,
2454 return_.as_non_null(),
2455 my_src.as_mut_slice().into(),
2456 )?;
2457 }
2458 let state = store.0.concurrent_state_mut();
2459 let thread = state.guest_thread.unwrap();
2460 if sync_caller {
2461 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2462 if let ResultInfo::Stack { result_count } = &result_info {
2463 match result_count {
2464 0 => None,
2465 1 => Some(my_src[0]),
2466 _ => unreachable!(),
2467 }
2468 } else {
2469 None
2470 },
2471 );
2472 }
2473 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2474 }),
2475 ty: task_return_type,
2476 memory: NonNull::new(memory).map(SendSyncPtr::new),
2477 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2478 },
2479 Caller::Guest { thread: old_thread },
2480 None,
2481 RuntimeInstance {
2482 instance: self.id().instance(),
2483 index: callee_instance,
2484 },
2485 callee_async,
2486 )?;
2487
2488 let guest_task = state.push(new_task)?;
2489 let new_thread = GuestThread::new_implicit(guest_task);
2490 let guest_thread = state.push(new_thread)?;
2491 state.get_mut(guest_task)?.threads.insert(guest_thread);
2492
2493 store
2494 .0
2495 .concurrent_state_mut()
2496 .get_mut(old_thread.task)?
2497 .subtasks
2498 .insert(guest_task);
2499
2500 store.0.set_thread(Some(QualifiedThreadId {
2503 task: guest_task,
2504 thread: guest_thread,
2505 }));
2506 log::trace!(
2507 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2508 );
2509
2510 Ok(())
2511 }
2512
2513 unsafe fn call_callback<T>(
2518 self,
2519 mut store: StoreContextMut<T>,
2520 function: SendSyncPtr<VMFuncRef>,
2521 event: Event,
2522 handle: u32,
2523 ) -> Result<u32> {
2524 let (ordinal, result) = event.parts();
2525 let params = &mut [
2526 ValRaw::u32(ordinal),
2527 ValRaw::u32(handle),
2528 ValRaw::u32(result),
2529 ];
2530 unsafe {
2535 crate::Func::call_unchecked_raw(
2536 &mut store,
2537 function.as_non_null(),
2538 params.as_mut_slice().into(),
2539 )?;
2540 }
2541 Ok(params[0].get_u32())
2542 }
2543
2544 unsafe fn start_call<T: 'static>(
2557 self,
2558 mut store: StoreContextMut<T>,
2559 callback: *mut VMFuncRef,
2560 post_return: *mut VMFuncRef,
2561 callee: *mut VMFuncRef,
2562 param_count: u32,
2563 result_count: u32,
2564 flags: u32,
2565 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2566 ) -> Result<u32> {
2567 let token = StoreToken::new(store.as_context_mut());
2568 let async_caller = storage.is_none();
2569 let state = store.0.concurrent_state_mut();
2570 let guest_thread = state.guest_thread.unwrap();
2571 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2572 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2573 let param_count = usize::try_from(param_count).unwrap();
2574 assert!(param_count <= MAX_FLAT_PARAMS);
2575 let result_count = usize::try_from(result_count).unwrap();
2576 assert!(result_count <= MAX_FLAT_RESULTS);
2577
2578 let task = state.get_mut(guest_thread.task)?;
2579 if !callback.is_null() {
2580 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2584 task.callback = Some(Box::new(move |store, event, handle| {
2585 let store = token.as_context_mut(store);
2586 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2587 }));
2588 }
2589
2590 let Caller::Guest { thread: caller } = &task.caller else {
2591 unreachable!()
2594 };
2595 let caller = *caller;
2596 let caller_instance = state.get_mut(caller.task)?.instance;
2597
2598 unsafe {
2600 self.queue_call(
2601 store.as_context_mut(),
2602 guest_thread,
2603 callee,
2604 param_count,
2605 result_count,
2606 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2607 NonNull::new(callback).map(SendSyncPtr::new),
2608 NonNull::new(post_return).map(SendSyncPtr::new),
2609 )?;
2610 }
2611
2612 let state = store.0.concurrent_state_mut();
2613
2614 let guest_waitable = Waitable::Guest(guest_thread.task);
2617 let old_set = guest_waitable.common(state)?.set;
2618 let set = state.get_mut(caller.task)?.sync_call_set;
2619 guest_waitable.join(state, Some(set))?;
2620
2621 let (status, waitable) = loop {
2637 store.0.suspend(SuspendReason::Waiting {
2638 set,
2639 thread: caller,
2640 skip_may_block_check: async_caller || !callee_async,
2648 })?;
2649
2650 let state = store.0.concurrent_state_mut();
2651
2652 log::trace!("taking event for {:?}", guest_thread.task);
2653 let event = guest_waitable.take_event(state)?;
2654 let Some(Event::Subtask { status }) = event else {
2655 unreachable!();
2656 };
2657
2658 log::trace!("status {status:?} for {:?}", guest_thread.task);
2659
2660 if status == Status::Returned {
2661 break (status, None);
2663 } else if async_caller {
2664 let handle = store
2668 .0
2669 .instance_state(caller_instance)
2670 .handle_table()
2671 .subtask_insert_guest(guest_thread.task.rep())?;
2672 store
2673 .0
2674 .concurrent_state_mut()
2675 .get_mut(guest_thread.task)?
2676 .common
2677 .handle = Some(handle);
2678 break (status, Some(handle));
2679 } else {
2680 }
2684 };
2685
2686 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2687
2688 store.0.set_thread(Some(caller));
2690 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2691 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2692
2693 if let Some(storage) = storage {
2694 let state = store.0.concurrent_state_mut();
2698 let task = state.get_mut(guest_thread.task)?;
2699 if let Some(result) = task.sync_result.take() {
2700 if let Some(result) = result {
2701 storage[0] = MaybeUninit::new(result);
2702 }
2703
2704 if task.exited && task.ready_to_delete() {
2705 Waitable::Guest(guest_thread.task).delete_from(state)?;
2706 }
2707 }
2708 }
2709
2710 Ok(status.pack(waitable))
2711 }
2712
2713 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2725 self,
2726 mut store: StoreContextMut<'_, T>,
2727 future: impl Future<Output = Result<R>> + Send + 'static,
2728 caller_instance: RuntimeComponentInstanceIndex,
2729 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2730 ) -> Result<Option<u32>> {
2731 let token = StoreToken::new(store.as_context_mut());
2732 let state = store.0.concurrent_state_mut();
2733 let caller = state.guest_thread.unwrap();
2734
2735 let (join_handle, future) = JoinHandle::run(async move {
2738 let mut future = pin!(future);
2739 let mut call_context = None;
2740 future::poll_fn(move |cx| {
2741 tls::get(|store| {
2744 if let Some(call_context) = call_context.take() {
2745 token
2746 .as_context_mut(store)
2747 .0
2748 .component_resource_state()
2749 .0
2750 .push(call_context);
2751 }
2752 });
2753
2754 let result = future.as_mut().poll(cx);
2755
2756 if result.is_pending() {
2757 tls::get(|store| {
2760 call_context = Some(
2761 token
2762 .as_context_mut(store)
2763 .0
2764 .component_resource_state()
2765 .0
2766 .pop()
2767 .unwrap(),
2768 );
2769 });
2770 }
2771 result
2772 })
2773 .await
2774 });
2775
2776 let task = state.push(HostTask::new(
2780 RuntimeInstance {
2781 instance: self.id().instance(),
2782 index: caller_instance,
2783 },
2784 Some(join_handle),
2785 ))?;
2786
2787 log::trace!("new host task child of {caller:?}: {task:?}");
2788
2789 let mut future = Box::pin(future);
2790
2791 let poll = tls::set(store.0, || {
2796 future
2797 .as_mut()
2798 .poll(&mut Context::from_waker(&Waker::noop()))
2799 });
2800
2801 Ok(match poll {
2802 Poll::Ready(None) => unreachable!(),
2803 Poll::Ready(Some(result)) => {
2804 lower(store.as_context_mut(), result?)?;
2807 log::trace!("delete host task {task:?} (already ready)");
2808 store.0.concurrent_state_mut().delete(task)?;
2809 None
2810 }
2811 Poll::Pending => {
2812 let future =
2820 Box::pin(async move {
2821 let result = match future.await {
2822 Some(result) => result?,
2823 None => return Ok(()),
2825 };
2826 tls::get(move |store| {
2827 store.concurrent_state_mut().push_high_priority(
2833 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2834 lower(token.as_context_mut(store), result)?;
2835 let state = store.concurrent_state_mut();
2836 state.get_mut(task)?.join_handle.take();
2837 Waitable::Host(task).set_event(
2838 state,
2839 Some(Event::Subtask {
2840 status: Status::Returned,
2841 }),
2842 )
2843 }))),
2844 );
2845 Ok(())
2846 })
2847 });
2848
2849 store.0.concurrent_state_mut().push_future(future);
2850 let handle = store
2851 .0
2852 .instance_state(RuntimeInstance {
2853 instance: self.id().instance(),
2854 index: caller_instance,
2855 })
2856 .handle_table()
2857 .subtask_insert_host(task.rep())?;
2858 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2859 log::trace!(
2860 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2861 );
2862 Some(handle)
2863 }
2864 })
2865 }
2866
2867 pub(crate) fn task_return(
2870 self,
2871 store: &mut dyn VMStore,
2872 ty: TypeTupleIndex,
2873 options: OptionsIndex,
2874 storage: &[ValRaw],
2875 ) -> Result<()> {
2876 let state = store.concurrent_state_mut();
2877 let guest_thread = state.guest_thread.unwrap();
2878 let lift = state
2879 .get_mut(guest_thread.task)?
2880 .lift_result
2881 .take()
2882 .ok_or_else(|| {
2883 format_err!("`task.return` or `task.cancel` called more than once for current task")
2884 })?;
2885 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2886
2887 let CanonicalOptions {
2888 string_encoding,
2889 data_model,
2890 ..
2891 } = &self.id().get(store).component().env_component().options[options];
2892
2893 let invalid = ty != lift.ty
2894 || string_encoding != &lift.string_encoding
2895 || match data_model {
2896 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2897 Some(memory) => {
2898 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2899 let actual = self.id().get(store).runtime_memory(memory);
2900 expected != actual.as_ptr()
2901 }
2902 None => false,
2905 },
2906 CanonicalOptionsDataModel::Gc { .. } => true,
2908 };
2909
2910 if invalid {
2911 bail!("invalid `task.return` signature and/or options for current task");
2912 }
2913
2914 log::trace!("task.return for {guest_thread:?}");
2915
2916 let result = (lift.lift)(store, storage)?;
2917 self.task_complete(store, guest_thread.task, result, Status::Returned)
2918 }
2919
2920 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2922 let state = store.concurrent_state_mut();
2923 let guest_thread = state.guest_thread.unwrap();
2924 let task = state.get_mut(guest_thread.task)?;
2925 if !task.cancel_sent {
2926 bail!("`task.cancel` called by task which has not been cancelled")
2927 }
2928 _ = task.lift_result.take().ok_or_else(|| {
2929 format_err!("`task.return` or `task.cancel` called more than once for current task")
2930 })?;
2931
2932 assert!(task.result.is_none());
2933
2934 log::trace!("task.cancel for {guest_thread:?}");
2935
2936 self.task_complete(
2937 store,
2938 guest_thread.task,
2939 Box::new(DummyResult),
2940 Status::ReturnCancelled,
2941 )
2942 }
2943
2944 fn task_complete(
2950 self,
2951 store: &mut StoreOpaque,
2952 guest_task: TableId<GuestTask>,
2953 result: Box<dyn Any + Send + Sync>,
2954 status: Status,
2955 ) -> Result<()> {
2956 let (calls, host_table, _, instance) = store.component_resource_state_with_instance(self);
2957 ResourceTables {
2958 calls,
2959 host_table: Some(host_table),
2960 guest: Some(instance.instance_states()),
2961 }
2962 .exit_call()?;
2963
2964 let state = store.concurrent_state_mut();
2965 let task = state.get_mut(guest_task)?;
2966
2967 if let Caller::Host { tx, .. } = &mut task.caller {
2968 if let Some(tx) = tx.take() {
2969 _ = tx.send(result);
2970 }
2971 } else {
2972 task.result = Some(result);
2973 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2974 }
2975
2976 Ok(())
2977 }
2978
2979 pub(crate) fn waitable_set_new(
2981 self,
2982 store: &mut StoreOpaque,
2983 caller_instance: RuntimeComponentInstanceIndex,
2984 ) -> Result<u32> {
2985 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2986 let handle = store
2987 .instance_state(RuntimeInstance {
2988 instance: self.id().instance(),
2989 index: caller_instance,
2990 })
2991 .handle_table()
2992 .waitable_set_insert(set.rep())?;
2993 log::trace!("new waitable set {set:?} (handle {handle})");
2994 Ok(handle)
2995 }
2996
2997 pub(crate) fn waitable_set_drop(
2999 self,
3000 store: &mut StoreOpaque,
3001 caller_instance: RuntimeComponentInstanceIndex,
3002 set: u32,
3003 ) -> Result<()> {
3004 let rep = store
3005 .instance_state(RuntimeInstance {
3006 instance: self.id().instance(),
3007 index: caller_instance,
3008 })
3009 .handle_table()
3010 .waitable_set_remove(set)?;
3011
3012 log::trace!("drop waitable set {rep} (handle {set})");
3013
3014 let set = store
3015 .concurrent_state_mut()
3016 .delete(TableId::<WaitableSet>::new(rep))?;
3017
3018 if !set.waiting.is_empty() {
3019 bail!("cannot drop waitable set with waiters");
3020 }
3021
3022 Ok(())
3023 }
3024
3025 pub(crate) fn waitable_join(
3027 self,
3028 store: &mut StoreOpaque,
3029 caller_instance: RuntimeComponentInstanceIndex,
3030 waitable_handle: u32,
3031 set_handle: u32,
3032 ) -> Result<()> {
3033 let mut instance = self.id().get_mut(store);
3034 let waitable =
3035 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3036
3037 let set = if set_handle == 0 {
3038 None
3039 } else {
3040 let set = instance.instance_states().0[caller_instance]
3041 .handle_table()
3042 .waitable_set_rep(set_handle)?;
3043
3044 Some(TableId::<WaitableSet>::new(set))
3045 };
3046
3047 log::trace!(
3048 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3049 );
3050
3051 waitable.join(store.concurrent_state_mut(), set)
3052 }
3053
3054 pub(crate) fn subtask_drop(
3056 self,
3057 store: &mut StoreOpaque,
3058 caller_instance: RuntimeComponentInstanceIndex,
3059 task_id: u32,
3060 ) -> Result<()> {
3061 self.waitable_join(store, caller_instance, task_id, 0)?;
3062
3063 let (rep, is_host) = store
3064 .instance_state(RuntimeInstance {
3065 instance: self.id().instance(),
3066 index: caller_instance,
3067 })
3068 .handle_table()
3069 .subtask_remove(task_id)?;
3070
3071 let concurrent_state = store.concurrent_state_mut();
3072 let (waitable, expected_caller_instance, delete) = if is_host {
3073 let id = TableId::<HostTask>::new(rep);
3074 let task = concurrent_state.get_mut(id)?;
3075 if task.join_handle.is_some() {
3076 bail!("cannot drop a subtask which has not yet resolved");
3077 }
3078 (Waitable::Host(id), task.caller_instance, true)
3079 } else {
3080 let id = TableId::<GuestTask>::new(rep);
3081 let task = concurrent_state.get_mut(id)?;
3082 if task.lift_result.is_some() {
3083 bail!("cannot drop a subtask which has not yet resolved");
3084 }
3085 if let &Caller::Guest { thread } = &task.caller {
3086 (
3087 Waitable::Guest(id),
3088 concurrent_state.get_mut(thread.task)?.instance,
3089 concurrent_state.get_mut(id)?.exited,
3090 )
3091 } else {
3092 unreachable!()
3093 }
3094 };
3095
3096 waitable.common(concurrent_state)?.handle = None;
3097
3098 if waitable.take_event(concurrent_state)?.is_some() {
3099 bail!("cannot drop a subtask with an undelivered event");
3100 }
3101
3102 if delete {
3103 waitable.delete_from(concurrent_state)?;
3104 }
3105
3106 assert_eq!(
3110 expected_caller_instance,
3111 RuntimeInstance {
3112 instance: self.id().instance(),
3113 index: caller_instance
3114 }
3115 );
3116 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3117 Ok(())
3118 }
3119
3120 pub(crate) fn waitable_set_wait(
3122 self,
3123 store: &mut StoreOpaque,
3124 options: OptionsIndex,
3125 set: u32,
3126 payload: u32,
3127 ) -> Result<u32> {
3128 if !self.options(store, options).async_ {
3129 store.check_blocking()?;
3133 }
3134
3135 let &CanonicalOptions {
3136 cancellable,
3137 instance: caller_instance,
3138 ..
3139 } = &self.id().get(store).component().env_component().options[options];
3140 let rep = store
3141 .instance_state(RuntimeInstance {
3142 instance: self.id().instance(),
3143 index: caller_instance,
3144 })
3145 .handle_table()
3146 .waitable_set_rep(set)?;
3147
3148 self.waitable_check(
3149 store,
3150 cancellable,
3151 WaitableCheck::Wait,
3152 WaitableCheckParams {
3153 set: TableId::new(rep),
3154 options,
3155 payload,
3156 },
3157 )
3158 }
3159
3160 pub(crate) fn waitable_set_poll(
3162 self,
3163 store: &mut StoreOpaque,
3164 options: OptionsIndex,
3165 set: u32,
3166 payload: u32,
3167 ) -> Result<u32> {
3168 let &CanonicalOptions {
3169 cancellable,
3170 instance: caller_instance,
3171 ..
3172 } = &self.id().get(store).component().env_component().options[options];
3173 let rep = store
3174 .instance_state(RuntimeInstance {
3175 instance: self.id().instance(),
3176 index: caller_instance,
3177 })
3178 .handle_table()
3179 .waitable_set_rep(set)?;
3180
3181 self.waitable_check(
3182 store,
3183 cancellable,
3184 WaitableCheck::Poll,
3185 WaitableCheckParams {
3186 set: TableId::new(rep),
3187 options,
3188 payload,
3189 },
3190 )
3191 }
3192
3193 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3195 let thread_id = store.concurrent_state_mut().guest_thread.unwrap().thread;
3196 Ok(store
3198 .concurrent_state_mut()
3199 .get_mut(thread_id)?
3200 .instance_rep
3201 .unwrap())
3202 }
3203
3204 pub(crate) fn thread_new_indirect<T: 'static>(
3206 self,
3207 mut store: StoreContextMut<T>,
3208 runtime_instance: RuntimeComponentInstanceIndex,
3209 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3211 start_func_idx: u32,
3212 context: i32,
3213 ) -> Result<u32> {
3214 log::trace!("creating new thread");
3215
3216 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3217 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3218 let callee = instance
3219 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3220 .ok_or_else(|| {
3221 format_err!("the start function index points to an uninitialized function")
3222 })?;
3223 if callee.type_index(store.0) != start_func_ty.type_index() {
3224 bail!(
3225 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3226 );
3227 }
3228
3229 let token = StoreToken::new(store.as_context_mut());
3230 let start_func = Box::new(
3231 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3232 let old_thread = store.set_thread(Some(guest_thread));
3233 log::trace!(
3234 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3235 );
3236
3237 store.maybe_push_call_context(guest_thread.task)?;
3238
3239 let mut store = token.as_context_mut(store);
3240 let mut params = [ValRaw::i32(context)];
3241 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3244
3245 store.0.maybe_pop_call_context(guest_thread.task)?;
3246
3247 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3248 log::trace!("explicit thread {guest_thread:?} completed");
3249 let state = store.0.concurrent_state_mut();
3250 let task = state.get_mut(guest_thread.task)?;
3251 if task.threads.is_empty() && !task.returned_or_cancelled() {
3252 bail!(Trap::NoAsyncResult);
3253 }
3254 store.0.set_thread(old_thread);
3255 let state = store.0.concurrent_state_mut();
3256 old_thread
3257 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3258 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3259 Waitable::Guest(guest_thread.task).delete_from(state)?;
3260 }
3261 log::trace!("thread start: restored {old_thread:?} as current thread");
3262
3263 Ok(())
3264 },
3265 );
3266
3267 let state = store.0.concurrent_state_mut();
3268 let current_thread = state.guest_thread.unwrap();
3269 let parent_task = current_thread.task;
3270
3271 let new_thread = GuestThread::new_explicit(parent_task, start_func);
3272 let thread_id = state.push(new_thread)?;
3273 state.get_mut(parent_task)?.threads.insert(thread_id);
3274
3275 log::trace!("new thread with id {thread_id:?} created");
3276
3277 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3278 }
3279
3280 pub(crate) fn resume_suspended_thread(
3281 self,
3282 store: &mut StoreOpaque,
3283 runtime_instance: RuntimeComponentInstanceIndex,
3284 thread_idx: u32,
3285 high_priority: bool,
3286 ) -> Result<()> {
3287 let thread_id =
3288 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3289 let state = store.concurrent_state_mut();
3290 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3291 let thread = state.get_mut(guest_thread.thread)?;
3292
3293 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3294 GuestThreadState::NotStartedExplicit(start_func) => {
3295 log::trace!("starting thread {guest_thread:?}");
3296 let guest_call = WorkItem::GuestCall(GuestCall {
3297 thread: guest_thread,
3298 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3299 start_func(store, guest_thread)
3300 })),
3301 });
3302 store
3303 .concurrent_state_mut()
3304 .push_work_item(guest_call, high_priority);
3305 }
3306 GuestThreadState::Suspended(fiber) => {
3307 log::trace!("resuming thread {thread_id:?} that was suspended");
3308 store
3309 .concurrent_state_mut()
3310 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3311 }
3312 _ => {
3313 bail!("cannot resume thread which is not suspended");
3314 }
3315 }
3316 Ok(())
3317 }
3318
3319 fn add_guest_thread_to_instance_table(
3320 self,
3321 thread_id: TableId<GuestThread>,
3322 store: &mut StoreOpaque,
3323 runtime_instance: RuntimeComponentInstanceIndex,
3324 ) -> Result<u32> {
3325 let guest_id = store
3326 .instance_state(RuntimeInstance {
3327 instance: self.id().instance(),
3328 index: runtime_instance,
3329 })
3330 .thread_handle_table()
3331 .guest_thread_insert(thread_id.rep())?;
3332 store
3333 .concurrent_state_mut()
3334 .get_mut(thread_id)?
3335 .instance_rep = Some(guest_id);
3336 Ok(guest_id)
3337 }
3338
3339 pub(crate) fn suspension_intrinsic(
3342 self,
3343 store: &mut StoreOpaque,
3344 caller: RuntimeComponentInstanceIndex,
3345 cancellable: bool,
3346 yielding: bool,
3347 to_thread: Option<u32>,
3348 ) -> Result<WaitResult> {
3349 if to_thread.is_none() {
3350 let state = store.concurrent_state_mut();
3351 if yielding {
3352 if !state.may_block(state.guest_thread.unwrap().task) {
3354 return Ok(WaitResult::Completed);
3358 }
3359 } else {
3360 store.check_blocking()?;
3364 }
3365 }
3366
3367 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3369 return Ok(WaitResult::Cancelled);
3370 }
3371
3372 if let Some(thread) = to_thread {
3373 self.resume_suspended_thread(store, caller, thread, true)?;
3374 }
3375
3376 let state = store.concurrent_state_mut();
3377 let guest_thread = state.guest_thread.unwrap();
3378 let reason = if yielding {
3379 SuspendReason::Yielding {
3380 thread: guest_thread,
3381 skip_may_block_check: to_thread.is_some(),
3385 }
3386 } else {
3387 SuspendReason::ExplicitlySuspending {
3388 thread: guest_thread,
3389 skip_may_block_check: to_thread.is_some(),
3393 }
3394 };
3395
3396 store.suspend(reason)?;
3397
3398 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3399 Ok(WaitResult::Cancelled)
3400 } else {
3401 Ok(WaitResult::Completed)
3402 }
3403 }
3404
3405 fn waitable_check(
3407 self,
3408 store: &mut StoreOpaque,
3409 cancellable: bool,
3410 check: WaitableCheck,
3411 params: WaitableCheckParams,
3412 ) -> Result<u32> {
3413 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3414
3415 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3416
3417 let state = store.concurrent_state_mut();
3418 let task = state.get_mut(guest_thread.task)?;
3419
3420 match &check {
3423 WaitableCheck::Wait => {
3424 let set = params.set;
3425
3426 if (task.event.is_none()
3427 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3428 && state.get_mut(set)?.ready.is_empty()
3429 {
3430 if cancellable {
3431 let old = state
3432 .get_mut(guest_thread.thread)?
3433 .wake_on_cancel
3434 .replace(set);
3435 assert!(old.is_none());
3436 }
3437
3438 store.suspend(SuspendReason::Waiting {
3439 set,
3440 thread: guest_thread,
3441 skip_may_block_check: false,
3442 })?;
3443 }
3444 }
3445 WaitableCheck::Poll => {}
3446 }
3447
3448 log::trace!(
3449 "waitable check for {guest_thread:?}; set {:?}, part two",
3450 params.set
3451 );
3452
3453 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3455
3456 let (ordinal, handle, result) = match &check {
3457 WaitableCheck::Wait => {
3458 let (event, waitable) = event.unwrap();
3459 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3460 let (ordinal, result) = event.parts();
3461 (ordinal, handle, result)
3462 }
3463 WaitableCheck::Poll => {
3464 if let Some((event, waitable)) = event {
3465 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3466 let (ordinal, result) = event.parts();
3467 (ordinal, handle, result)
3468 } else {
3469 log::trace!(
3470 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3471 guest_thread.task,
3472 params.set
3473 );
3474 let (ordinal, result) = Event::None.parts();
3475 (ordinal, 0, result)
3476 }
3477 }
3478 };
3479 let memory = self.options_memory_mut(store, params.options);
3480 let ptr = func::validate_inbounds_dynamic(
3481 &CanonicalAbiInfo::POINTER_PAIR,
3482 memory,
3483 &ValRaw::u32(params.payload),
3484 )?;
3485 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3486 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3487 Ok(ordinal)
3488 }
3489
3490 pub(crate) fn subtask_cancel(
3492 self,
3493 store: &mut StoreOpaque,
3494 caller_instance: RuntimeComponentInstanceIndex,
3495 async_: bool,
3496 task_id: u32,
3497 ) -> Result<u32> {
3498 if !async_ {
3499 store.check_blocking()?;
3503 }
3504
3505 let (rep, is_host) = store
3506 .instance_state(RuntimeInstance {
3507 instance: self.id().instance(),
3508 index: caller_instance,
3509 })
3510 .handle_table()
3511 .subtask_rep(task_id)?;
3512 let (waitable, expected_caller_instance) = if is_host {
3513 let id = TableId::<HostTask>::new(rep);
3514 (
3515 Waitable::Host(id),
3516 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3517 )
3518 } else {
3519 let id = TableId::<GuestTask>::new(rep);
3520 if let &Caller::Guest { thread } = &store.concurrent_state_mut().get_mut(id)?.caller {
3521 (
3522 Waitable::Guest(id),
3523 store.concurrent_state_mut().get_mut(thread.task)?.instance,
3524 )
3525 } else {
3526 unreachable!()
3527 }
3528 };
3529 assert_eq!(
3533 expected_caller_instance,
3534 RuntimeInstance {
3535 instance: self.id().instance(),
3536 index: caller_instance
3537 }
3538 );
3539
3540 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3541
3542 let concurrent_state = store.concurrent_state_mut();
3543 if let Waitable::Host(host_task) = waitable {
3544 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3545 handle.abort();
3546 return Ok(Status::ReturnCancelled as u32);
3547 }
3548 } else {
3549 let caller = concurrent_state.guest_thread.unwrap();
3550 let guest_task = TableId::<GuestTask>::new(rep);
3551 let task = concurrent_state.get_mut(guest_task)?;
3552 if !task.already_lowered_parameters() {
3553 task.lower_params = None;
3557 task.lift_result = None;
3558 task.exited = true;
3559
3560 let instance = task.instance;
3561
3562 assert_eq!(1, task.threads.len());
3563 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3564 let concurrent_state = store.concurrent_state_mut();
3565 concurrent_state.delete(thread)?;
3566 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3567
3568 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3570 let pending_count = pending.len();
3571 pending.retain(|thread, _| thread.task != guest_task);
3572 if pending.len() == pending_count {
3574 bail!("`subtask.cancel` called after terminal status delivered");
3575 }
3576 return Ok(Status::StartCancelled as u32);
3577 } else if !task.returned_or_cancelled() {
3578 task.cancel_sent = true;
3581 task.event = Some(Event::Cancelled);
3586 for thread in task.threads.clone() {
3587 let thread = QualifiedThreadId {
3588 task: guest_task,
3589 thread,
3590 };
3591 if let Some(set) = concurrent_state
3592 .get_mut(thread.thread)
3593 .unwrap()
3594 .wake_on_cancel
3595 .take()
3596 {
3597 let item = match concurrent_state
3598 .get_mut(set)?
3599 .waiting
3600 .remove(&thread)
3601 .unwrap()
3602 {
3603 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3604 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3605 thread,
3606 kind: GuestCallKind::DeliverEvent {
3607 instance,
3608 set: None,
3609 },
3610 }),
3611 };
3612 concurrent_state.push_high_priority(item);
3613
3614 store.suspend(SuspendReason::Yielding {
3615 thread: caller,
3616 skip_may_block_check: false,
3619 })?;
3620 break;
3621 }
3622 }
3623
3624 let concurrent_state = store.concurrent_state_mut();
3625 let task = concurrent_state.get_mut(guest_task)?;
3626 if !task.returned_or_cancelled() {
3627 if async_ {
3628 return Ok(BLOCKED);
3629 } else {
3630 store.wait_for_event(Waitable::Guest(guest_task))?;
3631 }
3632 }
3633 }
3634 }
3635
3636 let event = waitable.take_event(store.concurrent_state_mut())?;
3637 if let Some(Event::Subtask {
3638 status: status @ (Status::Returned | Status::ReturnCancelled),
3639 }) = event
3640 {
3641 Ok(status as u32)
3642 } else {
3643 bail!("`subtask.cancel` called after terminal status delivered");
3644 }
3645 }
3646
3647 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3648 store.concurrent_state_mut().context_get(slot)
3649 }
3650
3651 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3652 store.concurrent_state_mut().context_set(slot, value)
3653 }
3654}
3655
3656pub trait VMComponentAsyncStore {
3664 unsafe fn prepare_call(
3670 &mut self,
3671 instance: Instance,
3672 memory: *mut VMMemoryDefinition,
3673 start: *mut VMFuncRef,
3674 return_: *mut VMFuncRef,
3675 caller_instance: RuntimeComponentInstanceIndex,
3676 callee_instance: RuntimeComponentInstanceIndex,
3677 task_return_type: TypeTupleIndex,
3678 callee_async: bool,
3679 string_encoding: u8,
3680 result_count: u32,
3681 storage: *mut ValRaw,
3682 storage_len: usize,
3683 ) -> Result<()>;
3684
3685 unsafe fn sync_start(
3688 &mut self,
3689 instance: Instance,
3690 callback: *mut VMFuncRef,
3691 callee: *mut VMFuncRef,
3692 param_count: u32,
3693 storage: *mut MaybeUninit<ValRaw>,
3694 storage_len: usize,
3695 ) -> Result<()>;
3696
3697 unsafe fn async_start(
3700 &mut self,
3701 instance: Instance,
3702 callback: *mut VMFuncRef,
3703 post_return: *mut VMFuncRef,
3704 callee: *mut VMFuncRef,
3705 param_count: u32,
3706 result_count: u32,
3707 flags: u32,
3708 ) -> Result<u32>;
3709
3710 fn future_write(
3712 &mut self,
3713 instance: Instance,
3714 caller: RuntimeComponentInstanceIndex,
3715 ty: TypeFutureTableIndex,
3716 options: OptionsIndex,
3717 future: u32,
3718 address: u32,
3719 ) -> Result<u32>;
3720
3721 fn future_read(
3723 &mut self,
3724 instance: Instance,
3725 caller: RuntimeComponentInstanceIndex,
3726 ty: TypeFutureTableIndex,
3727 options: OptionsIndex,
3728 future: u32,
3729 address: u32,
3730 ) -> Result<u32>;
3731
3732 fn future_drop_writable(
3734 &mut self,
3735 instance: Instance,
3736 ty: TypeFutureTableIndex,
3737 writer: u32,
3738 ) -> Result<()>;
3739
3740 fn stream_write(
3742 &mut self,
3743 instance: Instance,
3744 caller: RuntimeComponentInstanceIndex,
3745 ty: TypeStreamTableIndex,
3746 options: OptionsIndex,
3747 stream: u32,
3748 address: u32,
3749 count: u32,
3750 ) -> Result<u32>;
3751
3752 fn stream_read(
3754 &mut self,
3755 instance: Instance,
3756 caller: RuntimeComponentInstanceIndex,
3757 ty: TypeStreamTableIndex,
3758 options: OptionsIndex,
3759 stream: u32,
3760 address: u32,
3761 count: u32,
3762 ) -> Result<u32>;
3763
3764 fn flat_stream_write(
3767 &mut self,
3768 instance: Instance,
3769 caller: RuntimeComponentInstanceIndex,
3770 ty: TypeStreamTableIndex,
3771 options: OptionsIndex,
3772 payload_size: u32,
3773 payload_align: u32,
3774 stream: u32,
3775 address: u32,
3776 count: u32,
3777 ) -> Result<u32>;
3778
3779 fn flat_stream_read(
3782 &mut self,
3783 instance: Instance,
3784 caller: RuntimeComponentInstanceIndex,
3785 ty: TypeStreamTableIndex,
3786 options: OptionsIndex,
3787 payload_size: u32,
3788 payload_align: u32,
3789 stream: u32,
3790 address: u32,
3791 count: u32,
3792 ) -> Result<u32>;
3793
3794 fn stream_drop_writable(
3796 &mut self,
3797 instance: Instance,
3798 ty: TypeStreamTableIndex,
3799 writer: u32,
3800 ) -> Result<()>;
3801
3802 fn error_context_debug_message(
3804 &mut self,
3805 instance: Instance,
3806 ty: TypeComponentLocalErrorContextTableIndex,
3807 options: OptionsIndex,
3808 err_ctx_handle: u32,
3809 debug_msg_address: u32,
3810 ) -> Result<()>;
3811
3812 fn thread_new_indirect(
3814 &mut self,
3815 instance: Instance,
3816 caller: RuntimeComponentInstanceIndex,
3817 func_ty_idx: TypeFuncIndex,
3818 start_func_table_idx: RuntimeTableIndex,
3819 start_func_idx: u32,
3820 context: i32,
3821 ) -> Result<u32>;
3822}
3823
3824impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3826 unsafe fn prepare_call(
3827 &mut self,
3828 instance: Instance,
3829 memory: *mut VMMemoryDefinition,
3830 start: *mut VMFuncRef,
3831 return_: *mut VMFuncRef,
3832 caller_instance: RuntimeComponentInstanceIndex,
3833 callee_instance: RuntimeComponentInstanceIndex,
3834 task_return_type: TypeTupleIndex,
3835 callee_async: bool,
3836 string_encoding: u8,
3837 result_count_or_max_if_async: u32,
3838 storage: *mut ValRaw,
3839 storage_len: usize,
3840 ) -> Result<()> {
3841 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3845
3846 unsafe {
3847 instance.prepare_call(
3848 StoreContextMut(self),
3849 start,
3850 return_,
3851 caller_instance,
3852 callee_instance,
3853 task_return_type,
3854 callee_async,
3855 memory,
3856 string_encoding,
3857 match result_count_or_max_if_async {
3858 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3859 params,
3860 has_result: false,
3861 },
3862 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3863 params,
3864 has_result: true,
3865 },
3866 result_count => CallerInfo::Sync {
3867 params,
3868 result_count,
3869 },
3870 },
3871 )
3872 }
3873 }
3874
3875 unsafe fn sync_start(
3876 &mut self,
3877 instance: Instance,
3878 callback: *mut VMFuncRef,
3879 callee: *mut VMFuncRef,
3880 param_count: u32,
3881 storage: *mut MaybeUninit<ValRaw>,
3882 storage_len: usize,
3883 ) -> Result<()> {
3884 unsafe {
3885 instance
3886 .start_call(
3887 StoreContextMut(self),
3888 callback,
3889 ptr::null_mut(),
3890 callee,
3891 param_count,
3892 1,
3893 START_FLAG_ASYNC_CALLEE,
3894 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3898 )
3899 .map(drop)
3900 }
3901 }
3902
3903 unsafe fn async_start(
3904 &mut self,
3905 instance: Instance,
3906 callback: *mut VMFuncRef,
3907 post_return: *mut VMFuncRef,
3908 callee: *mut VMFuncRef,
3909 param_count: u32,
3910 result_count: u32,
3911 flags: u32,
3912 ) -> Result<u32> {
3913 unsafe {
3914 instance.start_call(
3915 StoreContextMut(self),
3916 callback,
3917 post_return,
3918 callee,
3919 param_count,
3920 result_count,
3921 flags,
3922 None,
3923 )
3924 }
3925 }
3926
3927 fn future_write(
3928 &mut self,
3929 instance: Instance,
3930 caller: RuntimeComponentInstanceIndex,
3931 ty: TypeFutureTableIndex,
3932 options: OptionsIndex,
3933 future: u32,
3934 address: u32,
3935 ) -> Result<u32> {
3936 instance
3937 .guest_write(
3938 StoreContextMut(self),
3939 caller,
3940 TransmitIndex::Future(ty),
3941 options,
3942 None,
3943 future,
3944 address,
3945 1,
3946 )
3947 .map(|result| result.encode())
3948 }
3949
3950 fn future_read(
3951 &mut self,
3952 instance: Instance,
3953 caller: RuntimeComponentInstanceIndex,
3954 ty: TypeFutureTableIndex,
3955 options: OptionsIndex,
3956 future: u32,
3957 address: u32,
3958 ) -> Result<u32> {
3959 instance
3960 .guest_read(
3961 StoreContextMut(self),
3962 caller,
3963 TransmitIndex::Future(ty),
3964 options,
3965 None,
3966 future,
3967 address,
3968 1,
3969 )
3970 .map(|result| result.encode())
3971 }
3972
3973 fn stream_write(
3974 &mut self,
3975 instance: Instance,
3976 caller: RuntimeComponentInstanceIndex,
3977 ty: TypeStreamTableIndex,
3978 options: OptionsIndex,
3979 stream: u32,
3980 address: u32,
3981 count: u32,
3982 ) -> Result<u32> {
3983 instance
3984 .guest_write(
3985 StoreContextMut(self),
3986 caller,
3987 TransmitIndex::Stream(ty),
3988 options,
3989 None,
3990 stream,
3991 address,
3992 count,
3993 )
3994 .map(|result| result.encode())
3995 }
3996
3997 fn stream_read(
3998 &mut self,
3999 instance: Instance,
4000 caller: RuntimeComponentInstanceIndex,
4001 ty: TypeStreamTableIndex,
4002 options: OptionsIndex,
4003 stream: u32,
4004 address: u32,
4005 count: u32,
4006 ) -> Result<u32> {
4007 instance
4008 .guest_read(
4009 StoreContextMut(self),
4010 caller,
4011 TransmitIndex::Stream(ty),
4012 options,
4013 None,
4014 stream,
4015 address,
4016 count,
4017 )
4018 .map(|result| result.encode())
4019 }
4020
4021 fn future_drop_writable(
4022 &mut self,
4023 instance: Instance,
4024 ty: TypeFutureTableIndex,
4025 writer: u32,
4026 ) -> Result<()> {
4027 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4028 }
4029
4030 fn flat_stream_write(
4031 &mut self,
4032 instance: Instance,
4033 caller: RuntimeComponentInstanceIndex,
4034 ty: TypeStreamTableIndex,
4035 options: OptionsIndex,
4036 payload_size: u32,
4037 payload_align: u32,
4038 stream: u32,
4039 address: u32,
4040 count: u32,
4041 ) -> Result<u32> {
4042 instance
4043 .guest_write(
4044 StoreContextMut(self),
4045 caller,
4046 TransmitIndex::Stream(ty),
4047 options,
4048 Some(FlatAbi {
4049 size: payload_size,
4050 align: payload_align,
4051 }),
4052 stream,
4053 address,
4054 count,
4055 )
4056 .map(|result| result.encode())
4057 }
4058
4059 fn flat_stream_read(
4060 &mut self,
4061 instance: Instance,
4062 caller: RuntimeComponentInstanceIndex,
4063 ty: TypeStreamTableIndex,
4064 options: OptionsIndex,
4065 payload_size: u32,
4066 payload_align: u32,
4067 stream: u32,
4068 address: u32,
4069 count: u32,
4070 ) -> Result<u32> {
4071 instance
4072 .guest_read(
4073 StoreContextMut(self),
4074 caller,
4075 TransmitIndex::Stream(ty),
4076 options,
4077 Some(FlatAbi {
4078 size: payload_size,
4079 align: payload_align,
4080 }),
4081 stream,
4082 address,
4083 count,
4084 )
4085 .map(|result| result.encode())
4086 }
4087
4088 fn stream_drop_writable(
4089 &mut self,
4090 instance: Instance,
4091 ty: TypeStreamTableIndex,
4092 writer: u32,
4093 ) -> Result<()> {
4094 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4095 }
4096
4097 fn error_context_debug_message(
4098 &mut self,
4099 instance: Instance,
4100 ty: TypeComponentLocalErrorContextTableIndex,
4101 options: OptionsIndex,
4102 err_ctx_handle: u32,
4103 debug_msg_address: u32,
4104 ) -> Result<()> {
4105 instance.error_context_debug_message(
4106 StoreContextMut(self),
4107 ty,
4108 options,
4109 err_ctx_handle,
4110 debug_msg_address,
4111 )
4112 }
4113
4114 fn thread_new_indirect(
4115 &mut self,
4116 instance: Instance,
4117 caller: RuntimeComponentInstanceIndex,
4118 func_ty_idx: TypeFuncIndex,
4119 start_func_table_idx: RuntimeTableIndex,
4120 start_func_idx: u32,
4121 context: i32,
4122 ) -> Result<u32> {
4123 instance.thread_new_indirect(
4124 StoreContextMut(self),
4125 caller,
4126 func_ty_idx,
4127 start_func_table_idx,
4128 start_func_idx,
4129 context,
4130 )
4131 }
4132}
4133
4134type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4135
4136struct HostTask {
4138 common: WaitableCommon,
4139 caller_instance: RuntimeInstance,
4140 join_handle: Option<JoinHandle>,
4141}
4142
4143impl HostTask {
4144 fn new(caller_instance: RuntimeInstance, join_handle: Option<JoinHandle>) -> Self {
4145 Self {
4146 common: WaitableCommon::default(),
4147 caller_instance,
4148 join_handle,
4149 }
4150 }
4151}
4152
4153impl TableDebug for HostTask {
4154 fn type_name() -> &'static str {
4155 "HostTask"
4156 }
4157}
4158
4159type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4160
4161enum Caller {
4163 Host {
4165 tx: Option<oneshot::Sender<LiftedResult>>,
4167 exit_tx: Arc<oneshot::Sender<()>>,
4174 host_future_present: bool,
4177 caller: Option<QualifiedThreadId>,
4182 },
4183 Guest {
4185 thread: QualifiedThreadId,
4187 },
4188}
4189
4190struct LiftResult {
4193 lift: RawLift,
4194 ty: TypeTupleIndex,
4195 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4196 string_encoding: StringEncoding,
4197}
4198
4199#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4204struct QualifiedThreadId {
4205 task: TableId<GuestTask>,
4206 thread: TableId<GuestThread>,
4207}
4208
4209impl QualifiedThreadId {
4210 fn qualify(
4211 state: &mut ConcurrentState,
4212 thread: TableId<GuestThread>,
4213 ) -> Result<QualifiedThreadId> {
4214 Ok(QualifiedThreadId {
4215 task: state.get_mut(thread)?.parent_task,
4216 thread,
4217 })
4218 }
4219}
4220
4221impl fmt::Debug for QualifiedThreadId {
4222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4223 f.debug_tuple("QualifiedThreadId")
4224 .field(&self.task.rep())
4225 .field(&self.thread.rep())
4226 .finish()
4227 }
4228}
4229
4230enum GuestThreadState {
4231 NotStartedImplicit,
4232 NotStartedExplicit(
4233 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4234 ),
4235 Running,
4236 Suspended(StoreFiber<'static>),
4237 Pending,
4238 Completed,
4239}
4240pub struct GuestThread {
4241 context: [u32; 2],
4244 parent_task: TableId<GuestTask>,
4246 wake_on_cancel: Option<TableId<WaitableSet>>,
4249 state: GuestThreadState,
4251 instance_rep: Option<u32>,
4254}
4255
4256impl GuestThread {
4257 fn from_instance(
4260 state: Pin<&mut ComponentInstance>,
4261 caller_instance: RuntimeComponentInstanceIndex,
4262 guest_thread: u32,
4263 ) -> Result<TableId<Self>> {
4264 let rep = state.instance_states().0[caller_instance]
4265 .thread_handle_table()
4266 .guest_thread_rep(guest_thread)?;
4267 Ok(TableId::new(rep))
4268 }
4269
4270 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4271 Self {
4272 context: [0; 2],
4273 parent_task,
4274 wake_on_cancel: None,
4275 state: GuestThreadState::NotStartedImplicit,
4276 instance_rep: None,
4277 }
4278 }
4279
4280 fn new_explicit(
4281 parent_task: TableId<GuestTask>,
4282 start_func: Box<
4283 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4284 >,
4285 ) -> Self {
4286 Self {
4287 context: [0; 2],
4288 parent_task,
4289 wake_on_cancel: None,
4290 state: GuestThreadState::NotStartedExplicit(start_func),
4291 instance_rep: None,
4292 }
4293 }
4294}
4295
4296impl TableDebug for GuestThread {
4297 fn type_name() -> &'static str {
4298 "GuestThread"
4299 }
4300}
4301
4302enum SyncResult {
4303 NotProduced,
4304 Produced(Option<ValRaw>),
4305 Taken,
4306}
4307
4308impl SyncResult {
4309 fn take(&mut self) -> Option<Option<ValRaw>> {
4310 match mem::replace(self, SyncResult::Taken) {
4311 SyncResult::NotProduced => None,
4312 SyncResult::Produced(val) => Some(val),
4313 SyncResult::Taken => {
4314 panic!("attempted to take a synchronous result that was already taken")
4315 }
4316 }
4317 }
4318}
4319
4320#[derive(Debug)]
4321enum HostFutureState {
4322 NotApplicable,
4323 Live,
4324 Dropped,
4325}
4326
4327pub(crate) struct GuestTask {
4329 common: WaitableCommon,
4331 lower_params: Option<RawLower>,
4333 lift_result: Option<LiftResult>,
4335 result: Option<LiftedResult>,
4338 callback: Option<CallbackFn>,
4341 caller: Caller,
4343 call_context: Option<CallContext>,
4346 sync_result: SyncResult,
4349 cancel_sent: bool,
4352 starting_sent: bool,
4355 subtasks: HashSet<TableId<GuestTask>>,
4360 sync_call_set: TableId<WaitableSet>,
4362 instance: RuntimeInstance,
4369 event: Option<Event>,
4372 function_index: Option<ExportIndex>,
4374 exited: bool,
4376 threads: HashSet<TableId<GuestThread>>,
4378 host_future_state: HostFutureState,
4381 async_function: bool,
4384}
4385
4386impl GuestTask {
4387 fn already_lowered_parameters(&self) -> bool {
4388 self.lower_params.is_none()
4390 }
4391
4392 fn returned_or_cancelled(&self) -> bool {
4393 self.lift_result.is_none()
4395 }
4396
4397 fn ready_to_delete(&self) -> bool {
4398 let threads_completed = self.threads.is_empty();
4399 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4400 let pending_completion_event = matches!(
4401 self.common.event,
4402 Some(Event::Subtask {
4403 status: Status::Returned | Status::ReturnCancelled
4404 })
4405 );
4406 let ready = threads_completed
4407 && !has_sync_result
4408 && !pending_completion_event
4409 && !matches!(self.host_future_state, HostFutureState::Live);
4410 log::trace!(
4411 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4412 threads_completed,
4413 has_sync_result,
4414 pending_completion_event,
4415 self.host_future_state
4416 );
4417 ready
4418 }
4419
4420 fn new(
4421 state: &mut ConcurrentState,
4422 lower_params: RawLower,
4423 lift_result: LiftResult,
4424 caller: Caller,
4425 callback: Option<CallbackFn>,
4426 instance: RuntimeInstance,
4427 async_function: bool,
4428 ) -> Result<Self> {
4429 let sync_call_set = state.push(WaitableSet::default())?;
4430 let host_future_state = match &caller {
4431 Caller::Guest { .. } => HostFutureState::NotApplicable,
4432 Caller::Host {
4433 host_future_present,
4434 ..
4435 } => {
4436 if *host_future_present {
4437 HostFutureState::Live
4438 } else {
4439 HostFutureState::NotApplicable
4440 }
4441 }
4442 };
4443 Ok(Self {
4444 common: WaitableCommon::default(),
4445 lower_params: Some(lower_params),
4446 lift_result: Some(lift_result),
4447 result: None,
4448 callback,
4449 caller,
4450 call_context: Some(CallContext::default()),
4451 sync_result: SyncResult::NotProduced,
4452 cancel_sent: false,
4453 starting_sent: false,
4454 subtasks: HashSet::new(),
4455 sync_call_set,
4456 instance,
4457 event: None,
4458 function_index: None,
4459 exited: false,
4460 threads: HashSet::new(),
4461 host_future_state,
4462 async_function,
4463 })
4464 }
4465
4466 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4469 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4472 if let Some(Event::Subtask {
4473 status: Status::Returned | Status::ReturnCancelled,
4474 }) = waitable.common(state)?.event
4475 {
4476 waitable.delete_from(state)?;
4477 }
4478 }
4479
4480 assert!(self.threads.is_empty());
4481
4482 state.delete(self.sync_call_set)?;
4483
4484 match &self.caller {
4486 Caller::Guest { thread } => {
4487 let task_mut = state.get_mut(thread.task)?;
4488 let present = task_mut.subtasks.remove(&me);
4489 assert!(present);
4490
4491 for subtask in &self.subtasks {
4492 task_mut.subtasks.insert(*subtask);
4493 }
4494
4495 for subtask in &self.subtasks {
4496 state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4497 }
4498 }
4499 Caller::Host {
4500 exit_tx, caller, ..
4501 } => {
4502 for subtask in &self.subtasks {
4503 state.get_mut(*subtask)?.caller = Caller::Host {
4504 tx: None,
4505 exit_tx: exit_tx.clone(),
4509 host_future_present: false,
4510 caller: *caller,
4511 };
4512 }
4513 }
4514 }
4515
4516 for subtask in self.subtasks {
4517 let task = state.get_mut(subtask)?;
4518 if task.exited && task.ready_to_delete() {
4519 Waitable::Guest(subtask).delete_from(state)?;
4520 }
4521 }
4522
4523 Ok(())
4524 }
4525}
4526
4527impl TableDebug for GuestTask {
4528 fn type_name() -> &'static str {
4529 "GuestTask"
4530 }
4531}
4532
4533#[derive(Default)]
4535struct WaitableCommon {
4536 event: Option<Event>,
4538 set: Option<TableId<WaitableSet>>,
4540 handle: Option<u32>,
4542}
4543
4544#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4546enum Waitable {
4547 Host(TableId<HostTask>),
4549 Guest(TableId<GuestTask>),
4551 Transmit(TableId<TransmitHandle>),
4553}
4554
4555impl Waitable {
4556 fn from_instance(
4559 state: Pin<&mut ComponentInstance>,
4560 caller_instance: RuntimeComponentInstanceIndex,
4561 waitable: u32,
4562 ) -> Result<Self> {
4563 use crate::runtime::vm::component::Waitable;
4564
4565 let (waitable, kind) = state.instance_states().0[caller_instance]
4566 .handle_table()
4567 .waitable_rep(waitable)?;
4568
4569 Ok(match kind {
4570 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4571 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4572 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4573 })
4574 }
4575
4576 fn rep(&self) -> u32 {
4578 match self {
4579 Self::Host(id) => id.rep(),
4580 Self::Guest(id) => id.rep(),
4581 Self::Transmit(id) => id.rep(),
4582 }
4583 }
4584
4585 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4589 log::trace!("waitable {self:?} join set {set:?}",);
4590
4591 let old = mem::replace(&mut self.common(state)?.set, set);
4592
4593 if let Some(old) = old {
4594 match *self {
4595 Waitable::Host(id) => state.remove_child(id, old),
4596 Waitable::Guest(id) => state.remove_child(id, old),
4597 Waitable::Transmit(id) => state.remove_child(id, old),
4598 }?;
4599
4600 state.get_mut(old)?.ready.remove(self);
4601 }
4602
4603 if let Some(set) = set {
4604 match *self {
4605 Waitable::Host(id) => state.add_child(id, set),
4606 Waitable::Guest(id) => state.add_child(id, set),
4607 Waitable::Transmit(id) => state.add_child(id, set),
4608 }?;
4609
4610 if self.common(state)?.event.is_some() {
4611 self.mark_ready(state)?;
4612 }
4613 }
4614
4615 Ok(())
4616 }
4617
4618 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4620 Ok(match self {
4621 Self::Host(id) => &mut state.get_mut(*id)?.common,
4622 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4623 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4624 })
4625 }
4626
4627 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4631 log::trace!("set event for {self:?}: {event:?}");
4632 self.common(state)?.event = event;
4633 self.mark_ready(state)
4634 }
4635
4636 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4638 let common = self.common(state)?;
4639 let event = common.event.take();
4640 if let Some(set) = self.common(state)?.set {
4641 state.get_mut(set)?.ready.remove(self);
4642 }
4643
4644 Ok(event)
4645 }
4646
4647 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4651 if let Some(set) = self.common(state)?.set {
4652 state.get_mut(set)?.ready.insert(*self);
4653 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4654 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4655 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4656
4657 let item = match mode {
4658 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4659 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4660 thread,
4661 kind: GuestCallKind::DeliverEvent {
4662 instance,
4663 set: Some(set),
4664 },
4665 }),
4666 };
4667 state.push_high_priority(item);
4668 }
4669 }
4670 Ok(())
4671 }
4672
4673 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4675 match self {
4676 Self::Host(task) => {
4677 log::trace!("delete host task {task:?}");
4678 state.delete(*task)?;
4679 }
4680 Self::Guest(task) => {
4681 log::trace!("delete guest task {task:?}");
4682 state.delete(*task)?.dispose(state, *task)?;
4683 }
4684 Self::Transmit(task) => {
4685 state.delete(*task)?;
4686 }
4687 }
4688
4689 Ok(())
4690 }
4691}
4692
4693impl fmt::Debug for Waitable {
4694 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4695 match self {
4696 Self::Host(id) => write!(f, "{id:?}"),
4697 Self::Guest(id) => write!(f, "{id:?}"),
4698 Self::Transmit(id) => write!(f, "{id:?}"),
4699 }
4700 }
4701}
4702
4703#[derive(Default)]
4705struct WaitableSet {
4706 ready: BTreeSet<Waitable>,
4708 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4710}
4711
4712impl TableDebug for WaitableSet {
4713 fn type_name() -> &'static str {
4714 "WaitableSet"
4715 }
4716}
4717
4718type RawLower =
4720 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4721
4722type RawLift = Box<
4724 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4725>;
4726
4727type LiftedResult = Box<dyn Any + Send + Sync>;
4731
4732struct DummyResult;
4735
4736#[derive(Default)]
4738pub struct ConcurrentInstanceState {
4739 backpressure: u16,
4741 do_not_enter: bool,
4743 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4746}
4747
4748impl ConcurrentInstanceState {
4749 pub fn pending_is_empty(&self) -> bool {
4750 self.pending.is_empty()
4751 }
4752}
4753
4754pub struct ConcurrentState {
4756 guest_thread: Option<QualifiedThreadId>,
4758
4759 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4764 table: AlwaysMut<ResourceTable>,
4766 high_priority: Vec<WorkItem>,
4768 low_priority: VecDeque<WorkItem>,
4770 suspend_reason: Option<SuspendReason>,
4774 worker: Option<StoreFiber<'static>>,
4778 worker_item: Option<WorkerItem>,
4780
4781 global_error_context_ref_counts:
4794 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4795}
4796
4797impl Default for ConcurrentState {
4798 fn default() -> Self {
4799 Self {
4800 guest_thread: None,
4801 table: AlwaysMut::new(ResourceTable::new()),
4802 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4803 high_priority: Vec::new(),
4804 low_priority: VecDeque::new(),
4805 suspend_reason: None,
4806 worker: None,
4807 worker_item: None,
4808 global_error_context_ref_counts: BTreeMap::new(),
4809 }
4810 }
4811}
4812
4813impl ConcurrentState {
4814 pub(crate) fn take_fibers_and_futures(
4831 &mut self,
4832 fibers: &mut Vec<StoreFiber<'static>>,
4833 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4834 ) {
4835 for entry in self.table.get_mut().iter_mut() {
4836 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4837 for mode in mem::take(&mut set.waiting).into_values() {
4838 if let WaitMode::Fiber(fiber) = mode {
4839 fibers.push(fiber);
4840 }
4841 }
4842 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4843 if let GuestThreadState::Suspended(fiber) =
4844 mem::replace(&mut thread.state, GuestThreadState::Completed)
4845 {
4846 fibers.push(fiber);
4847 }
4848 }
4849 }
4850
4851 if let Some(fiber) = self.worker.take() {
4852 fibers.push(fiber);
4853 }
4854
4855 let mut handle_item = |item| match item {
4856 WorkItem::ResumeFiber(fiber) => {
4857 fibers.push(fiber);
4858 }
4859 WorkItem::PushFuture(future) => {
4860 self.futures
4861 .get_mut()
4862 .as_mut()
4863 .unwrap()
4864 .push(future.into_inner());
4865 }
4866 _ => {}
4867 };
4868
4869 for item in mem::take(&mut self.high_priority) {
4870 handle_item(item);
4871 }
4872 for item in mem::take(&mut self.low_priority) {
4873 handle_item(item);
4874 }
4875
4876 if let Some(them) = self.futures.get_mut().take() {
4877 futures.push(them);
4878 }
4879 }
4880
4881 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4885 let mut ready = mem::take(&mut self.high_priority);
4886 if ready.is_empty() {
4887 if let Some(item) = self.low_priority.pop_back() {
4888 ready.push(item);
4889 }
4890 }
4891 ready
4892 }
4893
4894 fn push<V: Send + Sync + 'static>(
4895 &mut self,
4896 value: V,
4897 ) -> Result<TableId<V>, ResourceTableError> {
4898 self.table.get_mut().push(value).map(TableId::from)
4899 }
4900
4901 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4902 self.table.get_mut().get_mut(&Resource::from(id))
4903 }
4904
4905 pub fn add_child<T: 'static, U: 'static>(
4906 &mut self,
4907 child: TableId<T>,
4908 parent: TableId<U>,
4909 ) -> Result<(), ResourceTableError> {
4910 self.table
4911 .get_mut()
4912 .add_child(Resource::from(child), Resource::from(parent))
4913 }
4914
4915 pub fn remove_child<T: 'static, U: 'static>(
4916 &mut self,
4917 child: TableId<T>,
4918 parent: TableId<U>,
4919 ) -> Result<(), ResourceTableError> {
4920 self.table
4921 .get_mut()
4922 .remove_child(Resource::from(child), Resource::from(parent))
4923 }
4924
4925 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4926 self.table.get_mut().delete(Resource::from(id))
4927 }
4928
4929 fn push_future(&mut self, future: HostTaskFuture) {
4930 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4937 }
4938
4939 fn push_high_priority(&mut self, item: WorkItem) {
4940 log::trace!("push high priority: {item:?}");
4941 self.high_priority.push(item);
4942 }
4943
4944 fn push_low_priority(&mut self, item: WorkItem) {
4945 log::trace!("push low priority: {item:?}");
4946 self.low_priority.push_front(item);
4947 }
4948
4949 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4950 if high_priority {
4951 self.push_high_priority(item);
4952 } else {
4953 self.push_low_priority(item);
4954 }
4955 }
4956
4957 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4959 let thread = self.guest_thread.unwrap();
4960 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4961 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4962 Ok(val)
4963 }
4964
4965 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4967 let thread = self.guest_thread.unwrap();
4968 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4969 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4970 Ok(())
4971 }
4972
4973 fn take_pending_cancellation(&mut self) -> bool {
4976 let thread = self.guest_thread.unwrap();
4977 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4978 assert!(matches!(event, Event::Cancelled));
4979 true
4980 } else {
4981 false
4982 }
4983 }
4984
4985 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4986 if self.may_block(task) {
4987 Ok(())
4988 } else {
4989 Err(Trap::CannotBlockSyncTask.into())
4990 }
4991 }
4992
4993 fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4994 let task = self.get_mut(task).unwrap();
4995 task.async_function || task.returned_or_cancelled()
4996 }
4997}
4998
4999fn for_any_lower<
5002 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5003>(
5004 fun: F,
5005) -> F {
5006 fun
5007}
5008
5009fn for_any_lift<
5011 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5012>(
5013 fun: F,
5014) -> F {
5015 fun
5016}
5017
5018fn checked<F: Future + Send + 'static>(
5023 id: StoreId,
5024 fut: F,
5025) -> impl Future<Output = F::Output> + Send + 'static {
5026 async move {
5027 let mut fut = pin!(fut);
5028 future::poll_fn(move |cx| {
5029 let message = "\
5030 `Future`s which depend on asynchronous component tasks, streams, or \
5031 futures to complete may only be polled from the event loop of the \
5032 store to which they belong. Please use \
5033 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5034 ";
5035 tls::try_get(|store| {
5036 let matched = match store {
5037 tls::TryGet::Some(store) => store.id() == id,
5038 tls::TryGet::Taken | tls::TryGet::None => false,
5039 };
5040
5041 if !matched {
5042 panic!("{message}")
5043 }
5044 });
5045 fut.as_mut().poll(cx)
5046 })
5047 .await
5048 }
5049}
5050
5051fn check_recursive_run() {
5054 tls::try_get(|store| {
5055 if !matches!(store, tls::TryGet::None) {
5056 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5057 }
5058 });
5059}
5060
5061fn unpack_callback_code(code: u32) -> (u32, u32) {
5062 (code & 0xF, code >> 4)
5063}
5064
5065struct WaitableCheckParams {
5069 set: TableId<WaitableSet>,
5070 options: OptionsIndex,
5071 payload: u32,
5072}
5073
5074enum WaitableCheck {
5077 Wait,
5078 Poll,
5079}
5080
5081pub(crate) struct PreparedCall<R> {
5083 handle: Func,
5085 thread: QualifiedThreadId,
5087 param_count: usize,
5089 rx: oneshot::Receiver<LiftedResult>,
5092 exit_rx: oneshot::Receiver<()>,
5095 _phantom: PhantomData<R>,
5096}
5097
5098impl<R> PreparedCall<R> {
5099 pub(crate) fn task_id(&self) -> TaskId {
5101 TaskId {
5102 task: self.thread.task,
5103 }
5104 }
5105}
5106
5107pub(crate) struct TaskId {
5109 task: TableId<GuestTask>,
5110}
5111
5112impl TaskId {
5113 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5119 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5120 if !task.already_lowered_parameters() {
5121 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5122 } else {
5123 task.host_future_state = HostFutureState::Dropped;
5124 if task.ready_to_delete() {
5125 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5126 }
5127 }
5128 Ok(())
5129 }
5130}
5131
5132pub(crate) fn prepare_call<T, R>(
5138 mut store: StoreContextMut<T>,
5139 handle: Func,
5140 param_count: usize,
5141 host_future_present: bool,
5142 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5143 + Send
5144 + Sync
5145 + 'static,
5146 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5147 + Send
5148 + Sync
5149 + 'static,
5150) -> Result<PreparedCall<R>> {
5151 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5152
5153 let instance = handle.instance().id().get(store.0);
5154 let options = &instance.component().env_component().options[options];
5155 let ty = &instance.component().types()[ty];
5156 let async_function = ty.async_;
5157 let task_return_type = ty.results;
5158 let component_instance = raw_options.instance;
5159 let callback = options.callback.map(|i| instance.runtime_callback(i));
5160 let memory = options
5161 .memory()
5162 .map(|i| instance.runtime_memory(i))
5163 .map(SendSyncPtr::new);
5164 let string_encoding = options.string_encoding;
5165 let token = StoreToken::new(store.as_context_mut());
5166 let state = store.0.concurrent_state_mut();
5167
5168 let (tx, rx) = oneshot::channel();
5169 let (exit_tx, exit_rx) = oneshot::channel();
5170
5171 let caller = state.guest_thread;
5172 let mut task = GuestTask::new(
5173 state,
5174 Box::new(for_any_lower(move |store, params| {
5175 lower_params(handle, token.as_context_mut(store), params)
5176 })),
5177 LiftResult {
5178 lift: Box::new(for_any_lift(move |store, result| {
5179 lift_result(handle, store, result)
5180 })),
5181 ty: task_return_type,
5182 memory,
5183 string_encoding,
5184 },
5185 Caller::Host {
5186 tx: Some(tx),
5187 exit_tx: Arc::new(exit_tx),
5188 host_future_present,
5189 caller,
5190 },
5191 callback.map(|callback| {
5192 let callback = SendSyncPtr::new(callback);
5193 let instance = handle.instance();
5194 Box::new(move |store: &mut dyn VMStore, event, handle| {
5195 let store = token.as_context_mut(store);
5196 unsafe { instance.call_callback(store, callback, event, handle) }
5199 }) as CallbackFn
5200 }),
5201 RuntimeInstance {
5202 instance: handle.instance().id().instance(),
5203 index: component_instance,
5204 },
5205 async_function,
5206 )?;
5207 task.function_index = Some(handle.index());
5208
5209 let task = state.push(task)?;
5210 let thread = state.push(GuestThread::new_implicit(task))?;
5211 state.get_mut(task)?.threads.insert(thread);
5212
5213 if !store.0.may_enter_task(task) {
5214 bail!(crate::Trap::CannotEnterComponent);
5215 }
5216
5217 Ok(PreparedCall {
5218 handle,
5219 thread: QualifiedThreadId { task, thread },
5220 param_count,
5221 rx,
5222 exit_rx,
5223 _phantom: PhantomData,
5224 })
5225}
5226
5227pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5234 mut store: StoreContextMut<T>,
5235 prepared: PreparedCall<R>,
5236) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5237 let PreparedCall {
5238 handle,
5239 thread,
5240 param_count,
5241 rx,
5242 exit_rx,
5243 ..
5244 } = prepared;
5245
5246 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5247
5248 Ok(checked(
5249 store.0.id(),
5250 rx.map(move |result| {
5251 result
5252 .map(|v| (*v.downcast().unwrap(), exit_rx))
5253 .map_err(crate::Error::from)
5254 }),
5255 ))
5256}
5257
5258fn queue_call0<T: 'static>(
5261 store: StoreContextMut<T>,
5262 handle: Func,
5263 guest_thread: QualifiedThreadId,
5264 param_count: usize,
5265) -> Result<()> {
5266 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5267 let is_concurrent = raw_options.async_;
5268 let callback = raw_options.callback;
5269 let instance = handle.instance();
5270 let callee = handle.lifted_core_func(store.0);
5271 let post_return = handle.post_return_core_func(store.0);
5272 let callback = callback.map(|i| {
5273 let instance = instance.id().get(store.0);
5274 SendSyncPtr::new(instance.runtime_callback(i))
5275 });
5276
5277 log::trace!("queueing call {guest_thread:?}");
5278
5279 unsafe {
5283 instance.queue_call(
5284 store,
5285 guest_thread,
5286 SendSyncPtr::new(callee),
5287 param_count,
5288 1,
5289 is_concurrent,
5290 callback,
5291 post_return.map(SendSyncPtr::new),
5292 )
5293 }
5294}