1use crate::component::func::{self, Func, call_post_return};
54use crate::component::{
55 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64 bail, error::format_err,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::sync::Arc;
84use std::task::{Context, Poll, Waker};
85use std::vec::Vec;
86use table::{TableDebug, TableId};
87use wasmtime_environ::Trap;
88use wasmtime_environ::component::{
89 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
90 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
91 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
92 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
93 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
94};
95use wasmtime_environ::packed_option::ReservedValue;
96
97pub use abort::JoinHandle;
98pub use future_stream_any::{FutureAny, StreamAny};
99pub use futures_and_streams::{
100 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103};
104pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105
106mod abort;
107mod error_contexts;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113const BLOCKED: u32 = 0xffff_ffff;
116
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120 Starting = 0,
121 Started = 1,
122 Returned = 2,
123 StartCancelled = 3,
124 ReturnCancelled = 4,
125}
126
127impl Status {
128 pub fn pack(self, waitable: Option<u32>) -> u32 {
134 assert!(matches!(self, Status::Returned) == waitable.is_none());
135 let waitable = waitable.unwrap_or(0);
136 assert!(waitable < (1 << 28));
137 (waitable << 4) | (self as u32)
138 }
139}
140
141#[derive(Clone, Copy, Debug)]
144enum Event {
145 None,
146 Cancelled,
147 Subtask {
148 status: Status,
149 },
150 StreamRead {
151 code: ReturnCode,
152 pending: Option<(TypeStreamTableIndex, u32)>,
153 },
154 StreamWrite {
155 code: ReturnCode,
156 pending: Option<(TypeStreamTableIndex, u32)>,
157 },
158 FutureRead {
159 code: ReturnCode,
160 pending: Option<(TypeFutureTableIndex, u32)>,
161 },
162 FutureWrite {
163 code: ReturnCode,
164 pending: Option<(TypeFutureTableIndex, u32)>,
165 },
166}
167
168impl Event {
169 fn parts(self) -> (u32, u32) {
174 const EVENT_NONE: u32 = 0;
175 const EVENT_SUBTASK: u32 = 1;
176 const EVENT_STREAM_READ: u32 = 2;
177 const EVENT_STREAM_WRITE: u32 = 3;
178 const EVENT_FUTURE_READ: u32 = 4;
179 const EVENT_FUTURE_WRITE: u32 = 5;
180 const EVENT_CANCELLED: u32 = 6;
181 match self {
182 Event::None => (EVENT_NONE, 0),
183 Event::Cancelled => (EVENT_CANCELLED, 0),
184 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189 }
190 }
191}
192
193mod callback_code {
195 pub const EXIT: u32 = 0;
196 pub const YIELD: u32 = 1;
197 pub const WAIT: u32 = 2;
198}
199
200const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204
205pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211 store: StoreContextMut<'a, T>,
212 get_data: fn(&mut T) -> D::Data<'_>,
213}
214
215impl<'a, T, D> Access<'a, T, D>
216where
217 D: HasData + ?Sized,
218 T: 'static,
219{
220 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222 Self { store, get_data }
223 }
224
225 pub fn data_mut(&mut self) -> &mut T {
227 self.store.data_mut()
228 }
229
230 pub fn get(&mut self) -> D::Data<'_> {
232 (self.get_data)(self.data_mut())
233 }
234
235 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239 where
240 T: 'static,
241 {
242 let accessor = Accessor {
243 get_data: self.get_data,
244 token: StoreToken::new(self.store.as_context_mut()),
245 };
246 self.store
247 .as_context_mut()
248 .spawn_with_accessor(accessor, task)
249 }
250
251 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254 self.get_data
255 }
256}
257
258impl<'a, T, D> AsContext for Access<'a, T, D>
259where
260 D: HasData + ?Sized,
261 T: 'static,
262{
263 type Data = T;
264
265 fn as_context(&self) -> StoreContext<'_, T> {
266 self.store.as_context()
267 }
268}
269
270impl<'a, T, D> AsContextMut for Access<'a, T, D>
271where
272 D: HasData + ?Sized,
273 T: 'static,
274{
275 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276 self.store.as_context_mut()
277 }
278}
279
280pub struct Accessor<T: 'static, D = HasSelf<T>>
340where
341 D: HasData + ?Sized,
342{
343 token: StoreToken<T>,
344 get_data: fn(&mut T) -> D::Data<'_>,
345}
346
347pub trait AsAccessor {
364 type Data: 'static;
366
367 type AccessorData: HasData + ?Sized;
370
371 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373}
374
375impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376 type Data = T::Data;
377 type AccessorData = T::AccessorData;
378
379 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380 T::as_accessor(self)
381 }
382}
383
384impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385 type Data = T;
386 type AccessorData = D;
387
388 fn as_accessor(&self) -> &Accessor<T, D> {
389 self
390 }
391}
392
393const _: () = {
416 const fn assert<T: Send + Sync>() {}
417 assert::<Accessor<UnsafeCell<u32>>>();
418};
419
420impl<T> Accessor<T> {
421 pub(crate) fn new(token: StoreToken<T>) -> Self {
430 Self {
431 token,
432 get_data: |x| x,
433 }
434 }
435}
436
437impl<T, D> Accessor<T, D>
438where
439 D: HasData + ?Sized,
440{
441 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459 tls::get(|vmstore| {
460 fun(Access {
461 store: self.token.as_context_mut(vmstore),
462 get_data: self.get_data,
463 })
464 })
465 }
466
467 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470 self.get_data
471 }
472
473 pub fn with_getter<D2: HasData>(
490 &self,
491 get_data: fn(&mut T) -> D2::Data<'_>,
492 ) -> Accessor<T, D2> {
493 Accessor {
494 token: self.token,
495 get_data,
496 }
497 }
498
499 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515 where
516 T: 'static,
517 {
518 let accessor = self.clone_for_spawn();
519 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520 }
521
522 fn clone_for_spawn(&self) -> Self {
523 Self {
524 token: self.token,
525 get_data: self.get_data,
526 }
527 }
528}
529
530pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
542where
543 D: HasData + ?Sized,
544{
545 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
547}
548
549enum CallerInfo {
552 Async {
554 params: Vec<ValRaw>,
555 has_result: bool,
556 },
557 Sync {
559 params: Vec<ValRaw>,
560 result_count: u32,
561 },
562}
563
564enum WaitMode {
566 Fiber(StoreFiber<'static>),
568 Callback(Instance),
571}
572
573#[derive(Debug)]
575enum SuspendReason {
576 Waiting {
579 set: TableId<WaitableSet>,
580 thread: QualifiedThreadId,
581 skip_may_block_check: bool,
582 },
583 NeedWork,
586 Yielding {
589 thread: QualifiedThreadId,
590 skip_may_block_check: bool,
591 },
592 ExplicitlySuspending {
594 thread: QualifiedThreadId,
595 skip_may_block_check: bool,
596 },
597}
598
599enum GuestCallKind {
601 DeliverEvent {
604 instance: Instance,
606 set: Option<TableId<WaitableSet>>,
611 },
612 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
618 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
619}
620
621impl fmt::Debug for GuestCallKind {
622 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623 match self {
624 Self::DeliverEvent { instance, set } => f
625 .debug_struct("DeliverEvent")
626 .field("instance", instance)
627 .field("set", set)
628 .finish(),
629 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
630 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
631 }
632 }
633}
634
635#[derive(Copy, Clone, Debug)]
637pub enum SuspensionTarget {
638 SomeSuspended(u32),
639 Some(u32),
640 None,
641}
642
643impl SuspensionTarget {
644 fn is_none(&self) -> bool {
645 matches!(self, SuspensionTarget::None)
646 }
647 fn is_some(&self) -> bool {
648 !self.is_none()
649 }
650}
651
652#[derive(Debug)]
654struct GuestCall {
655 thread: QualifiedThreadId,
656 kind: GuestCallKind,
657}
658
659impl GuestCall {
660 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
670 let instance = store
671 .concurrent_state_mut()
672 .get_mut(self.thread.task)?
673 .instance;
674 let state = store.instance_state(instance).concurrent_state();
675
676 let ready = match &self.kind {
677 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
678 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
679 GuestCallKind::StartExplicit(_) => true,
680 };
681 log::trace!(
682 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
683 state.do_not_enter,
684 state.backpressure
685 );
686 Ok(ready)
687 }
688}
689
690enum WorkerItem {
692 GuestCall(GuestCall),
693 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
694}
695
696enum WorkItem {
699 PushFuture(AlwaysMut<HostTaskFuture>),
701 ResumeFiber(StoreFiber<'static>),
703 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
705 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
707 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
709}
710
711impl fmt::Debug for WorkItem {
712 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
713 match self {
714 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
715 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
716 Self::ResumeThread(instance, thread) => f
717 .debug_tuple("ResumeThread")
718 .field(instance)
719 .field(thread)
720 .finish(),
721 Self::GuestCall(instance, call) => f
722 .debug_tuple("GuestCall")
723 .field(instance)
724 .field(call)
725 .finish(),
726 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
727 }
728 }
729}
730
731#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
733pub(crate) enum WaitResult {
734 Cancelled,
735 Completed,
736}
737
738pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
746 store: &mut dyn VMStore,
747 future: impl Future<Output = Result<R>> + Send + 'static,
748) -> Result<R> {
749 let state = store.concurrent_state_mut();
750 let task = state.unwrap_current_host_thread();
751
752 let mut future = Box::pin(async move {
756 let result = future.await?;
757 tls::get(move |store| {
758 let state = store.concurrent_state_mut();
759 state.get_mut(task)?.result = Some(Box::new(result) as _);
760
761 Waitable::Host(task).set_event(
762 state,
763 Some(Event::Subtask {
764 status: Status::Returned,
765 }),
766 )?;
767
768 Ok(())
769 })
770 }) as HostTaskFuture;
771
772 let poll = tls::set(store, || {
776 future
777 .as_mut()
778 .poll(&mut Context::from_waker(&Waker::noop()))
779 });
780
781 match poll {
782 Poll::Ready(result) => result?,
784
785 Poll::Pending => {
790 let state = store.concurrent_state_mut();
791 state.push_future(future);
792
793 let caller = state.get_mut(task)?.caller;
794 let set = state.get_mut(caller.task)?.sync_call_set;
795 Waitable::Host(task).join(state, Some(set))?;
796
797 store.suspend(SuspendReason::Waiting {
798 set,
799 thread: caller,
800 skip_may_block_check: false,
801 })?;
802
803 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
807 }
808 }
809
810 Ok(*store
812 .concurrent_state_mut()
813 .get_mut(task)?
814 .result
815 .take()
816 .unwrap()
817 .downcast()
818 .unwrap())
819}
820
821fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
823 let mut next = Some(call);
824 while let Some(call) = next.take() {
825 match call.kind {
826 GuestCallKind::DeliverEvent { instance, set } => {
827 let (event, waitable) = instance
828 .get_event(store, call.thread.task, set, true)?
829 .unwrap();
830 let state = store.concurrent_state_mut();
831 let task = state.get_mut(call.thread.task)?;
832 let runtime_instance = task.instance;
833 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
834
835 log::trace!(
836 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
837 call.thread,
838 );
839
840 let old_thread = store.set_thread(call.thread);
841 log::trace!(
842 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
843 call.thread
844 );
845
846 store.enter_instance(runtime_instance);
847
848 let callback = store
849 .concurrent_state_mut()
850 .get_mut(call.thread.task)?
851 .callback
852 .take()
853 .unwrap();
854
855 let code = callback(store, event, handle)?;
856
857 store
858 .concurrent_state_mut()
859 .get_mut(call.thread.task)?
860 .callback = Some(callback);
861
862 store.exit_instance(runtime_instance)?;
863
864 store.set_thread(old_thread);
865
866 next = instance.handle_callback_code(
867 store,
868 call.thread,
869 runtime_instance.index,
870 code,
871 )?;
872
873 log::trace!(
874 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
875 );
876 }
877 GuestCallKind::StartImplicit(fun) => {
878 next = fun(store)?;
879 }
880 GuestCallKind::StartExplicit(fun) => {
881 fun(store)?;
882 }
883 }
884 }
885
886 Ok(())
887}
888
889impl<T> Store<T> {
890 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
892 where
893 T: Send + 'static,
894 {
895 ensure!(
896 self.as_context().0.concurrency_support(),
897 "cannot use `run_concurrent` when Config::concurrency_support disabled",
898 );
899 self.as_context_mut().run_concurrent(fun).await
900 }
901
902 #[doc(hidden)]
903 pub fn assert_concurrent_state_empty(&mut self) {
904 self.as_context_mut().assert_concurrent_state_empty();
905 }
906
907 #[doc(hidden)]
908 pub fn concurrent_state_table_size(&mut self) -> usize {
909 self.as_context_mut().concurrent_state_table_size()
910 }
911
912 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
914 where
915 T: 'static,
916 {
917 self.as_context_mut().spawn(task)
918 }
919}
920
921impl<T> StoreContextMut<'_, T> {
922 #[doc(hidden)]
933 pub fn assert_concurrent_state_empty(self) {
934 let store = self.0;
935 store
936 .store_data_mut()
937 .components
938 .assert_instance_states_empty();
939 let state = store.concurrent_state_mut();
940 assert!(
941 state.table.get_mut().is_empty(),
942 "non-empty table: {:?}",
943 state.table.get_mut()
944 );
945 assert!(state.high_priority.is_empty());
946 assert!(state.low_priority.is_empty());
947 assert!(state.current_thread.is_none());
948 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
949 assert!(state.global_error_context_ref_counts.is_empty());
950 }
951
952 #[doc(hidden)]
957 pub fn concurrent_state_table_size(&mut self) -> usize {
958 self.0
959 .concurrent_state_mut()
960 .table
961 .get_mut()
962 .iter_mut()
963 .count()
964 }
965
966 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
976 where
977 T: 'static,
978 {
979 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
980 self.spawn_with_accessor(accessor, task)
981 }
982
983 fn spawn_with_accessor<D>(
986 self,
987 accessor: Accessor<T, D>,
988 task: impl AccessorTask<T, D>,
989 ) -> JoinHandle
990 where
991 T: 'static,
992 D: HasData + ?Sized,
993 {
994 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
998 self.0
999 .concurrent_state_mut()
1000 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1001 handle
1002 }
1003
1004 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1088 where
1089 T: Send + 'static,
1090 {
1091 ensure!(
1092 self.0.concurrency_support(),
1093 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1094 );
1095 self.do_run_concurrent(fun, false).await
1096 }
1097
1098 pub(super) async fn run_concurrent_trap_on_idle<R>(
1099 self,
1100 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1101 ) -> Result<R>
1102 where
1103 T: Send + 'static,
1104 {
1105 self.do_run_concurrent(fun, true).await
1106 }
1107
1108 async fn do_run_concurrent<R>(
1109 mut self,
1110 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1111 trap_on_idle: bool,
1112 ) -> Result<R>
1113 where
1114 T: Send + 'static,
1115 {
1116 debug_assert!(self.0.concurrency_support());
1117 check_recursive_run();
1118 let token = StoreToken::new(self.as_context_mut());
1119
1120 struct Dropper<'a, T: 'static, V> {
1121 store: StoreContextMut<'a, T>,
1122 value: ManuallyDrop<V>,
1123 }
1124
1125 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1126 fn drop(&mut self) {
1127 tls::set(self.store.0, || {
1128 unsafe { ManuallyDrop::drop(&mut self.value) }
1133 });
1134 }
1135 }
1136
1137 let accessor = &Accessor::new(token);
1138 let dropper = &mut Dropper {
1139 store: self,
1140 value: ManuallyDrop::new(fun(accessor)),
1141 };
1142 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1144
1145 dropper
1146 .store
1147 .as_context_mut()
1148 .poll_until(future, trap_on_idle)
1149 .await
1150 }
1151
1152 async fn poll_until<R>(
1158 mut self,
1159 mut future: Pin<&mut impl Future<Output = R>>,
1160 trap_on_idle: bool,
1161 ) -> Result<R>
1162 where
1163 T: Send + 'static,
1164 {
1165 struct Reset<'a, T: 'static> {
1166 store: StoreContextMut<'a, T>,
1167 futures: Option<FuturesUnordered<HostTaskFuture>>,
1168 }
1169
1170 impl<'a, T> Drop for Reset<'a, T> {
1171 fn drop(&mut self) {
1172 if let Some(futures) = self.futures.take() {
1173 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1174 }
1175 }
1176 }
1177
1178 loop {
1179 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1183 let mut reset = Reset {
1184 store: self.as_context_mut(),
1185 futures,
1186 };
1187 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1188
1189 enum PollResult<R> {
1190 Complete(R),
1191 ProcessWork(Vec<WorkItem>),
1192 }
1193 let result = future::poll_fn(|cx| {
1194 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1197 return Poll::Ready(Ok(PollResult::Complete(value)));
1198 }
1199
1200 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1204 Poll::Ready(Some(output)) => {
1205 match output {
1206 Err(e) => return Poll::Ready(Err(e)),
1207 Ok(()) => {}
1208 }
1209 Poll::Ready(true)
1210 }
1211 Poll::Ready(None) => Poll::Ready(false),
1212 Poll::Pending => Poll::Pending,
1213 };
1214
1215 let state = reset.store.0.concurrent_state_mut();
1219 let ready = state.collect_work_items_to_run();
1220 if !ready.is_empty() {
1221 return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1222 }
1223
1224 return match next {
1228 Poll::Ready(true) => {
1229 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1235 }
1236 Poll::Ready(false) => {
1237 if let Poll::Ready(value) =
1241 tls::set(reset.store.0, || future.as_mut().poll(cx))
1242 {
1243 Poll::Ready(Ok(PollResult::Complete(value)))
1244 } else {
1245 if trap_on_idle {
1251 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1254 } else {
1255 Poll::Pending
1259 }
1260 }
1261 }
1262 Poll::Pending => Poll::Pending,
1267 };
1268 })
1269 .await;
1270
1271 drop(reset);
1275
1276 match result? {
1277 PollResult::Complete(value) => break Ok(value),
1280 PollResult::ProcessWork(ready) => {
1283 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1284 store: StoreContextMut<'a, T>,
1285 ready: I,
1286 }
1287
1288 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1289 fn drop(&mut self) {
1290 while let Some(item) = self.ready.next() {
1291 match item {
1292 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1293 WorkItem::PushFuture(future) => {
1294 tls::set(self.store.0, move || drop(future))
1295 }
1296 _ => {}
1297 }
1298 }
1299 }
1300 }
1301
1302 let mut dispose = Dispose {
1303 store: self.as_context_mut(),
1304 ready: ready.into_iter(),
1305 };
1306
1307 while let Some(item) = dispose.ready.next() {
1308 dispose
1309 .store
1310 .as_context_mut()
1311 .handle_work_item(item)
1312 .await?;
1313 }
1314 }
1315 }
1316 }
1317 }
1318
1319 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1321 where
1322 T: Send,
1323 {
1324 log::trace!("handle work item {item:?}");
1325 match item {
1326 WorkItem::PushFuture(future) => {
1327 self.0
1328 .concurrent_state_mut()
1329 .futures
1330 .get_mut()
1331 .as_mut()
1332 .unwrap()
1333 .push(future.into_inner());
1334 }
1335 WorkItem::ResumeFiber(fiber) => {
1336 self.0.resume_fiber(fiber).await?;
1337 }
1338 WorkItem::ResumeThread(_, thread) => {
1339 if let GuestThreadState::Ready(fiber) = mem::replace(
1340 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1341 GuestThreadState::Running,
1342 ) {
1343 self.0.resume_fiber(fiber).await?;
1344 } else {
1345 bail!("cannot resume non-pending thread {thread:?}");
1346 }
1347 }
1348 WorkItem::GuestCall(_, call) => {
1349 if call.is_ready(self.0)? {
1350 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1351 } else {
1352 let state = self.0.concurrent_state_mut();
1353 let task = state.get_mut(call.thread.task)?;
1354 if !task.starting_sent {
1355 task.starting_sent = true;
1356 if let GuestCallKind::StartImplicit(_) = &call.kind {
1357 Waitable::Guest(call.thread.task).set_event(
1358 state,
1359 Some(Event::Subtask {
1360 status: Status::Starting,
1361 }),
1362 )?;
1363 }
1364 }
1365
1366 let instance = state.get_mut(call.thread.task)?.instance;
1367 self.0
1368 .instance_state(instance)
1369 .concurrent_state()
1370 .pending
1371 .insert(call.thread, call.kind);
1372 }
1373 }
1374 WorkItem::WorkerFunction(fun) => {
1375 self.run_on_worker(WorkerItem::Function(fun)).await?;
1376 }
1377 }
1378
1379 Ok(())
1380 }
1381
1382 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1384 where
1385 T: Send,
1386 {
1387 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1388 fiber
1389 } else {
1390 fiber::make_fiber(self.0, move |store| {
1391 loop {
1392 match store.concurrent_state_mut().worker_item.take().unwrap() {
1393 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1394 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1395 }
1396
1397 store.suspend(SuspendReason::NeedWork)?;
1398 }
1399 })?
1400 };
1401
1402 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1403 assert!(worker_item.is_none());
1404 *worker_item = Some(item);
1405
1406 self.0.resume_fiber(worker).await
1407 }
1408
1409 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1414 where
1415 T: 'static,
1416 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1417 + Send
1418 + Sync
1419 + 'static,
1420 R: Send + Sync + 'static,
1421 {
1422 let token = StoreToken::new(self);
1423 async move {
1424 let mut accessor = Accessor::new(token);
1425 closure(&mut accessor).await
1426 }
1427 }
1428}
1429
1430impl StoreOpaque {
1431 pub(crate) fn enter_guest_sync_call(
1438 &mut self,
1439 guest_caller: Option<RuntimeInstance>,
1440 callee_async: bool,
1441 callee: RuntimeInstance,
1442 ) -> Result<()> {
1443 log::trace!("enter sync call {callee:?}");
1444 if !self.concurrency_support() {
1445 return Ok(self.enter_call_not_concurrent());
1446 }
1447
1448 let state = self.concurrent_state_mut();
1449 let thread = state.current_thread;
1450 let instance = if let Some(thread) = thread.guest() {
1451 Some(state.get_mut(thread.task)?.instance)
1452 } else {
1453 None
1454 };
1455 let task = GuestTask::new(
1456 state,
1457 Box::new(move |_, _| unreachable!()),
1458 LiftResult {
1459 lift: Box::new(move |_, _| unreachable!()),
1460 ty: TypeTupleIndex::reserved_value(),
1461 memory: None,
1462 string_encoding: StringEncoding::Utf8,
1463 },
1464 if let Some(caller) = guest_caller {
1465 assert_eq!(caller, instance.unwrap());
1466 Caller::Guest {
1467 thread: *thread.guest().unwrap(),
1468 }
1469 } else {
1470 Caller::Host {
1471 tx: None,
1472 exit_tx: Arc::new(oneshot::channel().0),
1473 host_future_present: false,
1474 caller: thread,
1475 }
1476 },
1477 None,
1478 callee,
1479 callee_async,
1480 )?;
1481
1482 let guest_task = state.push(task)?;
1483 let new_thread = GuestThread::new_implicit(guest_task);
1484 let guest_thread = state.push(new_thread)?;
1485 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1486 guest_thread,
1487 self,
1488 callee.index,
1489 )?;
1490
1491 let state = self.concurrent_state_mut();
1492 state.get_mut(guest_task)?.threads.insert(guest_thread);
1493 if guest_caller.is_some() {
1494 let thread = thread.guest().unwrap();
1495 state.get_mut(thread.task)?.subtasks.insert(guest_task);
1496 }
1497
1498 self.set_thread(QualifiedThreadId {
1499 task: guest_task,
1500 thread: guest_thread,
1501 });
1502
1503 Ok(())
1504 }
1505
1506 pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1508 if !self.concurrency_support() {
1509 return Ok(self.exit_call_not_concurrent());
1510 }
1511 let thread = *self.set_thread(CurrentThread::None).guest().unwrap();
1512 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1513 log::trace!("exit sync call {instance:?}");
1514 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1515 self,
1516 thread,
1517 instance.index,
1518 )?;
1519
1520 let state = self.concurrent_state_mut();
1521 let task = state.get_mut(thread.task)?;
1522 let caller = match &task.caller {
1523 &Caller::Guest { thread } => {
1524 assert!(guest_caller);
1525 thread.into()
1526 }
1527 &Caller::Host { caller, .. } => {
1528 assert!(!guest_caller);
1529 caller
1530 }
1531 };
1532 self.set_thread(caller);
1533
1534 let state = self.concurrent_state_mut();
1535 let task = state.get_mut(thread.task)?;
1536 if task.ready_to_delete() {
1537 state.delete(thread.task)?.dispose(state, thread.task)?;
1538 }
1539
1540 Ok(())
1541 }
1542
1543 pub fn enter_host_call(&mut self) -> Result<()> {
1551 let state = self.concurrent_state_mut();
1552 let caller = state.unwrap_current_guest_thread();
1553 let task = state.push(HostTask::new(caller))?;
1554 log::trace!("new host task {task:?}");
1555 self.set_thread(task);
1556 Ok(())
1557 }
1558
1559 pub fn exit_host_call(&mut self) -> Result<()> {
1566 let task = self.concurrent_state_mut().unwrap_current_host_thread();
1567 log::trace!("delete host task {task:?}");
1568 let task = self.concurrent_state_mut().delete(task)?;
1569 self.set_thread(task.caller);
1570 Ok(())
1571 }
1572
1573 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1581 if self.trapped() {
1582 return false;
1583 }
1584 if !self.concurrency_support() {
1585 return true;
1586 }
1587 let state = self.concurrent_state_mut();
1588 let mut cur = state.current_thread;
1589 loop {
1590 match cur {
1591 CurrentThread::None => break true,
1592 CurrentThread::Guest(thread) => {
1593 let task = state.get_mut(thread.task).unwrap();
1594
1595 if task.instance.instance == instance.instance {
1602 break false;
1603 }
1604 cur = match task.caller {
1605 Caller::Host { caller, .. } => caller,
1606 Caller::Guest { thread } => thread.into(),
1607 };
1608 }
1609 CurrentThread::Host(id) => {
1610 cur = state.get_mut(id).unwrap().caller.into();
1611 }
1612 }
1613 }
1614 }
1615
1616 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1619 self.component_instance_mut(instance.instance)
1620 .instance_state(instance.index)
1621 }
1622
1623 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread {
1624 let state = self.concurrent_state_mut();
1629 let old_thread = mem::replace(&mut state.current_thread, thread.into());
1630 if let Some(old_thread) = old_thread.guest() {
1631 let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1632 self.component_instance_mut(instance)
1633 .set_task_may_block(false)
1634 }
1635
1636 if self.concurrent_state_mut().current_thread.guest().is_some() {
1639 self.set_task_may_block();
1640 }
1641
1642 old_thread
1643 }
1644
1645 fn set_task_may_block(&mut self) {
1648 let state = self.concurrent_state_mut();
1649 let guest_thread = state.unwrap_current_guest_thread();
1650 let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1651 let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1652 self.component_instance_mut(instance)
1653 .set_task_may_block(may_block)
1654 }
1655
1656 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1657 if !self.concurrency_support() {
1658 return Ok(());
1659 }
1660 let state = self.concurrent_state_mut();
1661 let task = state.unwrap_current_guest_thread().task;
1662 let instance = state.get_mut(task).unwrap().instance.instance;
1663 let task_may_block = self.component_instance(instance).get_task_may_block();
1664
1665 if task_may_block {
1666 Ok(())
1667 } else {
1668 Err(Trap::CannotBlockSyncTask.into())
1669 }
1670 }
1671
1672 fn enter_instance(&mut self, instance: RuntimeInstance) {
1676 log::trace!("enter {instance:?}");
1677 self.instance_state(instance)
1678 .concurrent_state()
1679 .do_not_enter = true;
1680 }
1681
1682 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1686 log::trace!("exit {instance:?}");
1687 self.instance_state(instance)
1688 .concurrent_state()
1689 .do_not_enter = false;
1690 self.partition_pending(instance)
1691 }
1692
1693 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1698 for (thread, kind) in
1699 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1700 {
1701 let call = GuestCall { thread, kind };
1702 if call.is_ready(self)? {
1703 self.concurrent_state_mut()
1704 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1705 } else {
1706 self.instance_state(instance)
1707 .concurrent_state()
1708 .pending
1709 .insert(call.thread, call.kind);
1710 }
1711 }
1712
1713 Ok(())
1714 }
1715
1716 pub(crate) fn backpressure_modify(
1718 &mut self,
1719 caller_instance: RuntimeInstance,
1720 modify: impl FnOnce(u16) -> Option<u16>,
1721 ) -> Result<()> {
1722 let state = self.instance_state(caller_instance).concurrent_state();
1723 let old = state.backpressure;
1724 let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1725 state.backpressure = new;
1726
1727 if old > 0 && new == 0 {
1728 self.partition_pending(caller_instance)?;
1731 }
1732
1733 Ok(())
1734 }
1735
1736 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1739 let old_thread = self.concurrent_state_mut().current_thread;
1740 log::trace!("resume_fiber: save current thread {old_thread:?}");
1741
1742 let fiber = fiber::resolve_or_release(self, fiber).await?;
1743
1744 self.set_thread(old_thread);
1745
1746 let state = self.concurrent_state_mut();
1747
1748 if let Some(ot) = old_thread.guest() {
1749 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1750 }
1751 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1752
1753 if let Some(mut fiber) = fiber {
1754 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1755 match state.suspend_reason.take().unwrap() {
1757 SuspendReason::NeedWork => {
1758 if state.worker.is_none() {
1759 state.worker = Some(fiber);
1760 } else {
1761 fiber.dispose(self);
1762 }
1763 }
1764 SuspendReason::Yielding { thread, .. } => {
1765 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1766 let instance = state.get_mut(thread.task)?.instance.index;
1767 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1768 }
1769 SuspendReason::ExplicitlySuspending { thread, .. } => {
1770 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1771 }
1772 SuspendReason::Waiting { set, thread, .. } => {
1773 let old = state
1774 .get_mut(set)?
1775 .waiting
1776 .insert(thread, WaitMode::Fiber(fiber));
1777 assert!(old.is_none());
1778 }
1779 };
1780 } else {
1781 log::trace!("resume_fiber: fiber has exited");
1782 }
1783
1784 Ok(())
1785 }
1786
1787 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1793 log::trace!("suspend fiber: {reason:?}");
1794
1795 let task = match &reason {
1799 SuspendReason::Yielding { thread, .. }
1800 | SuspendReason::Waiting { thread, .. }
1801 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1802 SuspendReason::NeedWork => None,
1803 };
1804
1805 let old_guest_thread = if task.is_some() {
1806 self.concurrent_state_mut().current_thread
1807 } else {
1808 CurrentThread::None
1809 };
1810
1811 assert!(
1817 matches!(
1818 reason,
1819 SuspendReason::ExplicitlySuspending {
1820 skip_may_block_check: true,
1821 ..
1822 } | SuspendReason::Waiting {
1823 skip_may_block_check: true,
1824 ..
1825 } | SuspendReason::Yielding {
1826 skip_may_block_check: true,
1827 ..
1828 }
1829 ) || old_guest_thread
1830 .guest()
1831 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1832 .unwrap_or(true)
1833 );
1834
1835 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1836 assert!(suspend_reason.is_none());
1837 *suspend_reason = Some(reason);
1838
1839 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1840
1841 if task.is_some() {
1842 self.set_thread(old_guest_thread);
1843 }
1844
1845 Ok(())
1846 }
1847
1848 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1849 let state = self.concurrent_state_mut();
1850 let caller = state.unwrap_current_guest_thread();
1851 let old_set = waitable.common(state)?.set;
1852 let set = state.get_mut(caller.task)?.sync_call_set;
1853 waitable.join(state, Some(set))?;
1854 self.suspend(SuspendReason::Waiting {
1855 set,
1856 thread: caller,
1857 skip_may_block_check: false,
1858 })?;
1859 let state = self.concurrent_state_mut();
1860 waitable.join(state, old_set)
1861 }
1862}
1863
1864impl Instance {
1865 fn get_event(
1868 self,
1869 store: &mut StoreOpaque,
1870 guest_task: TableId<GuestTask>,
1871 set: Option<TableId<WaitableSet>>,
1872 cancellable: bool,
1873 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1874 let state = store.concurrent_state_mut();
1875
1876 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1877 log::trace!("deliver event {event:?} to {guest_task:?}");
1878
1879 if cancellable || !matches!(event, Event::Cancelled) {
1880 return Ok(Some((event, None)));
1881 } else {
1882 state.get_mut(guest_task)?.event = Some(event);
1883 }
1884 }
1885
1886 Ok(
1887 if let Some((set, waitable)) = set
1888 .and_then(|set| {
1889 state
1890 .get_mut(set)
1891 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1892 .transpose()
1893 })
1894 .transpose()?
1895 {
1896 let common = waitable.common(state)?;
1897 let handle = common.handle.unwrap();
1898 let event = common.event.take().unwrap();
1899
1900 log::trace!(
1901 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1902 );
1903
1904 waitable.on_delivery(store, self, event);
1905
1906 Some((event, Some((waitable, handle))))
1907 } else {
1908 None
1909 },
1910 )
1911 }
1912
1913 fn handle_callback_code(
1919 self,
1920 store: &mut StoreOpaque,
1921 guest_thread: QualifiedThreadId,
1922 runtime_instance: RuntimeComponentInstanceIndex,
1923 code: u32,
1924 ) -> Result<Option<GuestCall>> {
1925 let (code, set) = unpack_callback_code(code);
1926
1927 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1928
1929 let state = store.concurrent_state_mut();
1930
1931 let get_set = |store: &mut StoreOpaque, handle| {
1932 if handle == 0 {
1933 bail!("invalid waitable-set handle");
1934 }
1935
1936 let set = store
1937 .instance_state(RuntimeInstance {
1938 instance: self.id().instance(),
1939 index: runtime_instance,
1940 })
1941 .handle_table()
1942 .waitable_set_rep(handle)?;
1943
1944 Ok(TableId::<WaitableSet>::new(set))
1945 };
1946
1947 Ok(match code {
1948 callback_code::EXIT => {
1949 log::trace!("implicit thread {guest_thread:?} completed");
1950 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1951 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1952 if task.threads.is_empty() && !task.returned_or_cancelled() {
1953 bail!(Trap::NoAsyncResult);
1954 }
1955 match &task.caller {
1956 Caller::Host { .. } => {
1957 if task.ready_to_delete() {
1958 Waitable::Guest(guest_thread.task)
1959 .delete_from(store.concurrent_state_mut())?;
1960 }
1961 }
1962 Caller::Guest { .. } => {
1963 task.exited = true;
1964 task.callback = None;
1965 }
1966 }
1967 None
1968 }
1969 callback_code::YIELD => {
1970 let task = state.get_mut(guest_thread.task)?;
1971 if let Some(event) = task.event {
1976 assert!(matches!(event, Event::None | Event::Cancelled));
1977 } else {
1978 task.event = Some(Event::None);
1979 }
1980 let call = GuestCall {
1981 thread: guest_thread,
1982 kind: GuestCallKind::DeliverEvent {
1983 instance: self,
1984 set: None,
1985 },
1986 };
1987 if state.may_block(guest_thread.task) {
1988 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
1991 None
1992 } else {
1993 Some(call)
1997 }
1998 }
1999 callback_code::WAIT => {
2000 state.check_blocking_for(guest_thread.task)?;
2003
2004 let set = get_set(store, set)?;
2005 let state = store.concurrent_state_mut();
2006
2007 if state.get_mut(guest_thread.task)?.event.is_some()
2008 || !state.get_mut(set)?.ready.is_empty()
2009 {
2010 state.push_high_priority(WorkItem::GuestCall(
2012 runtime_instance,
2013 GuestCall {
2014 thread: guest_thread,
2015 kind: GuestCallKind::DeliverEvent {
2016 instance: self,
2017 set: Some(set),
2018 },
2019 },
2020 ));
2021 } else {
2022 let old = state
2030 .get_mut(guest_thread.thread)?
2031 .wake_on_cancel
2032 .replace(set);
2033 assert!(old.is_none());
2034 let old = state
2035 .get_mut(set)?
2036 .waiting
2037 .insert(guest_thread, WaitMode::Callback(self));
2038 assert!(old.is_none());
2039 }
2040 None
2041 }
2042 _ => bail!("unsupported callback code: {code}"),
2043 })
2044 }
2045
2046 fn cleanup_thread(
2047 self,
2048 store: &mut StoreOpaque,
2049 guest_thread: QualifiedThreadId,
2050 runtime_instance: RuntimeComponentInstanceIndex,
2051 ) -> Result<()> {
2052 let guest_id = store
2053 .concurrent_state_mut()
2054 .get_mut(guest_thread.thread)?
2055 .instance_rep;
2056 store
2057 .instance_state(RuntimeInstance {
2058 instance: self.id().instance(),
2059 index: runtime_instance,
2060 })
2061 .thread_handle_table()
2062 .guest_thread_remove(guest_id.unwrap())?;
2063
2064 store.concurrent_state_mut().delete(guest_thread.thread)?;
2065 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2066 task.threads.remove(&guest_thread.thread);
2067 Ok(())
2068 }
2069
2070 unsafe fn queue_call<T: 'static>(
2077 self,
2078 mut store: StoreContextMut<T>,
2079 guest_thread: QualifiedThreadId,
2080 callee: SendSyncPtr<VMFuncRef>,
2081 param_count: usize,
2082 result_count: usize,
2083 async_: bool,
2084 callback: Option<SendSyncPtr<VMFuncRef>>,
2085 post_return: Option<SendSyncPtr<VMFuncRef>>,
2086 ) -> Result<()> {
2087 unsafe fn make_call<T: 'static>(
2102 store: StoreContextMut<T>,
2103 guest_thread: QualifiedThreadId,
2104 callee: SendSyncPtr<VMFuncRef>,
2105 param_count: usize,
2106 result_count: usize,
2107 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2108 + Send
2109 + Sync
2110 + 'static
2111 + use<T> {
2112 let token = StoreToken::new(store);
2113 move |store: &mut dyn VMStore| {
2114 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2115
2116 store
2117 .concurrent_state_mut()
2118 .get_mut(guest_thread.thread)?
2119 .state = GuestThreadState::Running;
2120 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2121 let lower = task.lower_params.take().unwrap();
2122
2123 lower(store, &mut storage[..param_count])?;
2124
2125 let mut store = token.as_context_mut(store);
2126
2127 unsafe {
2130 crate::Func::call_unchecked_raw(
2131 &mut store,
2132 callee.as_non_null(),
2133 NonNull::new(
2134 &mut storage[..param_count.max(result_count)]
2135 as *mut [MaybeUninit<ValRaw>] as _,
2136 )
2137 .unwrap(),
2138 )?;
2139 }
2140
2141 Ok(storage)
2142 }
2143 }
2144
2145 let call = unsafe {
2149 make_call(
2150 store.as_context_mut(),
2151 guest_thread,
2152 callee,
2153 param_count,
2154 result_count,
2155 )
2156 };
2157
2158 let callee_instance = store
2159 .0
2160 .concurrent_state_mut()
2161 .get_mut(guest_thread.task)?
2162 .instance;
2163
2164 let fun = if callback.is_some() {
2165 assert!(async_);
2166
2167 Box::new(move |store: &mut dyn VMStore| {
2168 self.add_guest_thread_to_instance_table(
2169 guest_thread.thread,
2170 store,
2171 callee_instance.index,
2172 )?;
2173 let old_thread = store.set_thread(guest_thread);
2174 log::trace!(
2175 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2176 );
2177
2178 store.enter_instance(callee_instance);
2179
2180 let storage = call(store)?;
2187
2188 store.exit_instance(callee_instance)?;
2189
2190 store.set_thread(old_thread);
2191 let state = store.concurrent_state_mut();
2192 old_thread
2193 .guest()
2194 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2195 log::trace!("stackless call: restored {old_thread:?} as current thread");
2196
2197 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2200
2201 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2202 })
2203 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2204 } else {
2205 let token = StoreToken::new(store.as_context_mut());
2206 Box::new(move |store: &mut dyn VMStore| {
2207 self.add_guest_thread_to_instance_table(
2208 guest_thread.thread,
2209 store,
2210 callee_instance.index,
2211 )?;
2212 let old_thread = store.set_thread(guest_thread);
2213 log::trace!(
2214 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2215 );
2216 let flags = self.id().get(store).instance_flags(callee_instance.index);
2217
2218 if !async_ {
2222 store.enter_instance(callee_instance);
2223 }
2224
2225 let storage = call(store)?;
2232
2233 if async_ {
2234 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2235 if task.threads.len() == 1 && !task.returned_or_cancelled() {
2236 bail!(Trap::NoAsyncResult);
2237 }
2238 } else {
2239 let lift = {
2245 store.exit_instance(callee_instance)?;
2246
2247 let state = store.concurrent_state_mut();
2248 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2249
2250 state
2251 .get_mut(guest_thread.task)?
2252 .lift_result
2253 .take()
2254 .unwrap()
2255 };
2256
2257 let result = (lift.lift)(store, unsafe {
2260 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2261 &storage[..result_count],
2262 )
2263 })?;
2264
2265 let post_return_arg = match result_count {
2266 0 => ValRaw::i32(0),
2267 1 => unsafe { storage[0].assume_init() },
2270 _ => unreachable!(),
2271 };
2272
2273 unsafe {
2274 call_post_return(
2275 token.as_context_mut(store),
2276 post_return.map(|v| v.as_non_null()),
2277 post_return_arg,
2278 flags,
2279 )?;
2280 }
2281
2282 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2283 }
2284
2285 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2287
2288 store.set_thread(old_thread);
2289
2290 let state = store.concurrent_state_mut();
2291 let task = state.get_mut(guest_thread.task)?;
2292
2293 match &task.caller {
2294 Caller::Host { .. } => {
2295 if task.ready_to_delete() {
2296 Waitable::Guest(guest_thread.task).delete_from(state)?;
2297 }
2298 }
2299 Caller::Guest { .. } => {
2300 task.exited = true;
2301 }
2302 }
2303
2304 Ok(None)
2305 })
2306 };
2307
2308 store
2309 .0
2310 .concurrent_state_mut()
2311 .push_high_priority(WorkItem::GuestCall(
2312 callee_instance.index,
2313 GuestCall {
2314 thread: guest_thread,
2315 kind: GuestCallKind::StartImplicit(fun),
2316 },
2317 ));
2318
2319 Ok(())
2320 }
2321
2322 unsafe fn prepare_call<T: 'static>(
2335 self,
2336 mut store: StoreContextMut<T>,
2337 start: *mut VMFuncRef,
2338 return_: *mut VMFuncRef,
2339 caller_instance: RuntimeComponentInstanceIndex,
2340 callee_instance: RuntimeComponentInstanceIndex,
2341 task_return_type: TypeTupleIndex,
2342 callee_async: bool,
2343 memory: *mut VMMemoryDefinition,
2344 string_encoding: u8,
2345 caller_info: CallerInfo,
2346 ) -> Result<()> {
2347 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2348 store.0.check_blocking()?;
2352 }
2353
2354 enum ResultInfo {
2355 Heap { results: u32 },
2356 Stack { result_count: u32 },
2357 }
2358
2359 let result_info = match &caller_info {
2360 CallerInfo::Async {
2361 has_result: true,
2362 params,
2363 } => ResultInfo::Heap {
2364 results: params.last().unwrap().get_u32(),
2365 },
2366 CallerInfo::Async {
2367 has_result: false, ..
2368 } => ResultInfo::Stack { result_count: 0 },
2369 CallerInfo::Sync {
2370 result_count,
2371 params,
2372 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2373 results: params.last().unwrap().get_u32(),
2374 },
2375 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2376 result_count: *result_count,
2377 },
2378 };
2379
2380 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2381
2382 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2386 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2387 let token = StoreToken::new(store.as_context_mut());
2388 let state = store.0.concurrent_state_mut();
2389 let old_thread = state.unwrap_current_guest_thread();
2390
2391 assert_eq!(
2392 state.get_mut(old_thread.task)?.instance,
2393 RuntimeInstance {
2394 instance: self.id().instance(),
2395 index: caller_instance,
2396 }
2397 );
2398
2399 let new_task = GuestTask::new(
2400 state,
2401 Box::new(move |store, dst| {
2402 let mut store = token.as_context_mut(store);
2403 assert!(dst.len() <= MAX_FLAT_PARAMS);
2404 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2406 let count = match caller_info {
2407 CallerInfo::Async { params, has_result } => {
2411 let params = ¶ms[..params.len() - usize::from(has_result)];
2412 for (param, src) in params.iter().zip(&mut src) {
2413 src.write(*param);
2414 }
2415 params.len()
2416 }
2417
2418 CallerInfo::Sync { params, .. } => {
2420 for (param, src) in params.iter().zip(&mut src) {
2421 src.write(*param);
2422 }
2423 params.len()
2424 }
2425 };
2426 unsafe {
2433 crate::Func::call_unchecked_raw(
2434 &mut store,
2435 start.as_non_null(),
2436 NonNull::new(
2437 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2438 )
2439 .unwrap(),
2440 )?;
2441 }
2442 dst.copy_from_slice(&src[..dst.len()]);
2443 let state = store.0.concurrent_state_mut();
2444 Waitable::Guest(state.unwrap_current_guest_thread().task).set_event(
2445 state,
2446 Some(Event::Subtask {
2447 status: Status::Started,
2448 }),
2449 )?;
2450 Ok(())
2451 }),
2452 LiftResult {
2453 lift: Box::new(move |store, src| {
2454 let mut store = token.as_context_mut(store);
2457 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2459 my_src.push(ValRaw::u32(*results));
2460 }
2461 unsafe {
2468 crate::Func::call_unchecked_raw(
2469 &mut store,
2470 return_.as_non_null(),
2471 my_src.as_mut_slice().into(),
2472 )?;
2473 }
2474 let state = store.0.concurrent_state_mut();
2475 let thread = state.unwrap_current_guest_thread();
2476 if sync_caller {
2477 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2478 if let ResultInfo::Stack { result_count } = &result_info {
2479 match result_count {
2480 0 => None,
2481 1 => Some(my_src[0]),
2482 _ => unreachable!(),
2483 }
2484 } else {
2485 None
2486 },
2487 );
2488 }
2489 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2490 }),
2491 ty: task_return_type,
2492 memory: NonNull::new(memory).map(SendSyncPtr::new),
2493 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2494 },
2495 Caller::Guest { thread: old_thread },
2496 None,
2497 RuntimeInstance {
2498 instance: self.id().instance(),
2499 index: callee_instance,
2500 },
2501 callee_async,
2502 )?;
2503
2504 let guest_task = state.push(new_task)?;
2505 let new_thread = GuestThread::new_implicit(guest_task);
2506 let guest_thread = state.push(new_thread)?;
2507 state.get_mut(guest_task)?.threads.insert(guest_thread);
2508
2509 store
2510 .0
2511 .concurrent_state_mut()
2512 .get_mut(old_thread.task)?
2513 .subtasks
2514 .insert(guest_task);
2515
2516 store.0.set_thread(QualifiedThreadId {
2519 task: guest_task,
2520 thread: guest_thread,
2521 });
2522 log::trace!(
2523 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2524 );
2525
2526 Ok(())
2527 }
2528
2529 unsafe fn call_callback<T>(
2534 self,
2535 mut store: StoreContextMut<T>,
2536 function: SendSyncPtr<VMFuncRef>,
2537 event: Event,
2538 handle: u32,
2539 ) -> Result<u32> {
2540 let (ordinal, result) = event.parts();
2541 let params = &mut [
2542 ValRaw::u32(ordinal),
2543 ValRaw::u32(handle),
2544 ValRaw::u32(result),
2545 ];
2546 unsafe {
2551 crate::Func::call_unchecked_raw(
2552 &mut store,
2553 function.as_non_null(),
2554 params.as_mut_slice().into(),
2555 )?;
2556 }
2557 Ok(params[0].get_u32())
2558 }
2559
2560 unsafe fn start_call<T: 'static>(
2573 self,
2574 mut store: StoreContextMut<T>,
2575 callback: *mut VMFuncRef,
2576 post_return: *mut VMFuncRef,
2577 callee: *mut VMFuncRef,
2578 param_count: u32,
2579 result_count: u32,
2580 flags: u32,
2581 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2582 ) -> Result<u32> {
2583 let token = StoreToken::new(store.as_context_mut());
2584 let async_caller = storage.is_none();
2585 let state = store.0.concurrent_state_mut();
2586 let guest_thread = state.unwrap_current_guest_thread();
2587 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2588 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2589 let param_count = usize::try_from(param_count).unwrap();
2590 assert!(param_count <= MAX_FLAT_PARAMS);
2591 let result_count = usize::try_from(result_count).unwrap();
2592 assert!(result_count <= MAX_FLAT_RESULTS);
2593
2594 let task = state.get_mut(guest_thread.task)?;
2595 if !callback.is_null() {
2596 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2600 task.callback = Some(Box::new(move |store, event, handle| {
2601 let store = token.as_context_mut(store);
2602 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2603 }));
2604 }
2605
2606 let Caller::Guest { thread: caller } = &task.caller else {
2607 unreachable!()
2610 };
2611 let caller = *caller;
2612 let caller_instance = state.get_mut(caller.task)?.instance;
2613
2614 unsafe {
2616 self.queue_call(
2617 store.as_context_mut(),
2618 guest_thread,
2619 callee,
2620 param_count,
2621 result_count,
2622 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2623 NonNull::new(callback).map(SendSyncPtr::new),
2624 NonNull::new(post_return).map(SendSyncPtr::new),
2625 )?;
2626 }
2627
2628 let state = store.0.concurrent_state_mut();
2629
2630 let guest_waitable = Waitable::Guest(guest_thread.task);
2633 let old_set = guest_waitable.common(state)?.set;
2634 let set = state.get_mut(caller.task)?.sync_call_set;
2635 guest_waitable.join(state, Some(set))?;
2636
2637 let (status, waitable) = loop {
2653 store.0.suspend(SuspendReason::Waiting {
2654 set,
2655 thread: caller,
2656 skip_may_block_check: async_caller || !callee_async,
2664 })?;
2665
2666 let state = store.0.concurrent_state_mut();
2667
2668 log::trace!("taking event for {:?}", guest_thread.task);
2669 let event = guest_waitable.take_event(state)?;
2670 let Some(Event::Subtask { status }) = event else {
2671 unreachable!();
2672 };
2673
2674 log::trace!("status {status:?} for {:?}", guest_thread.task);
2675
2676 if status == Status::Returned {
2677 break (status, None);
2679 } else if async_caller {
2680 let handle = store
2684 .0
2685 .instance_state(caller_instance)
2686 .handle_table()
2687 .subtask_insert_guest(guest_thread.task.rep())?;
2688 store
2689 .0
2690 .concurrent_state_mut()
2691 .get_mut(guest_thread.task)?
2692 .common
2693 .handle = Some(handle);
2694 break (status, Some(handle));
2695 } else {
2696 }
2700 };
2701
2702 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2703
2704 store.0.set_thread(caller);
2706 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2707 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2708
2709 if let Some(storage) = storage {
2710 let state = store.0.concurrent_state_mut();
2714 let task = state.get_mut(guest_thread.task)?;
2715 if let Some(result) = task.sync_result.take() {
2716 if let Some(result) = result {
2717 storage[0] = MaybeUninit::new(result);
2718 }
2719
2720 if task.exited && task.ready_to_delete() {
2721 Waitable::Guest(guest_thread.task).delete_from(state)?;
2722 }
2723 }
2724 }
2725
2726 Ok(status.pack(waitable))
2727 }
2728
2729 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2741 self,
2742 mut store: StoreContextMut<'_, T>,
2743 future: impl Future<Output = Result<R>> + Send + 'static,
2744 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2745 ) -> Result<Option<u32>> {
2746 let token = StoreToken::new(store.as_context_mut());
2747 let state = store.0.concurrent_state_mut();
2748 let task = state.unwrap_current_host_thread();
2749
2750 let (join_handle, future) = JoinHandle::run(future);
2753 {
2754 let task = state.get_mut(task)?;
2755 assert!(task.join_handle.is_none());
2756 task.join_handle = Some(join_handle);
2757 }
2758
2759 let mut future = Box::pin(future);
2760
2761 let poll = tls::set(store.0, || {
2766 future
2767 .as_mut()
2768 .poll(&mut Context::from_waker(&Waker::noop()))
2769 });
2770
2771 match poll {
2772 Poll::Ready(Some(result)) => {
2774 lower(store.as_context_mut(), result?)?;
2775 return Ok(None);
2776 }
2777
2778 Poll::Ready(None) => unreachable!(),
2781
2782 Poll::Pending => {}
2784 }
2785
2786 let future = Box::pin(async move {
2794 let result = match future.await {
2795 Some(result) => result?,
2796 None => return Ok(()),
2798 };
2799 let on_complete = move |store: &mut dyn VMStore| {
2800 let mut store = token.as_context_mut(store);
2804 let state = store.0.concurrent_state_mut();
2805 assert!(state.current_thread.is_none());
2806 store.0.set_thread(task);
2807
2808 lower(store.as_context_mut(), result)?;
2809 let state = store.0.concurrent_state_mut();
2810 state.get_mut(task)?.join_handle.take();
2811 Waitable::Host(task).set_event(
2812 state,
2813 Some(Event::Subtask {
2814 status: Status::Returned,
2815 }),
2816 )?;
2817
2818 store.0.set_thread(CurrentThread::None);
2820 Ok(())
2821 };
2822
2823 tls::get(move |store| {
2828 store
2829 .concurrent_state_mut()
2830 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2831 on_complete,
2832 ))));
2833 Ok(())
2834 })
2835 });
2836
2837 let state = store.0.concurrent_state_mut();
2840 state.push_future(future);
2841 let caller = state.get_mut(task)?.caller;
2842 let instance = state.get_mut(caller.task)?.instance;
2843 let handle = store
2844 .0
2845 .instance_state(instance)
2846 .handle_table()
2847 .subtask_insert_host(task.rep())?;
2848 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2849 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2850
2851 store.0.set_thread(caller);
2855 Ok(Some(handle))
2856 }
2857
2858 pub(crate) fn task_return(
2861 self,
2862 store: &mut dyn VMStore,
2863 ty: TypeTupleIndex,
2864 options: OptionsIndex,
2865 storage: &[ValRaw],
2866 ) -> Result<()> {
2867 let state = store.concurrent_state_mut();
2868 let guest_thread = state.unwrap_current_guest_thread();
2869 let lift = state
2870 .get_mut(guest_thread.task)?
2871 .lift_result
2872 .take()
2873 .ok_or_else(|| {
2874 format_err!("`task.return` or `task.cancel` called more than once for current task")
2875 })?;
2876 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2877
2878 let CanonicalOptions {
2879 string_encoding,
2880 data_model,
2881 ..
2882 } = &self.id().get(store).component().env_component().options[options];
2883
2884 let invalid = ty != lift.ty
2885 || string_encoding != &lift.string_encoding
2886 || match data_model {
2887 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2888 Some(memory) => {
2889 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2890 let actual = self.id().get(store).runtime_memory(memory);
2891 expected != actual.as_ptr()
2892 }
2893 None => false,
2896 },
2897 CanonicalOptionsDataModel::Gc { .. } => true,
2899 };
2900
2901 if invalid {
2902 bail!("invalid `task.return` signature and/or options for current task");
2903 }
2904
2905 log::trace!("task.return for {guest_thread:?}");
2906
2907 let result = (lift.lift)(store, storage)?;
2908 self.task_complete(store, guest_thread.task, result, Status::Returned)
2909 }
2910
2911 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2913 let state = store.concurrent_state_mut();
2914 let guest_thread = state.unwrap_current_guest_thread();
2915 let task = state.get_mut(guest_thread.task)?;
2916 if !task.cancel_sent {
2917 bail!("`task.cancel` called by task which has not been cancelled")
2918 }
2919 _ = task.lift_result.take().ok_or_else(|| {
2920 format_err!("`task.return` or `task.cancel` called more than once for current task")
2921 })?;
2922
2923 assert!(task.result.is_none());
2924
2925 log::trace!("task.cancel for {guest_thread:?}");
2926
2927 self.task_complete(
2928 store,
2929 guest_thread.task,
2930 Box::new(DummyResult),
2931 Status::ReturnCancelled,
2932 )
2933 }
2934
2935 fn task_complete(
2941 self,
2942 store: &mut StoreOpaque,
2943 guest_task: TableId<GuestTask>,
2944 result: Box<dyn Any + Send + Sync>,
2945 status: Status,
2946 ) -> Result<()> {
2947 store
2948 .component_resource_tables(Some(self))
2949 .validate_scope_exit()?;
2950
2951 let state = store.concurrent_state_mut();
2952 let task = state.get_mut(guest_task)?;
2953
2954 if let Caller::Host { tx, .. } = &mut task.caller {
2955 if let Some(tx) = tx.take() {
2956 _ = tx.send(result);
2957 }
2958 } else {
2959 task.result = Some(result);
2960 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2961 }
2962
2963 Ok(())
2964 }
2965
2966 pub(crate) fn waitable_set_new(
2968 self,
2969 store: &mut StoreOpaque,
2970 caller_instance: RuntimeComponentInstanceIndex,
2971 ) -> Result<u32> {
2972 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2973 let handle = store
2974 .instance_state(RuntimeInstance {
2975 instance: self.id().instance(),
2976 index: caller_instance,
2977 })
2978 .handle_table()
2979 .waitable_set_insert(set.rep())?;
2980 log::trace!("new waitable set {set:?} (handle {handle})");
2981 Ok(handle)
2982 }
2983
2984 pub(crate) fn waitable_set_drop(
2986 self,
2987 store: &mut StoreOpaque,
2988 caller_instance: RuntimeComponentInstanceIndex,
2989 set: u32,
2990 ) -> Result<()> {
2991 let rep = store
2992 .instance_state(RuntimeInstance {
2993 instance: self.id().instance(),
2994 index: caller_instance,
2995 })
2996 .handle_table()
2997 .waitable_set_remove(set)?;
2998
2999 log::trace!("drop waitable set {rep} (handle {set})");
3000
3001 let set = store
3002 .concurrent_state_mut()
3003 .delete(TableId::<WaitableSet>::new(rep))?;
3004
3005 if !set.waiting.is_empty() {
3006 bail!("cannot drop waitable set with waiters");
3007 }
3008
3009 Ok(())
3010 }
3011
3012 pub(crate) fn waitable_join(
3014 self,
3015 store: &mut StoreOpaque,
3016 caller_instance: RuntimeComponentInstanceIndex,
3017 waitable_handle: u32,
3018 set_handle: u32,
3019 ) -> Result<()> {
3020 let mut instance = self.id().get_mut(store);
3021 let waitable =
3022 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3023
3024 let set = if set_handle == 0 {
3025 None
3026 } else {
3027 let set = instance.instance_states().0[caller_instance]
3028 .handle_table()
3029 .waitable_set_rep(set_handle)?;
3030
3031 Some(TableId::<WaitableSet>::new(set))
3032 };
3033
3034 log::trace!(
3035 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3036 );
3037
3038 waitable.join(store.concurrent_state_mut(), set)
3039 }
3040
3041 pub(crate) fn subtask_drop(
3043 self,
3044 store: &mut StoreOpaque,
3045 caller_instance: RuntimeComponentInstanceIndex,
3046 task_id: u32,
3047 ) -> Result<()> {
3048 self.waitable_join(store, caller_instance, task_id, 0)?;
3049
3050 let (rep, is_host) = store
3051 .instance_state(RuntimeInstance {
3052 instance: self.id().instance(),
3053 index: caller_instance,
3054 })
3055 .handle_table()
3056 .subtask_remove(task_id)?;
3057
3058 let concurrent_state = store.concurrent_state_mut();
3059 let (waitable, expected_caller, delete) = if is_host {
3060 let id = TableId::<HostTask>::new(rep);
3061 let task = concurrent_state.get_mut(id)?;
3062 if task.join_handle.is_some() {
3063 bail!("cannot drop a subtask which has not yet resolved");
3064 }
3065 (Waitable::Host(id), task.caller, true)
3066 } else {
3067 let id = TableId::<GuestTask>::new(rep);
3068 let task = concurrent_state.get_mut(id)?;
3069 if task.lift_result.is_some() {
3070 bail!("cannot drop a subtask which has not yet resolved");
3071 }
3072 if let Caller::Guest { thread } = task.caller {
3073 (
3074 Waitable::Guest(id),
3075 thread,
3076 concurrent_state.get_mut(id)?.exited,
3077 )
3078 } else {
3079 unreachable!()
3080 }
3081 };
3082
3083 waitable.common(concurrent_state)?.handle = None;
3084
3085 if waitable.take_event(concurrent_state)?.is_some() {
3086 bail!("cannot drop a subtask with an undelivered event");
3087 }
3088
3089 if delete {
3090 waitable.delete_from(concurrent_state)?;
3091 }
3092
3093 assert_eq!(
3097 expected_caller,
3098 concurrent_state.unwrap_current_guest_thread(),
3099 );
3100 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3101 Ok(())
3102 }
3103
3104 pub(crate) fn waitable_set_wait(
3106 self,
3107 store: &mut StoreOpaque,
3108 options: OptionsIndex,
3109 set: u32,
3110 payload: u32,
3111 ) -> Result<u32> {
3112 if !self.options(store, options).async_ {
3113 store.check_blocking()?;
3117 }
3118
3119 let &CanonicalOptions {
3120 cancellable,
3121 instance: caller_instance,
3122 ..
3123 } = &self.id().get(store).component().env_component().options[options];
3124 let rep = store
3125 .instance_state(RuntimeInstance {
3126 instance: self.id().instance(),
3127 index: caller_instance,
3128 })
3129 .handle_table()
3130 .waitable_set_rep(set)?;
3131
3132 self.waitable_check(
3133 store,
3134 cancellable,
3135 WaitableCheck::Wait,
3136 WaitableCheckParams {
3137 set: TableId::new(rep),
3138 options,
3139 payload,
3140 },
3141 )
3142 }
3143
3144 pub(crate) fn waitable_set_poll(
3146 self,
3147 store: &mut StoreOpaque,
3148 options: OptionsIndex,
3149 set: u32,
3150 payload: u32,
3151 ) -> Result<u32> {
3152 let &CanonicalOptions {
3153 cancellable,
3154 instance: caller_instance,
3155 ..
3156 } = &self.id().get(store).component().env_component().options[options];
3157 let rep = store
3158 .instance_state(RuntimeInstance {
3159 instance: self.id().instance(),
3160 index: caller_instance,
3161 })
3162 .handle_table()
3163 .waitable_set_rep(set)?;
3164
3165 self.waitable_check(
3166 store,
3167 cancellable,
3168 WaitableCheck::Poll,
3169 WaitableCheckParams {
3170 set: TableId::new(rep),
3171 options,
3172 payload,
3173 },
3174 )
3175 }
3176
3177 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3179 let thread_id = store
3180 .concurrent_state_mut()
3181 .unwrap_current_guest_thread()
3182 .thread;
3183 Ok(store
3185 .concurrent_state_mut()
3186 .get_mut(thread_id)?
3187 .instance_rep
3188 .unwrap())
3189 }
3190
3191 pub(crate) fn thread_new_indirect<T: 'static>(
3193 self,
3194 mut store: StoreContextMut<T>,
3195 runtime_instance: RuntimeComponentInstanceIndex,
3196 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3198 start_func_idx: u32,
3199 context: i32,
3200 ) -> Result<u32> {
3201 log::trace!("creating new thread");
3202
3203 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3204 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3205 let callee = instance
3206 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3207 .ok_or_else(|| {
3208 format_err!("the start function index points to an uninitialized function")
3209 })?;
3210 if callee.type_index(store.0) != start_func_ty.type_index() {
3211 bail!(
3212 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3213 );
3214 }
3215
3216 let token = StoreToken::new(store.as_context_mut());
3217 let start_func = Box::new(
3218 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3219 let old_thread = store.set_thread(guest_thread);
3220 log::trace!(
3221 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3222 );
3223
3224 let mut store = token.as_context_mut(store);
3225 let mut params = [ValRaw::i32(context)];
3226 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3229
3230 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3231 log::trace!("explicit thread {guest_thread:?} completed");
3232 let state = store.0.concurrent_state_mut();
3233 let task = state.get_mut(guest_thread.task)?;
3234 if task.threads.is_empty() && !task.returned_or_cancelled() {
3235 bail!(Trap::NoAsyncResult);
3236 }
3237 store.0.set_thread(old_thread);
3238 let state = store.0.concurrent_state_mut();
3239 old_thread
3240 .guest()
3241 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3242 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3243 Waitable::Guest(guest_thread.task).delete_from(state)?;
3244 }
3245 log::trace!("thread start: restored {old_thread:?} as current thread");
3246
3247 Ok(())
3248 },
3249 );
3250
3251 let state = store.0.concurrent_state_mut();
3252 let current_thread = state.unwrap_current_guest_thread();
3253 let parent_task = current_thread.task;
3254
3255 let new_thread = GuestThread::new_explicit(parent_task, start_func);
3256 let thread_id = state.push(new_thread)?;
3257 state.get_mut(parent_task)?.threads.insert(thread_id);
3258
3259 log::trace!("new thread with id {thread_id:?} created");
3260
3261 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3262 }
3263
3264 pub(crate) fn resume_thread(
3265 self,
3266 store: &mut StoreOpaque,
3267 runtime_instance: RuntimeComponentInstanceIndex,
3268 thread_idx: u32,
3269 high_priority: bool,
3270 allow_ready: bool,
3271 ) -> Result<()> {
3272 let thread_id =
3273 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3274 let state = store.concurrent_state_mut();
3275 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3276 let thread = state.get_mut(guest_thread.thread)?;
3277
3278 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3279 GuestThreadState::NotStartedExplicit(start_func) => {
3280 log::trace!("starting thread {guest_thread:?}");
3281 let guest_call = WorkItem::GuestCall(
3282 runtime_instance,
3283 GuestCall {
3284 thread: guest_thread,
3285 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3286 start_func(store, guest_thread)
3287 })),
3288 },
3289 );
3290 store
3291 .concurrent_state_mut()
3292 .push_work_item(guest_call, high_priority);
3293 }
3294 GuestThreadState::Suspended(fiber) => {
3295 log::trace!("resuming thread {thread_id:?} that was suspended");
3296 store
3297 .concurrent_state_mut()
3298 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3299 }
3300 GuestThreadState::Ready(fiber) if allow_ready => {
3301 log::trace!("resuming thread {thread_id:?} that was ready");
3302 thread.state = GuestThreadState::Ready(fiber);
3303 store
3304 .concurrent_state_mut()
3305 .promote_thread_work_item(guest_thread);
3306 }
3307 other => {
3308 thread.state = other;
3309 bail!("cannot resume thread which is not suspended");
3310 }
3311 }
3312 Ok(())
3313 }
3314
3315 fn add_guest_thread_to_instance_table(
3316 self,
3317 thread_id: TableId<GuestThread>,
3318 store: &mut StoreOpaque,
3319 runtime_instance: RuntimeComponentInstanceIndex,
3320 ) -> Result<u32> {
3321 let guest_id = store
3322 .instance_state(RuntimeInstance {
3323 instance: self.id().instance(),
3324 index: runtime_instance,
3325 })
3326 .thread_handle_table()
3327 .guest_thread_insert(thread_id.rep())?;
3328 store
3329 .concurrent_state_mut()
3330 .get_mut(thread_id)?
3331 .instance_rep = Some(guest_id);
3332 Ok(guest_id)
3333 }
3334
3335 pub(crate) fn suspension_intrinsic(
3338 self,
3339 store: &mut StoreOpaque,
3340 caller: RuntimeComponentInstanceIndex,
3341 cancellable: bool,
3342 yielding: bool,
3343 to_thread: SuspensionTarget,
3344 ) -> Result<WaitResult> {
3345 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3346 if to_thread.is_none() {
3347 let state = store.concurrent_state_mut();
3348 if yielding {
3349 if !state.may_block(guest_thread.task) {
3351 if !state.promote_instance_local_thread_work_item(caller) {
3354 return Ok(WaitResult::Completed);
3356 }
3357 }
3358 } else {
3359 store.check_blocking()?;
3363 }
3364 }
3365
3366 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3368 return Ok(WaitResult::Cancelled);
3369 }
3370
3371 match to_thread {
3372 SuspensionTarget::SomeSuspended(thread) => {
3373 self.resume_thread(store, caller, thread, true, false)?
3374 }
3375 SuspensionTarget::Some(thread) => {
3376 self.resume_thread(store, caller, thread, true, true)?
3377 }
3378 SuspensionTarget::None => { }
3379 }
3380
3381 let reason = if yielding {
3382 SuspendReason::Yielding {
3383 thread: guest_thread,
3384 skip_may_block_check: to_thread.is_some(),
3388 }
3389 } else {
3390 SuspendReason::ExplicitlySuspending {
3391 thread: guest_thread,
3392 skip_may_block_check: to_thread.is_some(),
3396 }
3397 };
3398
3399 store.suspend(reason)?;
3400
3401 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3402 Ok(WaitResult::Cancelled)
3403 } else {
3404 Ok(WaitResult::Completed)
3405 }
3406 }
3407
3408 fn waitable_check(
3410 self,
3411 store: &mut StoreOpaque,
3412 cancellable: bool,
3413 check: WaitableCheck,
3414 params: WaitableCheckParams,
3415 ) -> Result<u32> {
3416 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3417
3418 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3419
3420 let state = store.concurrent_state_mut();
3421 let task = state.get_mut(guest_thread.task)?;
3422
3423 match &check {
3426 WaitableCheck::Wait => {
3427 let set = params.set;
3428
3429 if (task.event.is_none()
3430 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3431 && state.get_mut(set)?.ready.is_empty()
3432 {
3433 if cancellable {
3434 let old = state
3435 .get_mut(guest_thread.thread)?
3436 .wake_on_cancel
3437 .replace(set);
3438 assert!(old.is_none());
3439 }
3440
3441 store.suspend(SuspendReason::Waiting {
3442 set,
3443 thread: guest_thread,
3444 skip_may_block_check: false,
3445 })?;
3446 }
3447 }
3448 WaitableCheck::Poll => {}
3449 }
3450
3451 log::trace!(
3452 "waitable check for {guest_thread:?}; set {:?}, part two",
3453 params.set
3454 );
3455
3456 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3458
3459 let (ordinal, handle, result) = match &check {
3460 WaitableCheck::Wait => {
3461 let (event, waitable) = event.unwrap();
3462 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3463 let (ordinal, result) = event.parts();
3464 (ordinal, handle, result)
3465 }
3466 WaitableCheck::Poll => {
3467 if let Some((event, waitable)) = event {
3468 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3469 let (ordinal, result) = event.parts();
3470 (ordinal, handle, result)
3471 } else {
3472 log::trace!(
3473 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3474 guest_thread.task,
3475 params.set
3476 );
3477 let (ordinal, result) = Event::None.parts();
3478 (ordinal, 0, result)
3479 }
3480 }
3481 };
3482 let memory = self.options_memory_mut(store, params.options);
3483 let ptr = func::validate_inbounds_dynamic(
3484 &CanonicalAbiInfo::POINTER_PAIR,
3485 memory,
3486 &ValRaw::u32(params.payload),
3487 )?;
3488 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3489 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3490 Ok(ordinal)
3491 }
3492
3493 pub(crate) fn subtask_cancel(
3495 self,
3496 store: &mut StoreOpaque,
3497 caller_instance: RuntimeComponentInstanceIndex,
3498 async_: bool,
3499 task_id: u32,
3500 ) -> Result<u32> {
3501 if !async_ {
3502 store.check_blocking()?;
3506 }
3507
3508 let (rep, is_host) = store
3509 .instance_state(RuntimeInstance {
3510 instance: self.id().instance(),
3511 index: caller_instance,
3512 })
3513 .handle_table()
3514 .subtask_rep(task_id)?;
3515 let (waitable, expected_caller) = if is_host {
3516 let id = TableId::<HostTask>::new(rep);
3517 (
3518 Waitable::Host(id),
3519 store.concurrent_state_mut().get_mut(id)?.caller,
3520 )
3521 } else {
3522 let id = TableId::<GuestTask>::new(rep);
3523 if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3524 (Waitable::Guest(id), thread)
3525 } else {
3526 unreachable!()
3527 }
3528 };
3529 let concurrent_state = store.concurrent_state_mut();
3533 assert_eq!(
3534 expected_caller,
3535 concurrent_state.unwrap_current_guest_thread(),
3536 );
3537
3538 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3539
3540 if let Waitable::Host(host_task) = waitable {
3541 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3542 handle.abort();
3543 return Ok(Status::ReturnCancelled as u32);
3544 }
3545 } else {
3546 let caller = concurrent_state.unwrap_current_guest_thread();
3547 let guest_task = TableId::<GuestTask>::new(rep);
3548 let task = concurrent_state.get_mut(guest_task)?;
3549 if !task.already_lowered_parameters() {
3550 task.lower_params = None;
3554 task.lift_result = None;
3555 task.exited = true;
3556
3557 let instance = task.instance;
3558
3559 assert_eq!(1, task.threads.len());
3560 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3561 let concurrent_state = store.concurrent_state_mut();
3562 concurrent_state.delete(thread)?;
3563 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3564
3565 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3567 let pending_count = pending.len();
3568 pending.retain(|thread, _| thread.task != guest_task);
3569 if pending.len() == pending_count {
3571 bail!("`subtask.cancel` called after terminal status delivered");
3572 }
3573 return Ok(Status::StartCancelled as u32);
3574 } else if !task.returned_or_cancelled() {
3575 task.cancel_sent = true;
3578 task.event = Some(Event::Cancelled);
3583 let runtime_instance = task.instance.index;
3584 for thread in task.threads.clone() {
3585 let thread = QualifiedThreadId {
3586 task: guest_task,
3587 thread,
3588 };
3589 if let Some(set) = concurrent_state
3590 .get_mut(thread.thread)
3591 .unwrap()
3592 .wake_on_cancel
3593 .take()
3594 {
3595 let item = match concurrent_state
3596 .get_mut(set)?
3597 .waiting
3598 .remove(&thread)
3599 .unwrap()
3600 {
3601 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3602 WaitMode::Callback(instance) => WorkItem::GuestCall(
3603 runtime_instance,
3604 GuestCall {
3605 thread,
3606 kind: GuestCallKind::DeliverEvent {
3607 instance,
3608 set: None,
3609 },
3610 },
3611 ),
3612 };
3613 concurrent_state.push_high_priority(item);
3614
3615 store.suspend(SuspendReason::Yielding {
3616 thread: caller,
3617 skip_may_block_check: false,
3620 })?;
3621 break;
3622 }
3623 }
3624
3625 let concurrent_state = store.concurrent_state_mut();
3626 let task = concurrent_state.get_mut(guest_task)?;
3627 if !task.returned_or_cancelled() {
3628 if async_ {
3629 return Ok(BLOCKED);
3630 } else {
3631 store.wait_for_event(Waitable::Guest(guest_task))?;
3632 }
3633 }
3634 }
3635 }
3636
3637 let event = waitable.take_event(store.concurrent_state_mut())?;
3638 if let Some(Event::Subtask {
3639 status: status @ (Status::Returned | Status::ReturnCancelled),
3640 }) = event
3641 {
3642 Ok(status as u32)
3643 } else {
3644 bail!("`subtask.cancel` called after terminal status delivered");
3645 }
3646 }
3647
3648 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3649 store.concurrent_state_mut().context_get(slot)
3650 }
3651
3652 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3653 store.concurrent_state_mut().context_set(slot, value)
3654 }
3655}
3656
3657pub trait VMComponentAsyncStore {
3665 unsafe fn prepare_call(
3671 &mut self,
3672 instance: Instance,
3673 memory: *mut VMMemoryDefinition,
3674 start: *mut VMFuncRef,
3675 return_: *mut VMFuncRef,
3676 caller_instance: RuntimeComponentInstanceIndex,
3677 callee_instance: RuntimeComponentInstanceIndex,
3678 task_return_type: TypeTupleIndex,
3679 callee_async: bool,
3680 string_encoding: u8,
3681 result_count: u32,
3682 storage: *mut ValRaw,
3683 storage_len: usize,
3684 ) -> Result<()>;
3685
3686 unsafe fn sync_start(
3689 &mut self,
3690 instance: Instance,
3691 callback: *mut VMFuncRef,
3692 callee: *mut VMFuncRef,
3693 param_count: u32,
3694 storage: *mut MaybeUninit<ValRaw>,
3695 storage_len: usize,
3696 ) -> Result<()>;
3697
3698 unsafe fn async_start(
3701 &mut self,
3702 instance: Instance,
3703 callback: *mut VMFuncRef,
3704 post_return: *mut VMFuncRef,
3705 callee: *mut VMFuncRef,
3706 param_count: u32,
3707 result_count: u32,
3708 flags: u32,
3709 ) -> Result<u32>;
3710
3711 fn future_write(
3713 &mut self,
3714 instance: Instance,
3715 caller: RuntimeComponentInstanceIndex,
3716 ty: TypeFutureTableIndex,
3717 options: OptionsIndex,
3718 future: u32,
3719 address: u32,
3720 ) -> Result<u32>;
3721
3722 fn future_read(
3724 &mut self,
3725 instance: Instance,
3726 caller: RuntimeComponentInstanceIndex,
3727 ty: TypeFutureTableIndex,
3728 options: OptionsIndex,
3729 future: u32,
3730 address: u32,
3731 ) -> Result<u32>;
3732
3733 fn future_drop_writable(
3735 &mut self,
3736 instance: Instance,
3737 ty: TypeFutureTableIndex,
3738 writer: u32,
3739 ) -> Result<()>;
3740
3741 fn stream_write(
3743 &mut self,
3744 instance: Instance,
3745 caller: RuntimeComponentInstanceIndex,
3746 ty: TypeStreamTableIndex,
3747 options: OptionsIndex,
3748 stream: u32,
3749 address: u32,
3750 count: u32,
3751 ) -> Result<u32>;
3752
3753 fn stream_read(
3755 &mut self,
3756 instance: Instance,
3757 caller: RuntimeComponentInstanceIndex,
3758 ty: TypeStreamTableIndex,
3759 options: OptionsIndex,
3760 stream: u32,
3761 address: u32,
3762 count: u32,
3763 ) -> Result<u32>;
3764
3765 fn flat_stream_write(
3768 &mut self,
3769 instance: Instance,
3770 caller: RuntimeComponentInstanceIndex,
3771 ty: TypeStreamTableIndex,
3772 options: OptionsIndex,
3773 payload_size: u32,
3774 payload_align: u32,
3775 stream: u32,
3776 address: u32,
3777 count: u32,
3778 ) -> Result<u32>;
3779
3780 fn flat_stream_read(
3783 &mut self,
3784 instance: Instance,
3785 caller: RuntimeComponentInstanceIndex,
3786 ty: TypeStreamTableIndex,
3787 options: OptionsIndex,
3788 payload_size: u32,
3789 payload_align: u32,
3790 stream: u32,
3791 address: u32,
3792 count: u32,
3793 ) -> Result<u32>;
3794
3795 fn stream_drop_writable(
3797 &mut self,
3798 instance: Instance,
3799 ty: TypeStreamTableIndex,
3800 writer: u32,
3801 ) -> Result<()>;
3802
3803 fn error_context_debug_message(
3805 &mut self,
3806 instance: Instance,
3807 ty: TypeComponentLocalErrorContextTableIndex,
3808 options: OptionsIndex,
3809 err_ctx_handle: u32,
3810 debug_msg_address: u32,
3811 ) -> Result<()>;
3812
3813 fn thread_new_indirect(
3815 &mut self,
3816 instance: Instance,
3817 caller: RuntimeComponentInstanceIndex,
3818 func_ty_idx: TypeFuncIndex,
3819 start_func_table_idx: RuntimeTableIndex,
3820 start_func_idx: u32,
3821 context: i32,
3822 ) -> Result<u32>;
3823}
3824
3825impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3827 unsafe fn prepare_call(
3828 &mut self,
3829 instance: Instance,
3830 memory: *mut VMMemoryDefinition,
3831 start: *mut VMFuncRef,
3832 return_: *mut VMFuncRef,
3833 caller_instance: RuntimeComponentInstanceIndex,
3834 callee_instance: RuntimeComponentInstanceIndex,
3835 task_return_type: TypeTupleIndex,
3836 callee_async: bool,
3837 string_encoding: u8,
3838 result_count_or_max_if_async: u32,
3839 storage: *mut ValRaw,
3840 storage_len: usize,
3841 ) -> Result<()> {
3842 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3846
3847 unsafe {
3848 instance.prepare_call(
3849 StoreContextMut(self),
3850 start,
3851 return_,
3852 caller_instance,
3853 callee_instance,
3854 task_return_type,
3855 callee_async,
3856 memory,
3857 string_encoding,
3858 match result_count_or_max_if_async {
3859 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3860 params,
3861 has_result: false,
3862 },
3863 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3864 params,
3865 has_result: true,
3866 },
3867 result_count => CallerInfo::Sync {
3868 params,
3869 result_count,
3870 },
3871 },
3872 )
3873 }
3874 }
3875
3876 unsafe fn sync_start(
3877 &mut self,
3878 instance: Instance,
3879 callback: *mut VMFuncRef,
3880 callee: *mut VMFuncRef,
3881 param_count: u32,
3882 storage: *mut MaybeUninit<ValRaw>,
3883 storage_len: usize,
3884 ) -> Result<()> {
3885 unsafe {
3886 instance
3887 .start_call(
3888 StoreContextMut(self),
3889 callback,
3890 ptr::null_mut(),
3891 callee,
3892 param_count,
3893 1,
3894 START_FLAG_ASYNC_CALLEE,
3895 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3899 )
3900 .map(drop)
3901 }
3902 }
3903
3904 unsafe fn async_start(
3905 &mut self,
3906 instance: Instance,
3907 callback: *mut VMFuncRef,
3908 post_return: *mut VMFuncRef,
3909 callee: *mut VMFuncRef,
3910 param_count: u32,
3911 result_count: u32,
3912 flags: u32,
3913 ) -> Result<u32> {
3914 unsafe {
3915 instance.start_call(
3916 StoreContextMut(self),
3917 callback,
3918 post_return,
3919 callee,
3920 param_count,
3921 result_count,
3922 flags,
3923 None,
3924 )
3925 }
3926 }
3927
3928 fn future_write(
3929 &mut self,
3930 instance: Instance,
3931 caller: RuntimeComponentInstanceIndex,
3932 ty: TypeFutureTableIndex,
3933 options: OptionsIndex,
3934 future: u32,
3935 address: u32,
3936 ) -> Result<u32> {
3937 instance
3938 .guest_write(
3939 StoreContextMut(self),
3940 caller,
3941 TransmitIndex::Future(ty),
3942 options,
3943 None,
3944 future,
3945 address,
3946 1,
3947 )
3948 .map(|result| result.encode())
3949 }
3950
3951 fn future_read(
3952 &mut self,
3953 instance: Instance,
3954 caller: RuntimeComponentInstanceIndex,
3955 ty: TypeFutureTableIndex,
3956 options: OptionsIndex,
3957 future: u32,
3958 address: u32,
3959 ) -> Result<u32> {
3960 instance
3961 .guest_read(
3962 StoreContextMut(self),
3963 caller,
3964 TransmitIndex::Future(ty),
3965 options,
3966 None,
3967 future,
3968 address,
3969 1,
3970 )
3971 .map(|result| result.encode())
3972 }
3973
3974 fn stream_write(
3975 &mut self,
3976 instance: Instance,
3977 caller: RuntimeComponentInstanceIndex,
3978 ty: TypeStreamTableIndex,
3979 options: OptionsIndex,
3980 stream: u32,
3981 address: u32,
3982 count: u32,
3983 ) -> Result<u32> {
3984 instance
3985 .guest_write(
3986 StoreContextMut(self),
3987 caller,
3988 TransmitIndex::Stream(ty),
3989 options,
3990 None,
3991 stream,
3992 address,
3993 count,
3994 )
3995 .map(|result| result.encode())
3996 }
3997
3998 fn stream_read(
3999 &mut self,
4000 instance: Instance,
4001 caller: RuntimeComponentInstanceIndex,
4002 ty: TypeStreamTableIndex,
4003 options: OptionsIndex,
4004 stream: u32,
4005 address: u32,
4006 count: u32,
4007 ) -> Result<u32> {
4008 instance
4009 .guest_read(
4010 StoreContextMut(self),
4011 caller,
4012 TransmitIndex::Stream(ty),
4013 options,
4014 None,
4015 stream,
4016 address,
4017 count,
4018 )
4019 .map(|result| result.encode())
4020 }
4021
4022 fn future_drop_writable(
4023 &mut self,
4024 instance: Instance,
4025 ty: TypeFutureTableIndex,
4026 writer: u32,
4027 ) -> Result<()> {
4028 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4029 }
4030
4031 fn flat_stream_write(
4032 &mut self,
4033 instance: Instance,
4034 caller: RuntimeComponentInstanceIndex,
4035 ty: TypeStreamTableIndex,
4036 options: OptionsIndex,
4037 payload_size: u32,
4038 payload_align: u32,
4039 stream: u32,
4040 address: u32,
4041 count: u32,
4042 ) -> Result<u32> {
4043 instance
4044 .guest_write(
4045 StoreContextMut(self),
4046 caller,
4047 TransmitIndex::Stream(ty),
4048 options,
4049 Some(FlatAbi {
4050 size: payload_size,
4051 align: payload_align,
4052 }),
4053 stream,
4054 address,
4055 count,
4056 )
4057 .map(|result| result.encode())
4058 }
4059
4060 fn flat_stream_read(
4061 &mut self,
4062 instance: Instance,
4063 caller: RuntimeComponentInstanceIndex,
4064 ty: TypeStreamTableIndex,
4065 options: OptionsIndex,
4066 payload_size: u32,
4067 payload_align: u32,
4068 stream: u32,
4069 address: u32,
4070 count: u32,
4071 ) -> Result<u32> {
4072 instance
4073 .guest_read(
4074 StoreContextMut(self),
4075 caller,
4076 TransmitIndex::Stream(ty),
4077 options,
4078 Some(FlatAbi {
4079 size: payload_size,
4080 align: payload_align,
4081 }),
4082 stream,
4083 address,
4084 count,
4085 )
4086 .map(|result| result.encode())
4087 }
4088
4089 fn stream_drop_writable(
4090 &mut self,
4091 instance: Instance,
4092 ty: TypeStreamTableIndex,
4093 writer: u32,
4094 ) -> Result<()> {
4095 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4096 }
4097
4098 fn error_context_debug_message(
4099 &mut self,
4100 instance: Instance,
4101 ty: TypeComponentLocalErrorContextTableIndex,
4102 options: OptionsIndex,
4103 err_ctx_handle: u32,
4104 debug_msg_address: u32,
4105 ) -> Result<()> {
4106 instance.error_context_debug_message(
4107 StoreContextMut(self),
4108 ty,
4109 options,
4110 err_ctx_handle,
4111 debug_msg_address,
4112 )
4113 }
4114
4115 fn thread_new_indirect(
4116 &mut self,
4117 instance: Instance,
4118 caller: RuntimeComponentInstanceIndex,
4119 func_ty_idx: TypeFuncIndex,
4120 start_func_table_idx: RuntimeTableIndex,
4121 start_func_idx: u32,
4122 context: i32,
4123 ) -> Result<u32> {
4124 instance.thread_new_indirect(
4125 StoreContextMut(self),
4126 caller,
4127 func_ty_idx,
4128 start_func_table_idx,
4129 start_func_idx,
4130 context,
4131 )
4132 }
4133}
4134
4135type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4136
4137struct HostTask {
4141 common: WaitableCommon,
4142
4143 caller: QualifiedThreadId,
4145
4146 call_context: CallContext,
4149
4150 join_handle: Option<JoinHandle>,
4155
4156 result: Option<LiftedResult>,
4158}
4159
4160impl HostTask {
4161 fn new(caller: QualifiedThreadId) -> Self {
4162 Self {
4163 common: WaitableCommon::default(),
4164 call_context: CallContext::default(),
4165 caller,
4166 join_handle: None,
4167 result: None,
4168 }
4169 }
4170}
4171
4172impl TableDebug for HostTask {
4173 fn type_name() -> &'static str {
4174 "HostTask"
4175 }
4176}
4177
4178type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4179
4180enum Caller {
4182 Host {
4184 tx: Option<oneshot::Sender<LiftedResult>>,
4186 exit_tx: Arc<oneshot::Sender<()>>,
4193 host_future_present: bool,
4196 caller: CurrentThread,
4200 },
4201 Guest {
4203 thread: QualifiedThreadId,
4205 },
4206}
4207
4208struct LiftResult {
4211 lift: RawLift,
4212 ty: TypeTupleIndex,
4213 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4214 string_encoding: StringEncoding,
4215}
4216
4217#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4222struct QualifiedThreadId {
4223 task: TableId<GuestTask>,
4224 thread: TableId<GuestThread>,
4225}
4226
4227impl QualifiedThreadId {
4228 fn qualify(
4229 state: &mut ConcurrentState,
4230 thread: TableId<GuestThread>,
4231 ) -> Result<QualifiedThreadId> {
4232 Ok(QualifiedThreadId {
4233 task: state.get_mut(thread)?.parent_task,
4234 thread,
4235 })
4236 }
4237}
4238
4239impl fmt::Debug for QualifiedThreadId {
4240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4241 f.debug_tuple("QualifiedThreadId")
4242 .field(&self.task.rep())
4243 .field(&self.thread.rep())
4244 .finish()
4245 }
4246}
4247
4248enum GuestThreadState {
4249 NotStartedImplicit,
4250 NotStartedExplicit(
4251 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4252 ),
4253 Running,
4254 Suspended(StoreFiber<'static>),
4255 Ready(StoreFiber<'static>),
4256 Completed,
4257}
4258pub struct GuestThread {
4259 context: [u32; 2],
4262 parent_task: TableId<GuestTask>,
4264 wake_on_cancel: Option<TableId<WaitableSet>>,
4267 state: GuestThreadState,
4269 instance_rep: Option<u32>,
4272}
4273
4274impl GuestThread {
4275 fn from_instance(
4278 state: Pin<&mut ComponentInstance>,
4279 caller_instance: RuntimeComponentInstanceIndex,
4280 guest_thread: u32,
4281 ) -> Result<TableId<Self>> {
4282 let rep = state.instance_states().0[caller_instance]
4283 .thread_handle_table()
4284 .guest_thread_rep(guest_thread)?;
4285 Ok(TableId::new(rep))
4286 }
4287
4288 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4289 Self {
4290 context: [0; 2],
4291 parent_task,
4292 wake_on_cancel: None,
4293 state: GuestThreadState::NotStartedImplicit,
4294 instance_rep: None,
4295 }
4296 }
4297
4298 fn new_explicit(
4299 parent_task: TableId<GuestTask>,
4300 start_func: Box<
4301 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4302 >,
4303 ) -> Self {
4304 Self {
4305 context: [0; 2],
4306 parent_task,
4307 wake_on_cancel: None,
4308 state: GuestThreadState::NotStartedExplicit(start_func),
4309 instance_rep: None,
4310 }
4311 }
4312}
4313
4314impl TableDebug for GuestThread {
4315 fn type_name() -> &'static str {
4316 "GuestThread"
4317 }
4318}
4319
4320enum SyncResult {
4321 NotProduced,
4322 Produced(Option<ValRaw>),
4323 Taken,
4324}
4325
4326impl SyncResult {
4327 fn take(&mut self) -> Option<Option<ValRaw>> {
4328 match mem::replace(self, SyncResult::Taken) {
4329 SyncResult::NotProduced => None,
4330 SyncResult::Produced(val) => Some(val),
4331 SyncResult::Taken => {
4332 panic!("attempted to take a synchronous result that was already taken")
4333 }
4334 }
4335 }
4336}
4337
4338#[derive(Debug)]
4339enum HostFutureState {
4340 NotApplicable,
4341 Live,
4342 Dropped,
4343}
4344
4345pub(crate) struct GuestTask {
4347 common: WaitableCommon,
4349 lower_params: Option<RawLower>,
4351 lift_result: Option<LiftResult>,
4353 result: Option<LiftedResult>,
4356 callback: Option<CallbackFn>,
4359 caller: Caller,
4361 call_context: CallContext,
4366 sync_result: SyncResult,
4369 cancel_sent: bool,
4372 starting_sent: bool,
4375 subtasks: HashSet<TableId<GuestTask>>,
4380 sync_call_set: TableId<WaitableSet>,
4382 instance: RuntimeInstance,
4389 event: Option<Event>,
4392 exited: bool,
4394 threads: HashSet<TableId<GuestThread>>,
4396 host_future_state: HostFutureState,
4399 async_function: bool,
4402}
4403
4404impl GuestTask {
4405 fn already_lowered_parameters(&self) -> bool {
4406 self.lower_params.is_none()
4408 }
4409
4410 fn returned_or_cancelled(&self) -> bool {
4411 self.lift_result.is_none()
4413 }
4414
4415 fn ready_to_delete(&self) -> bool {
4416 let threads_completed = self.threads.is_empty();
4417 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4418 let pending_completion_event = matches!(
4419 self.common.event,
4420 Some(Event::Subtask {
4421 status: Status::Returned | Status::ReturnCancelled
4422 })
4423 );
4424 let ready = threads_completed
4425 && !has_sync_result
4426 && !pending_completion_event
4427 && !matches!(self.host_future_state, HostFutureState::Live);
4428 log::trace!(
4429 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4430 threads_completed,
4431 has_sync_result,
4432 pending_completion_event,
4433 self.host_future_state
4434 );
4435 ready
4436 }
4437
4438 fn new(
4439 state: &mut ConcurrentState,
4440 lower_params: RawLower,
4441 lift_result: LiftResult,
4442 caller: Caller,
4443 callback: Option<CallbackFn>,
4444 instance: RuntimeInstance,
4445 async_function: bool,
4446 ) -> Result<Self> {
4447 let sync_call_set = state.push(WaitableSet::default())?;
4448 let host_future_state = match &caller {
4449 Caller::Guest { .. } => HostFutureState::NotApplicable,
4450 Caller::Host {
4451 host_future_present,
4452 ..
4453 } => {
4454 if *host_future_present {
4455 HostFutureState::Live
4456 } else {
4457 HostFutureState::NotApplicable
4458 }
4459 }
4460 };
4461 Ok(Self {
4462 common: WaitableCommon::default(),
4463 lower_params: Some(lower_params),
4464 lift_result: Some(lift_result),
4465 result: None,
4466 callback,
4467 caller,
4468 call_context: CallContext::default(),
4469 sync_result: SyncResult::NotProduced,
4470 cancel_sent: false,
4471 starting_sent: false,
4472 subtasks: HashSet::new(),
4473 sync_call_set,
4474 instance,
4475 event: None,
4476 exited: false,
4477 threads: HashSet::new(),
4478 host_future_state,
4479 async_function,
4480 })
4481 }
4482
4483 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4486 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4489 if let Some(Event::Subtask {
4490 status: Status::Returned | Status::ReturnCancelled,
4491 }) = waitable.common(state)?.event
4492 {
4493 waitable.delete_from(state)?;
4494 }
4495 }
4496
4497 assert!(self.threads.is_empty());
4498
4499 state.delete(self.sync_call_set)?;
4500
4501 match &self.caller {
4503 Caller::Guest { thread } => {
4504 let task_mut = state.get_mut(thread.task)?;
4505 let present = task_mut.subtasks.remove(&me);
4506 assert!(present);
4507
4508 for subtask in &self.subtasks {
4509 task_mut.subtasks.insert(*subtask);
4510 }
4511
4512 for subtask in &self.subtasks {
4513 state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4514 }
4515 }
4516 Caller::Host {
4517 exit_tx, caller, ..
4518 } => {
4519 for subtask in &self.subtasks {
4520 state.get_mut(*subtask)?.caller = Caller::Host {
4521 tx: None,
4522 exit_tx: exit_tx.clone(),
4526 host_future_present: false,
4527 caller: *caller,
4528 };
4529 }
4530 }
4531 }
4532
4533 for subtask in self.subtasks {
4534 let task = state.get_mut(subtask)?;
4535 if task.exited && task.ready_to_delete() {
4536 Waitable::Guest(subtask).delete_from(state)?;
4537 }
4538 }
4539
4540 Ok(())
4541 }
4542}
4543
4544impl TableDebug for GuestTask {
4545 fn type_name() -> &'static str {
4546 "GuestTask"
4547 }
4548}
4549
4550#[derive(Default)]
4552struct WaitableCommon {
4553 event: Option<Event>,
4555 set: Option<TableId<WaitableSet>>,
4557 handle: Option<u32>,
4559}
4560
4561#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4563enum Waitable {
4564 Host(TableId<HostTask>),
4566 Guest(TableId<GuestTask>),
4568 Transmit(TableId<TransmitHandle>),
4570}
4571
4572impl Waitable {
4573 fn from_instance(
4576 state: Pin<&mut ComponentInstance>,
4577 caller_instance: RuntimeComponentInstanceIndex,
4578 waitable: u32,
4579 ) -> Result<Self> {
4580 use crate::runtime::vm::component::Waitable;
4581
4582 let (waitable, kind) = state.instance_states().0[caller_instance]
4583 .handle_table()
4584 .waitable_rep(waitable)?;
4585
4586 Ok(match kind {
4587 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4588 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4589 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4590 })
4591 }
4592
4593 fn rep(&self) -> u32 {
4595 match self {
4596 Self::Host(id) => id.rep(),
4597 Self::Guest(id) => id.rep(),
4598 Self::Transmit(id) => id.rep(),
4599 }
4600 }
4601
4602 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4606 log::trace!("waitable {self:?} join set {set:?}",);
4607
4608 let old = mem::replace(&mut self.common(state)?.set, set);
4609
4610 if let Some(old) = old {
4611 match *self {
4612 Waitable::Host(id) => state.remove_child(id, old),
4613 Waitable::Guest(id) => state.remove_child(id, old),
4614 Waitable::Transmit(id) => state.remove_child(id, old),
4615 }?;
4616
4617 state.get_mut(old)?.ready.remove(self);
4618 }
4619
4620 if let Some(set) = set {
4621 match *self {
4622 Waitable::Host(id) => state.add_child(id, set),
4623 Waitable::Guest(id) => state.add_child(id, set),
4624 Waitable::Transmit(id) => state.add_child(id, set),
4625 }?;
4626
4627 if self.common(state)?.event.is_some() {
4628 self.mark_ready(state)?;
4629 }
4630 }
4631
4632 Ok(())
4633 }
4634
4635 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4637 Ok(match self {
4638 Self::Host(id) => &mut state.get_mut(*id)?.common,
4639 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4640 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4641 })
4642 }
4643
4644 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4648 log::trace!("set event for {self:?}: {event:?}");
4649 self.common(state)?.event = event;
4650 self.mark_ready(state)
4651 }
4652
4653 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4655 let common = self.common(state)?;
4656 let event = common.event.take();
4657 if let Some(set) = self.common(state)?.set {
4658 state.get_mut(set)?.ready.remove(self);
4659 }
4660
4661 Ok(event)
4662 }
4663
4664 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4668 if let Some(set) = self.common(state)?.set {
4669 state.get_mut(set)?.ready.insert(*self);
4670 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4671 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4672 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4673
4674 let item = match mode {
4675 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4676 WaitMode::Callback(instance) => WorkItem::GuestCall(
4677 state.get_mut(thread.task)?.instance.index,
4678 GuestCall {
4679 thread,
4680 kind: GuestCallKind::DeliverEvent {
4681 instance,
4682 set: Some(set),
4683 },
4684 },
4685 ),
4686 };
4687 state.push_high_priority(item);
4688 }
4689 }
4690 Ok(())
4691 }
4692
4693 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4695 match self {
4696 Self::Host(task) => {
4697 log::trace!("delete host task {task:?}");
4698 state.delete(*task)?;
4699 }
4700 Self::Guest(task) => {
4701 log::trace!("delete guest task {task:?}");
4702 state.delete(*task)?.dispose(state, *task)?;
4703 }
4704 Self::Transmit(task) => {
4705 state.delete(*task)?;
4706 }
4707 }
4708
4709 Ok(())
4710 }
4711}
4712
4713impl fmt::Debug for Waitable {
4714 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4715 match self {
4716 Self::Host(id) => write!(f, "{id:?}"),
4717 Self::Guest(id) => write!(f, "{id:?}"),
4718 Self::Transmit(id) => write!(f, "{id:?}"),
4719 }
4720 }
4721}
4722
4723#[derive(Default)]
4725struct WaitableSet {
4726 ready: BTreeSet<Waitable>,
4728 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4730}
4731
4732impl TableDebug for WaitableSet {
4733 fn type_name() -> &'static str {
4734 "WaitableSet"
4735 }
4736}
4737
4738type RawLower =
4740 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4741
4742type RawLift = Box<
4744 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4745>;
4746
4747type LiftedResult = Box<dyn Any + Send + Sync>;
4751
4752struct DummyResult;
4755
4756#[derive(Default)]
4758pub struct ConcurrentInstanceState {
4759 backpressure: u16,
4761 do_not_enter: bool,
4763 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4766}
4767
4768impl ConcurrentInstanceState {
4769 pub fn pending_is_empty(&self) -> bool {
4770 self.pending.is_empty()
4771 }
4772}
4773
4774#[derive(Debug, Copy, Clone)]
4775enum CurrentThread {
4776 Guest(QualifiedThreadId),
4777 Host(TableId<HostTask>),
4778 None,
4779}
4780
4781impl CurrentThread {
4782 fn guest(&self) -> Option<&QualifiedThreadId> {
4783 match self {
4784 Self::Guest(id) => Some(id),
4785 _ => None,
4786 }
4787 }
4788
4789 fn host(&self) -> Option<TableId<HostTask>> {
4790 match self {
4791 Self::Host(id) => Some(*id),
4792 _ => None,
4793 }
4794 }
4795
4796 fn is_none(&self) -> bool {
4797 matches!(self, Self::None)
4798 }
4799}
4800
4801impl From<QualifiedThreadId> for CurrentThread {
4802 fn from(id: QualifiedThreadId) -> Self {
4803 Self::Guest(id)
4804 }
4805}
4806
4807impl From<TableId<HostTask>> for CurrentThread {
4808 fn from(id: TableId<HostTask>) -> Self {
4809 Self::Host(id)
4810 }
4811}
4812
4813pub struct ConcurrentState {
4815 current_thread: CurrentThread,
4817
4818 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4823 table: AlwaysMut<ResourceTable>,
4825 high_priority: Vec<WorkItem>,
4827 low_priority: VecDeque<WorkItem>,
4829 suspend_reason: Option<SuspendReason>,
4833 worker: Option<StoreFiber<'static>>,
4837 worker_item: Option<WorkerItem>,
4839
4840 global_error_context_ref_counts:
4853 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4854}
4855
4856impl Default for ConcurrentState {
4857 fn default() -> Self {
4858 Self {
4859 current_thread: CurrentThread::None,
4860 table: AlwaysMut::new(ResourceTable::new()),
4861 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4862 high_priority: Vec::new(),
4863 low_priority: VecDeque::new(),
4864 suspend_reason: None,
4865 worker: None,
4866 worker_item: None,
4867 global_error_context_ref_counts: BTreeMap::new(),
4868 }
4869 }
4870}
4871
4872impl ConcurrentState {
4873 pub(crate) fn take_fibers_and_futures(
4890 &mut self,
4891 fibers: &mut Vec<StoreFiber<'static>>,
4892 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4893 ) {
4894 for entry in self.table.get_mut().iter_mut() {
4895 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4896 for mode in mem::take(&mut set.waiting).into_values() {
4897 if let WaitMode::Fiber(fiber) = mode {
4898 fibers.push(fiber);
4899 }
4900 }
4901 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4902 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4903 mem::replace(&mut thread.state, GuestThreadState::Completed)
4904 {
4905 fibers.push(fiber);
4906 }
4907 }
4908 }
4909
4910 if let Some(fiber) = self.worker.take() {
4911 fibers.push(fiber);
4912 }
4913
4914 let mut handle_item = |item| match item {
4915 WorkItem::ResumeFiber(fiber) => {
4916 fibers.push(fiber);
4917 }
4918 WorkItem::PushFuture(future) => {
4919 self.futures
4920 .get_mut()
4921 .as_mut()
4922 .unwrap()
4923 .push(future.into_inner());
4924 }
4925 _ => {}
4926 };
4927
4928 for item in mem::take(&mut self.high_priority) {
4929 handle_item(item);
4930 }
4931 for item in mem::take(&mut self.low_priority) {
4932 handle_item(item);
4933 }
4934
4935 if let Some(them) = self.futures.get_mut().take() {
4936 futures.push(them);
4937 }
4938 }
4939
4940 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4944 let mut ready = mem::take(&mut self.high_priority);
4945 if ready.is_empty() {
4946 if let Some(item) = self.low_priority.pop_back() {
4947 ready.push(item);
4948 }
4949 }
4950 ready
4951 }
4952
4953 fn push<V: Send + Sync + 'static>(
4954 &mut self,
4955 value: V,
4956 ) -> Result<TableId<V>, ResourceTableError> {
4957 self.table.get_mut().push(value).map(TableId::from)
4958 }
4959
4960 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4961 self.table.get_mut().get_mut(&Resource::from(id))
4962 }
4963
4964 pub fn add_child<T: 'static, U: 'static>(
4965 &mut self,
4966 child: TableId<T>,
4967 parent: TableId<U>,
4968 ) -> Result<(), ResourceTableError> {
4969 self.table
4970 .get_mut()
4971 .add_child(Resource::from(child), Resource::from(parent))
4972 }
4973
4974 pub fn remove_child<T: 'static, U: 'static>(
4975 &mut self,
4976 child: TableId<T>,
4977 parent: TableId<U>,
4978 ) -> Result<(), ResourceTableError> {
4979 self.table
4980 .get_mut()
4981 .remove_child(Resource::from(child), Resource::from(parent))
4982 }
4983
4984 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4985 self.table.get_mut().delete(Resource::from(id))
4986 }
4987
4988 fn push_future(&mut self, future: HostTaskFuture) {
4989 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4996 }
4997
4998 fn push_high_priority(&mut self, item: WorkItem) {
4999 log::trace!("push high priority: {item:?}");
5000 self.high_priority.push(item);
5001 }
5002
5003 fn push_low_priority(&mut self, item: WorkItem) {
5004 log::trace!("push low priority: {item:?}");
5005 self.low_priority.push_front(item);
5006 }
5007
5008 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5009 if high_priority {
5010 self.push_high_priority(item);
5011 } else {
5012 self.push_low_priority(item);
5013 }
5014 }
5015
5016 fn promote_instance_local_thread_work_item(
5017 &mut self,
5018 current_instance: RuntimeComponentInstanceIndex,
5019 ) -> bool {
5020 self.promote_work_items_matching(|item: &WorkItem| match item {
5021 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5022 *instance == current_instance
5023 }
5024 _ => false,
5025 })
5026 }
5027
5028 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5029 self.promote_work_items_matching(|item: &WorkItem| match item {
5030 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5031 *t == thread
5032 }
5033 _ => false,
5034 })
5035 }
5036
5037 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5038 where
5039 F: FnMut(&WorkItem) -> bool,
5040 {
5041 if self.high_priority.iter().any(&mut predicate) {
5045 true
5046 }
5047 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5050 let item = self.low_priority.remove(idx).unwrap();
5051 self.push_high_priority(item);
5052 true
5053 } else {
5054 false
5055 }
5056 }
5057
5058 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5060 let thread = self.unwrap_current_guest_thread();
5061 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
5062 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5063 Ok(val)
5064 }
5065
5066 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5068 let thread = self.unwrap_current_guest_thread();
5069 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5070 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
5071 Ok(())
5072 }
5073
5074 fn take_pending_cancellation(&mut self) -> bool {
5077 let thread = self.unwrap_current_guest_thread();
5078 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
5079 assert!(matches!(event, Event::Cancelled));
5080 true
5081 } else {
5082 false
5083 }
5084 }
5085
5086 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5087 if self.may_block(task) {
5088 Ok(())
5089 } else {
5090 Err(Trap::CannotBlockSyncTask.into())
5091 }
5092 }
5093
5094 fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5095 let task = self.get_mut(task).unwrap();
5096 task.async_function || task.returned_or_cancelled()
5097 }
5098
5099 pub fn call_context(&mut self, task: u32) -> &mut CallContext {
5105 let (task, is_host) = (task >> 1, task & 1 == 1);
5106 if is_host {
5107 let task: TableId<HostTask> = TableId::new(task);
5108 &mut self.get_mut(task).unwrap().call_context
5109 } else {
5110 let task: TableId<GuestTask> = TableId::new(task);
5111 &mut self.get_mut(task).unwrap().call_context
5112 }
5113 }
5114
5115 pub fn current_call_context_scope_id(&self) -> u32 {
5118 let (bits, is_host) = match self.current_thread {
5119 CurrentThread::Guest(id) => (id.task.rep(), false),
5120 CurrentThread::Host(id) => (id.rep(), true),
5121 CurrentThread::None => unreachable!(),
5122 };
5123 assert_eq!((bits << 1) >> 1, bits);
5124 (bits << 1) | u32::from(is_host)
5125 }
5126
5127 fn unwrap_current_guest_thread(&self) -> QualifiedThreadId {
5128 *self.current_thread.guest().unwrap()
5129 }
5130
5131 fn unwrap_current_host_thread(&self) -> TableId<HostTask> {
5132 self.current_thread.host().unwrap()
5133 }
5134}
5135
5136fn for_any_lower<
5139 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5140>(
5141 fun: F,
5142) -> F {
5143 fun
5144}
5145
5146fn for_any_lift<
5148 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5149>(
5150 fun: F,
5151) -> F {
5152 fun
5153}
5154
5155fn checked<F: Future + Send + 'static>(
5160 id: StoreId,
5161 fut: F,
5162) -> impl Future<Output = F::Output> + Send + 'static {
5163 async move {
5164 let mut fut = pin!(fut);
5165 future::poll_fn(move |cx| {
5166 let message = "\
5167 `Future`s which depend on asynchronous component tasks, streams, or \
5168 futures to complete may only be polled from the event loop of the \
5169 store to which they belong. Please use \
5170 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5171 ";
5172 tls::try_get(|store| {
5173 let matched = match store {
5174 tls::TryGet::Some(store) => store.id() == id,
5175 tls::TryGet::Taken | tls::TryGet::None => false,
5176 };
5177
5178 if !matched {
5179 panic!("{message}")
5180 }
5181 });
5182 fut.as_mut().poll(cx)
5183 })
5184 .await
5185 }
5186}
5187
5188fn check_recursive_run() {
5191 tls::try_get(|store| {
5192 if !matches!(store, tls::TryGet::None) {
5193 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5194 }
5195 });
5196}
5197
5198fn unpack_callback_code(code: u32) -> (u32, u32) {
5199 (code & 0xF, code >> 4)
5200}
5201
5202struct WaitableCheckParams {
5206 set: TableId<WaitableSet>,
5207 options: OptionsIndex,
5208 payload: u32,
5209}
5210
5211enum WaitableCheck {
5214 Wait,
5215 Poll,
5216}
5217
5218pub(crate) struct PreparedCall<R> {
5220 handle: Func,
5222 thread: QualifiedThreadId,
5224 param_count: usize,
5226 rx: oneshot::Receiver<LiftedResult>,
5229 exit_rx: oneshot::Receiver<()>,
5232 _phantom: PhantomData<R>,
5233}
5234
5235impl<R> PreparedCall<R> {
5236 pub(crate) fn task_id(&self) -> TaskId {
5238 TaskId {
5239 task: self.thread.task,
5240 }
5241 }
5242}
5243
5244pub(crate) struct TaskId {
5246 task: TableId<GuestTask>,
5247}
5248
5249impl TaskId {
5250 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5256 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5257 if !task.already_lowered_parameters() {
5258 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5259 } else {
5260 task.host_future_state = HostFutureState::Dropped;
5261 if task.ready_to_delete() {
5262 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5263 }
5264 }
5265 Ok(())
5266 }
5267}
5268
5269pub(crate) fn prepare_call<T, R>(
5275 mut store: StoreContextMut<T>,
5276 handle: Func,
5277 param_count: usize,
5278 host_future_present: bool,
5279 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5280 + Send
5281 + Sync
5282 + 'static,
5283 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5284 + Send
5285 + Sync
5286 + 'static,
5287) -> Result<PreparedCall<R>> {
5288 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5289
5290 let instance = handle.instance().id().get(store.0);
5291 let options = &instance.component().env_component().options[options];
5292 let ty = &instance.component().types()[ty];
5293 let async_function = ty.async_;
5294 let task_return_type = ty.results;
5295 let component_instance = raw_options.instance;
5296 let callback = options.callback.map(|i| instance.runtime_callback(i));
5297 let memory = options
5298 .memory()
5299 .map(|i| instance.runtime_memory(i))
5300 .map(SendSyncPtr::new);
5301 let string_encoding = options.string_encoding;
5302 let token = StoreToken::new(store.as_context_mut());
5303 let state = store.0.concurrent_state_mut();
5304
5305 let (tx, rx) = oneshot::channel();
5306 let (exit_tx, exit_rx) = oneshot::channel();
5307
5308 let instance = RuntimeInstance {
5309 instance: handle.instance().id().instance(),
5310 index: component_instance,
5311 };
5312 let caller = state.current_thread;
5313 let task = GuestTask::new(
5314 state,
5315 Box::new(for_any_lower(move |store, params| {
5316 lower_params(handle, token.as_context_mut(store), params)
5317 })),
5318 LiftResult {
5319 lift: Box::new(for_any_lift(move |store, result| {
5320 lift_result(handle, store, result)
5321 })),
5322 ty: task_return_type,
5323 memory,
5324 string_encoding,
5325 },
5326 Caller::Host {
5327 tx: Some(tx),
5328 exit_tx: Arc::new(exit_tx),
5329 host_future_present,
5330 caller,
5331 },
5332 callback.map(|callback| {
5333 let callback = SendSyncPtr::new(callback);
5334 let instance = handle.instance();
5335 Box::new(move |store: &mut dyn VMStore, event, handle| {
5336 let store = token.as_context_mut(store);
5337 unsafe { instance.call_callback(store, callback, event, handle) }
5340 }) as CallbackFn
5341 }),
5342 instance,
5343 async_function,
5344 )?;
5345
5346 let task = state.push(task)?;
5347 let thread = state.push(GuestThread::new_implicit(task))?;
5348 state.get_mut(task)?.threads.insert(thread);
5349
5350 if !store.0.may_enter(instance) {
5351 bail!(crate::Trap::CannotEnterComponent);
5352 }
5353
5354 Ok(PreparedCall {
5355 handle,
5356 thread: QualifiedThreadId { task, thread },
5357 param_count,
5358 rx,
5359 exit_rx,
5360 _phantom: PhantomData,
5361 })
5362}
5363
5364pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5371 mut store: StoreContextMut<T>,
5372 prepared: PreparedCall<R>,
5373) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5374 let PreparedCall {
5375 handle,
5376 thread,
5377 param_count,
5378 rx,
5379 exit_rx,
5380 ..
5381 } = prepared;
5382
5383 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5384
5385 Ok(checked(
5386 store.0.id(),
5387 rx.map(move |result| {
5388 result
5389 .map(|v| (*v.downcast().unwrap(), exit_rx))
5390 .map_err(crate::Error::from)
5391 }),
5392 ))
5393}
5394
5395fn queue_call0<T: 'static>(
5398 store: StoreContextMut<T>,
5399 handle: Func,
5400 guest_thread: QualifiedThreadId,
5401 param_count: usize,
5402) -> Result<()> {
5403 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5404 let is_concurrent = raw_options.async_;
5405 let callback = raw_options.callback;
5406 let instance = handle.instance();
5407 let callee = handle.lifted_core_func(store.0);
5408 let post_return = handle.post_return_core_func(store.0);
5409 let callback = callback.map(|i| {
5410 let instance = instance.id().get(store.0);
5411 SendSyncPtr::new(instance.runtime_callback(i))
5412 });
5413
5414 log::trace!("queueing call {guest_thread:?}");
5415
5416 unsafe {
5420 instance.queue_call(
5421 store,
5422 guest_thread,
5423 SendSyncPtr::new(callee),
5424 param_count,
5425 1,
5426 is_concurrent,
5427 callback,
5428 post_return.map(SendSyncPtr::new),
5429 )
5430 }
5431}