1use crate::component::func::{self, Func};
54use crate::component::{
55 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, HandleTable, ResourceTables};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64 bail,
65 error::{Context as _, format_err},
66};
67use error_contexts::GlobalErrorContextRefCount;
68use futures::channel::oneshot;
69use futures::future::{self, FutureExt};
70use futures::stream::{FuturesUnordered, StreamExt};
71use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
72use std::any::Any;
73use std::borrow::ToOwned;
74use std::boxed::Box;
75use std::cell::UnsafeCell;
76use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
77use std::fmt;
78use std::future::Future;
79use std::marker::PhantomData;
80use std::mem::{self, ManuallyDrop, MaybeUninit};
81use std::ops::DerefMut;
82use std::pin::{Pin, pin};
83use std::ptr::{self, NonNull};
84use std::slice;
85use std::sync::Arc;
86use std::task::{Context, Poll, Waker};
87use std::vec::Vec;
88use table::{TableDebug, TableId};
89use wasmtime_environ::Trap;
90use wasmtime_environ::component::{
91 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
92 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
93 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
94 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
95 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
96};
97use wasmtime_environ::packed_option::ReservedValue;
98
99pub use abort::JoinHandle;
100pub use future_stream_any::{FutureAny, StreamAny};
101pub use futures_and_streams::{
102 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
103 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
104 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
105};
106pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
107
108mod abort;
109mod error_contexts;
110mod future_stream_any;
111mod futures_and_streams;
112pub(crate) mod table;
113pub(crate) mod tls;
114
115const BLOCKED: u32 = 0xffff_ffff;
118
119#[derive(Clone, Copy, Eq, PartialEq, Debug)]
121pub enum Status {
122 Starting = 0,
123 Started = 1,
124 Returned = 2,
125 StartCancelled = 3,
126 ReturnCancelled = 4,
127}
128
129impl Status {
130 pub fn pack(self, waitable: Option<u32>) -> u32 {
136 assert!(matches!(self, Status::Returned) == waitable.is_none());
137 let waitable = waitable.unwrap_or(0);
138 assert!(waitable < (1 << 28));
139 (waitable << 4) | (self as u32)
140 }
141}
142
143#[derive(Clone, Copy, Debug)]
146enum Event {
147 None,
148 Cancelled,
149 Subtask {
150 status: Status,
151 },
152 StreamRead {
153 code: ReturnCode,
154 pending: Option<(TypeStreamTableIndex, u32)>,
155 },
156 StreamWrite {
157 code: ReturnCode,
158 pending: Option<(TypeStreamTableIndex, u32)>,
159 },
160 FutureRead {
161 code: ReturnCode,
162 pending: Option<(TypeFutureTableIndex, u32)>,
163 },
164 FutureWrite {
165 code: ReturnCode,
166 pending: Option<(TypeFutureTableIndex, u32)>,
167 },
168}
169
170impl Event {
171 fn parts(self) -> (u32, u32) {
176 const EVENT_NONE: u32 = 0;
177 const EVENT_SUBTASK: u32 = 1;
178 const EVENT_STREAM_READ: u32 = 2;
179 const EVENT_STREAM_WRITE: u32 = 3;
180 const EVENT_FUTURE_READ: u32 = 4;
181 const EVENT_FUTURE_WRITE: u32 = 5;
182 const EVENT_CANCELLED: u32 = 6;
183 match self {
184 Event::None => (EVENT_NONE, 0),
185 Event::Cancelled => (EVENT_CANCELLED, 0),
186 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
187 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
188 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
189 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
190 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
191 }
192 }
193}
194
195mod callback_code {
197 pub const EXIT: u32 = 0;
198 pub const YIELD: u32 = 1;
199 pub const WAIT: u32 = 2;
200}
201
202const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
206
207pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
213 store: StoreContextMut<'a, T>,
214 get_data: fn(&mut T) -> D::Data<'_>,
215}
216
217impl<'a, T, D> Access<'a, T, D>
218where
219 D: HasData + ?Sized,
220 T: 'static,
221{
222 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
224 Self { store, get_data }
225 }
226
227 pub fn data_mut(&mut self) -> &mut T {
229 self.store.data_mut()
230 }
231
232 pub fn get(&mut self) -> D::Data<'_> {
234 (self.get_data)(self.data_mut())
235 }
236
237 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
241 where
242 T: 'static,
243 {
244 let accessor = Accessor {
245 get_data: self.get_data,
246 token: StoreToken::new(self.store.as_context_mut()),
247 };
248 self.store
249 .as_context_mut()
250 .spawn_with_accessor(accessor, task)
251 }
252
253 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
256 self.get_data
257 }
258}
259
260impl<'a, T, D> AsContext for Access<'a, T, D>
261where
262 D: HasData + ?Sized,
263 T: 'static,
264{
265 type Data = T;
266
267 fn as_context(&self) -> StoreContext<'_, T> {
268 self.store.as_context()
269 }
270}
271
272impl<'a, T, D> AsContextMut for Access<'a, T, D>
273where
274 D: HasData + ?Sized,
275 T: 'static,
276{
277 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
278 self.store.as_context_mut()
279 }
280}
281
282pub struct Accessor<T: 'static, D = HasSelf<T>>
342where
343 D: HasData + ?Sized,
344{
345 token: StoreToken<T>,
346 get_data: fn(&mut T) -> D::Data<'_>,
347}
348
349pub trait AsAccessor {
366 type Data: 'static;
368
369 type AccessorData: HasData + ?Sized;
372
373 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
375}
376
377impl<T: AsAccessor + ?Sized> AsAccessor for &T {
378 type Data = T::Data;
379 type AccessorData = T::AccessorData;
380
381 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
382 T::as_accessor(self)
383 }
384}
385
386impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
387 type Data = T;
388 type AccessorData = D;
389
390 fn as_accessor(&self) -> &Accessor<T, D> {
391 self
392 }
393}
394
395const _: () = {
418 const fn assert<T: Send + Sync>() {}
419 assert::<Accessor<UnsafeCell<u32>>>();
420};
421
422impl<T> Accessor<T> {
423 pub(crate) fn new(token: StoreToken<T>) -> Self {
432 Self {
433 token,
434 get_data: |x| x,
435 }
436 }
437}
438
439impl<T, D> Accessor<T, D>
440where
441 D: HasData + ?Sized,
442{
443 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
461 tls::get(|vmstore| {
462 fun(Access {
463 store: self.token.as_context_mut(vmstore),
464 get_data: self.get_data,
465 })
466 })
467 }
468
469 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
472 self.get_data
473 }
474
475 pub fn with_getter<D2: HasData>(
492 &self,
493 get_data: fn(&mut T) -> D2::Data<'_>,
494 ) -> Accessor<T, D2> {
495 Accessor {
496 token: self.token,
497 get_data,
498 }
499 }
500
501 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
517 where
518 T: 'static,
519 {
520 let accessor = self.clone_for_spawn();
521 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
522 }
523
524 fn clone_for_spawn(&self) -> Self {
525 Self {
526 token: self.token,
527 get_data: self.get_data,
528 }
529 }
530}
531
532pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
544where
545 D: HasData + ?Sized,
546{
547 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
549}
550
551enum CallerInfo {
554 Async {
556 params: Vec<ValRaw>,
557 has_result: bool,
558 },
559 Sync {
561 params: Vec<ValRaw>,
562 result_count: u32,
563 },
564}
565
566enum WaitMode {
568 Fiber(StoreFiber<'static>),
570 Callback(Instance),
573}
574
575#[derive(Debug)]
577enum SuspendReason {
578 Waiting {
581 set: TableId<WaitableSet>,
582 thread: QualifiedThreadId,
583 skip_may_block_check: bool,
584 },
585 NeedWork,
588 Yielding {
591 thread: QualifiedThreadId,
592 skip_may_block_check: bool,
593 },
594 ExplicitlySuspending {
596 thread: QualifiedThreadId,
597 skip_may_block_check: bool,
598 },
599}
600
601enum GuestCallKind {
603 DeliverEvent {
606 instance: Instance,
608 set: Option<TableId<WaitableSet>>,
613 },
614 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
620 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
621}
622
623impl fmt::Debug for GuestCallKind {
624 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
625 match self {
626 Self::DeliverEvent { instance, set } => f
627 .debug_struct("DeliverEvent")
628 .field("instance", instance)
629 .field("set", set)
630 .finish(),
631 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
632 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
633 }
634 }
635}
636
637#[derive(Debug)]
639struct GuestCall {
640 thread: QualifiedThreadId,
641 kind: GuestCallKind,
642}
643
644impl GuestCall {
645 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
655 let instance = store
656 .concurrent_state_mut()
657 .get_mut(self.thread.task)?
658 .instance;
659 let state = store.instance_state(instance);
660
661 let ready = match &self.kind {
662 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
663 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
664 GuestCallKind::StartExplicit(_) => true,
665 };
666 log::trace!(
667 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
668 state.do_not_enter,
669 state.backpressure
670 );
671 Ok(ready)
672 }
673}
674
675enum WorkerItem {
677 GuestCall(GuestCall),
678 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
679}
680
681enum WorkItem {
684 PushFuture(AlwaysMut<HostTaskFuture>),
686 ResumeFiber(StoreFiber<'static>),
688 GuestCall(GuestCall),
690 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
692}
693
694impl fmt::Debug for WorkItem {
695 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
696 match self {
697 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
698 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
699 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
700 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
701 }
702 }
703}
704
705#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
707pub(crate) enum WaitResult {
708 Cancelled,
709 Completed,
710}
711
712pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
720 store: &mut dyn VMStore,
721 future: impl Future<Output = Result<R>> + Send + 'static,
722 caller_instance: RuntimeInstance,
723) -> Result<R> {
724 store.check_may_leave(caller_instance)?;
725
726 let state = store.concurrent_state_mut();
727
728 let caller = state.guest_thread.unwrap();
729
730 let old_result = state
733 .get_mut(caller.task)
734 .with_context(|| format!("bad handle: {caller:?}"))?
735 .result
736 .take();
737
738 let task = state.push(HostTask::new(caller_instance, None))?;
742
743 log::trace!("new host task child of {caller:?}: {task:?}");
744
745 let mut future = Box::pin(async move {
749 let result = future.await?;
750 tls::get(move |store| {
751 let state = store.concurrent_state_mut();
752 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
753
754 Waitable::Host(task).set_event(
755 state,
756 Some(Event::Subtask {
757 status: Status::Returned,
758 }),
759 )?;
760
761 Ok(())
762 })
763 }) as HostTaskFuture;
764
765 let poll = tls::set(store, || {
769 future
770 .as_mut()
771 .poll(&mut Context::from_waker(&Waker::noop()))
772 });
773
774 match poll {
775 Poll::Ready(result) => {
776 result?;
778 log::trace!("delete host task {task:?} (already ready)");
779 store.concurrent_state_mut().delete(task)?;
780 }
781 Poll::Pending => {
782 let state = store.concurrent_state_mut();
787 state.push_future(future);
788
789 let set = state.get_mut(caller.task)?.sync_call_set;
790 Waitable::Host(task).join(state, Some(set))?;
791
792 store.suspend(SuspendReason::Waiting {
793 set,
794 thread: caller,
795 skip_may_block_check: false,
796 })?;
797 }
798 }
799
800 Ok(*mem::replace(
802 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
803 old_result,
804 )
805 .unwrap()
806 .downcast()
807 .unwrap())
808}
809
810fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
812 let mut next = Some(call);
813 while let Some(call) = next.take() {
814 match call.kind {
815 GuestCallKind::DeliverEvent { instance, set } => {
816 let (event, waitable) = instance
817 .get_event(store, call.thread.task, set, true)?
818 .unwrap();
819 let state = store.concurrent_state_mut();
820 let task = state.get_mut(call.thread.task)?;
821 let runtime_instance = task.instance;
822 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
823
824 log::trace!(
825 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
826 call.thread,
827 );
828
829 let old_thread = store.set_thread(Some(call.thread));
830 log::trace!(
831 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
832 call.thread
833 );
834
835 store.maybe_push_call_context(call.thread.task)?;
836
837 store.enter_instance(runtime_instance);
838
839 let callback = store
840 .concurrent_state_mut()
841 .get_mut(call.thread.task)?
842 .callback
843 .take()
844 .unwrap();
845
846 let code = callback(store, event, handle)?;
847
848 store
849 .concurrent_state_mut()
850 .get_mut(call.thread.task)?
851 .callback = Some(callback);
852
853 store.exit_instance(runtime_instance)?;
854
855 store.maybe_pop_call_context(call.thread.task)?;
856
857 store.set_thread(old_thread);
858
859 next = instance.handle_callback_code(
860 store,
861 call.thread,
862 runtime_instance.index,
863 code,
864 )?;
865
866 log::trace!(
867 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
868 );
869 }
870 GuestCallKind::StartImplicit(fun) => {
871 next = fun(store)?;
872 }
873 GuestCallKind::StartExplicit(fun) => {
874 fun(store)?;
875 }
876 }
877 }
878
879 Ok(())
880}
881
882impl<T> Store<T> {
883 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
885 where
886 T: Send + 'static,
887 {
888 ensure!(
889 self.as_context().0.concurrency_support(),
890 "cannot use `run_concurrent` when Config::concurrency_support disabled",
891 );
892 self.as_context_mut().run_concurrent(fun).await
893 }
894
895 #[doc(hidden)]
896 pub fn assert_concurrent_state_empty(&mut self) {
897 self.as_context_mut().assert_concurrent_state_empty();
898 }
899
900 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
902 where
903 T: 'static,
904 {
905 self.as_context_mut().spawn(task)
906 }
907}
908
909impl<T> StoreContextMut<'_, T> {
910 #[doc(hidden)]
919 pub fn assert_concurrent_state_empty(self) {
920 let store = self.0;
921 store
922 .store_data_mut()
923 .components
924 .assert_instance_states_empty();
925 let state = store.concurrent_state_mut();
926 assert!(
927 state.table.get_mut().is_empty(),
928 "non-empty table: {:?}",
929 state.table.get_mut()
930 );
931 assert!(state.high_priority.is_empty());
932 assert!(state.low_priority.is_empty());
933 assert!(state.guest_thread.is_none());
934 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
935 assert!(state.global_error_context_ref_counts.is_empty());
936 }
937
938 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
948 where
949 T: 'static,
950 {
951 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
952 self.spawn_with_accessor(accessor, task)
953 }
954
955 fn spawn_with_accessor<D>(
958 self,
959 accessor: Accessor<T, D>,
960 task: impl AccessorTask<T, D>,
961 ) -> JoinHandle
962 where
963 T: 'static,
964 D: HasData + ?Sized,
965 {
966 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
970 self.0
971 .concurrent_state_mut()
972 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
973 handle
974 }
975
976 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1060 where
1061 T: Send + 'static,
1062 {
1063 ensure!(
1064 self.0.concurrency_support(),
1065 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1066 );
1067 self.do_run_concurrent(fun, false).await
1068 }
1069
1070 pub(super) async fn run_concurrent_trap_on_idle<R>(
1071 self,
1072 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1073 ) -> Result<R>
1074 where
1075 T: Send + 'static,
1076 {
1077 self.do_run_concurrent(fun, true).await
1078 }
1079
1080 async fn do_run_concurrent<R>(
1081 mut self,
1082 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1083 trap_on_idle: bool,
1084 ) -> Result<R>
1085 where
1086 T: Send + 'static,
1087 {
1088 debug_assert!(self.0.concurrency_support());
1089 check_recursive_run();
1090 let token = StoreToken::new(self.as_context_mut());
1091
1092 struct Dropper<'a, T: 'static, V> {
1093 store: StoreContextMut<'a, T>,
1094 value: ManuallyDrop<V>,
1095 }
1096
1097 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1098 fn drop(&mut self) {
1099 tls::set(self.store.0, || {
1100 unsafe { ManuallyDrop::drop(&mut self.value) }
1105 });
1106 }
1107 }
1108
1109 let accessor = &Accessor::new(token);
1110 let dropper = &mut Dropper {
1111 store: self,
1112 value: ManuallyDrop::new(fun(accessor)),
1113 };
1114 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1116
1117 dropper
1118 .store
1119 .as_context_mut()
1120 .poll_until(future, trap_on_idle)
1121 .await
1122 }
1123
1124 async fn poll_until<R>(
1130 mut self,
1131 mut future: Pin<&mut impl Future<Output = R>>,
1132 trap_on_idle: bool,
1133 ) -> Result<R>
1134 where
1135 T: Send + 'static,
1136 {
1137 struct Reset<'a, T: 'static> {
1138 store: StoreContextMut<'a, T>,
1139 futures: Option<FuturesUnordered<HostTaskFuture>>,
1140 }
1141
1142 impl<'a, T> Drop for Reset<'a, T> {
1143 fn drop(&mut self) {
1144 if let Some(futures) = self.futures.take() {
1145 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1146 }
1147 }
1148 }
1149
1150 loop {
1151 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1155 let mut reset = Reset {
1156 store: self.as_context_mut(),
1157 futures,
1158 };
1159 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1160
1161 enum PollResult<R> {
1162 Complete(R),
1163 ProcessWork(Vec<WorkItem>),
1164 }
1165 let result = future::poll_fn(|cx| {
1166 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1169 return Poll::Ready(Ok(PollResult::Complete(value)));
1170 }
1171
1172 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1176 Poll::Ready(Some(output)) => {
1177 match output {
1178 Err(e) => return Poll::Ready(Err(e)),
1179 Ok(()) => {}
1180 }
1181 Poll::Ready(true)
1182 }
1183 Poll::Ready(None) => Poll::Ready(false),
1184 Poll::Pending => Poll::Pending,
1185 };
1186
1187 let state = reset.store.0.concurrent_state_mut();
1191 let ready = state.collect_work_items_to_run();
1192 if !ready.is_empty() {
1193 return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1194 }
1195
1196 return match next {
1200 Poll::Ready(true) => {
1201 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1207 }
1208 Poll::Ready(false) => {
1209 if let Poll::Ready(value) =
1213 tls::set(reset.store.0, || future.as_mut().poll(cx))
1214 {
1215 Poll::Ready(Ok(PollResult::Complete(value)))
1216 } else {
1217 if trap_on_idle {
1223 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1226 } else {
1227 Poll::Pending
1231 }
1232 }
1233 }
1234 Poll::Pending => Poll::Pending,
1239 };
1240 })
1241 .await;
1242
1243 drop(reset);
1247
1248 match result? {
1249 PollResult::Complete(value) => break Ok(value),
1252 PollResult::ProcessWork(ready) => {
1255 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1256 store: StoreContextMut<'a, T>,
1257 ready: I,
1258 }
1259
1260 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1261 fn drop(&mut self) {
1262 while let Some(item) = self.ready.next() {
1263 match item {
1264 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1265 WorkItem::PushFuture(future) => {
1266 tls::set(self.store.0, move || drop(future))
1267 }
1268 _ => {}
1269 }
1270 }
1271 }
1272 }
1273
1274 let mut dispose = Dispose {
1275 store: self.as_context_mut(),
1276 ready: ready.into_iter(),
1277 };
1278
1279 while let Some(item) = dispose.ready.next() {
1280 dispose
1281 .store
1282 .as_context_mut()
1283 .handle_work_item(item)
1284 .await?;
1285 }
1286 }
1287 }
1288 }
1289 }
1290
1291 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1293 where
1294 T: Send,
1295 {
1296 log::trace!("handle work item {item:?}");
1297 match item {
1298 WorkItem::PushFuture(future) => {
1299 self.0
1300 .concurrent_state_mut()
1301 .futures
1302 .get_mut()
1303 .as_mut()
1304 .unwrap()
1305 .push(future.into_inner());
1306 }
1307 WorkItem::ResumeFiber(fiber) => {
1308 self.0.resume_fiber(fiber).await?;
1309 }
1310 WorkItem::GuestCall(call) => {
1311 if call.is_ready(self.0)? {
1312 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1313 } else {
1314 let state = self.0.concurrent_state_mut();
1315 let task = state.get_mut(call.thread.task)?;
1316 if !task.starting_sent {
1317 task.starting_sent = true;
1318 if let GuestCallKind::StartImplicit(_) = &call.kind {
1319 Waitable::Guest(call.thread.task).set_event(
1320 state,
1321 Some(Event::Subtask {
1322 status: Status::Starting,
1323 }),
1324 )?;
1325 }
1326 }
1327
1328 let instance = state.get_mut(call.thread.task)?.instance;
1329 self.0
1330 .instance_state(instance)
1331 .pending
1332 .insert(call.thread, call.kind);
1333 }
1334 }
1335 WorkItem::WorkerFunction(fun) => {
1336 self.run_on_worker(WorkerItem::Function(fun)).await?;
1337 }
1338 }
1339
1340 Ok(())
1341 }
1342
1343 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1345 where
1346 T: Send,
1347 {
1348 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1349 fiber
1350 } else {
1351 fiber::make_fiber(self.0, move |store| {
1352 loop {
1353 match store.concurrent_state_mut().worker_item.take().unwrap() {
1354 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1355 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1356 }
1357
1358 store.suspend(SuspendReason::NeedWork)?;
1359 }
1360 })?
1361 };
1362
1363 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1364 assert!(worker_item.is_none());
1365 *worker_item = Some(item);
1366
1367 self.0.resume_fiber(worker).await
1368 }
1369
1370 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1375 where
1376 T: 'static,
1377 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1378 + Send
1379 + Sync
1380 + 'static,
1381 R: Send + Sync + 'static,
1382 {
1383 let token = StoreToken::new(self);
1384 async move {
1385 let mut accessor = Accessor::new(token);
1386 closure(&mut accessor).await
1387 }
1388 }
1389}
1390
1391impl StoreOpaque {
1392 fn check_may_leave(&mut self, instance: RuntimeInstance) -> Result<()> {
1393 Instance::from_wasmtime(self, instance.instance)
1394 .id()
1395 .get(self)
1396 .check_may_leave(instance.index)?;
1397
1398 let state = self.concurrent_state_mut();
1401 let caller = state.guest_thread.unwrap();
1402 assert_eq!(state.get_mut(caller.task)?.instance, instance);
1403
1404 Ok(())
1405 }
1406
1407 pub(crate) fn enter_sync_call(
1414 &mut self,
1415 guest_caller: Option<RuntimeInstance>,
1416 callee_async: bool,
1417 callee: RuntimeInstance,
1418 ) -> Result<()> {
1419 log::trace!("enter sync call {callee:?}");
1420
1421 let state = self.concurrent_state_mut();
1422 let thread = state.guest_thread;
1423 let instance = if let Some(thread) = thread {
1424 Some(state.get_mut(thread.task)?.instance)
1425 } else {
1426 None
1427 };
1428 let task = GuestTask::new(
1429 state,
1430 Box::new(move |_, _| unreachable!()),
1431 LiftResult {
1432 lift: Box::new(move |_, _| unreachable!()),
1433 ty: TypeTupleIndex::reserved_value(),
1434 memory: None,
1435 string_encoding: StringEncoding::Utf8,
1436 },
1437 if let Some(caller) = guest_caller {
1438 assert_eq!(caller, instance.unwrap());
1439 Caller::Guest {
1440 thread: thread.unwrap(),
1441 }
1442 } else {
1443 Caller::Host {
1444 tx: None,
1445 exit_tx: Arc::new(oneshot::channel().0),
1446 host_future_present: false,
1447 call_post_return_automatically: false,
1448 caller: state.guest_thread,
1449 }
1450 },
1451 None,
1452 callee,
1453 callee_async,
1454 )?;
1455
1456 let guest_task = state.push(task)?;
1457 let new_thread = GuestThread::new_implicit(guest_task);
1458 let guest_thread = state.push(new_thread)?;
1459 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1460 guest_thread,
1461 self,
1462 callee.index,
1463 )?;
1464
1465 let state = self.concurrent_state_mut();
1466 state.get_mut(guest_task)?.threads.insert(guest_thread);
1467 if guest_caller.is_some() {
1468 let thread = state.guest_thread.unwrap();
1469 state.get_mut(thread.task)?.subtasks.insert(guest_task);
1470 }
1471
1472 self.set_thread(Some(QualifiedThreadId {
1473 task: guest_task,
1474 thread: guest_thread,
1475 }));
1476
1477 Ok(())
1478 }
1479
1480 pub(crate) fn exit_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1482 let thread = self.set_thread(None).unwrap();
1483 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1484 log::trace!("exit sync call {instance:?}");
1485 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1486 self,
1487 thread,
1488 instance.index,
1489 )?;
1490
1491 let state = self.concurrent_state_mut();
1492 let task = state.get_mut(thread.task)?;
1493 let caller = match &task.caller {
1494 &Caller::Guest { thread } => {
1495 assert!(guest_caller);
1496 Some(thread)
1497 }
1498 &Caller::Host { caller, .. } => {
1499 assert!(!guest_caller);
1500 caller
1501 }
1502 };
1503 self.set_thread(caller);
1504
1505 let state = self.concurrent_state_mut();
1506 let task = state.get_mut(thread.task)?;
1507 if task.ready_to_delete() {
1508 state.delete(thread.task)?.dispose(state, thread.task)?;
1509 }
1510
1511 Ok(())
1512 }
1513
1514 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1522 if !self.concurrency_support() {
1523 return self.may_enter_at_all(instance);
1524 }
1525 let state = self.concurrent_state_mut();
1526 if let Some(caller) = state.guest_thread {
1527 instance != state.get_mut(caller.task).unwrap().instance
1528 && self.may_enter_from_caller(caller.task, instance)
1529 } else {
1530 self.may_enter_at_all(instance)
1531 }
1532 }
1533
1534 fn may_enter_task(&mut self, task: TableId<GuestTask>) -> bool {
1537 let instance = self.concurrent_state_mut().get_mut(task).unwrap().instance;
1538 self.may_enter_from_caller(task, instance)
1539 }
1540
1541 fn may_enter_from_caller(
1544 &mut self,
1545 mut guest_task: TableId<GuestTask>,
1546 instance: RuntimeInstance,
1547 ) -> bool {
1548 self.may_enter_at_all(instance) && {
1549 let state = self.concurrent_state_mut();
1550 let guest_instance = instance.instance;
1551 loop {
1552 let next_thread = match &state.get_mut(guest_task).unwrap().caller {
1558 Caller::Host { caller: None, .. } => break true,
1559 &Caller::Host {
1560 caller: Some(caller),
1561 ..
1562 } => {
1563 let instance = state.get_mut(caller.task).unwrap().instance;
1564 if instance.instance == guest_instance {
1565 break false;
1566 } else {
1567 caller
1568 }
1569 }
1570 &Caller::Guest { thread } => {
1571 if state.get_mut(thread.task).unwrap().instance.instance == guest_instance {
1572 break false;
1573 } else {
1574 thread
1575 }
1576 }
1577 };
1578 guest_task = next_thread.task;
1579 }
1580 }
1581 }
1582
1583 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1586 self.component_instance_mut(instance.instance)
1587 .instance_state(instance.index)
1588 .concurrent_state()
1589 }
1590
1591 fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1594 self.component_instance_mut(instance.instance)
1595 .instance_state(instance.index)
1596 .handle_table()
1597 }
1598
1599 fn set_thread(&mut self, thread: Option<QualifiedThreadId>) -> Option<QualifiedThreadId> {
1600 let state = self.concurrent_state_mut();
1605 let old_thread = state.guest_thread.take();
1606 if let Some(old_thread) = old_thread {
1607 let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1608 self.component_instance_mut(instance)
1609 .set_task_may_block(false)
1610 }
1611
1612 self.concurrent_state_mut().guest_thread = thread;
1613
1614 if thread.is_some() {
1617 self.set_task_may_block();
1618 }
1619
1620 old_thread
1621 }
1622
1623 fn set_task_may_block(&mut self) {
1626 let state = self.concurrent_state_mut();
1627 let guest_thread = state.guest_thread.unwrap();
1628 let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1629 let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1630 self.component_instance_mut(instance)
1631 .set_task_may_block(may_block)
1632 }
1633
1634 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1635 if !self.concurrency_support() {
1636 return Ok(());
1637 }
1638 let state = self.concurrent_state_mut();
1639 let task = state.guest_thread.unwrap().task;
1640 let instance = state.get_mut(task).unwrap().instance.instance;
1641 let task_may_block = self.component_instance(instance).get_task_may_block();
1642
1643 if task_may_block {
1644 Ok(())
1645 } else {
1646 Err(Trap::CannotBlockSyncTask.into())
1647 }
1648 }
1649
1650 fn enter_instance(&mut self, instance: RuntimeInstance) {
1654 log::trace!("enter {instance:?}");
1655 self.instance_state(instance).do_not_enter = true;
1656 }
1657
1658 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1662 log::trace!("exit {instance:?}");
1663 self.instance_state(instance).do_not_enter = false;
1664 self.partition_pending(instance)
1665 }
1666
1667 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1672 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1673 let call = GuestCall { thread, kind };
1674 if call.is_ready(self)? {
1675 self.concurrent_state_mut()
1676 .push_high_priority(WorkItem::GuestCall(call));
1677 } else {
1678 self.instance_state(instance)
1679 .pending
1680 .insert(call.thread, call.kind);
1681 }
1682 }
1683
1684 Ok(())
1685 }
1686
1687 pub(crate) fn backpressure_modify(
1689 &mut self,
1690 caller_instance: RuntimeInstance,
1691 modify: impl FnOnce(u16) -> Option<u16>,
1692 ) -> Result<()> {
1693 let state = self.instance_state(caller_instance);
1694 let old = state.backpressure;
1695 let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1696 state.backpressure = new;
1697
1698 if old > 0 && new == 0 {
1699 self.partition_pending(caller_instance)?;
1702 }
1703
1704 Ok(())
1705 }
1706
1707 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1710 let old_thread = self.concurrent_state_mut().guest_thread;
1711 log::trace!("resume_fiber: save current thread {old_thread:?}");
1712
1713 let fiber = fiber::resolve_or_release(self, fiber).await?;
1714
1715 self.set_thread(old_thread);
1716
1717 let state = self.concurrent_state_mut();
1718
1719 if let Some(ref ot) = old_thread {
1720 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1721 }
1722 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1723
1724 if let Some(mut fiber) = fiber {
1725 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1726 match state.suspend_reason.take().unwrap() {
1728 SuspendReason::NeedWork => {
1729 if state.worker.is_none() {
1730 state.worker = Some(fiber);
1731 } else {
1732 fiber.dispose(self);
1733 }
1734 }
1735 SuspendReason::Yielding { thread, .. } => {
1736 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1737 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1738 }
1739 SuspendReason::ExplicitlySuspending { thread, .. } => {
1740 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1741 }
1742 SuspendReason::Waiting { set, thread, .. } => {
1743 let old = state
1744 .get_mut(set)?
1745 .waiting
1746 .insert(thread, WaitMode::Fiber(fiber));
1747 assert!(old.is_none());
1748 }
1749 };
1750 } else {
1751 log::trace!("resume_fiber: fiber has exited");
1752 }
1753
1754 Ok(())
1755 }
1756
1757 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1763 log::trace!("suspend fiber: {reason:?}");
1764
1765 let task = match &reason {
1769 SuspendReason::Yielding { thread, .. }
1770 | SuspendReason::Waiting { thread, .. }
1771 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1772 SuspendReason::NeedWork => None,
1773 };
1774
1775 let old_guest_thread = if let Some(task) = task {
1776 self.maybe_pop_call_context(task)?;
1777 self.concurrent_state_mut().guest_thread
1778 } else {
1779 None
1780 };
1781
1782 assert!(
1788 matches!(
1789 reason,
1790 SuspendReason::ExplicitlySuspending {
1791 skip_may_block_check: true,
1792 ..
1793 } | SuspendReason::Waiting {
1794 skip_may_block_check: true,
1795 ..
1796 } | SuspendReason::Yielding {
1797 skip_may_block_check: true,
1798 ..
1799 }
1800 ) || old_guest_thread
1801 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1802 .unwrap_or(true)
1803 );
1804
1805 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1806 assert!(suspend_reason.is_none());
1807 *suspend_reason = Some(reason);
1808
1809 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1810
1811 if let Some(task) = task {
1812 self.set_thread(old_guest_thread);
1813 self.maybe_push_call_context(task)?;
1814 }
1815
1816 Ok(())
1817 }
1818
1819 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1823 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1824
1825 if !task.returned_or_cancelled() {
1826 log::trace!("push call context for {guest_task:?}");
1827 let call_context = task.call_context.take().unwrap();
1828 self.component_resource_state().0.push(call_context);
1829 }
1830 Ok(())
1831 }
1832
1833 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1837 if !self
1838 .concurrent_state_mut()
1839 .get_mut(guest_task)?
1840 .returned_or_cancelled()
1841 {
1842 log::trace!("pop call context for {guest_task:?}");
1843 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1844 self.concurrent_state_mut()
1845 .get_mut(guest_task)?
1846 .call_context = call_context;
1847 }
1848 Ok(())
1849 }
1850
1851 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1852 let state = self.concurrent_state_mut();
1853 let caller = state.guest_thread.unwrap();
1854 let old_set = waitable.common(state)?.set;
1855 let set = state.get_mut(caller.task)?.sync_call_set;
1856 waitable.join(state, Some(set))?;
1857 self.suspend(SuspendReason::Waiting {
1858 set,
1859 thread: caller,
1860 skip_may_block_check: false,
1861 })?;
1862 let state = self.concurrent_state_mut();
1863 waitable.join(state, old_set)
1864 }
1865}
1866
1867impl Instance {
1868 fn check_may_leave(
1869 self,
1870 store: &mut StoreOpaque,
1871 caller: RuntimeComponentInstanceIndex,
1872 ) -> Result<()> {
1873 store.check_may_leave(RuntimeInstance {
1874 instance: self.id().instance(),
1875 index: caller,
1876 })
1877 }
1878
1879 fn get_event(
1882 self,
1883 store: &mut StoreOpaque,
1884 guest_task: TableId<GuestTask>,
1885 set: Option<TableId<WaitableSet>>,
1886 cancellable: bool,
1887 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1888 let state = store.concurrent_state_mut();
1889
1890 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1891 log::trace!("deliver event {event:?} to {guest_task:?}");
1892
1893 if cancellable || !matches!(event, Event::Cancelled) {
1894 return Ok(Some((event, None)));
1895 } else {
1896 state.get_mut(guest_task)?.event = Some(event);
1897 }
1898 }
1899
1900 Ok(
1901 if let Some((set, waitable)) = set
1902 .and_then(|set| {
1903 state
1904 .get_mut(set)
1905 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1906 .transpose()
1907 })
1908 .transpose()?
1909 {
1910 let common = waitable.common(state)?;
1911 let handle = common.handle.unwrap();
1912 let event = common.event.take().unwrap();
1913
1914 log::trace!(
1915 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1916 );
1917
1918 waitable.on_delivery(store, self, event);
1919
1920 Some((event, Some((waitable, handle))))
1921 } else {
1922 None
1923 },
1924 )
1925 }
1926
1927 fn handle_callback_code(
1933 self,
1934 store: &mut StoreOpaque,
1935 guest_thread: QualifiedThreadId,
1936 runtime_instance: RuntimeComponentInstanceIndex,
1937 code: u32,
1938 ) -> Result<Option<GuestCall>> {
1939 let (code, set) = unpack_callback_code(code);
1940
1941 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1942
1943 let state = store.concurrent_state_mut();
1944
1945 let get_set = |store: &mut StoreOpaque, handle| {
1946 if handle == 0 {
1947 bail!("invalid waitable-set handle");
1948 }
1949
1950 let set = store
1951 .handle_table(RuntimeInstance {
1952 instance: self.id().instance(),
1953 index: runtime_instance,
1954 })
1955 .waitable_set_rep(handle)?;
1956
1957 Ok(TableId::<WaitableSet>::new(set))
1958 };
1959
1960 Ok(match code {
1961 callback_code::EXIT => {
1962 log::trace!("implicit thread {guest_thread:?} completed");
1963 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1964 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1965 if task.threads.is_empty() && !task.returned_or_cancelled() {
1966 bail!(Trap::NoAsyncResult);
1967 }
1968 match &task.caller {
1969 Caller::Host { .. } => {
1970 if task.ready_to_delete() {
1971 Waitable::Guest(guest_thread.task)
1972 .delete_from(store.concurrent_state_mut())?;
1973 }
1974 }
1975 Caller::Guest { .. } => {
1976 task.exited = true;
1977 task.callback = None;
1978 }
1979 }
1980 None
1981 }
1982 callback_code::YIELD => {
1983 let task = state.get_mut(guest_thread.task)?;
1984 if let Some(event) = task.event {
1989 assert!(matches!(event, Event::None | Event::Cancelled));
1990 } else {
1991 task.event = Some(Event::None);
1992 }
1993 let call = GuestCall {
1994 thread: guest_thread,
1995 kind: GuestCallKind::DeliverEvent {
1996 instance: self,
1997 set: None,
1998 },
1999 };
2000 if state.may_block(guest_thread.task) {
2001 state.push_low_priority(WorkItem::GuestCall(call));
2004 None
2005 } else {
2006 Some(call)
2010 }
2011 }
2012 callback_code::WAIT => {
2013 state.check_blocking_for(guest_thread.task)?;
2016
2017 let set = get_set(store, set)?;
2018 let state = store.concurrent_state_mut();
2019
2020 if state.get_mut(guest_thread.task)?.event.is_some()
2021 || !state.get_mut(set)?.ready.is_empty()
2022 {
2023 state.push_high_priority(WorkItem::GuestCall(GuestCall {
2025 thread: guest_thread,
2026 kind: GuestCallKind::DeliverEvent {
2027 instance: self,
2028 set: Some(set),
2029 },
2030 }));
2031 } else {
2032 let old = state
2040 .get_mut(guest_thread.thread)?
2041 .wake_on_cancel
2042 .replace(set);
2043 assert!(old.is_none());
2044 let old = state
2045 .get_mut(set)?
2046 .waiting
2047 .insert(guest_thread, WaitMode::Callback(self));
2048 assert!(old.is_none());
2049 }
2050 None
2051 }
2052 _ => bail!("unsupported callback code: {code}"),
2053 })
2054 }
2055
2056 fn cleanup_thread(
2057 self,
2058 store: &mut StoreOpaque,
2059 guest_thread: QualifiedThreadId,
2060 runtime_instance: RuntimeComponentInstanceIndex,
2061 ) -> Result<()> {
2062 let guest_id = store
2063 .concurrent_state_mut()
2064 .get_mut(guest_thread.thread)?
2065 .instance_rep;
2066 store
2067 .handle_table(RuntimeInstance {
2068 instance: self.id().instance(),
2069 index: runtime_instance,
2070 })
2071 .guest_thread_remove(guest_id.unwrap())?;
2072
2073 store.concurrent_state_mut().delete(guest_thread.thread)?;
2074 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2075 task.threads.remove(&guest_thread.thread);
2076 Ok(())
2077 }
2078
2079 unsafe fn queue_call<T: 'static>(
2086 self,
2087 mut store: StoreContextMut<T>,
2088 guest_thread: QualifiedThreadId,
2089 callee: SendSyncPtr<VMFuncRef>,
2090 param_count: usize,
2091 result_count: usize,
2092 async_: bool,
2093 callback: Option<SendSyncPtr<VMFuncRef>>,
2094 post_return: Option<SendSyncPtr<VMFuncRef>>,
2095 ) -> Result<()> {
2096 unsafe fn make_call<T: 'static>(
2111 store: StoreContextMut<T>,
2112 guest_thread: QualifiedThreadId,
2113 callee: SendSyncPtr<VMFuncRef>,
2114 param_count: usize,
2115 result_count: usize,
2116 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2117 + Send
2118 + Sync
2119 + 'static
2120 + use<T> {
2121 let token = StoreToken::new(store);
2122 move |store: &mut dyn VMStore| {
2123 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2124
2125 store
2126 .concurrent_state_mut()
2127 .get_mut(guest_thread.thread)?
2128 .state = GuestThreadState::Running;
2129 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2130 let lower = task.lower_params.take().unwrap();
2131
2132 lower(store, &mut storage[..param_count])?;
2133
2134 let mut store = token.as_context_mut(store);
2135
2136 unsafe {
2139 crate::Func::call_unchecked_raw(
2140 &mut store,
2141 callee.as_non_null(),
2142 NonNull::new(
2143 &mut storage[..param_count.max(result_count)]
2144 as *mut [MaybeUninit<ValRaw>] as _,
2145 )
2146 .unwrap(),
2147 )?;
2148 }
2149
2150 Ok(storage)
2151 }
2152 }
2153
2154 let call = unsafe {
2158 make_call(
2159 store.as_context_mut(),
2160 guest_thread,
2161 callee,
2162 param_count,
2163 result_count,
2164 )
2165 };
2166
2167 let callee_instance = store
2168 .0
2169 .concurrent_state_mut()
2170 .get_mut(guest_thread.task)?
2171 .instance;
2172
2173 let fun = if callback.is_some() {
2174 assert!(async_);
2175
2176 Box::new(move |store: &mut dyn VMStore| {
2177 self.add_guest_thread_to_instance_table(
2178 guest_thread.thread,
2179 store,
2180 callee_instance.index,
2181 )?;
2182 let old_thread = store.set_thread(Some(guest_thread));
2183 log::trace!(
2184 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2185 );
2186
2187 store.maybe_push_call_context(guest_thread.task)?;
2188
2189 store.enter_instance(callee_instance);
2190
2191 let storage = call(store)?;
2198
2199 store.exit_instance(callee_instance)?;
2200
2201 store.maybe_pop_call_context(guest_thread.task)?;
2202
2203 store.set_thread(old_thread);
2204 let state = store.concurrent_state_mut();
2205 old_thread
2206 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2207 log::trace!("stackless call: restored {old_thread:?} as current thread");
2208
2209 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2212
2213 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2214 })
2215 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2216 } else {
2217 let token = StoreToken::new(store.as_context_mut());
2218 Box::new(move |store: &mut dyn VMStore| {
2219 self.add_guest_thread_to_instance_table(
2220 guest_thread.thread,
2221 store,
2222 callee_instance.index,
2223 )?;
2224 let old_thread = store.set_thread(Some(guest_thread));
2225 log::trace!(
2226 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2227 );
2228 let mut flags = self.id().get(store).instance_flags(callee_instance.index);
2229
2230 store.maybe_push_call_context(guest_thread.task)?;
2231
2232 if !async_ {
2236 store.enter_instance(callee_instance);
2237 }
2238
2239 let storage = call(store)?;
2246
2247 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2249
2250 if async_ {
2251 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2252 if task.threads.is_empty() && !task.returned_or_cancelled() {
2253 bail!(Trap::NoAsyncResult);
2254 }
2255 } else {
2256 let lift = {
2262 store.exit_instance(callee_instance)?;
2263
2264 let state = store.concurrent_state_mut();
2265 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2266
2267 state
2268 .get_mut(guest_thread.task)?
2269 .lift_result
2270 .take()
2271 .unwrap()
2272 };
2273
2274 let result = (lift.lift)(store, unsafe {
2277 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2278 &storage[..result_count],
2279 )
2280 })?;
2281
2282 let post_return_arg = match result_count {
2283 0 => ValRaw::i32(0),
2284 1 => unsafe { storage[0].assume_init() },
2287 _ => unreachable!(),
2288 };
2289
2290 if store
2291 .concurrent_state_mut()
2292 .get_mut(guest_thread.task)?
2293 .call_post_return_automatically()
2294 {
2295 unsafe {
2296 flags.set_may_leave(false);
2297 flags.set_needs_post_return(false);
2298 }
2299
2300 if let Some(func) = post_return {
2301 let mut store = token.as_context_mut(store);
2302
2303 unsafe {
2309 crate::Func::call_unchecked_raw(
2310 &mut store,
2311 func.as_non_null(),
2312 slice::from_ref(&post_return_arg).into(),
2313 )?;
2314 }
2315 }
2316
2317 unsafe {
2318 flags.set_may_leave(true);
2319 }
2320 }
2321
2322 self.task_complete(
2323 store,
2324 guest_thread.task,
2325 result,
2326 Status::Returned,
2327 post_return_arg,
2328 )?;
2329 }
2330
2331 store.set_thread(old_thread);
2332
2333 store.maybe_pop_call_context(guest_thread.task)?;
2334
2335 let state = store.concurrent_state_mut();
2336 let task = state.get_mut(guest_thread.task)?;
2337
2338 match &task.caller {
2339 Caller::Host { .. } => {
2340 if task.ready_to_delete() {
2341 Waitable::Guest(guest_thread.task).delete_from(state)?;
2342 }
2343 }
2344 Caller::Guest { .. } => {
2345 task.exited = true;
2346 }
2347 }
2348
2349 Ok(None)
2350 })
2351 };
2352
2353 store
2354 .0
2355 .concurrent_state_mut()
2356 .push_high_priority(WorkItem::GuestCall(GuestCall {
2357 thread: guest_thread,
2358 kind: GuestCallKind::StartImplicit(fun),
2359 }));
2360
2361 Ok(())
2362 }
2363
2364 unsafe fn prepare_call<T: 'static>(
2377 self,
2378 mut store: StoreContextMut<T>,
2379 start: *mut VMFuncRef,
2380 return_: *mut VMFuncRef,
2381 caller_instance: RuntimeComponentInstanceIndex,
2382 callee_instance: RuntimeComponentInstanceIndex,
2383 task_return_type: TypeTupleIndex,
2384 callee_async: bool,
2385 memory: *mut VMMemoryDefinition,
2386 string_encoding: u8,
2387 caller_info: CallerInfo,
2388 ) -> Result<()> {
2389 self.check_may_leave(store.0, caller_instance)?;
2390
2391 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2392 store.0.check_blocking()?;
2396 }
2397
2398 enum ResultInfo {
2399 Heap { results: u32 },
2400 Stack { result_count: u32 },
2401 }
2402
2403 let result_info = match &caller_info {
2404 CallerInfo::Async {
2405 has_result: true,
2406 params,
2407 } => ResultInfo::Heap {
2408 results: params.last().unwrap().get_u32(),
2409 },
2410 CallerInfo::Async {
2411 has_result: false, ..
2412 } => ResultInfo::Stack { result_count: 0 },
2413 CallerInfo::Sync {
2414 result_count,
2415 params,
2416 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2417 results: params.last().unwrap().get_u32(),
2418 },
2419 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2420 result_count: *result_count,
2421 },
2422 };
2423
2424 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2425
2426 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2430 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2431 let token = StoreToken::new(store.as_context_mut());
2432 let state = store.0.concurrent_state_mut();
2433 let old_thread = state.guest_thread.unwrap();
2434
2435 assert_eq!(
2436 state.get_mut(old_thread.task)?.instance,
2437 RuntimeInstance {
2438 instance: self.id().instance(),
2439 index: caller_instance,
2440 }
2441 );
2442
2443 let new_task = GuestTask::new(
2444 state,
2445 Box::new(move |store, dst| {
2446 let mut store = token.as_context_mut(store);
2447 assert!(dst.len() <= MAX_FLAT_PARAMS);
2448 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2450 let count = match caller_info {
2451 CallerInfo::Async { params, has_result } => {
2455 let params = ¶ms[..params.len() - usize::from(has_result)];
2456 for (param, src) in params.iter().zip(&mut src) {
2457 src.write(*param);
2458 }
2459 params.len()
2460 }
2461
2462 CallerInfo::Sync { params, .. } => {
2464 for (param, src) in params.iter().zip(&mut src) {
2465 src.write(*param);
2466 }
2467 params.len()
2468 }
2469 };
2470 unsafe {
2477 crate::Func::call_unchecked_raw(
2478 &mut store,
2479 start.as_non_null(),
2480 NonNull::new(
2481 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2482 )
2483 .unwrap(),
2484 )?;
2485 }
2486 dst.copy_from_slice(&src[..dst.len()]);
2487 let state = store.0.concurrent_state_mut();
2488 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2489 state,
2490 Some(Event::Subtask {
2491 status: Status::Started,
2492 }),
2493 )?;
2494 Ok(())
2495 }),
2496 LiftResult {
2497 lift: Box::new(move |store, src| {
2498 let mut store = token.as_context_mut(store);
2501 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2503 my_src.push(ValRaw::u32(*results));
2504 }
2505 unsafe {
2512 crate::Func::call_unchecked_raw(
2513 &mut store,
2514 return_.as_non_null(),
2515 my_src.as_mut_slice().into(),
2516 )?;
2517 }
2518 let state = store.0.concurrent_state_mut();
2519 let thread = state.guest_thread.unwrap();
2520 if sync_caller {
2521 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2522 if let ResultInfo::Stack { result_count } = &result_info {
2523 match result_count {
2524 0 => None,
2525 1 => Some(my_src[0]),
2526 _ => unreachable!(),
2527 }
2528 } else {
2529 None
2530 },
2531 );
2532 }
2533 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2534 }),
2535 ty: task_return_type,
2536 memory: NonNull::new(memory).map(SendSyncPtr::new),
2537 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2538 },
2539 Caller::Guest { thread: old_thread },
2540 None,
2541 RuntimeInstance {
2542 instance: self.id().instance(),
2543 index: callee_instance,
2544 },
2545 callee_async,
2546 )?;
2547
2548 let guest_task = state.push(new_task)?;
2549 let new_thread = GuestThread::new_implicit(guest_task);
2550 let guest_thread = state.push(new_thread)?;
2551 state.get_mut(guest_task)?.threads.insert(guest_thread);
2552
2553 store
2554 .0
2555 .concurrent_state_mut()
2556 .get_mut(old_thread.task)?
2557 .subtasks
2558 .insert(guest_task);
2559
2560 store.0.set_thread(Some(QualifiedThreadId {
2563 task: guest_task,
2564 thread: guest_thread,
2565 }));
2566 log::trace!(
2567 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2568 );
2569
2570 Ok(())
2571 }
2572
2573 unsafe fn call_callback<T>(
2578 self,
2579 mut store: StoreContextMut<T>,
2580 function: SendSyncPtr<VMFuncRef>,
2581 event: Event,
2582 handle: u32,
2583 ) -> Result<u32> {
2584 let (ordinal, result) = event.parts();
2585 let params = &mut [
2586 ValRaw::u32(ordinal),
2587 ValRaw::u32(handle),
2588 ValRaw::u32(result),
2589 ];
2590 unsafe {
2595 crate::Func::call_unchecked_raw(
2596 &mut store,
2597 function.as_non_null(),
2598 params.as_mut_slice().into(),
2599 )?;
2600 }
2601 Ok(params[0].get_u32())
2602 }
2603
2604 unsafe fn start_call<T: 'static>(
2617 self,
2618 mut store: StoreContextMut<T>,
2619 callback: *mut VMFuncRef,
2620 post_return: *mut VMFuncRef,
2621 callee: *mut VMFuncRef,
2622 param_count: u32,
2623 result_count: u32,
2624 flags: u32,
2625 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2626 ) -> Result<u32> {
2627 let token = StoreToken::new(store.as_context_mut());
2628 let async_caller = storage.is_none();
2629 let state = store.0.concurrent_state_mut();
2630 let guest_thread = state.guest_thread.unwrap();
2631 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2632 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2633 let param_count = usize::try_from(param_count).unwrap();
2634 assert!(param_count <= MAX_FLAT_PARAMS);
2635 let result_count = usize::try_from(result_count).unwrap();
2636 assert!(result_count <= MAX_FLAT_RESULTS);
2637
2638 let task = state.get_mut(guest_thread.task)?;
2639 if !callback.is_null() {
2640 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2644 task.callback = Some(Box::new(move |store, event, handle| {
2645 let store = token.as_context_mut(store);
2646 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2647 }));
2648 }
2649
2650 let Caller::Guest { thread: caller } = &task.caller else {
2651 unreachable!()
2654 };
2655 let caller = *caller;
2656 let caller_instance = state.get_mut(caller.task)?.instance;
2657
2658 unsafe {
2660 self.queue_call(
2661 store.as_context_mut(),
2662 guest_thread,
2663 callee,
2664 param_count,
2665 result_count,
2666 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2667 NonNull::new(callback).map(SendSyncPtr::new),
2668 NonNull::new(post_return).map(SendSyncPtr::new),
2669 )?;
2670 }
2671
2672 let state = store.0.concurrent_state_mut();
2673
2674 let guest_waitable = Waitable::Guest(guest_thread.task);
2677 let old_set = guest_waitable.common(state)?.set;
2678 let set = state.get_mut(caller.task)?.sync_call_set;
2679 guest_waitable.join(state, Some(set))?;
2680
2681 let (status, waitable) = loop {
2697 store.0.suspend(SuspendReason::Waiting {
2698 set,
2699 thread: caller,
2700 skip_may_block_check: async_caller || !callee_async,
2708 })?;
2709
2710 let state = store.0.concurrent_state_mut();
2711
2712 log::trace!("taking event for {:?}", guest_thread.task);
2713 let event = guest_waitable.take_event(state)?;
2714 let Some(Event::Subtask { status }) = event else {
2715 unreachable!();
2716 };
2717
2718 log::trace!("status {status:?} for {:?}", guest_thread.task);
2719
2720 if status == Status::Returned {
2721 break (status, None);
2723 } else if async_caller {
2724 let handle = store
2728 .0
2729 .handle_table(caller_instance)
2730 .subtask_insert_guest(guest_thread.task.rep())?;
2731 store
2732 .0
2733 .concurrent_state_mut()
2734 .get_mut(guest_thread.task)?
2735 .common
2736 .handle = Some(handle);
2737 break (status, Some(handle));
2738 } else {
2739 }
2743 };
2744
2745 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2746
2747 store.0.set_thread(Some(caller));
2749 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2750 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2751
2752 if let Some(storage) = storage {
2753 let state = store.0.concurrent_state_mut();
2757 let task = state.get_mut(guest_thread.task)?;
2758 if let Some(result) = task.sync_result.take() {
2759 if let Some(result) = result {
2760 storage[0] = MaybeUninit::new(result);
2761 }
2762
2763 if task.exited && task.ready_to_delete() {
2764 Waitable::Guest(guest_thread.task).delete_from(state)?;
2765 }
2766 }
2767 }
2768
2769 Ok(status.pack(waitable))
2770 }
2771
2772 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2784 self,
2785 mut store: StoreContextMut<'_, T>,
2786 future: impl Future<Output = Result<R>> + Send + 'static,
2787 caller_instance: RuntimeComponentInstanceIndex,
2788 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2789 ) -> Result<Option<u32>> {
2790 let token = StoreToken::new(store.as_context_mut());
2791 let state = store.0.concurrent_state_mut();
2792 let caller = state.guest_thread.unwrap();
2793
2794 let (join_handle, future) = JoinHandle::run(async move {
2797 let mut future = pin!(future);
2798 let mut call_context = None;
2799 future::poll_fn(move |cx| {
2800 tls::get(|store| {
2803 if let Some(call_context) = call_context.take() {
2804 token
2805 .as_context_mut(store)
2806 .0
2807 .component_resource_state()
2808 .0
2809 .push(call_context);
2810 }
2811 });
2812
2813 let result = future.as_mut().poll(cx);
2814
2815 if result.is_pending() {
2816 tls::get(|store| {
2819 call_context = Some(
2820 token
2821 .as_context_mut(store)
2822 .0
2823 .component_resource_state()
2824 .0
2825 .pop()
2826 .unwrap(),
2827 );
2828 });
2829 }
2830 result
2831 })
2832 .await
2833 });
2834
2835 let task = state.push(HostTask::new(
2839 RuntimeInstance {
2840 instance: self.id().instance(),
2841 index: caller_instance,
2842 },
2843 Some(join_handle),
2844 ))?;
2845
2846 log::trace!("new host task child of {caller:?}: {task:?}");
2847
2848 let mut future = Box::pin(future);
2849
2850 let poll = tls::set(store.0, || {
2855 future
2856 .as_mut()
2857 .poll(&mut Context::from_waker(&Waker::noop()))
2858 });
2859
2860 Ok(match poll {
2861 Poll::Ready(None) => unreachable!(),
2862 Poll::Ready(Some(result)) => {
2863 lower(store.as_context_mut(), result?)?;
2866 log::trace!("delete host task {task:?} (already ready)");
2867 store.0.concurrent_state_mut().delete(task)?;
2868 None
2869 }
2870 Poll::Pending => {
2871 let future =
2879 Box::pin(async move {
2880 let result = match future.await {
2881 Some(result) => result?,
2882 None => return Ok(()),
2884 };
2885 tls::get(move |store| {
2886 store.concurrent_state_mut().push_high_priority(
2892 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2893 lower(token.as_context_mut(store), result)?;
2894 let state = store.concurrent_state_mut();
2895 state.get_mut(task)?.join_handle.take();
2896 Waitable::Host(task).set_event(
2897 state,
2898 Some(Event::Subtask {
2899 status: Status::Returned,
2900 }),
2901 )
2902 }))),
2903 );
2904 Ok(())
2905 })
2906 });
2907
2908 store.0.concurrent_state_mut().push_future(future);
2909 let handle = store
2910 .0
2911 .handle_table(RuntimeInstance {
2912 instance: self.id().instance(),
2913 index: caller_instance,
2914 })
2915 .subtask_insert_host(task.rep())?;
2916 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2917 log::trace!(
2918 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2919 );
2920 Some(handle)
2921 }
2922 })
2923 }
2924
2925 pub(crate) fn task_return(
2928 self,
2929 store: &mut dyn VMStore,
2930 caller: RuntimeComponentInstanceIndex,
2931 ty: TypeTupleIndex,
2932 options: OptionsIndex,
2933 storage: &[ValRaw],
2934 ) -> Result<()> {
2935 self.check_may_leave(store, caller)?;
2936
2937 let state = store.concurrent_state_mut();
2938 let guest_thread = state.guest_thread.unwrap();
2939 let lift = state
2940 .get_mut(guest_thread.task)?
2941 .lift_result
2942 .take()
2943 .ok_or_else(|| {
2944 format_err!("`task.return` or `task.cancel` called more than once for current task")
2945 })?;
2946 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2947
2948 let CanonicalOptions {
2949 string_encoding,
2950 data_model,
2951 ..
2952 } = &self.id().get(store).component().env_component().options[options];
2953
2954 let invalid = ty != lift.ty
2955 || string_encoding != &lift.string_encoding
2956 || match data_model {
2957 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2958 Some(memory) => {
2959 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2960 let actual = self.id().get(store).runtime_memory(memory);
2961 expected != actual.as_ptr()
2962 }
2963 None => false,
2966 },
2967 CanonicalOptionsDataModel::Gc { .. } => true,
2969 };
2970
2971 if invalid {
2972 bail!("invalid `task.return` signature and/or options for current task");
2973 }
2974
2975 log::trace!("task.return for {guest_thread:?}");
2976
2977 let result = (lift.lift)(store, storage)?;
2978 self.task_complete(
2979 store,
2980 guest_thread.task,
2981 result,
2982 Status::Returned,
2983 ValRaw::i32(0),
2984 )
2985 }
2986
2987 pub(crate) fn task_cancel(
2989 self,
2990 store: &mut StoreOpaque,
2991 caller: RuntimeComponentInstanceIndex,
2992 ) -> Result<()> {
2993 self.check_may_leave(store, caller)?;
2994
2995 let state = store.concurrent_state_mut();
2996 let guest_thread = state.guest_thread.unwrap();
2997 let task = state.get_mut(guest_thread.task)?;
2998 if !task.cancel_sent {
2999 bail!("`task.cancel` called by task which has not been cancelled")
3000 }
3001 _ = task.lift_result.take().ok_or_else(|| {
3002 format_err!("`task.return` or `task.cancel` called more than once for current task")
3003 })?;
3004
3005 assert!(task.result.is_none());
3006
3007 log::trace!("task.cancel for {guest_thread:?}");
3008
3009 self.task_complete(
3010 store,
3011 guest_thread.task,
3012 Box::new(DummyResult),
3013 Status::ReturnCancelled,
3014 ValRaw::i32(0),
3015 )
3016 }
3017
3018 fn task_complete(
3024 self,
3025 store: &mut StoreOpaque,
3026 guest_task: TableId<GuestTask>,
3027 result: Box<dyn Any + Send + Sync>,
3028 status: Status,
3029 post_return_arg: ValRaw,
3030 ) -> Result<()> {
3031 if store
3032 .concurrent_state_mut()
3033 .get_mut(guest_task)?
3034 .call_post_return_automatically()
3035 {
3036 let (calls, host_table, _, instance) =
3037 store.component_resource_state_with_instance(self);
3038 ResourceTables {
3039 calls,
3040 host_table: Some(host_table),
3041 guest: Some(instance.instance_states()),
3042 }
3043 .exit_call()?;
3044 } else {
3045 let function_index = store
3050 .concurrent_state_mut()
3051 .get_mut(guest_task)?
3052 .function_index
3053 .unwrap();
3054 self.id()
3055 .get_mut(store)
3056 .post_return_arg_set(function_index, post_return_arg);
3057 }
3058
3059 let state = store.concurrent_state_mut();
3060 let task = state.get_mut(guest_task)?;
3061
3062 if let Caller::Host { tx, .. } = &mut task.caller {
3063 if let Some(tx) = tx.take() {
3064 _ = tx.send(result);
3065 }
3066 } else {
3067 task.result = Some(result);
3068 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3069 }
3070
3071 Ok(())
3072 }
3073
3074 pub(crate) fn waitable_set_new(
3076 self,
3077 store: &mut StoreOpaque,
3078 caller_instance: RuntimeComponentInstanceIndex,
3079 ) -> Result<u32> {
3080 self.check_may_leave(store, caller_instance)?;
3081
3082 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3083 let handle = store
3084 .handle_table(RuntimeInstance {
3085 instance: self.id().instance(),
3086 index: caller_instance,
3087 })
3088 .waitable_set_insert(set.rep())?;
3089 log::trace!("new waitable set {set:?} (handle {handle})");
3090 Ok(handle)
3091 }
3092
3093 pub(crate) fn waitable_set_drop(
3095 self,
3096 store: &mut StoreOpaque,
3097 caller_instance: RuntimeComponentInstanceIndex,
3098 set: u32,
3099 ) -> Result<()> {
3100 self.check_may_leave(store, caller_instance)?;
3101
3102 let rep = store
3103 .handle_table(RuntimeInstance {
3104 instance: self.id().instance(),
3105 index: caller_instance,
3106 })
3107 .waitable_set_remove(set)?;
3108
3109 log::trace!("drop waitable set {rep} (handle {set})");
3110
3111 let set = store
3112 .concurrent_state_mut()
3113 .delete(TableId::<WaitableSet>::new(rep))?;
3114
3115 if !set.waiting.is_empty() {
3116 bail!("cannot drop waitable set with waiters");
3117 }
3118
3119 Ok(())
3120 }
3121
3122 pub(crate) fn waitable_join(
3124 self,
3125 store: &mut StoreOpaque,
3126 caller_instance: RuntimeComponentInstanceIndex,
3127 waitable_handle: u32,
3128 set_handle: u32,
3129 ) -> Result<()> {
3130 self.check_may_leave(store, caller_instance)?;
3131
3132 let mut instance = self.id().get_mut(store);
3133 let waitable =
3134 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3135
3136 let set = if set_handle == 0 {
3137 None
3138 } else {
3139 let set = instance.instance_states().0[caller_instance]
3140 .handle_table()
3141 .waitable_set_rep(set_handle)?;
3142
3143 Some(TableId::<WaitableSet>::new(set))
3144 };
3145
3146 log::trace!(
3147 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3148 );
3149
3150 waitable.join(store.concurrent_state_mut(), set)
3151 }
3152
3153 pub(crate) fn subtask_drop(
3155 self,
3156 store: &mut StoreOpaque,
3157 caller_instance: RuntimeComponentInstanceIndex,
3158 task_id: u32,
3159 ) -> Result<()> {
3160 self.check_may_leave(store, caller_instance)?;
3161
3162 self.waitable_join(store, caller_instance, task_id, 0)?;
3163
3164 let (rep, is_host) = store
3165 .handle_table(RuntimeInstance {
3166 instance: self.id().instance(),
3167 index: caller_instance,
3168 })
3169 .subtask_remove(task_id)?;
3170
3171 let concurrent_state = store.concurrent_state_mut();
3172 let (waitable, expected_caller_instance, delete) = if is_host {
3173 let id = TableId::<HostTask>::new(rep);
3174 let task = concurrent_state.get_mut(id)?;
3175 if task.join_handle.is_some() {
3176 bail!("cannot drop a subtask which has not yet resolved");
3177 }
3178 (Waitable::Host(id), task.caller_instance, true)
3179 } else {
3180 let id = TableId::<GuestTask>::new(rep);
3181 let task = concurrent_state.get_mut(id)?;
3182 if task.lift_result.is_some() {
3183 bail!("cannot drop a subtask which has not yet resolved");
3184 }
3185 if let &Caller::Guest { thread } = &task.caller {
3186 (
3187 Waitable::Guest(id),
3188 concurrent_state.get_mut(thread.task)?.instance,
3189 concurrent_state.get_mut(id)?.exited,
3190 )
3191 } else {
3192 unreachable!()
3193 }
3194 };
3195
3196 waitable.common(concurrent_state)?.handle = None;
3197
3198 if waitable.take_event(concurrent_state)?.is_some() {
3199 bail!("cannot drop a subtask with an undelivered event");
3200 }
3201
3202 if delete {
3203 waitable.delete_from(concurrent_state)?;
3204 }
3205
3206 assert_eq!(
3210 expected_caller_instance,
3211 RuntimeInstance {
3212 instance: self.id().instance(),
3213 index: caller_instance
3214 }
3215 );
3216 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3217 Ok(())
3218 }
3219
3220 pub(crate) fn waitable_set_wait(
3222 self,
3223 store: &mut StoreOpaque,
3224 caller: RuntimeComponentInstanceIndex,
3225 options: OptionsIndex,
3226 set: u32,
3227 payload: u32,
3228 ) -> Result<u32> {
3229 self.check_may_leave(store, caller)?;
3230
3231 if !self.options(store, options).async_ {
3232 store.check_blocking()?;
3236 }
3237
3238 let &CanonicalOptions {
3239 cancellable,
3240 instance: caller_instance,
3241 ..
3242 } = &self.id().get(store).component().env_component().options[options];
3243 let rep = store
3244 .handle_table(RuntimeInstance {
3245 instance: self.id().instance(),
3246 index: caller_instance,
3247 })
3248 .waitable_set_rep(set)?;
3249
3250 self.waitable_check(
3251 store,
3252 cancellable,
3253 WaitableCheck::Wait,
3254 WaitableCheckParams {
3255 set: TableId::new(rep),
3256 options,
3257 payload,
3258 },
3259 )
3260 }
3261
3262 pub(crate) fn waitable_set_poll(
3264 self,
3265 store: &mut StoreOpaque,
3266 caller: RuntimeComponentInstanceIndex,
3267 options: OptionsIndex,
3268 set: u32,
3269 payload: u32,
3270 ) -> Result<u32> {
3271 self.check_may_leave(store, caller)?;
3272
3273 let &CanonicalOptions {
3274 cancellable,
3275 instance: caller_instance,
3276 ..
3277 } = &self.id().get(store).component().env_component().options[options];
3278 let rep = store
3279 .handle_table(RuntimeInstance {
3280 instance: self.id().instance(),
3281 index: caller_instance,
3282 })
3283 .waitable_set_rep(set)?;
3284
3285 self.waitable_check(
3286 store,
3287 cancellable,
3288 WaitableCheck::Poll,
3289 WaitableCheckParams {
3290 set: TableId::new(rep),
3291 options,
3292 payload,
3293 },
3294 )
3295 }
3296
3297 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3299 let thread_id = store.concurrent_state_mut().guest_thread.unwrap().thread;
3300 Ok(store
3302 .concurrent_state_mut()
3303 .get_mut(thread_id)?
3304 .instance_rep
3305 .unwrap())
3306 }
3307
3308 pub(crate) fn thread_new_indirect<T: 'static>(
3310 self,
3311 mut store: StoreContextMut<T>,
3312 runtime_instance: RuntimeComponentInstanceIndex,
3313 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3315 start_func_idx: u32,
3316 context: i32,
3317 ) -> Result<u32> {
3318 self.check_may_leave(store.0, runtime_instance)?;
3319
3320 log::trace!("creating new thread");
3321
3322 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3323 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3324 let callee = instance
3325 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3326 .ok_or_else(|| {
3327 format_err!("the start function index points to an uninitialized function")
3328 })?;
3329 if callee.type_index(store.0) != start_func_ty.type_index() {
3330 bail!(
3331 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3332 );
3333 }
3334
3335 let token = StoreToken::new(store.as_context_mut());
3336 let start_func = Box::new(
3337 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3338 let old_thread = store.set_thread(Some(guest_thread));
3339 log::trace!(
3340 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3341 );
3342
3343 store.maybe_push_call_context(guest_thread.task)?;
3344
3345 let mut store = token.as_context_mut(store);
3346 let mut params = [ValRaw::i32(context)];
3347 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3350
3351 store.0.maybe_pop_call_context(guest_thread.task)?;
3352
3353 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3354 log::trace!("explicit thread {guest_thread:?} completed");
3355 let state = store.0.concurrent_state_mut();
3356 let task = state.get_mut(guest_thread.task)?;
3357 if task.threads.is_empty() && !task.returned_or_cancelled() {
3358 bail!(Trap::NoAsyncResult);
3359 }
3360 store.0.set_thread(old_thread);
3361 let state = store.0.concurrent_state_mut();
3362 old_thread
3363 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3364 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3365 Waitable::Guest(guest_thread.task).delete_from(state)?;
3366 }
3367 log::trace!("thread start: restored {old_thread:?} as current thread");
3368
3369 Ok(())
3370 },
3371 );
3372
3373 let state = store.0.concurrent_state_mut();
3374 let current_thread = state.guest_thread.unwrap();
3375 let parent_task = current_thread.task;
3376
3377 let new_thread = GuestThread::new_explicit(parent_task, start_func);
3378 let thread_id = state.push(new_thread)?;
3379 state.get_mut(parent_task)?.threads.insert(thread_id);
3380
3381 log::trace!("new thread with id {thread_id:?} created");
3382
3383 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3384 }
3385
3386 pub(crate) fn resume_suspended_thread(
3387 self,
3388 store: &mut StoreOpaque,
3389 runtime_instance: RuntimeComponentInstanceIndex,
3390 thread_idx: u32,
3391 high_priority: bool,
3392 ) -> Result<()> {
3393 let thread_id =
3394 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3395 let state = store.concurrent_state_mut();
3396 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3397 let thread = state.get_mut(guest_thread.thread)?;
3398
3399 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3400 GuestThreadState::NotStartedExplicit(start_func) => {
3401 log::trace!("starting thread {guest_thread:?}");
3402 let guest_call = WorkItem::GuestCall(GuestCall {
3403 thread: guest_thread,
3404 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3405 start_func(store, guest_thread)
3406 })),
3407 });
3408 store
3409 .concurrent_state_mut()
3410 .push_work_item(guest_call, high_priority);
3411 }
3412 GuestThreadState::Suspended(fiber) => {
3413 log::trace!("resuming thread {thread_id:?} that was suspended");
3414 store
3415 .concurrent_state_mut()
3416 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3417 }
3418 _ => {
3419 bail!("cannot resume thread which is not suspended");
3420 }
3421 }
3422 Ok(())
3423 }
3424
3425 fn add_guest_thread_to_instance_table(
3426 self,
3427 thread_id: TableId<GuestThread>,
3428 store: &mut StoreOpaque,
3429 runtime_instance: RuntimeComponentInstanceIndex,
3430 ) -> Result<u32> {
3431 let guest_id = store
3432 .handle_table(RuntimeInstance {
3433 instance: self.id().instance(),
3434 index: runtime_instance,
3435 })
3436 .guest_thread_insert(thread_id.rep())?;
3437 store
3438 .concurrent_state_mut()
3439 .get_mut(thread_id)?
3440 .instance_rep = Some(guest_id);
3441 Ok(guest_id)
3442 }
3443
3444 pub(crate) fn suspension_intrinsic(
3447 self,
3448 store: &mut StoreOpaque,
3449 caller: RuntimeComponentInstanceIndex,
3450 cancellable: bool,
3451 yielding: bool,
3452 to_thread: Option<u32>,
3453 ) -> Result<WaitResult> {
3454 self.check_may_leave(store, caller)?;
3455
3456 if to_thread.is_none() {
3457 let state = store.concurrent_state_mut();
3458 if yielding {
3459 if !state.may_block(state.guest_thread.unwrap().task) {
3461 return Ok(WaitResult::Completed);
3465 }
3466 } else {
3467 store.check_blocking()?;
3471 }
3472 }
3473
3474 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3476 return Ok(WaitResult::Cancelled);
3477 }
3478
3479 if let Some(thread) = to_thread {
3480 self.resume_suspended_thread(store, caller, thread, true)?;
3481 }
3482
3483 let state = store.concurrent_state_mut();
3484 let guest_thread = state.guest_thread.unwrap();
3485 let reason = if yielding {
3486 SuspendReason::Yielding {
3487 thread: guest_thread,
3488 skip_may_block_check: to_thread.is_some(),
3492 }
3493 } else {
3494 SuspendReason::ExplicitlySuspending {
3495 thread: guest_thread,
3496 skip_may_block_check: to_thread.is_some(),
3500 }
3501 };
3502
3503 store.suspend(reason)?;
3504
3505 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3506 Ok(WaitResult::Cancelled)
3507 } else {
3508 Ok(WaitResult::Completed)
3509 }
3510 }
3511
3512 fn waitable_check(
3514 self,
3515 store: &mut StoreOpaque,
3516 cancellable: bool,
3517 check: WaitableCheck,
3518 params: WaitableCheckParams,
3519 ) -> Result<u32> {
3520 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3521
3522 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3523
3524 let state = store.concurrent_state_mut();
3525 let task = state.get_mut(guest_thread.task)?;
3526
3527 match &check {
3530 WaitableCheck::Wait => {
3531 let set = params.set;
3532
3533 if (task.event.is_none()
3534 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3535 && state.get_mut(set)?.ready.is_empty()
3536 {
3537 if cancellable {
3538 let old = state
3539 .get_mut(guest_thread.thread)?
3540 .wake_on_cancel
3541 .replace(set);
3542 assert!(old.is_none());
3543 }
3544
3545 store.suspend(SuspendReason::Waiting {
3546 set,
3547 thread: guest_thread,
3548 skip_may_block_check: false,
3549 })?;
3550 }
3551 }
3552 WaitableCheck::Poll => {}
3553 }
3554
3555 log::trace!(
3556 "waitable check for {guest_thread:?}; set {:?}, part two",
3557 params.set
3558 );
3559
3560 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3562
3563 let (ordinal, handle, result) = match &check {
3564 WaitableCheck::Wait => {
3565 let (event, waitable) = event.unwrap();
3566 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3567 let (ordinal, result) = event.parts();
3568 (ordinal, handle, result)
3569 }
3570 WaitableCheck::Poll => {
3571 if let Some((event, waitable)) = event {
3572 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3573 let (ordinal, result) = event.parts();
3574 (ordinal, handle, result)
3575 } else {
3576 log::trace!(
3577 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3578 guest_thread.task,
3579 params.set
3580 );
3581 let (ordinal, result) = Event::None.parts();
3582 (ordinal, 0, result)
3583 }
3584 }
3585 };
3586 let memory = self.options_memory_mut(store, params.options);
3587 let ptr = func::validate_inbounds_dynamic(
3588 &CanonicalAbiInfo::POINTER_PAIR,
3589 memory,
3590 &ValRaw::u32(params.payload),
3591 )?;
3592 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3593 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3594 Ok(ordinal)
3595 }
3596
3597 pub(crate) fn subtask_cancel(
3599 self,
3600 store: &mut StoreOpaque,
3601 caller_instance: RuntimeComponentInstanceIndex,
3602 async_: bool,
3603 task_id: u32,
3604 ) -> Result<u32> {
3605 self.check_may_leave(store, caller_instance)?;
3606
3607 if !async_ {
3608 store.check_blocking()?;
3612 }
3613
3614 let (rep, is_host) = store
3615 .handle_table(RuntimeInstance {
3616 instance: self.id().instance(),
3617 index: caller_instance,
3618 })
3619 .subtask_rep(task_id)?;
3620 let (waitable, expected_caller_instance) = if is_host {
3621 let id = TableId::<HostTask>::new(rep);
3622 (
3623 Waitable::Host(id),
3624 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3625 )
3626 } else {
3627 let id = TableId::<GuestTask>::new(rep);
3628 if let &Caller::Guest { thread } = &store.concurrent_state_mut().get_mut(id)?.caller {
3629 (
3630 Waitable::Guest(id),
3631 store.concurrent_state_mut().get_mut(thread.task)?.instance,
3632 )
3633 } else {
3634 unreachable!()
3635 }
3636 };
3637 assert_eq!(
3641 expected_caller_instance,
3642 RuntimeInstance {
3643 instance: self.id().instance(),
3644 index: caller_instance
3645 }
3646 );
3647
3648 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3649
3650 let concurrent_state = store.concurrent_state_mut();
3651 if let Waitable::Host(host_task) = waitable {
3652 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3653 handle.abort();
3654 return Ok(Status::ReturnCancelled as u32);
3655 }
3656 } else {
3657 let caller = concurrent_state.guest_thread.unwrap();
3658 let guest_task = TableId::<GuestTask>::new(rep);
3659 let task = concurrent_state.get_mut(guest_task)?;
3660 if !task.already_lowered_parameters() {
3661 task.lower_params = None;
3665 task.lift_result = None;
3666 task.exited = true;
3667
3668 let instance = task.instance;
3669
3670 assert_eq!(1, task.threads.len());
3671 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3672 let concurrent_state = store.concurrent_state_mut();
3673 concurrent_state.delete(thread)?;
3674 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3675
3676 let pending = &mut store.instance_state(instance).pending;
3678 let pending_count = pending.len();
3679 pending.retain(|thread, _| thread.task != guest_task);
3680 if pending.len() == pending_count {
3682 bail!("`subtask.cancel` called after terminal status delivered");
3683 }
3684 return Ok(Status::StartCancelled as u32);
3685 } else if !task.returned_or_cancelled() {
3686 task.cancel_sent = true;
3689 task.event = Some(Event::Cancelled);
3694 for thread in task.threads.clone() {
3695 let thread = QualifiedThreadId {
3696 task: guest_task,
3697 thread,
3698 };
3699 if let Some(set) = concurrent_state
3700 .get_mut(thread.thread)
3701 .unwrap()
3702 .wake_on_cancel
3703 .take()
3704 {
3705 let item = match concurrent_state
3706 .get_mut(set)?
3707 .waiting
3708 .remove(&thread)
3709 .unwrap()
3710 {
3711 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3712 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3713 thread,
3714 kind: GuestCallKind::DeliverEvent {
3715 instance,
3716 set: None,
3717 },
3718 }),
3719 };
3720 concurrent_state.push_high_priority(item);
3721
3722 store.suspend(SuspendReason::Yielding {
3723 thread: caller,
3724 skip_may_block_check: false,
3727 })?;
3728 break;
3729 }
3730 }
3731
3732 let concurrent_state = store.concurrent_state_mut();
3733 let task = concurrent_state.get_mut(guest_task)?;
3734 if !task.returned_or_cancelled() {
3735 if async_ {
3736 return Ok(BLOCKED);
3737 } else {
3738 store.wait_for_event(Waitable::Guest(guest_task))?;
3739 }
3740 }
3741 }
3742 }
3743
3744 let event = waitable.take_event(store.concurrent_state_mut())?;
3745 if let Some(Event::Subtask {
3746 status: status @ (Status::Returned | Status::ReturnCancelled),
3747 }) = event
3748 {
3749 Ok(status as u32)
3750 } else {
3751 bail!("`subtask.cancel` called after terminal status delivered");
3752 }
3753 }
3754
3755 pub(crate) fn context_get(
3756 self,
3757 store: &mut StoreOpaque,
3758 caller: RuntimeComponentInstanceIndex,
3759 slot: u32,
3760 ) -> Result<u32> {
3761 self.check_may_leave(store, caller)?;
3762
3763 store.concurrent_state_mut().context_get(slot)
3764 }
3765
3766 pub(crate) fn context_set(
3767 self,
3768 store: &mut StoreOpaque,
3769 caller: RuntimeComponentInstanceIndex,
3770 slot: u32,
3771 value: u32,
3772 ) -> Result<()> {
3773 self.check_may_leave(store, caller)?;
3774
3775 store.concurrent_state_mut().context_set(slot, value)
3776 }
3777}
3778
3779pub trait VMComponentAsyncStore {
3787 unsafe fn prepare_call(
3793 &mut self,
3794 instance: Instance,
3795 memory: *mut VMMemoryDefinition,
3796 start: *mut VMFuncRef,
3797 return_: *mut VMFuncRef,
3798 caller_instance: RuntimeComponentInstanceIndex,
3799 callee_instance: RuntimeComponentInstanceIndex,
3800 task_return_type: TypeTupleIndex,
3801 callee_async: bool,
3802 string_encoding: u8,
3803 result_count: u32,
3804 storage: *mut ValRaw,
3805 storage_len: usize,
3806 ) -> Result<()>;
3807
3808 unsafe fn sync_start(
3811 &mut self,
3812 instance: Instance,
3813 callback: *mut VMFuncRef,
3814 callee: *mut VMFuncRef,
3815 param_count: u32,
3816 storage: *mut MaybeUninit<ValRaw>,
3817 storage_len: usize,
3818 ) -> Result<()>;
3819
3820 unsafe fn async_start(
3823 &mut self,
3824 instance: Instance,
3825 callback: *mut VMFuncRef,
3826 post_return: *mut VMFuncRef,
3827 callee: *mut VMFuncRef,
3828 param_count: u32,
3829 result_count: u32,
3830 flags: u32,
3831 ) -> Result<u32>;
3832
3833 fn future_write(
3835 &mut self,
3836 instance: Instance,
3837 caller: RuntimeComponentInstanceIndex,
3838 ty: TypeFutureTableIndex,
3839 options: OptionsIndex,
3840 future: u32,
3841 address: u32,
3842 ) -> Result<u32>;
3843
3844 fn future_read(
3846 &mut self,
3847 instance: Instance,
3848 caller: RuntimeComponentInstanceIndex,
3849 ty: TypeFutureTableIndex,
3850 options: OptionsIndex,
3851 future: u32,
3852 address: u32,
3853 ) -> Result<u32>;
3854
3855 fn future_drop_writable(
3857 &mut self,
3858 instance: Instance,
3859 caller: RuntimeComponentInstanceIndex,
3860 ty: TypeFutureTableIndex,
3861 writer: u32,
3862 ) -> Result<()>;
3863
3864 fn stream_write(
3866 &mut self,
3867 instance: Instance,
3868 caller: RuntimeComponentInstanceIndex,
3869 ty: TypeStreamTableIndex,
3870 options: OptionsIndex,
3871 stream: u32,
3872 address: u32,
3873 count: u32,
3874 ) -> Result<u32>;
3875
3876 fn stream_read(
3878 &mut self,
3879 instance: Instance,
3880 caller: RuntimeComponentInstanceIndex,
3881 ty: TypeStreamTableIndex,
3882 options: OptionsIndex,
3883 stream: u32,
3884 address: u32,
3885 count: u32,
3886 ) -> Result<u32>;
3887
3888 fn flat_stream_write(
3891 &mut self,
3892 instance: Instance,
3893 caller: RuntimeComponentInstanceIndex,
3894 ty: TypeStreamTableIndex,
3895 options: OptionsIndex,
3896 payload_size: u32,
3897 payload_align: u32,
3898 stream: u32,
3899 address: u32,
3900 count: u32,
3901 ) -> Result<u32>;
3902
3903 fn flat_stream_read(
3906 &mut self,
3907 instance: Instance,
3908 caller: RuntimeComponentInstanceIndex,
3909 ty: TypeStreamTableIndex,
3910 options: OptionsIndex,
3911 payload_size: u32,
3912 payload_align: u32,
3913 stream: u32,
3914 address: u32,
3915 count: u32,
3916 ) -> Result<u32>;
3917
3918 fn stream_drop_writable(
3920 &mut self,
3921 instance: Instance,
3922 caller: RuntimeComponentInstanceIndex,
3923 ty: TypeStreamTableIndex,
3924 writer: u32,
3925 ) -> Result<()>;
3926
3927 fn error_context_debug_message(
3929 &mut self,
3930 instance: Instance,
3931 caller: RuntimeComponentInstanceIndex,
3932 ty: TypeComponentLocalErrorContextTableIndex,
3933 options: OptionsIndex,
3934 err_ctx_handle: u32,
3935 debug_msg_address: u32,
3936 ) -> Result<()>;
3937
3938 fn thread_new_indirect(
3940 &mut self,
3941 instance: Instance,
3942 caller: RuntimeComponentInstanceIndex,
3943 func_ty_idx: TypeFuncIndex,
3944 start_func_table_idx: RuntimeTableIndex,
3945 start_func_idx: u32,
3946 context: i32,
3947 ) -> Result<u32>;
3948}
3949
3950impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3952 unsafe fn prepare_call(
3953 &mut self,
3954 instance: Instance,
3955 memory: *mut VMMemoryDefinition,
3956 start: *mut VMFuncRef,
3957 return_: *mut VMFuncRef,
3958 caller_instance: RuntimeComponentInstanceIndex,
3959 callee_instance: RuntimeComponentInstanceIndex,
3960 task_return_type: TypeTupleIndex,
3961 callee_async: bool,
3962 string_encoding: u8,
3963 result_count_or_max_if_async: u32,
3964 storage: *mut ValRaw,
3965 storage_len: usize,
3966 ) -> Result<()> {
3967 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3971
3972 unsafe {
3973 instance.prepare_call(
3974 StoreContextMut(self),
3975 start,
3976 return_,
3977 caller_instance,
3978 callee_instance,
3979 task_return_type,
3980 callee_async,
3981 memory,
3982 string_encoding,
3983 match result_count_or_max_if_async {
3984 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3985 params,
3986 has_result: false,
3987 },
3988 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3989 params,
3990 has_result: true,
3991 },
3992 result_count => CallerInfo::Sync {
3993 params,
3994 result_count,
3995 },
3996 },
3997 )
3998 }
3999 }
4000
4001 unsafe fn sync_start(
4002 &mut self,
4003 instance: Instance,
4004 callback: *mut VMFuncRef,
4005 callee: *mut VMFuncRef,
4006 param_count: u32,
4007 storage: *mut MaybeUninit<ValRaw>,
4008 storage_len: usize,
4009 ) -> Result<()> {
4010 unsafe {
4011 instance
4012 .start_call(
4013 StoreContextMut(self),
4014 callback,
4015 ptr::null_mut(),
4016 callee,
4017 param_count,
4018 1,
4019 START_FLAG_ASYNC_CALLEE,
4020 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4024 )
4025 .map(drop)
4026 }
4027 }
4028
4029 unsafe fn async_start(
4030 &mut self,
4031 instance: Instance,
4032 callback: *mut VMFuncRef,
4033 post_return: *mut VMFuncRef,
4034 callee: *mut VMFuncRef,
4035 param_count: u32,
4036 result_count: u32,
4037 flags: u32,
4038 ) -> Result<u32> {
4039 unsafe {
4040 instance.start_call(
4041 StoreContextMut(self),
4042 callback,
4043 post_return,
4044 callee,
4045 param_count,
4046 result_count,
4047 flags,
4048 None,
4049 )
4050 }
4051 }
4052
4053 fn future_write(
4054 &mut self,
4055 instance: Instance,
4056 caller: RuntimeComponentInstanceIndex,
4057 ty: TypeFutureTableIndex,
4058 options: OptionsIndex,
4059 future: u32,
4060 address: u32,
4061 ) -> Result<u32> {
4062 instance.check_may_leave(self, caller)?;
4063
4064 instance
4065 .guest_write(
4066 StoreContextMut(self),
4067 caller,
4068 TransmitIndex::Future(ty),
4069 options,
4070 None,
4071 future,
4072 address,
4073 1,
4074 )
4075 .map(|result| result.encode())
4076 }
4077
4078 fn future_read(
4079 &mut self,
4080 instance: Instance,
4081 caller: RuntimeComponentInstanceIndex,
4082 ty: TypeFutureTableIndex,
4083 options: OptionsIndex,
4084 future: u32,
4085 address: u32,
4086 ) -> Result<u32> {
4087 instance.check_may_leave(self, caller)?;
4088
4089 instance
4090 .guest_read(
4091 StoreContextMut(self),
4092 caller,
4093 TransmitIndex::Future(ty),
4094 options,
4095 None,
4096 future,
4097 address,
4098 1,
4099 )
4100 .map(|result| result.encode())
4101 }
4102
4103 fn stream_write(
4104 &mut self,
4105 instance: Instance,
4106 caller: RuntimeComponentInstanceIndex,
4107 ty: TypeStreamTableIndex,
4108 options: OptionsIndex,
4109 stream: u32,
4110 address: u32,
4111 count: u32,
4112 ) -> Result<u32> {
4113 instance.check_may_leave(self, caller)?;
4114
4115 instance
4116 .guest_write(
4117 StoreContextMut(self),
4118 caller,
4119 TransmitIndex::Stream(ty),
4120 options,
4121 None,
4122 stream,
4123 address,
4124 count,
4125 )
4126 .map(|result| result.encode())
4127 }
4128
4129 fn stream_read(
4130 &mut self,
4131 instance: Instance,
4132 caller: RuntimeComponentInstanceIndex,
4133 ty: TypeStreamTableIndex,
4134 options: OptionsIndex,
4135 stream: u32,
4136 address: u32,
4137 count: u32,
4138 ) -> Result<u32> {
4139 instance.check_may_leave(self, caller)?;
4140
4141 instance
4142 .guest_read(
4143 StoreContextMut(self),
4144 caller,
4145 TransmitIndex::Stream(ty),
4146 options,
4147 None,
4148 stream,
4149 address,
4150 count,
4151 )
4152 .map(|result| result.encode())
4153 }
4154
4155 fn future_drop_writable(
4156 &mut self,
4157 instance: Instance,
4158 caller: RuntimeComponentInstanceIndex,
4159 ty: TypeFutureTableIndex,
4160 writer: u32,
4161 ) -> Result<()> {
4162 instance.check_may_leave(self, caller)?;
4163
4164 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4165 }
4166
4167 fn flat_stream_write(
4168 &mut self,
4169 instance: Instance,
4170 caller: RuntimeComponentInstanceIndex,
4171 ty: TypeStreamTableIndex,
4172 options: OptionsIndex,
4173 payload_size: u32,
4174 payload_align: u32,
4175 stream: u32,
4176 address: u32,
4177 count: u32,
4178 ) -> Result<u32> {
4179 instance.check_may_leave(self, caller)?;
4180
4181 instance
4182 .guest_write(
4183 StoreContextMut(self),
4184 caller,
4185 TransmitIndex::Stream(ty),
4186 options,
4187 Some(FlatAbi {
4188 size: payload_size,
4189 align: payload_align,
4190 }),
4191 stream,
4192 address,
4193 count,
4194 )
4195 .map(|result| result.encode())
4196 }
4197
4198 fn flat_stream_read(
4199 &mut self,
4200 instance: Instance,
4201 caller: RuntimeComponentInstanceIndex,
4202 ty: TypeStreamTableIndex,
4203 options: OptionsIndex,
4204 payload_size: u32,
4205 payload_align: u32,
4206 stream: u32,
4207 address: u32,
4208 count: u32,
4209 ) -> Result<u32> {
4210 instance.check_may_leave(self, caller)?;
4211
4212 instance
4213 .guest_read(
4214 StoreContextMut(self),
4215 caller,
4216 TransmitIndex::Stream(ty),
4217 options,
4218 Some(FlatAbi {
4219 size: payload_size,
4220 align: payload_align,
4221 }),
4222 stream,
4223 address,
4224 count,
4225 )
4226 .map(|result| result.encode())
4227 }
4228
4229 fn stream_drop_writable(
4230 &mut self,
4231 instance: Instance,
4232 caller: RuntimeComponentInstanceIndex,
4233 ty: TypeStreamTableIndex,
4234 writer: u32,
4235 ) -> Result<()> {
4236 instance.check_may_leave(self, caller)?;
4237
4238 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4239 }
4240
4241 fn error_context_debug_message(
4242 &mut self,
4243 instance: Instance,
4244 caller: RuntimeComponentInstanceIndex,
4245 ty: TypeComponentLocalErrorContextTableIndex,
4246 options: OptionsIndex,
4247 err_ctx_handle: u32,
4248 debug_msg_address: u32,
4249 ) -> Result<()> {
4250 instance.check_may_leave(self, caller)?;
4251
4252 instance.error_context_debug_message(
4253 StoreContextMut(self),
4254 ty,
4255 options,
4256 err_ctx_handle,
4257 debug_msg_address,
4258 )
4259 }
4260
4261 fn thread_new_indirect(
4262 &mut self,
4263 instance: Instance,
4264 caller: RuntimeComponentInstanceIndex,
4265 func_ty_idx: TypeFuncIndex,
4266 start_func_table_idx: RuntimeTableIndex,
4267 start_func_idx: u32,
4268 context: i32,
4269 ) -> Result<u32> {
4270 instance.thread_new_indirect(
4271 StoreContextMut(self),
4272 caller,
4273 func_ty_idx,
4274 start_func_table_idx,
4275 start_func_idx,
4276 context,
4277 )
4278 }
4279}
4280
4281type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4282
4283struct HostTask {
4285 common: WaitableCommon,
4286 caller_instance: RuntimeInstance,
4287 join_handle: Option<JoinHandle>,
4288}
4289
4290impl HostTask {
4291 fn new(caller_instance: RuntimeInstance, join_handle: Option<JoinHandle>) -> Self {
4292 Self {
4293 common: WaitableCommon::default(),
4294 caller_instance,
4295 join_handle,
4296 }
4297 }
4298}
4299
4300impl TableDebug for HostTask {
4301 fn type_name() -> &'static str {
4302 "HostTask"
4303 }
4304}
4305
4306type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4307
4308enum Caller {
4310 Host {
4312 tx: Option<oneshot::Sender<LiftedResult>>,
4314 exit_tx: Arc<oneshot::Sender<()>>,
4321 host_future_present: bool,
4324 call_post_return_automatically: bool,
4326 caller: Option<QualifiedThreadId>,
4331 },
4332 Guest {
4334 thread: QualifiedThreadId,
4336 },
4337}
4338
4339struct LiftResult {
4342 lift: RawLift,
4343 ty: TypeTupleIndex,
4344 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4345 string_encoding: StringEncoding,
4346}
4347
4348#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4353struct QualifiedThreadId {
4354 task: TableId<GuestTask>,
4355 thread: TableId<GuestThread>,
4356}
4357
4358impl QualifiedThreadId {
4359 fn qualify(
4360 state: &mut ConcurrentState,
4361 thread: TableId<GuestThread>,
4362 ) -> Result<QualifiedThreadId> {
4363 Ok(QualifiedThreadId {
4364 task: state.get_mut(thread)?.parent_task,
4365 thread,
4366 })
4367 }
4368}
4369
4370impl fmt::Debug for QualifiedThreadId {
4371 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4372 f.debug_tuple("QualifiedThreadId")
4373 .field(&self.task.rep())
4374 .field(&self.thread.rep())
4375 .finish()
4376 }
4377}
4378
4379enum GuestThreadState {
4380 NotStartedImplicit,
4381 NotStartedExplicit(
4382 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4383 ),
4384 Running,
4385 Suspended(StoreFiber<'static>),
4386 Pending,
4387 Completed,
4388}
4389pub struct GuestThread {
4390 context: [u32; 2],
4393 parent_task: TableId<GuestTask>,
4395 wake_on_cancel: Option<TableId<WaitableSet>>,
4398 state: GuestThreadState,
4400 instance_rep: Option<u32>,
4403}
4404
4405impl GuestThread {
4406 fn from_instance(
4409 state: Pin<&mut ComponentInstance>,
4410 caller_instance: RuntimeComponentInstanceIndex,
4411 guest_thread: u32,
4412 ) -> Result<TableId<Self>> {
4413 let rep = state.instance_states().0[caller_instance]
4414 .handle_table()
4415 .guest_thread_rep(guest_thread)?;
4416 Ok(TableId::new(rep))
4417 }
4418
4419 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4420 Self {
4421 context: [0; 2],
4422 parent_task,
4423 wake_on_cancel: None,
4424 state: GuestThreadState::NotStartedImplicit,
4425 instance_rep: None,
4426 }
4427 }
4428
4429 fn new_explicit(
4430 parent_task: TableId<GuestTask>,
4431 start_func: Box<
4432 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4433 >,
4434 ) -> Self {
4435 Self {
4436 context: [0; 2],
4437 parent_task,
4438 wake_on_cancel: None,
4439 state: GuestThreadState::NotStartedExplicit(start_func),
4440 instance_rep: None,
4441 }
4442 }
4443}
4444
4445impl TableDebug for GuestThread {
4446 fn type_name() -> &'static str {
4447 "GuestThread"
4448 }
4449}
4450
4451enum SyncResult {
4452 NotProduced,
4453 Produced(Option<ValRaw>),
4454 Taken,
4455}
4456
4457impl SyncResult {
4458 fn take(&mut self) -> Option<Option<ValRaw>> {
4459 match mem::replace(self, SyncResult::Taken) {
4460 SyncResult::NotProduced => None,
4461 SyncResult::Produced(val) => Some(val),
4462 SyncResult::Taken => {
4463 panic!("attempted to take a synchronous result that was already taken")
4464 }
4465 }
4466 }
4467}
4468
4469#[derive(Debug)]
4470enum HostFutureState {
4471 NotApplicable,
4472 Live,
4473 Dropped,
4474}
4475
4476pub(crate) struct GuestTask {
4478 common: WaitableCommon,
4480 lower_params: Option<RawLower>,
4482 lift_result: Option<LiftResult>,
4484 result: Option<LiftedResult>,
4487 callback: Option<CallbackFn>,
4490 caller: Caller,
4492 call_context: Option<CallContext>,
4495 sync_result: SyncResult,
4498 cancel_sent: bool,
4501 starting_sent: bool,
4504 subtasks: HashSet<TableId<GuestTask>>,
4509 sync_call_set: TableId<WaitableSet>,
4511 instance: RuntimeInstance,
4518 event: Option<Event>,
4521 function_index: Option<ExportIndex>,
4523 exited: bool,
4525 threads: HashSet<TableId<GuestThread>>,
4527 host_future_state: HostFutureState,
4530 async_function: bool,
4533}
4534
4535impl GuestTask {
4536 fn already_lowered_parameters(&self) -> bool {
4537 self.lower_params.is_none()
4539 }
4540
4541 fn returned_or_cancelled(&self) -> bool {
4542 self.lift_result.is_none()
4544 }
4545
4546 fn ready_to_delete(&self) -> bool {
4547 let threads_completed = self.threads.is_empty();
4548 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4549 let pending_completion_event = matches!(
4550 self.common.event,
4551 Some(Event::Subtask {
4552 status: Status::Returned | Status::ReturnCancelled
4553 })
4554 );
4555 let ready = threads_completed
4556 && !has_sync_result
4557 && !pending_completion_event
4558 && !matches!(self.host_future_state, HostFutureState::Live);
4559 log::trace!(
4560 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4561 threads_completed,
4562 has_sync_result,
4563 pending_completion_event,
4564 self.host_future_state
4565 );
4566 ready
4567 }
4568
4569 fn new(
4570 state: &mut ConcurrentState,
4571 lower_params: RawLower,
4572 lift_result: LiftResult,
4573 caller: Caller,
4574 callback: Option<CallbackFn>,
4575 instance: RuntimeInstance,
4576 async_function: bool,
4577 ) -> Result<Self> {
4578 let sync_call_set = state.push(WaitableSet::default())?;
4579 let host_future_state = match &caller {
4580 Caller::Guest { .. } => HostFutureState::NotApplicable,
4581 Caller::Host {
4582 host_future_present,
4583 ..
4584 } => {
4585 if *host_future_present {
4586 HostFutureState::Live
4587 } else {
4588 HostFutureState::NotApplicable
4589 }
4590 }
4591 };
4592 Ok(Self {
4593 common: WaitableCommon::default(),
4594 lower_params: Some(lower_params),
4595 lift_result: Some(lift_result),
4596 result: None,
4597 callback,
4598 caller,
4599 call_context: Some(CallContext::default()),
4600 sync_result: SyncResult::NotProduced,
4601 cancel_sent: false,
4602 starting_sent: false,
4603 subtasks: HashSet::new(),
4604 sync_call_set,
4605 instance,
4606 event: None,
4607 function_index: None,
4608 exited: false,
4609 threads: HashSet::new(),
4610 host_future_state,
4611 async_function,
4612 })
4613 }
4614
4615 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4618 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4621 if let Some(Event::Subtask {
4622 status: Status::Returned | Status::ReturnCancelled,
4623 }) = waitable.common(state)?.event
4624 {
4625 waitable.delete_from(state)?;
4626 }
4627 }
4628
4629 assert!(self.threads.is_empty());
4630
4631 state.delete(self.sync_call_set)?;
4632
4633 match &self.caller {
4635 Caller::Guest { thread } => {
4636 let task_mut = state.get_mut(thread.task)?;
4637 let present = task_mut.subtasks.remove(&me);
4638 assert!(present);
4639
4640 for subtask in &self.subtasks {
4641 task_mut.subtasks.insert(*subtask);
4642 }
4643
4644 for subtask in &self.subtasks {
4645 state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4646 }
4647 }
4648 Caller::Host {
4649 exit_tx, caller, ..
4650 } => {
4651 for subtask in &self.subtasks {
4652 state.get_mut(*subtask)?.caller = Caller::Host {
4653 tx: None,
4654 exit_tx: exit_tx.clone(),
4658 host_future_present: false,
4659 call_post_return_automatically: true,
4660 caller: *caller,
4661 };
4662 }
4663 }
4664 }
4665
4666 for subtask in self.subtasks {
4667 let task = state.get_mut(subtask)?;
4668 if task.exited && task.ready_to_delete() {
4669 Waitable::Guest(subtask).delete_from(state)?;
4670 }
4671 }
4672
4673 Ok(())
4674 }
4675
4676 fn call_post_return_automatically(&self) -> bool {
4677 matches!(
4678 self.caller,
4679 Caller::Guest { .. }
4680 | Caller::Host {
4681 call_post_return_automatically: true,
4682 ..
4683 }
4684 )
4685 }
4686}
4687
4688impl TableDebug for GuestTask {
4689 fn type_name() -> &'static str {
4690 "GuestTask"
4691 }
4692}
4693
4694#[derive(Default)]
4696struct WaitableCommon {
4697 event: Option<Event>,
4699 set: Option<TableId<WaitableSet>>,
4701 handle: Option<u32>,
4703}
4704
4705#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4707enum Waitable {
4708 Host(TableId<HostTask>),
4710 Guest(TableId<GuestTask>),
4712 Transmit(TableId<TransmitHandle>),
4714}
4715
4716impl Waitable {
4717 fn from_instance(
4720 state: Pin<&mut ComponentInstance>,
4721 caller_instance: RuntimeComponentInstanceIndex,
4722 waitable: u32,
4723 ) -> Result<Self> {
4724 use crate::runtime::vm::component::Waitable;
4725
4726 let (waitable, kind) = state.instance_states().0[caller_instance]
4727 .handle_table()
4728 .waitable_rep(waitable)?;
4729
4730 Ok(match kind {
4731 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4732 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4733 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4734 })
4735 }
4736
4737 fn rep(&self) -> u32 {
4739 match self {
4740 Self::Host(id) => id.rep(),
4741 Self::Guest(id) => id.rep(),
4742 Self::Transmit(id) => id.rep(),
4743 }
4744 }
4745
4746 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4750 log::trace!("waitable {self:?} join set {set:?}",);
4751
4752 let old = mem::replace(&mut self.common(state)?.set, set);
4753
4754 if let Some(old) = old {
4755 match *self {
4756 Waitable::Host(id) => state.remove_child(id, old),
4757 Waitable::Guest(id) => state.remove_child(id, old),
4758 Waitable::Transmit(id) => state.remove_child(id, old),
4759 }?;
4760
4761 state.get_mut(old)?.ready.remove(self);
4762 }
4763
4764 if let Some(set) = set {
4765 match *self {
4766 Waitable::Host(id) => state.add_child(id, set),
4767 Waitable::Guest(id) => state.add_child(id, set),
4768 Waitable::Transmit(id) => state.add_child(id, set),
4769 }?;
4770
4771 if self.common(state)?.event.is_some() {
4772 self.mark_ready(state)?;
4773 }
4774 }
4775
4776 Ok(())
4777 }
4778
4779 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4781 Ok(match self {
4782 Self::Host(id) => &mut state.get_mut(*id)?.common,
4783 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4784 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4785 })
4786 }
4787
4788 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4792 log::trace!("set event for {self:?}: {event:?}");
4793 self.common(state)?.event = event;
4794 self.mark_ready(state)
4795 }
4796
4797 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4799 let common = self.common(state)?;
4800 let event = common.event.take();
4801 if let Some(set) = self.common(state)?.set {
4802 state.get_mut(set)?.ready.remove(self);
4803 }
4804
4805 Ok(event)
4806 }
4807
4808 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4812 if let Some(set) = self.common(state)?.set {
4813 state.get_mut(set)?.ready.insert(*self);
4814 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4815 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4816 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4817
4818 let item = match mode {
4819 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4820 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4821 thread,
4822 kind: GuestCallKind::DeliverEvent {
4823 instance,
4824 set: Some(set),
4825 },
4826 }),
4827 };
4828 state.push_high_priority(item);
4829 }
4830 }
4831 Ok(())
4832 }
4833
4834 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4836 match self {
4837 Self::Host(task) => {
4838 log::trace!("delete host task {task:?}");
4839 state.delete(*task)?;
4840 }
4841 Self::Guest(task) => {
4842 log::trace!("delete guest task {task:?}");
4843 state.delete(*task)?.dispose(state, *task)?;
4844 }
4845 Self::Transmit(task) => {
4846 state.delete(*task)?;
4847 }
4848 }
4849
4850 Ok(())
4851 }
4852}
4853
4854impl fmt::Debug for Waitable {
4855 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4856 match self {
4857 Self::Host(id) => write!(f, "{id:?}"),
4858 Self::Guest(id) => write!(f, "{id:?}"),
4859 Self::Transmit(id) => write!(f, "{id:?}"),
4860 }
4861 }
4862}
4863
4864#[derive(Default)]
4866struct WaitableSet {
4867 ready: BTreeSet<Waitable>,
4869 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4871}
4872
4873impl TableDebug for WaitableSet {
4874 fn type_name() -> &'static str {
4875 "WaitableSet"
4876 }
4877}
4878
4879type RawLower =
4881 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4882
4883type RawLift = Box<
4885 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4886>;
4887
4888type LiftedResult = Box<dyn Any + Send + Sync>;
4892
4893struct DummyResult;
4896
4897#[derive(Default)]
4899pub struct ConcurrentInstanceState {
4900 backpressure: u16,
4902 do_not_enter: bool,
4904 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4907}
4908
4909impl ConcurrentInstanceState {
4910 pub fn pending_is_empty(&self) -> bool {
4911 self.pending.is_empty()
4912 }
4913}
4914
4915pub struct ConcurrentState {
4917 guest_thread: Option<QualifiedThreadId>,
4919
4920 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4925 table: AlwaysMut<ResourceTable>,
4927 high_priority: Vec<WorkItem>,
4929 low_priority: VecDeque<WorkItem>,
4931 suspend_reason: Option<SuspendReason>,
4935 worker: Option<StoreFiber<'static>>,
4939 worker_item: Option<WorkerItem>,
4941
4942 global_error_context_ref_counts:
4955 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4956}
4957
4958impl Default for ConcurrentState {
4959 fn default() -> Self {
4960 Self {
4961 guest_thread: None,
4962 table: AlwaysMut::new(ResourceTable::new()),
4963 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4964 high_priority: Vec::new(),
4965 low_priority: VecDeque::new(),
4966 suspend_reason: None,
4967 worker: None,
4968 worker_item: None,
4969 global_error_context_ref_counts: BTreeMap::new(),
4970 }
4971 }
4972}
4973
4974impl ConcurrentState {
4975 pub(crate) fn take_fibers_and_futures(
4992 &mut self,
4993 fibers: &mut Vec<StoreFiber<'static>>,
4994 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4995 ) {
4996 for entry in self.table.get_mut().iter_mut() {
4997 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4998 for mode in mem::take(&mut set.waiting).into_values() {
4999 if let WaitMode::Fiber(fiber) = mode {
5000 fibers.push(fiber);
5001 }
5002 }
5003 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5004 if let GuestThreadState::Suspended(fiber) =
5005 mem::replace(&mut thread.state, GuestThreadState::Completed)
5006 {
5007 fibers.push(fiber);
5008 }
5009 }
5010 }
5011
5012 if let Some(fiber) = self.worker.take() {
5013 fibers.push(fiber);
5014 }
5015
5016 let mut handle_item = |item| match item {
5017 WorkItem::ResumeFiber(fiber) => {
5018 fibers.push(fiber);
5019 }
5020 WorkItem::PushFuture(future) => {
5021 self.futures
5022 .get_mut()
5023 .as_mut()
5024 .unwrap()
5025 .push(future.into_inner());
5026 }
5027 _ => {}
5028 };
5029
5030 for item in mem::take(&mut self.high_priority) {
5031 handle_item(item);
5032 }
5033 for item in mem::take(&mut self.low_priority) {
5034 handle_item(item);
5035 }
5036
5037 if let Some(them) = self.futures.get_mut().take() {
5038 futures.push(them);
5039 }
5040 }
5041
5042 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
5046 let mut ready = mem::take(&mut self.high_priority);
5047 if ready.is_empty() {
5048 if let Some(item) = self.low_priority.pop_back() {
5049 ready.push(item);
5050 }
5051 }
5052 ready
5053 }
5054
5055 fn push<V: Send + Sync + 'static>(
5056 &mut self,
5057 value: V,
5058 ) -> Result<TableId<V>, ResourceTableError> {
5059 self.table.get_mut().push(value).map(TableId::from)
5060 }
5061
5062 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5063 self.table.get_mut().get_mut(&Resource::from(id))
5064 }
5065
5066 pub fn add_child<T: 'static, U: 'static>(
5067 &mut self,
5068 child: TableId<T>,
5069 parent: TableId<U>,
5070 ) -> Result<(), ResourceTableError> {
5071 self.table
5072 .get_mut()
5073 .add_child(Resource::from(child), Resource::from(parent))
5074 }
5075
5076 pub fn remove_child<T: 'static, U: 'static>(
5077 &mut self,
5078 child: TableId<T>,
5079 parent: TableId<U>,
5080 ) -> Result<(), ResourceTableError> {
5081 self.table
5082 .get_mut()
5083 .remove_child(Resource::from(child), Resource::from(parent))
5084 }
5085
5086 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5087 self.table.get_mut().delete(Resource::from(id))
5088 }
5089
5090 fn push_future(&mut self, future: HostTaskFuture) {
5091 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5098 }
5099
5100 fn push_high_priority(&mut self, item: WorkItem) {
5101 log::trace!("push high priority: {item:?}");
5102 self.high_priority.push(item);
5103 }
5104
5105 fn push_low_priority(&mut self, item: WorkItem) {
5106 log::trace!("push low priority: {item:?}");
5107 self.low_priority.push_front(item);
5108 }
5109
5110 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5111 if high_priority {
5112 self.push_high_priority(item);
5113 } else {
5114 self.push_low_priority(item);
5115 }
5116 }
5117
5118 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5120 let thread = self.guest_thread.unwrap();
5121 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
5122 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5123 Ok(val)
5124 }
5125
5126 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5128 let thread = self.guest_thread.unwrap();
5129 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5130 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
5131 Ok(())
5132 }
5133
5134 fn take_pending_cancellation(&mut self) -> bool {
5137 let thread = self.guest_thread.unwrap();
5138 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
5139 assert!(matches!(event, Event::Cancelled));
5140 true
5141 } else {
5142 false
5143 }
5144 }
5145
5146 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5147 if self.may_block(task) {
5148 Ok(())
5149 } else {
5150 Err(Trap::CannotBlockSyncTask.into())
5151 }
5152 }
5153
5154 fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5155 let task = self.get_mut(task).unwrap();
5156 task.async_function || task.returned_or_cancelled()
5157 }
5158}
5159
5160fn for_any_lower<
5163 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5164>(
5165 fun: F,
5166) -> F {
5167 fun
5168}
5169
5170fn for_any_lift<
5172 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5173>(
5174 fun: F,
5175) -> F {
5176 fun
5177}
5178
5179fn checked<F: Future + Send + 'static>(
5184 id: StoreId,
5185 fut: F,
5186) -> impl Future<Output = F::Output> + Send + 'static {
5187 async move {
5188 let mut fut = pin!(fut);
5189 future::poll_fn(move |cx| {
5190 let message = "\
5191 `Future`s which depend on asynchronous component tasks, streams, or \
5192 futures to complete may only be polled from the event loop of the \
5193 store to which they belong. Please use \
5194 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5195 ";
5196 tls::try_get(|store| {
5197 let matched = match store {
5198 tls::TryGet::Some(store) => store.id() == id,
5199 tls::TryGet::Taken | tls::TryGet::None => false,
5200 };
5201
5202 if !matched {
5203 panic!("{message}")
5204 }
5205 });
5206 fut.as_mut().poll(cx)
5207 })
5208 .await
5209 }
5210}
5211
5212fn check_recursive_run() {
5215 tls::try_get(|store| {
5216 if !matches!(store, tls::TryGet::None) {
5217 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5218 }
5219 });
5220}
5221
5222fn unpack_callback_code(code: u32) -> (u32, u32) {
5223 (code & 0xF, code >> 4)
5224}
5225
5226struct WaitableCheckParams {
5230 set: TableId<WaitableSet>,
5231 options: OptionsIndex,
5232 payload: u32,
5233}
5234
5235enum WaitableCheck {
5238 Wait,
5239 Poll,
5240}
5241
5242pub(crate) struct PreparedCall<R> {
5244 handle: Func,
5246 thread: QualifiedThreadId,
5248 param_count: usize,
5250 rx: oneshot::Receiver<LiftedResult>,
5253 exit_rx: oneshot::Receiver<()>,
5256 _phantom: PhantomData<R>,
5257}
5258
5259impl<R> PreparedCall<R> {
5260 pub(crate) fn task_id(&self) -> TaskId {
5262 TaskId {
5263 task: self.thread.task,
5264 }
5265 }
5266}
5267
5268pub(crate) struct TaskId {
5270 task: TableId<GuestTask>,
5271}
5272
5273impl TaskId {
5274 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5280 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5281 if !task.already_lowered_parameters() {
5282 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5283 } else {
5284 task.host_future_state = HostFutureState::Dropped;
5285 if task.ready_to_delete() {
5286 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5287 }
5288 }
5289 Ok(())
5290 }
5291}
5292
5293pub(crate) fn prepare_call<T, R>(
5299 mut store: StoreContextMut<T>,
5300 handle: Func,
5301 param_count: usize,
5302 host_future_present: bool,
5303 call_post_return_automatically: bool,
5304 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5305 + Send
5306 + Sync
5307 + 'static,
5308 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5309 + Send
5310 + Sync
5311 + 'static,
5312) -> Result<PreparedCall<R>> {
5313 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5314
5315 let instance = handle.instance().id().get(store.0);
5316 let options = &instance.component().env_component().options[options];
5317 let ty = &instance.component().types()[ty];
5318 let async_function = ty.async_;
5319 let task_return_type = ty.results;
5320 let component_instance = raw_options.instance;
5321 let callback = options.callback.map(|i| instance.runtime_callback(i));
5322 let memory = options
5323 .memory()
5324 .map(|i| instance.runtime_memory(i))
5325 .map(SendSyncPtr::new);
5326 let string_encoding = options.string_encoding;
5327 let token = StoreToken::new(store.as_context_mut());
5328 let state = store.0.concurrent_state_mut();
5329
5330 let (tx, rx) = oneshot::channel();
5331 let (exit_tx, exit_rx) = oneshot::channel();
5332
5333 let caller = state.guest_thread;
5334 let mut task = GuestTask::new(
5335 state,
5336 Box::new(for_any_lower(move |store, params| {
5337 lower_params(handle, token.as_context_mut(store), params)
5338 })),
5339 LiftResult {
5340 lift: Box::new(for_any_lift(move |store, result| {
5341 lift_result(handle, store, result)
5342 })),
5343 ty: task_return_type,
5344 memory,
5345 string_encoding,
5346 },
5347 Caller::Host {
5348 tx: Some(tx),
5349 exit_tx: Arc::new(exit_tx),
5350 host_future_present,
5351 call_post_return_automatically,
5352 caller,
5353 },
5354 callback.map(|callback| {
5355 let callback = SendSyncPtr::new(callback);
5356 let instance = handle.instance();
5357 Box::new(move |store: &mut dyn VMStore, event, handle| {
5358 let store = token.as_context_mut(store);
5359 unsafe { instance.call_callback(store, callback, event, handle) }
5362 }) as CallbackFn
5363 }),
5364 RuntimeInstance {
5365 instance: handle.instance().id().instance(),
5366 index: component_instance,
5367 },
5368 async_function,
5369 )?;
5370 task.function_index = Some(handle.index());
5371
5372 let task = state.push(task)?;
5373 let thread = state.push(GuestThread::new_implicit(task))?;
5374 state.get_mut(task)?.threads.insert(thread);
5375
5376 if !store.0.may_enter_task(task) {
5377 bail!(crate::Trap::CannotEnterComponent);
5378 }
5379
5380 Ok(PreparedCall {
5381 handle,
5382 thread: QualifiedThreadId { task, thread },
5383 param_count,
5384 rx,
5385 exit_rx,
5386 _phantom: PhantomData,
5387 })
5388}
5389
5390pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5397 mut store: StoreContextMut<T>,
5398 prepared: PreparedCall<R>,
5399) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5400 let PreparedCall {
5401 handle,
5402 thread,
5403 param_count,
5404 rx,
5405 exit_rx,
5406 ..
5407 } = prepared;
5408
5409 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5410
5411 Ok(checked(
5412 store.0.id(),
5413 rx.map(move |result| {
5414 result
5415 .map(|v| (*v.downcast().unwrap(), exit_rx))
5416 .map_err(crate::Error::from)
5417 }),
5418 ))
5419}
5420
5421fn queue_call0<T: 'static>(
5424 store: StoreContextMut<T>,
5425 handle: Func,
5426 guest_thread: QualifiedThreadId,
5427 param_count: usize,
5428) -> Result<()> {
5429 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5430 let is_concurrent = raw_options.async_;
5431 let callback = raw_options.callback;
5432 let instance = handle.instance();
5433 let callee = handle.lifted_core_func(store.0);
5434 let post_return = handle.post_return_core_func(store.0);
5435 let callback = callback.map(|i| {
5436 let instance = instance.id().get(store.0);
5437 SendSyncPtr::new(instance.runtime_callback(i))
5438 });
5439
5440 log::trace!("queueing call {guest_thread:?}");
5441
5442 unsafe {
5446 instance.queue_call(
5447 store,
5448 guest_thread,
5449 SendSyncPtr::new(callee),
5450 param_count,
5451 1,
5452 is_concurrent,
5453 callback,
5454 post_return.map(SendSyncPtr::new),
5455 )
5456 }
5457}