1use crate::component::func::{self, Func, call_post_return};
54use crate::component::{
55 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64 bail, error::format_err,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::sync::Arc;
84use std::task::{Context, Poll, Waker};
85use std::vec::Vec;
86use table::{TableDebug, TableId};
87use wasmtime_environ::Trap;
88use wasmtime_environ::component::{
89 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
90 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
91 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
92 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
93 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
94};
95use wasmtime_environ::packed_option::ReservedValue;
96
97pub use abort::JoinHandle;
98pub use future_stream_any::{FutureAny, StreamAny};
99pub use futures_and_streams::{
100 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103};
104pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105
106mod abort;
107mod error_contexts;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113const BLOCKED: u32 = 0xffff_ffff;
116
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120 Starting = 0,
121 Started = 1,
122 Returned = 2,
123 StartCancelled = 3,
124 ReturnCancelled = 4,
125}
126
127impl Status {
128 pub fn pack(self, waitable: Option<u32>) -> u32 {
134 assert!(matches!(self, Status::Returned) == waitable.is_none());
135 let waitable = waitable.unwrap_or(0);
136 assert!(waitable < (1 << 28));
137 (waitable << 4) | (self as u32)
138 }
139}
140
141#[derive(Clone, Copy, Debug)]
144enum Event {
145 None,
146 Cancelled,
147 Subtask {
148 status: Status,
149 },
150 StreamRead {
151 code: ReturnCode,
152 pending: Option<(TypeStreamTableIndex, u32)>,
153 },
154 StreamWrite {
155 code: ReturnCode,
156 pending: Option<(TypeStreamTableIndex, u32)>,
157 },
158 FutureRead {
159 code: ReturnCode,
160 pending: Option<(TypeFutureTableIndex, u32)>,
161 },
162 FutureWrite {
163 code: ReturnCode,
164 pending: Option<(TypeFutureTableIndex, u32)>,
165 },
166}
167
168impl Event {
169 fn parts(self) -> (u32, u32) {
174 const EVENT_NONE: u32 = 0;
175 const EVENT_SUBTASK: u32 = 1;
176 const EVENT_STREAM_READ: u32 = 2;
177 const EVENT_STREAM_WRITE: u32 = 3;
178 const EVENT_FUTURE_READ: u32 = 4;
179 const EVENT_FUTURE_WRITE: u32 = 5;
180 const EVENT_CANCELLED: u32 = 6;
181 match self {
182 Event::None => (EVENT_NONE, 0),
183 Event::Cancelled => (EVENT_CANCELLED, 0),
184 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189 }
190 }
191}
192
193mod callback_code {
195 pub const EXIT: u32 = 0;
196 pub const YIELD: u32 = 1;
197 pub const WAIT: u32 = 2;
198}
199
200const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204
205pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211 store: StoreContextMut<'a, T>,
212 get_data: fn(&mut T) -> D::Data<'_>,
213}
214
215impl<'a, T, D> Access<'a, T, D>
216where
217 D: HasData + ?Sized,
218 T: 'static,
219{
220 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222 Self { store, get_data }
223 }
224
225 pub fn data_mut(&mut self) -> &mut T {
227 self.store.data_mut()
228 }
229
230 pub fn get(&mut self) -> D::Data<'_> {
232 (self.get_data)(self.data_mut())
233 }
234
235 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239 where
240 T: 'static,
241 {
242 let accessor = Accessor {
243 get_data: self.get_data,
244 token: StoreToken::new(self.store.as_context_mut()),
245 };
246 self.store
247 .as_context_mut()
248 .spawn_with_accessor(accessor, task)
249 }
250
251 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254 self.get_data
255 }
256}
257
258impl<'a, T, D> AsContext for Access<'a, T, D>
259where
260 D: HasData + ?Sized,
261 T: 'static,
262{
263 type Data = T;
264
265 fn as_context(&self) -> StoreContext<'_, T> {
266 self.store.as_context()
267 }
268}
269
270impl<'a, T, D> AsContextMut for Access<'a, T, D>
271where
272 D: HasData + ?Sized,
273 T: 'static,
274{
275 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276 self.store.as_context_mut()
277 }
278}
279
280pub struct Accessor<T: 'static, D = HasSelf<T>>
340where
341 D: HasData + ?Sized,
342{
343 token: StoreToken<T>,
344 get_data: fn(&mut T) -> D::Data<'_>,
345}
346
347pub trait AsAccessor {
364 type Data: 'static;
366
367 type AccessorData: HasData + ?Sized;
370
371 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373}
374
375impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376 type Data = T::Data;
377 type AccessorData = T::AccessorData;
378
379 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380 T::as_accessor(self)
381 }
382}
383
384impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385 type Data = T;
386 type AccessorData = D;
387
388 fn as_accessor(&self) -> &Accessor<T, D> {
389 self
390 }
391}
392
393const _: () = {
416 const fn assert<T: Send + Sync>() {}
417 assert::<Accessor<UnsafeCell<u32>>>();
418};
419
420impl<T> Accessor<T> {
421 pub(crate) fn new(token: StoreToken<T>) -> Self {
430 Self {
431 token,
432 get_data: |x| x,
433 }
434 }
435}
436
437impl<T, D> Accessor<T, D>
438where
439 D: HasData + ?Sized,
440{
441 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459 tls::get(|vmstore| {
460 fun(Access {
461 store: self.token.as_context_mut(vmstore),
462 get_data: self.get_data,
463 })
464 })
465 }
466
467 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470 self.get_data
471 }
472
473 pub fn with_getter<D2: HasData>(
490 &self,
491 get_data: fn(&mut T) -> D2::Data<'_>,
492 ) -> Accessor<T, D2> {
493 Accessor {
494 token: self.token,
495 get_data,
496 }
497 }
498
499 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515 where
516 T: 'static,
517 {
518 let accessor = self.clone_for_spawn();
519 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520 }
521
522 fn clone_for_spawn(&self) -> Self {
523 Self {
524 token: self.token,
525 get_data: self.get_data,
526 }
527 }
528}
529
530pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
542where
543 D: HasData + ?Sized,
544{
545 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
547}
548
549enum CallerInfo {
552 Async {
554 params: Vec<ValRaw>,
555 has_result: bool,
556 },
557 Sync {
559 params: Vec<ValRaw>,
560 result_count: u32,
561 },
562}
563
564enum WaitMode {
566 Fiber(StoreFiber<'static>),
568 Callback(Instance),
571}
572
573#[derive(Debug)]
575enum SuspendReason {
576 Waiting {
579 set: TableId<WaitableSet>,
580 thread: QualifiedThreadId,
581 skip_may_block_check: bool,
582 },
583 NeedWork,
586 Yielding {
589 thread: QualifiedThreadId,
590 skip_may_block_check: bool,
591 },
592 ExplicitlySuspending {
594 thread: QualifiedThreadId,
595 skip_may_block_check: bool,
596 },
597}
598
599enum GuestCallKind {
601 DeliverEvent {
604 instance: Instance,
606 set: Option<TableId<WaitableSet>>,
611 },
612 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
618 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
619}
620
621impl fmt::Debug for GuestCallKind {
622 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623 match self {
624 Self::DeliverEvent { instance, set } => f
625 .debug_struct("DeliverEvent")
626 .field("instance", instance)
627 .field("set", set)
628 .finish(),
629 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
630 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
631 }
632 }
633}
634
635#[derive(Debug)]
637struct GuestCall {
638 thread: QualifiedThreadId,
639 kind: GuestCallKind,
640}
641
642impl GuestCall {
643 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
653 let instance = store
654 .concurrent_state_mut()
655 .get_mut(self.thread.task)?
656 .instance;
657 let state = store.instance_state(instance).concurrent_state();
658
659 let ready = match &self.kind {
660 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
661 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
662 GuestCallKind::StartExplicit(_) => true,
663 };
664 log::trace!(
665 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
666 state.do_not_enter,
667 state.backpressure
668 );
669 Ok(ready)
670 }
671}
672
673enum WorkerItem {
675 GuestCall(GuestCall),
676 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
677}
678
679enum WorkItem {
682 PushFuture(AlwaysMut<HostTaskFuture>),
684 ResumeFiber(StoreFiber<'static>),
686 GuestCall(GuestCall),
688 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
690}
691
692impl fmt::Debug for WorkItem {
693 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
694 match self {
695 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
696 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
697 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
698 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
699 }
700 }
701}
702
703#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
705pub(crate) enum WaitResult {
706 Cancelled,
707 Completed,
708}
709
710pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
718 store: &mut dyn VMStore,
719 future: impl Future<Output = Result<R>> + Send + 'static,
720) -> Result<R> {
721 let state = store.concurrent_state_mut();
722 let task = state.unwrap_current_host_thread();
723
724 let mut future = Box::pin(async move {
728 let result = future.await?;
729 tls::get(move |store| {
730 let state = store.concurrent_state_mut();
731 state.get_mut(task)?.result = Some(Box::new(result) as _);
732
733 Waitable::Host(task).set_event(
734 state,
735 Some(Event::Subtask {
736 status: Status::Returned,
737 }),
738 )?;
739
740 Ok(())
741 })
742 }) as HostTaskFuture;
743
744 let poll = tls::set(store, || {
748 future
749 .as_mut()
750 .poll(&mut Context::from_waker(&Waker::noop()))
751 });
752
753 match poll {
754 Poll::Ready(result) => result?,
756
757 Poll::Pending => {
762 let state = store.concurrent_state_mut();
763 state.push_future(future);
764
765 let caller = state.get_mut(task)?.caller;
766 let set = state.get_mut(caller.task)?.sync_call_set;
767 Waitable::Host(task).join(state, Some(set))?;
768
769 store.suspend(SuspendReason::Waiting {
770 set,
771 thread: caller,
772 skip_may_block_check: false,
773 })?;
774
775 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
779 }
780 }
781
782 Ok(*store
784 .concurrent_state_mut()
785 .get_mut(task)?
786 .result
787 .take()
788 .unwrap()
789 .downcast()
790 .unwrap())
791}
792
793fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
795 let mut next = Some(call);
796 while let Some(call) = next.take() {
797 match call.kind {
798 GuestCallKind::DeliverEvent { instance, set } => {
799 let (event, waitable) = instance
800 .get_event(store, call.thread.task, set, true)?
801 .unwrap();
802 let state = store.concurrent_state_mut();
803 let task = state.get_mut(call.thread.task)?;
804 let runtime_instance = task.instance;
805 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
806
807 log::trace!(
808 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
809 call.thread,
810 );
811
812 let old_thread = store.set_thread(call.thread);
813 log::trace!(
814 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
815 call.thread
816 );
817
818 store.enter_instance(runtime_instance);
819
820 let callback = store
821 .concurrent_state_mut()
822 .get_mut(call.thread.task)?
823 .callback
824 .take()
825 .unwrap();
826
827 let code = callback(store, event, handle)?;
828
829 store
830 .concurrent_state_mut()
831 .get_mut(call.thread.task)?
832 .callback = Some(callback);
833
834 store.exit_instance(runtime_instance)?;
835
836 store.set_thread(old_thread);
837
838 next = instance.handle_callback_code(
839 store,
840 call.thread,
841 runtime_instance.index,
842 code,
843 )?;
844
845 log::trace!(
846 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
847 );
848 }
849 GuestCallKind::StartImplicit(fun) => {
850 next = fun(store)?;
851 }
852 GuestCallKind::StartExplicit(fun) => {
853 fun(store)?;
854 }
855 }
856 }
857
858 Ok(())
859}
860
861impl<T> Store<T> {
862 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
864 where
865 T: Send + 'static,
866 {
867 ensure!(
868 self.as_context().0.concurrency_support(),
869 "cannot use `run_concurrent` when Config::concurrency_support disabled",
870 );
871 self.as_context_mut().run_concurrent(fun).await
872 }
873
874 #[doc(hidden)]
875 pub fn assert_concurrent_state_empty(&mut self) {
876 self.as_context_mut().assert_concurrent_state_empty();
877 }
878
879 #[doc(hidden)]
880 pub fn concurrent_state_table_size(&mut self) -> usize {
881 self.as_context_mut().concurrent_state_table_size()
882 }
883
884 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
886 where
887 T: 'static,
888 {
889 self.as_context_mut().spawn(task)
890 }
891}
892
893impl<T> StoreContextMut<'_, T> {
894 #[doc(hidden)]
905 pub fn assert_concurrent_state_empty(self) {
906 let store = self.0;
907 store
908 .store_data_mut()
909 .components
910 .assert_instance_states_empty();
911 let state = store.concurrent_state_mut();
912 assert!(
913 state.table.get_mut().is_empty(),
914 "non-empty table: {:?}",
915 state.table.get_mut()
916 );
917 assert!(state.high_priority.is_empty());
918 assert!(state.low_priority.is_empty());
919 assert!(state.current_thread.is_none());
920 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
921 assert!(state.global_error_context_ref_counts.is_empty());
922 }
923
924 #[doc(hidden)]
929 pub fn concurrent_state_table_size(&mut self) -> usize {
930 self.0
931 .concurrent_state_mut()
932 .table
933 .get_mut()
934 .iter_mut()
935 .count()
936 }
937
938 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
948 where
949 T: 'static,
950 {
951 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
952 self.spawn_with_accessor(accessor, task)
953 }
954
955 fn spawn_with_accessor<D>(
958 self,
959 accessor: Accessor<T, D>,
960 task: impl AccessorTask<T, D>,
961 ) -> JoinHandle
962 where
963 T: 'static,
964 D: HasData + ?Sized,
965 {
966 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
970 self.0
971 .concurrent_state_mut()
972 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
973 handle
974 }
975
976 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1060 where
1061 T: Send + 'static,
1062 {
1063 ensure!(
1064 self.0.concurrency_support(),
1065 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1066 );
1067 self.do_run_concurrent(fun, false).await
1068 }
1069
1070 pub(super) async fn run_concurrent_trap_on_idle<R>(
1071 self,
1072 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1073 ) -> Result<R>
1074 where
1075 T: Send + 'static,
1076 {
1077 self.do_run_concurrent(fun, true).await
1078 }
1079
1080 async fn do_run_concurrent<R>(
1081 mut self,
1082 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1083 trap_on_idle: bool,
1084 ) -> Result<R>
1085 where
1086 T: Send + 'static,
1087 {
1088 debug_assert!(self.0.concurrency_support());
1089 check_recursive_run();
1090 let token = StoreToken::new(self.as_context_mut());
1091
1092 struct Dropper<'a, T: 'static, V> {
1093 store: StoreContextMut<'a, T>,
1094 value: ManuallyDrop<V>,
1095 }
1096
1097 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1098 fn drop(&mut self) {
1099 tls::set(self.store.0, || {
1100 unsafe { ManuallyDrop::drop(&mut self.value) }
1105 });
1106 }
1107 }
1108
1109 let accessor = &Accessor::new(token);
1110 let dropper = &mut Dropper {
1111 store: self,
1112 value: ManuallyDrop::new(fun(accessor)),
1113 };
1114 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1116
1117 dropper
1118 .store
1119 .as_context_mut()
1120 .poll_until(future, trap_on_idle)
1121 .await
1122 }
1123
1124 async fn poll_until<R>(
1130 mut self,
1131 mut future: Pin<&mut impl Future<Output = R>>,
1132 trap_on_idle: bool,
1133 ) -> Result<R>
1134 where
1135 T: Send + 'static,
1136 {
1137 struct Reset<'a, T: 'static> {
1138 store: StoreContextMut<'a, T>,
1139 futures: Option<FuturesUnordered<HostTaskFuture>>,
1140 }
1141
1142 impl<'a, T> Drop for Reset<'a, T> {
1143 fn drop(&mut self) {
1144 if let Some(futures) = self.futures.take() {
1145 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1146 }
1147 }
1148 }
1149
1150 loop {
1151 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1155 let mut reset = Reset {
1156 store: self.as_context_mut(),
1157 futures,
1158 };
1159 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1160
1161 enum PollResult<R> {
1162 Complete(R),
1163 ProcessWork(Vec<WorkItem>),
1164 }
1165 let result = future::poll_fn(|cx| {
1166 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1169 return Poll::Ready(Ok(PollResult::Complete(value)));
1170 }
1171
1172 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1176 Poll::Ready(Some(output)) => {
1177 match output {
1178 Err(e) => return Poll::Ready(Err(e)),
1179 Ok(()) => {}
1180 }
1181 Poll::Ready(true)
1182 }
1183 Poll::Ready(None) => Poll::Ready(false),
1184 Poll::Pending => Poll::Pending,
1185 };
1186
1187 let state = reset.store.0.concurrent_state_mut();
1191 let ready = state.collect_work_items_to_run();
1192 if !ready.is_empty() {
1193 return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1194 }
1195
1196 return match next {
1200 Poll::Ready(true) => {
1201 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1207 }
1208 Poll::Ready(false) => {
1209 if let Poll::Ready(value) =
1213 tls::set(reset.store.0, || future.as_mut().poll(cx))
1214 {
1215 Poll::Ready(Ok(PollResult::Complete(value)))
1216 } else {
1217 if trap_on_idle {
1223 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1226 } else {
1227 Poll::Pending
1231 }
1232 }
1233 }
1234 Poll::Pending => Poll::Pending,
1239 };
1240 })
1241 .await;
1242
1243 drop(reset);
1247
1248 match result? {
1249 PollResult::Complete(value) => break Ok(value),
1252 PollResult::ProcessWork(ready) => {
1255 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1256 store: StoreContextMut<'a, T>,
1257 ready: I,
1258 }
1259
1260 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1261 fn drop(&mut self) {
1262 while let Some(item) = self.ready.next() {
1263 match item {
1264 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1265 WorkItem::PushFuture(future) => {
1266 tls::set(self.store.0, move || drop(future))
1267 }
1268 _ => {}
1269 }
1270 }
1271 }
1272 }
1273
1274 let mut dispose = Dispose {
1275 store: self.as_context_mut(),
1276 ready: ready.into_iter(),
1277 };
1278
1279 while let Some(item) = dispose.ready.next() {
1280 dispose
1281 .store
1282 .as_context_mut()
1283 .handle_work_item(item)
1284 .await?;
1285 }
1286 }
1287 }
1288 }
1289 }
1290
1291 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1293 where
1294 T: Send,
1295 {
1296 log::trace!("handle work item {item:?}");
1297 match item {
1298 WorkItem::PushFuture(future) => {
1299 self.0
1300 .concurrent_state_mut()
1301 .futures
1302 .get_mut()
1303 .as_mut()
1304 .unwrap()
1305 .push(future.into_inner());
1306 }
1307 WorkItem::ResumeFiber(fiber) => {
1308 self.0.resume_fiber(fiber).await?;
1309 }
1310 WorkItem::GuestCall(call) => {
1311 if call.is_ready(self.0)? {
1312 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1313 } else {
1314 let state = self.0.concurrent_state_mut();
1315 let task = state.get_mut(call.thread.task)?;
1316 if !task.starting_sent {
1317 task.starting_sent = true;
1318 if let GuestCallKind::StartImplicit(_) = &call.kind {
1319 Waitable::Guest(call.thread.task).set_event(
1320 state,
1321 Some(Event::Subtask {
1322 status: Status::Starting,
1323 }),
1324 )?;
1325 }
1326 }
1327
1328 let instance = state.get_mut(call.thread.task)?.instance;
1329 self.0
1330 .instance_state(instance)
1331 .concurrent_state()
1332 .pending
1333 .insert(call.thread, call.kind);
1334 }
1335 }
1336 WorkItem::WorkerFunction(fun) => {
1337 self.run_on_worker(WorkerItem::Function(fun)).await?;
1338 }
1339 }
1340
1341 Ok(())
1342 }
1343
1344 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1346 where
1347 T: Send,
1348 {
1349 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1350 fiber
1351 } else {
1352 fiber::make_fiber(self.0, move |store| {
1353 loop {
1354 match store.concurrent_state_mut().worker_item.take().unwrap() {
1355 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1356 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1357 }
1358
1359 store.suspend(SuspendReason::NeedWork)?;
1360 }
1361 })?
1362 };
1363
1364 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1365 assert!(worker_item.is_none());
1366 *worker_item = Some(item);
1367
1368 self.0.resume_fiber(worker).await
1369 }
1370
1371 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1376 where
1377 T: 'static,
1378 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1379 + Send
1380 + Sync
1381 + 'static,
1382 R: Send + Sync + 'static,
1383 {
1384 let token = StoreToken::new(self);
1385 async move {
1386 let mut accessor = Accessor::new(token);
1387 closure(&mut accessor).await
1388 }
1389 }
1390}
1391
1392impl StoreOpaque {
1393 pub(crate) fn enter_guest_sync_call(
1400 &mut self,
1401 guest_caller: Option<RuntimeInstance>,
1402 callee_async: bool,
1403 callee: RuntimeInstance,
1404 ) -> Result<()> {
1405 log::trace!("enter sync call {callee:?}");
1406 if !self.concurrency_support() {
1407 return Ok(self.enter_call_not_concurrent());
1408 }
1409
1410 let state = self.concurrent_state_mut();
1411 let thread = state.current_thread;
1412 let instance = if let Some(thread) = thread.guest() {
1413 Some(state.get_mut(thread.task)?.instance)
1414 } else {
1415 None
1416 };
1417 let task = GuestTask::new(
1418 state,
1419 Box::new(move |_, _| unreachable!()),
1420 LiftResult {
1421 lift: Box::new(move |_, _| unreachable!()),
1422 ty: TypeTupleIndex::reserved_value(),
1423 memory: None,
1424 string_encoding: StringEncoding::Utf8,
1425 },
1426 if let Some(caller) = guest_caller {
1427 assert_eq!(caller, instance.unwrap());
1428 Caller::Guest {
1429 thread: *thread.guest().unwrap(),
1430 }
1431 } else {
1432 Caller::Host {
1433 tx: None,
1434 exit_tx: Arc::new(oneshot::channel().0),
1435 host_future_present: false,
1436 caller: thread,
1437 }
1438 },
1439 None,
1440 callee,
1441 callee_async,
1442 )?;
1443
1444 let guest_task = state.push(task)?;
1445 let new_thread = GuestThread::new_implicit(guest_task);
1446 let guest_thread = state.push(new_thread)?;
1447 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1448 guest_thread,
1449 self,
1450 callee.index,
1451 )?;
1452
1453 let state = self.concurrent_state_mut();
1454 state.get_mut(guest_task)?.threads.insert(guest_thread);
1455 if guest_caller.is_some() {
1456 let thread = thread.guest().unwrap();
1457 state.get_mut(thread.task)?.subtasks.insert(guest_task);
1458 }
1459
1460 self.set_thread(QualifiedThreadId {
1461 task: guest_task,
1462 thread: guest_thread,
1463 });
1464
1465 Ok(())
1466 }
1467
1468 pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1470 if !self.concurrency_support() {
1471 return Ok(self.exit_call_not_concurrent());
1472 }
1473 let thread = *self.set_thread(CurrentThread::None).guest().unwrap();
1474 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1475 log::trace!("exit sync call {instance:?}");
1476 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1477 self,
1478 thread,
1479 instance.index,
1480 )?;
1481
1482 let state = self.concurrent_state_mut();
1483 let task = state.get_mut(thread.task)?;
1484 let caller = match &task.caller {
1485 &Caller::Guest { thread } => {
1486 assert!(guest_caller);
1487 thread.into()
1488 }
1489 &Caller::Host { caller, .. } => {
1490 assert!(!guest_caller);
1491 caller
1492 }
1493 };
1494 self.set_thread(caller);
1495
1496 let state = self.concurrent_state_mut();
1497 let task = state.get_mut(thread.task)?;
1498 if task.ready_to_delete() {
1499 state.delete(thread.task)?.dispose(state, thread.task)?;
1500 }
1501
1502 Ok(())
1503 }
1504
1505 pub fn enter_host_call(&mut self) -> Result<()> {
1513 let state = self.concurrent_state_mut();
1514 let caller = state.unwrap_current_guest_thread();
1515 let task = state.push(HostTask::new(caller))?;
1516 log::trace!("new host task {task:?}");
1517 self.set_thread(task);
1518 Ok(())
1519 }
1520
1521 pub fn exit_host_call(&mut self) -> Result<()> {
1528 let task = self.concurrent_state_mut().unwrap_current_host_thread();
1529 log::trace!("delete host task {task:?}");
1530 let task = self.concurrent_state_mut().delete(task)?;
1531 self.set_thread(task.caller);
1532 Ok(())
1533 }
1534
1535 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1543 if self.trapped() {
1544 return false;
1545 }
1546 if !self.concurrency_support() {
1547 return true;
1548 }
1549 let state = self.concurrent_state_mut();
1550 let mut cur = state.current_thread;
1551 loop {
1552 match cur {
1553 CurrentThread::None => break true,
1554 CurrentThread::Guest(thread) => {
1555 let task = state.get_mut(thread.task).unwrap();
1556
1557 if task.instance.instance == instance.instance {
1564 break false;
1565 }
1566 cur = match task.caller {
1567 Caller::Host { caller, .. } => caller,
1568 Caller::Guest { thread } => thread.into(),
1569 };
1570 }
1571 CurrentThread::Host(id) => {
1572 cur = state.get_mut(id).unwrap().caller.into();
1573 }
1574 }
1575 }
1576 }
1577
1578 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1581 self.component_instance_mut(instance.instance)
1582 .instance_state(instance.index)
1583 }
1584
1585 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread {
1586 let state = self.concurrent_state_mut();
1591 let old_thread = mem::replace(&mut state.current_thread, thread.into());
1592 if let Some(old_thread) = old_thread.guest() {
1593 let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1594 self.component_instance_mut(instance)
1595 .set_task_may_block(false)
1596 }
1597
1598 if self.concurrent_state_mut().current_thread.guest().is_some() {
1601 self.set_task_may_block();
1602 }
1603
1604 old_thread
1605 }
1606
1607 fn set_task_may_block(&mut self) {
1610 let state = self.concurrent_state_mut();
1611 let guest_thread = state.unwrap_current_guest_thread();
1612 let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1613 let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1614 self.component_instance_mut(instance)
1615 .set_task_may_block(may_block)
1616 }
1617
1618 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1619 if !self.concurrency_support() {
1620 return Ok(());
1621 }
1622 let state = self.concurrent_state_mut();
1623 let task = state.unwrap_current_guest_thread().task;
1624 let instance = state.get_mut(task).unwrap().instance.instance;
1625 let task_may_block = self.component_instance(instance).get_task_may_block();
1626
1627 if task_may_block {
1628 Ok(())
1629 } else {
1630 Err(Trap::CannotBlockSyncTask.into())
1631 }
1632 }
1633
1634 fn enter_instance(&mut self, instance: RuntimeInstance) {
1638 log::trace!("enter {instance:?}");
1639 self.instance_state(instance)
1640 .concurrent_state()
1641 .do_not_enter = true;
1642 }
1643
1644 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1648 log::trace!("exit {instance:?}");
1649 self.instance_state(instance)
1650 .concurrent_state()
1651 .do_not_enter = false;
1652 self.partition_pending(instance)
1653 }
1654
1655 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1660 for (thread, kind) in
1661 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1662 {
1663 let call = GuestCall { thread, kind };
1664 if call.is_ready(self)? {
1665 self.concurrent_state_mut()
1666 .push_high_priority(WorkItem::GuestCall(call));
1667 } else {
1668 self.instance_state(instance)
1669 .concurrent_state()
1670 .pending
1671 .insert(call.thread, call.kind);
1672 }
1673 }
1674
1675 Ok(())
1676 }
1677
1678 pub(crate) fn backpressure_modify(
1680 &mut self,
1681 caller_instance: RuntimeInstance,
1682 modify: impl FnOnce(u16) -> Option<u16>,
1683 ) -> Result<()> {
1684 let state = self.instance_state(caller_instance).concurrent_state();
1685 let old = state.backpressure;
1686 let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1687 state.backpressure = new;
1688
1689 if old > 0 && new == 0 {
1690 self.partition_pending(caller_instance)?;
1693 }
1694
1695 Ok(())
1696 }
1697
1698 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1701 let old_thread = self.concurrent_state_mut().current_thread;
1702 log::trace!("resume_fiber: save current thread {old_thread:?}");
1703
1704 let fiber = fiber::resolve_or_release(self, fiber).await?;
1705
1706 self.set_thread(old_thread);
1707
1708 let state = self.concurrent_state_mut();
1709
1710 if let Some(ot) = old_thread.guest() {
1711 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1712 }
1713 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1714
1715 if let Some(mut fiber) = fiber {
1716 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1717 match state.suspend_reason.take().unwrap() {
1719 SuspendReason::NeedWork => {
1720 if state.worker.is_none() {
1721 state.worker = Some(fiber);
1722 } else {
1723 fiber.dispose(self);
1724 }
1725 }
1726 SuspendReason::Yielding { thread, .. } => {
1727 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1728 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1729 }
1730 SuspendReason::ExplicitlySuspending { thread, .. } => {
1731 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1732 }
1733 SuspendReason::Waiting { set, thread, .. } => {
1734 let old = state
1735 .get_mut(set)?
1736 .waiting
1737 .insert(thread, WaitMode::Fiber(fiber));
1738 assert!(old.is_none());
1739 }
1740 };
1741 } else {
1742 log::trace!("resume_fiber: fiber has exited");
1743 }
1744
1745 Ok(())
1746 }
1747
1748 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1754 log::trace!("suspend fiber: {reason:?}");
1755
1756 let task = match &reason {
1760 SuspendReason::Yielding { thread, .. }
1761 | SuspendReason::Waiting { thread, .. }
1762 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1763 SuspendReason::NeedWork => None,
1764 };
1765
1766 let old_guest_thread = if task.is_some() {
1767 self.concurrent_state_mut().current_thread
1768 } else {
1769 CurrentThread::None
1770 };
1771
1772 assert!(
1778 matches!(
1779 reason,
1780 SuspendReason::ExplicitlySuspending {
1781 skip_may_block_check: true,
1782 ..
1783 } | SuspendReason::Waiting {
1784 skip_may_block_check: true,
1785 ..
1786 } | SuspendReason::Yielding {
1787 skip_may_block_check: true,
1788 ..
1789 }
1790 ) || old_guest_thread
1791 .guest()
1792 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1793 .unwrap_or(true)
1794 );
1795
1796 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1797 assert!(suspend_reason.is_none());
1798 *suspend_reason = Some(reason);
1799
1800 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1801
1802 if task.is_some() {
1803 self.set_thread(old_guest_thread);
1804 }
1805
1806 Ok(())
1807 }
1808
1809 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1810 let state = self.concurrent_state_mut();
1811 let caller = state.unwrap_current_guest_thread();
1812 let old_set = waitable.common(state)?.set;
1813 let set = state.get_mut(caller.task)?.sync_call_set;
1814 waitable.join(state, Some(set))?;
1815 self.suspend(SuspendReason::Waiting {
1816 set,
1817 thread: caller,
1818 skip_may_block_check: false,
1819 })?;
1820 let state = self.concurrent_state_mut();
1821 waitable.join(state, old_set)
1822 }
1823}
1824
1825impl Instance {
1826 fn get_event(
1829 self,
1830 store: &mut StoreOpaque,
1831 guest_task: TableId<GuestTask>,
1832 set: Option<TableId<WaitableSet>>,
1833 cancellable: bool,
1834 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1835 let state = store.concurrent_state_mut();
1836
1837 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1838 log::trace!("deliver event {event:?} to {guest_task:?}");
1839
1840 if cancellable || !matches!(event, Event::Cancelled) {
1841 return Ok(Some((event, None)));
1842 } else {
1843 state.get_mut(guest_task)?.event = Some(event);
1844 }
1845 }
1846
1847 Ok(
1848 if let Some((set, waitable)) = set
1849 .and_then(|set| {
1850 state
1851 .get_mut(set)
1852 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1853 .transpose()
1854 })
1855 .transpose()?
1856 {
1857 let common = waitable.common(state)?;
1858 let handle = common.handle.unwrap();
1859 let event = common.event.take().unwrap();
1860
1861 log::trace!(
1862 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1863 );
1864
1865 waitable.on_delivery(store, self, event);
1866
1867 Some((event, Some((waitable, handle))))
1868 } else {
1869 None
1870 },
1871 )
1872 }
1873
1874 fn handle_callback_code(
1880 self,
1881 store: &mut StoreOpaque,
1882 guest_thread: QualifiedThreadId,
1883 runtime_instance: RuntimeComponentInstanceIndex,
1884 code: u32,
1885 ) -> Result<Option<GuestCall>> {
1886 let (code, set) = unpack_callback_code(code);
1887
1888 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1889
1890 let state = store.concurrent_state_mut();
1891
1892 let get_set = |store: &mut StoreOpaque, handle| {
1893 if handle == 0 {
1894 bail!("invalid waitable-set handle");
1895 }
1896
1897 let set = store
1898 .instance_state(RuntimeInstance {
1899 instance: self.id().instance(),
1900 index: runtime_instance,
1901 })
1902 .handle_table()
1903 .waitable_set_rep(handle)?;
1904
1905 Ok(TableId::<WaitableSet>::new(set))
1906 };
1907
1908 Ok(match code {
1909 callback_code::EXIT => {
1910 log::trace!("implicit thread {guest_thread:?} completed");
1911 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1912 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1913 if task.threads.is_empty() && !task.returned_or_cancelled() {
1914 bail!(Trap::NoAsyncResult);
1915 }
1916 match &task.caller {
1917 Caller::Host { .. } => {
1918 if task.ready_to_delete() {
1919 Waitable::Guest(guest_thread.task)
1920 .delete_from(store.concurrent_state_mut())?;
1921 }
1922 }
1923 Caller::Guest { .. } => {
1924 task.exited = true;
1925 task.callback = None;
1926 }
1927 }
1928 None
1929 }
1930 callback_code::YIELD => {
1931 let task = state.get_mut(guest_thread.task)?;
1932 if let Some(event) = task.event {
1937 assert!(matches!(event, Event::None | Event::Cancelled));
1938 } else {
1939 task.event = Some(Event::None);
1940 }
1941 let call = GuestCall {
1942 thread: guest_thread,
1943 kind: GuestCallKind::DeliverEvent {
1944 instance: self,
1945 set: None,
1946 },
1947 };
1948 if state.may_block(guest_thread.task) {
1949 state.push_low_priority(WorkItem::GuestCall(call));
1952 None
1953 } else {
1954 Some(call)
1958 }
1959 }
1960 callback_code::WAIT => {
1961 state.check_blocking_for(guest_thread.task)?;
1964
1965 let set = get_set(store, set)?;
1966 let state = store.concurrent_state_mut();
1967
1968 if state.get_mut(guest_thread.task)?.event.is_some()
1969 || !state.get_mut(set)?.ready.is_empty()
1970 {
1971 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1973 thread: guest_thread,
1974 kind: GuestCallKind::DeliverEvent {
1975 instance: self,
1976 set: Some(set),
1977 },
1978 }));
1979 } else {
1980 let old = state
1988 .get_mut(guest_thread.thread)?
1989 .wake_on_cancel
1990 .replace(set);
1991 assert!(old.is_none());
1992 let old = state
1993 .get_mut(set)?
1994 .waiting
1995 .insert(guest_thread, WaitMode::Callback(self));
1996 assert!(old.is_none());
1997 }
1998 None
1999 }
2000 _ => bail!("unsupported callback code: {code}"),
2001 })
2002 }
2003
2004 fn cleanup_thread(
2005 self,
2006 store: &mut StoreOpaque,
2007 guest_thread: QualifiedThreadId,
2008 runtime_instance: RuntimeComponentInstanceIndex,
2009 ) -> Result<()> {
2010 let guest_id = store
2011 .concurrent_state_mut()
2012 .get_mut(guest_thread.thread)?
2013 .instance_rep;
2014 store
2015 .instance_state(RuntimeInstance {
2016 instance: self.id().instance(),
2017 index: runtime_instance,
2018 })
2019 .thread_handle_table()
2020 .guest_thread_remove(guest_id.unwrap())?;
2021
2022 store.concurrent_state_mut().delete(guest_thread.thread)?;
2023 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2024 task.threads.remove(&guest_thread.thread);
2025 Ok(())
2026 }
2027
2028 unsafe fn queue_call<T: 'static>(
2035 self,
2036 mut store: StoreContextMut<T>,
2037 guest_thread: QualifiedThreadId,
2038 callee: SendSyncPtr<VMFuncRef>,
2039 param_count: usize,
2040 result_count: usize,
2041 async_: bool,
2042 callback: Option<SendSyncPtr<VMFuncRef>>,
2043 post_return: Option<SendSyncPtr<VMFuncRef>>,
2044 ) -> Result<()> {
2045 unsafe fn make_call<T: 'static>(
2060 store: StoreContextMut<T>,
2061 guest_thread: QualifiedThreadId,
2062 callee: SendSyncPtr<VMFuncRef>,
2063 param_count: usize,
2064 result_count: usize,
2065 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2066 + Send
2067 + Sync
2068 + 'static
2069 + use<T> {
2070 let token = StoreToken::new(store);
2071 move |store: &mut dyn VMStore| {
2072 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2073
2074 store
2075 .concurrent_state_mut()
2076 .get_mut(guest_thread.thread)?
2077 .state = GuestThreadState::Running;
2078 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2079 let lower = task.lower_params.take().unwrap();
2080
2081 lower(store, &mut storage[..param_count])?;
2082
2083 let mut store = token.as_context_mut(store);
2084
2085 unsafe {
2088 crate::Func::call_unchecked_raw(
2089 &mut store,
2090 callee.as_non_null(),
2091 NonNull::new(
2092 &mut storage[..param_count.max(result_count)]
2093 as *mut [MaybeUninit<ValRaw>] as _,
2094 )
2095 .unwrap(),
2096 )?;
2097 }
2098
2099 Ok(storage)
2100 }
2101 }
2102
2103 let call = unsafe {
2107 make_call(
2108 store.as_context_mut(),
2109 guest_thread,
2110 callee,
2111 param_count,
2112 result_count,
2113 )
2114 };
2115
2116 let callee_instance = store
2117 .0
2118 .concurrent_state_mut()
2119 .get_mut(guest_thread.task)?
2120 .instance;
2121
2122 let fun = if callback.is_some() {
2123 assert!(async_);
2124
2125 Box::new(move |store: &mut dyn VMStore| {
2126 self.add_guest_thread_to_instance_table(
2127 guest_thread.thread,
2128 store,
2129 callee_instance.index,
2130 )?;
2131 let old_thread = store.set_thread(guest_thread);
2132 log::trace!(
2133 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2134 );
2135
2136 store.enter_instance(callee_instance);
2137
2138 let storage = call(store)?;
2145
2146 store.exit_instance(callee_instance)?;
2147
2148 store.set_thread(old_thread);
2149 let state = store.concurrent_state_mut();
2150 old_thread
2151 .guest()
2152 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2153 log::trace!("stackless call: restored {old_thread:?} as current thread");
2154
2155 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2158
2159 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2160 })
2161 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2162 } else {
2163 let token = StoreToken::new(store.as_context_mut());
2164 Box::new(move |store: &mut dyn VMStore| {
2165 self.add_guest_thread_to_instance_table(
2166 guest_thread.thread,
2167 store,
2168 callee_instance.index,
2169 )?;
2170 let old_thread = store.set_thread(guest_thread);
2171 log::trace!(
2172 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2173 );
2174 let flags = self.id().get(store).instance_flags(callee_instance.index);
2175
2176 if !async_ {
2180 store.enter_instance(callee_instance);
2181 }
2182
2183 let storage = call(store)?;
2190
2191 if async_ {
2192 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2193 if task.threads.len() == 1 && !task.returned_or_cancelled() {
2194 bail!(Trap::NoAsyncResult);
2195 }
2196 } else {
2197 let lift = {
2203 store.exit_instance(callee_instance)?;
2204
2205 let state = store.concurrent_state_mut();
2206 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2207
2208 state
2209 .get_mut(guest_thread.task)?
2210 .lift_result
2211 .take()
2212 .unwrap()
2213 };
2214
2215 let result = (lift.lift)(store, unsafe {
2218 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2219 &storage[..result_count],
2220 )
2221 })?;
2222
2223 let post_return_arg = match result_count {
2224 0 => ValRaw::i32(0),
2225 1 => unsafe { storage[0].assume_init() },
2228 _ => unreachable!(),
2229 };
2230
2231 unsafe {
2232 call_post_return(
2233 token.as_context_mut(store),
2234 post_return.map(|v| v.as_non_null()),
2235 post_return_arg,
2236 flags,
2237 )?;
2238 }
2239
2240 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2241 }
2242
2243 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2245
2246 store.set_thread(old_thread);
2247
2248 let state = store.concurrent_state_mut();
2249 let task = state.get_mut(guest_thread.task)?;
2250
2251 match &task.caller {
2252 Caller::Host { .. } => {
2253 if task.ready_to_delete() {
2254 Waitable::Guest(guest_thread.task).delete_from(state)?;
2255 }
2256 }
2257 Caller::Guest { .. } => {
2258 task.exited = true;
2259 }
2260 }
2261
2262 Ok(None)
2263 })
2264 };
2265
2266 store
2267 .0
2268 .concurrent_state_mut()
2269 .push_high_priority(WorkItem::GuestCall(GuestCall {
2270 thread: guest_thread,
2271 kind: GuestCallKind::StartImplicit(fun),
2272 }));
2273
2274 Ok(())
2275 }
2276
2277 unsafe fn prepare_call<T: 'static>(
2290 self,
2291 mut store: StoreContextMut<T>,
2292 start: *mut VMFuncRef,
2293 return_: *mut VMFuncRef,
2294 caller_instance: RuntimeComponentInstanceIndex,
2295 callee_instance: RuntimeComponentInstanceIndex,
2296 task_return_type: TypeTupleIndex,
2297 callee_async: bool,
2298 memory: *mut VMMemoryDefinition,
2299 string_encoding: u8,
2300 caller_info: CallerInfo,
2301 ) -> Result<()> {
2302 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2303 store.0.check_blocking()?;
2307 }
2308
2309 enum ResultInfo {
2310 Heap { results: u32 },
2311 Stack { result_count: u32 },
2312 }
2313
2314 let result_info = match &caller_info {
2315 CallerInfo::Async {
2316 has_result: true,
2317 params,
2318 } => ResultInfo::Heap {
2319 results: params.last().unwrap().get_u32(),
2320 },
2321 CallerInfo::Async {
2322 has_result: false, ..
2323 } => ResultInfo::Stack { result_count: 0 },
2324 CallerInfo::Sync {
2325 result_count,
2326 params,
2327 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2328 results: params.last().unwrap().get_u32(),
2329 },
2330 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2331 result_count: *result_count,
2332 },
2333 };
2334
2335 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2336
2337 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2341 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2342 let token = StoreToken::new(store.as_context_mut());
2343 let state = store.0.concurrent_state_mut();
2344 let old_thread = state.unwrap_current_guest_thread();
2345
2346 assert_eq!(
2347 state.get_mut(old_thread.task)?.instance,
2348 RuntimeInstance {
2349 instance: self.id().instance(),
2350 index: caller_instance,
2351 }
2352 );
2353
2354 let new_task = GuestTask::new(
2355 state,
2356 Box::new(move |store, dst| {
2357 let mut store = token.as_context_mut(store);
2358 assert!(dst.len() <= MAX_FLAT_PARAMS);
2359 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2361 let count = match caller_info {
2362 CallerInfo::Async { params, has_result } => {
2366 let params = ¶ms[..params.len() - usize::from(has_result)];
2367 for (param, src) in params.iter().zip(&mut src) {
2368 src.write(*param);
2369 }
2370 params.len()
2371 }
2372
2373 CallerInfo::Sync { params, .. } => {
2375 for (param, src) in params.iter().zip(&mut src) {
2376 src.write(*param);
2377 }
2378 params.len()
2379 }
2380 };
2381 unsafe {
2388 crate::Func::call_unchecked_raw(
2389 &mut store,
2390 start.as_non_null(),
2391 NonNull::new(
2392 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2393 )
2394 .unwrap(),
2395 )?;
2396 }
2397 dst.copy_from_slice(&src[..dst.len()]);
2398 let state = store.0.concurrent_state_mut();
2399 Waitable::Guest(state.unwrap_current_guest_thread().task).set_event(
2400 state,
2401 Some(Event::Subtask {
2402 status: Status::Started,
2403 }),
2404 )?;
2405 Ok(())
2406 }),
2407 LiftResult {
2408 lift: Box::new(move |store, src| {
2409 let mut store = token.as_context_mut(store);
2412 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2414 my_src.push(ValRaw::u32(*results));
2415 }
2416 unsafe {
2423 crate::Func::call_unchecked_raw(
2424 &mut store,
2425 return_.as_non_null(),
2426 my_src.as_mut_slice().into(),
2427 )?;
2428 }
2429 let state = store.0.concurrent_state_mut();
2430 let thread = state.unwrap_current_guest_thread();
2431 if sync_caller {
2432 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2433 if let ResultInfo::Stack { result_count } = &result_info {
2434 match result_count {
2435 0 => None,
2436 1 => Some(my_src[0]),
2437 _ => unreachable!(),
2438 }
2439 } else {
2440 None
2441 },
2442 );
2443 }
2444 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2445 }),
2446 ty: task_return_type,
2447 memory: NonNull::new(memory).map(SendSyncPtr::new),
2448 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2449 },
2450 Caller::Guest { thread: old_thread },
2451 None,
2452 RuntimeInstance {
2453 instance: self.id().instance(),
2454 index: callee_instance,
2455 },
2456 callee_async,
2457 )?;
2458
2459 let guest_task = state.push(new_task)?;
2460 let new_thread = GuestThread::new_implicit(guest_task);
2461 let guest_thread = state.push(new_thread)?;
2462 state.get_mut(guest_task)?.threads.insert(guest_thread);
2463
2464 store
2465 .0
2466 .concurrent_state_mut()
2467 .get_mut(old_thread.task)?
2468 .subtasks
2469 .insert(guest_task);
2470
2471 store.0.set_thread(QualifiedThreadId {
2474 task: guest_task,
2475 thread: guest_thread,
2476 });
2477 log::trace!(
2478 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2479 );
2480
2481 Ok(())
2482 }
2483
2484 unsafe fn call_callback<T>(
2489 self,
2490 mut store: StoreContextMut<T>,
2491 function: SendSyncPtr<VMFuncRef>,
2492 event: Event,
2493 handle: u32,
2494 ) -> Result<u32> {
2495 let (ordinal, result) = event.parts();
2496 let params = &mut [
2497 ValRaw::u32(ordinal),
2498 ValRaw::u32(handle),
2499 ValRaw::u32(result),
2500 ];
2501 unsafe {
2506 crate::Func::call_unchecked_raw(
2507 &mut store,
2508 function.as_non_null(),
2509 params.as_mut_slice().into(),
2510 )?;
2511 }
2512 Ok(params[0].get_u32())
2513 }
2514
2515 unsafe fn start_call<T: 'static>(
2528 self,
2529 mut store: StoreContextMut<T>,
2530 callback: *mut VMFuncRef,
2531 post_return: *mut VMFuncRef,
2532 callee: *mut VMFuncRef,
2533 param_count: u32,
2534 result_count: u32,
2535 flags: u32,
2536 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2537 ) -> Result<u32> {
2538 let token = StoreToken::new(store.as_context_mut());
2539 let async_caller = storage.is_none();
2540 let state = store.0.concurrent_state_mut();
2541 let guest_thread = state.unwrap_current_guest_thread();
2542 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2543 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2544 let param_count = usize::try_from(param_count).unwrap();
2545 assert!(param_count <= MAX_FLAT_PARAMS);
2546 let result_count = usize::try_from(result_count).unwrap();
2547 assert!(result_count <= MAX_FLAT_RESULTS);
2548
2549 let task = state.get_mut(guest_thread.task)?;
2550 if !callback.is_null() {
2551 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2555 task.callback = Some(Box::new(move |store, event, handle| {
2556 let store = token.as_context_mut(store);
2557 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2558 }));
2559 }
2560
2561 let Caller::Guest { thread: caller } = &task.caller else {
2562 unreachable!()
2565 };
2566 let caller = *caller;
2567 let caller_instance = state.get_mut(caller.task)?.instance;
2568
2569 unsafe {
2571 self.queue_call(
2572 store.as_context_mut(),
2573 guest_thread,
2574 callee,
2575 param_count,
2576 result_count,
2577 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2578 NonNull::new(callback).map(SendSyncPtr::new),
2579 NonNull::new(post_return).map(SendSyncPtr::new),
2580 )?;
2581 }
2582
2583 let state = store.0.concurrent_state_mut();
2584
2585 let guest_waitable = Waitable::Guest(guest_thread.task);
2588 let old_set = guest_waitable.common(state)?.set;
2589 let set = state.get_mut(caller.task)?.sync_call_set;
2590 guest_waitable.join(state, Some(set))?;
2591
2592 let (status, waitable) = loop {
2608 store.0.suspend(SuspendReason::Waiting {
2609 set,
2610 thread: caller,
2611 skip_may_block_check: async_caller || !callee_async,
2619 })?;
2620
2621 let state = store.0.concurrent_state_mut();
2622
2623 log::trace!("taking event for {:?}", guest_thread.task);
2624 let event = guest_waitable.take_event(state)?;
2625 let Some(Event::Subtask { status }) = event else {
2626 unreachable!();
2627 };
2628
2629 log::trace!("status {status:?} for {:?}", guest_thread.task);
2630
2631 if status == Status::Returned {
2632 break (status, None);
2634 } else if async_caller {
2635 let handle = store
2639 .0
2640 .instance_state(caller_instance)
2641 .handle_table()
2642 .subtask_insert_guest(guest_thread.task.rep())?;
2643 store
2644 .0
2645 .concurrent_state_mut()
2646 .get_mut(guest_thread.task)?
2647 .common
2648 .handle = Some(handle);
2649 break (status, Some(handle));
2650 } else {
2651 }
2655 };
2656
2657 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2658
2659 store.0.set_thread(caller);
2661 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2662 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2663
2664 if let Some(storage) = storage {
2665 let state = store.0.concurrent_state_mut();
2669 let task = state.get_mut(guest_thread.task)?;
2670 if let Some(result) = task.sync_result.take() {
2671 if let Some(result) = result {
2672 storage[0] = MaybeUninit::new(result);
2673 }
2674
2675 if task.exited && task.ready_to_delete() {
2676 Waitable::Guest(guest_thread.task).delete_from(state)?;
2677 }
2678 }
2679 }
2680
2681 Ok(status.pack(waitable))
2682 }
2683
2684 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2696 self,
2697 mut store: StoreContextMut<'_, T>,
2698 future: impl Future<Output = Result<R>> + Send + 'static,
2699 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2700 ) -> Result<Option<u32>> {
2701 let token = StoreToken::new(store.as_context_mut());
2702 let state = store.0.concurrent_state_mut();
2703 let task = state.unwrap_current_host_thread();
2704
2705 let (join_handle, future) = JoinHandle::run(future);
2708 {
2709 let task = state.get_mut(task)?;
2710 assert!(task.join_handle.is_none());
2711 task.join_handle = Some(join_handle);
2712 }
2713
2714 let mut future = Box::pin(future);
2715
2716 let poll = tls::set(store.0, || {
2721 future
2722 .as_mut()
2723 .poll(&mut Context::from_waker(&Waker::noop()))
2724 });
2725
2726 match poll {
2727 Poll::Ready(Some(result)) => {
2729 lower(store.as_context_mut(), result?)?;
2730 return Ok(None);
2731 }
2732
2733 Poll::Ready(None) => unreachable!(),
2736
2737 Poll::Pending => {}
2739 }
2740
2741 let future = Box::pin(async move {
2749 let result = match future.await {
2750 Some(result) => result?,
2751 None => return Ok(()),
2753 };
2754 let on_complete = move |store: &mut dyn VMStore| {
2755 let mut store = token.as_context_mut(store);
2759 let state = store.0.concurrent_state_mut();
2760 assert!(state.current_thread.is_none());
2761 store.0.set_thread(task);
2762
2763 lower(store.as_context_mut(), result)?;
2764 let state = store.0.concurrent_state_mut();
2765 state.get_mut(task)?.join_handle.take();
2766 Waitable::Host(task).set_event(
2767 state,
2768 Some(Event::Subtask {
2769 status: Status::Returned,
2770 }),
2771 )?;
2772
2773 store.0.set_thread(CurrentThread::None);
2775 Ok(())
2776 };
2777
2778 tls::get(move |store| {
2783 store
2784 .concurrent_state_mut()
2785 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2786 on_complete,
2787 ))));
2788 Ok(())
2789 })
2790 });
2791
2792 let state = store.0.concurrent_state_mut();
2795 state.push_future(future);
2796 let caller = state.get_mut(task)?.caller;
2797 let instance = state.get_mut(caller.task)?.instance;
2798 let handle = store
2799 .0
2800 .instance_state(instance)
2801 .handle_table()
2802 .subtask_insert_host(task.rep())?;
2803 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2804 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2805
2806 store.0.set_thread(caller);
2810 Ok(Some(handle))
2811 }
2812
2813 pub(crate) fn task_return(
2816 self,
2817 store: &mut dyn VMStore,
2818 ty: TypeTupleIndex,
2819 options: OptionsIndex,
2820 storage: &[ValRaw],
2821 ) -> Result<()> {
2822 let state = store.concurrent_state_mut();
2823 let guest_thread = state.unwrap_current_guest_thread();
2824 let lift = state
2825 .get_mut(guest_thread.task)?
2826 .lift_result
2827 .take()
2828 .ok_or_else(|| {
2829 format_err!("`task.return` or `task.cancel` called more than once for current task")
2830 })?;
2831 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2832
2833 let CanonicalOptions {
2834 string_encoding,
2835 data_model,
2836 ..
2837 } = &self.id().get(store).component().env_component().options[options];
2838
2839 let invalid = ty != lift.ty
2840 || string_encoding != &lift.string_encoding
2841 || match data_model {
2842 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2843 Some(memory) => {
2844 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2845 let actual = self.id().get(store).runtime_memory(memory);
2846 expected != actual.as_ptr()
2847 }
2848 None => false,
2851 },
2852 CanonicalOptionsDataModel::Gc { .. } => true,
2854 };
2855
2856 if invalid {
2857 bail!("invalid `task.return` signature and/or options for current task");
2858 }
2859
2860 log::trace!("task.return for {guest_thread:?}");
2861
2862 let result = (lift.lift)(store, storage)?;
2863 self.task_complete(store, guest_thread.task, result, Status::Returned)
2864 }
2865
2866 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2868 let state = store.concurrent_state_mut();
2869 let guest_thread = state.unwrap_current_guest_thread();
2870 let task = state.get_mut(guest_thread.task)?;
2871 if !task.cancel_sent {
2872 bail!("`task.cancel` called by task which has not been cancelled")
2873 }
2874 _ = task.lift_result.take().ok_or_else(|| {
2875 format_err!("`task.return` or `task.cancel` called more than once for current task")
2876 })?;
2877
2878 assert!(task.result.is_none());
2879
2880 log::trace!("task.cancel for {guest_thread:?}");
2881
2882 self.task_complete(
2883 store,
2884 guest_thread.task,
2885 Box::new(DummyResult),
2886 Status::ReturnCancelled,
2887 )
2888 }
2889
2890 fn task_complete(
2896 self,
2897 store: &mut StoreOpaque,
2898 guest_task: TableId<GuestTask>,
2899 result: Box<dyn Any + Send + Sync>,
2900 status: Status,
2901 ) -> Result<()> {
2902 store
2903 .component_resource_tables(Some(self))
2904 .validate_scope_exit()?;
2905
2906 let state = store.concurrent_state_mut();
2907 let task = state.get_mut(guest_task)?;
2908
2909 if let Caller::Host { tx, .. } = &mut task.caller {
2910 if let Some(tx) = tx.take() {
2911 _ = tx.send(result);
2912 }
2913 } else {
2914 task.result = Some(result);
2915 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2916 }
2917
2918 Ok(())
2919 }
2920
2921 pub(crate) fn waitable_set_new(
2923 self,
2924 store: &mut StoreOpaque,
2925 caller_instance: RuntimeComponentInstanceIndex,
2926 ) -> Result<u32> {
2927 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2928 let handle = store
2929 .instance_state(RuntimeInstance {
2930 instance: self.id().instance(),
2931 index: caller_instance,
2932 })
2933 .handle_table()
2934 .waitable_set_insert(set.rep())?;
2935 log::trace!("new waitable set {set:?} (handle {handle})");
2936 Ok(handle)
2937 }
2938
2939 pub(crate) fn waitable_set_drop(
2941 self,
2942 store: &mut StoreOpaque,
2943 caller_instance: RuntimeComponentInstanceIndex,
2944 set: u32,
2945 ) -> Result<()> {
2946 let rep = store
2947 .instance_state(RuntimeInstance {
2948 instance: self.id().instance(),
2949 index: caller_instance,
2950 })
2951 .handle_table()
2952 .waitable_set_remove(set)?;
2953
2954 log::trace!("drop waitable set {rep} (handle {set})");
2955
2956 let set = store
2957 .concurrent_state_mut()
2958 .delete(TableId::<WaitableSet>::new(rep))?;
2959
2960 if !set.waiting.is_empty() {
2961 bail!("cannot drop waitable set with waiters");
2962 }
2963
2964 Ok(())
2965 }
2966
2967 pub(crate) fn waitable_join(
2969 self,
2970 store: &mut StoreOpaque,
2971 caller_instance: RuntimeComponentInstanceIndex,
2972 waitable_handle: u32,
2973 set_handle: u32,
2974 ) -> Result<()> {
2975 let mut instance = self.id().get_mut(store);
2976 let waitable =
2977 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2978
2979 let set = if set_handle == 0 {
2980 None
2981 } else {
2982 let set = instance.instance_states().0[caller_instance]
2983 .handle_table()
2984 .waitable_set_rep(set_handle)?;
2985
2986 Some(TableId::<WaitableSet>::new(set))
2987 };
2988
2989 log::trace!(
2990 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2991 );
2992
2993 waitable.join(store.concurrent_state_mut(), set)
2994 }
2995
2996 pub(crate) fn subtask_drop(
2998 self,
2999 store: &mut StoreOpaque,
3000 caller_instance: RuntimeComponentInstanceIndex,
3001 task_id: u32,
3002 ) -> Result<()> {
3003 self.waitable_join(store, caller_instance, task_id, 0)?;
3004
3005 let (rep, is_host) = store
3006 .instance_state(RuntimeInstance {
3007 instance: self.id().instance(),
3008 index: caller_instance,
3009 })
3010 .handle_table()
3011 .subtask_remove(task_id)?;
3012
3013 let concurrent_state = store.concurrent_state_mut();
3014 let (waitable, expected_caller, delete) = if is_host {
3015 let id = TableId::<HostTask>::new(rep);
3016 let task = concurrent_state.get_mut(id)?;
3017 if task.join_handle.is_some() {
3018 bail!("cannot drop a subtask which has not yet resolved");
3019 }
3020 (Waitable::Host(id), task.caller, true)
3021 } else {
3022 let id = TableId::<GuestTask>::new(rep);
3023 let task = concurrent_state.get_mut(id)?;
3024 if task.lift_result.is_some() {
3025 bail!("cannot drop a subtask which has not yet resolved");
3026 }
3027 if let Caller::Guest { thread } = task.caller {
3028 (
3029 Waitable::Guest(id),
3030 thread,
3031 concurrent_state.get_mut(id)?.exited,
3032 )
3033 } else {
3034 unreachable!()
3035 }
3036 };
3037
3038 waitable.common(concurrent_state)?.handle = None;
3039
3040 if waitable.take_event(concurrent_state)?.is_some() {
3041 bail!("cannot drop a subtask with an undelivered event");
3042 }
3043
3044 if delete {
3045 waitable.delete_from(concurrent_state)?;
3046 }
3047
3048 assert_eq!(
3052 expected_caller,
3053 concurrent_state.unwrap_current_guest_thread(),
3054 );
3055 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3056 Ok(())
3057 }
3058
3059 pub(crate) fn waitable_set_wait(
3061 self,
3062 store: &mut StoreOpaque,
3063 options: OptionsIndex,
3064 set: u32,
3065 payload: u32,
3066 ) -> Result<u32> {
3067 if !self.options(store, options).async_ {
3068 store.check_blocking()?;
3072 }
3073
3074 let &CanonicalOptions {
3075 cancellable,
3076 instance: caller_instance,
3077 ..
3078 } = &self.id().get(store).component().env_component().options[options];
3079 let rep = store
3080 .instance_state(RuntimeInstance {
3081 instance: self.id().instance(),
3082 index: caller_instance,
3083 })
3084 .handle_table()
3085 .waitable_set_rep(set)?;
3086
3087 self.waitable_check(
3088 store,
3089 cancellable,
3090 WaitableCheck::Wait,
3091 WaitableCheckParams {
3092 set: TableId::new(rep),
3093 options,
3094 payload,
3095 },
3096 )
3097 }
3098
3099 pub(crate) fn waitable_set_poll(
3101 self,
3102 store: &mut StoreOpaque,
3103 options: OptionsIndex,
3104 set: u32,
3105 payload: u32,
3106 ) -> Result<u32> {
3107 let &CanonicalOptions {
3108 cancellable,
3109 instance: caller_instance,
3110 ..
3111 } = &self.id().get(store).component().env_component().options[options];
3112 let rep = store
3113 .instance_state(RuntimeInstance {
3114 instance: self.id().instance(),
3115 index: caller_instance,
3116 })
3117 .handle_table()
3118 .waitable_set_rep(set)?;
3119
3120 self.waitable_check(
3121 store,
3122 cancellable,
3123 WaitableCheck::Poll,
3124 WaitableCheckParams {
3125 set: TableId::new(rep),
3126 options,
3127 payload,
3128 },
3129 )
3130 }
3131
3132 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3134 let thread_id = store
3135 .concurrent_state_mut()
3136 .unwrap_current_guest_thread()
3137 .thread;
3138 Ok(store
3140 .concurrent_state_mut()
3141 .get_mut(thread_id)?
3142 .instance_rep
3143 .unwrap())
3144 }
3145
3146 pub(crate) fn thread_new_indirect<T: 'static>(
3148 self,
3149 mut store: StoreContextMut<T>,
3150 runtime_instance: RuntimeComponentInstanceIndex,
3151 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3153 start_func_idx: u32,
3154 context: i32,
3155 ) -> Result<u32> {
3156 log::trace!("creating new thread");
3157
3158 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3159 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3160 let callee = instance
3161 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3162 .ok_or_else(|| {
3163 format_err!("the start function index points to an uninitialized function")
3164 })?;
3165 if callee.type_index(store.0) != start_func_ty.type_index() {
3166 bail!(
3167 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3168 );
3169 }
3170
3171 let token = StoreToken::new(store.as_context_mut());
3172 let start_func = Box::new(
3173 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3174 let old_thread = store.set_thread(guest_thread);
3175 log::trace!(
3176 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3177 );
3178
3179 let mut store = token.as_context_mut(store);
3180 let mut params = [ValRaw::i32(context)];
3181 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3184
3185 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3186 log::trace!("explicit thread {guest_thread:?} completed");
3187 let state = store.0.concurrent_state_mut();
3188 let task = state.get_mut(guest_thread.task)?;
3189 if task.threads.is_empty() && !task.returned_or_cancelled() {
3190 bail!(Trap::NoAsyncResult);
3191 }
3192 store.0.set_thread(old_thread);
3193 let state = store.0.concurrent_state_mut();
3194 old_thread
3195 .guest()
3196 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3197 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3198 Waitable::Guest(guest_thread.task).delete_from(state)?;
3199 }
3200 log::trace!("thread start: restored {old_thread:?} as current thread");
3201
3202 Ok(())
3203 },
3204 );
3205
3206 let state = store.0.concurrent_state_mut();
3207 let current_thread = state.unwrap_current_guest_thread();
3208 let parent_task = current_thread.task;
3209
3210 let new_thread = GuestThread::new_explicit(parent_task, start_func);
3211 let thread_id = state.push(new_thread)?;
3212 state.get_mut(parent_task)?.threads.insert(thread_id);
3213
3214 log::trace!("new thread with id {thread_id:?} created");
3215
3216 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3217 }
3218
3219 pub(crate) fn resume_suspended_thread(
3220 self,
3221 store: &mut StoreOpaque,
3222 runtime_instance: RuntimeComponentInstanceIndex,
3223 thread_idx: u32,
3224 high_priority: bool,
3225 ) -> Result<()> {
3226 let thread_id =
3227 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3228 let state = store.concurrent_state_mut();
3229 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3230 let thread = state.get_mut(guest_thread.thread)?;
3231
3232 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3233 GuestThreadState::NotStartedExplicit(start_func) => {
3234 log::trace!("starting thread {guest_thread:?}");
3235 let guest_call = WorkItem::GuestCall(GuestCall {
3236 thread: guest_thread,
3237 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3238 start_func(store, guest_thread)
3239 })),
3240 });
3241 store
3242 .concurrent_state_mut()
3243 .push_work_item(guest_call, high_priority);
3244 }
3245 GuestThreadState::Suspended(fiber) => {
3246 log::trace!("resuming thread {thread_id:?} that was suspended");
3247 store
3248 .concurrent_state_mut()
3249 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3250 }
3251 _ => {
3252 bail!("cannot resume thread which is not suspended");
3253 }
3254 }
3255 Ok(())
3256 }
3257
3258 fn add_guest_thread_to_instance_table(
3259 self,
3260 thread_id: TableId<GuestThread>,
3261 store: &mut StoreOpaque,
3262 runtime_instance: RuntimeComponentInstanceIndex,
3263 ) -> Result<u32> {
3264 let guest_id = store
3265 .instance_state(RuntimeInstance {
3266 instance: self.id().instance(),
3267 index: runtime_instance,
3268 })
3269 .thread_handle_table()
3270 .guest_thread_insert(thread_id.rep())?;
3271 store
3272 .concurrent_state_mut()
3273 .get_mut(thread_id)?
3274 .instance_rep = Some(guest_id);
3275 Ok(guest_id)
3276 }
3277
3278 pub(crate) fn suspension_intrinsic(
3281 self,
3282 store: &mut StoreOpaque,
3283 caller: RuntimeComponentInstanceIndex,
3284 cancellable: bool,
3285 yielding: bool,
3286 to_thread: Option<u32>,
3287 ) -> Result<WaitResult> {
3288 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3289 if to_thread.is_none() {
3290 let state = store.concurrent_state_mut();
3291 if yielding {
3292 if !state.may_block(guest_thread.task) {
3294 return Ok(WaitResult::Completed);
3298 }
3299 } else {
3300 store.check_blocking()?;
3304 }
3305 }
3306
3307 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3309 return Ok(WaitResult::Cancelled);
3310 }
3311
3312 if let Some(thread) = to_thread {
3313 self.resume_suspended_thread(store, caller, thread, true)?;
3314 }
3315
3316 let reason = if yielding {
3317 SuspendReason::Yielding {
3318 thread: guest_thread,
3319 skip_may_block_check: to_thread.is_some(),
3323 }
3324 } else {
3325 SuspendReason::ExplicitlySuspending {
3326 thread: guest_thread,
3327 skip_may_block_check: to_thread.is_some(),
3331 }
3332 };
3333
3334 store.suspend(reason)?;
3335
3336 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3337 Ok(WaitResult::Cancelled)
3338 } else {
3339 Ok(WaitResult::Completed)
3340 }
3341 }
3342
3343 fn waitable_check(
3345 self,
3346 store: &mut StoreOpaque,
3347 cancellable: bool,
3348 check: WaitableCheck,
3349 params: WaitableCheckParams,
3350 ) -> Result<u32> {
3351 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3352
3353 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3354
3355 let state = store.concurrent_state_mut();
3356 let task = state.get_mut(guest_thread.task)?;
3357
3358 match &check {
3361 WaitableCheck::Wait => {
3362 let set = params.set;
3363
3364 if (task.event.is_none()
3365 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3366 && state.get_mut(set)?.ready.is_empty()
3367 {
3368 if cancellable {
3369 let old = state
3370 .get_mut(guest_thread.thread)?
3371 .wake_on_cancel
3372 .replace(set);
3373 assert!(old.is_none());
3374 }
3375
3376 store.suspend(SuspendReason::Waiting {
3377 set,
3378 thread: guest_thread,
3379 skip_may_block_check: false,
3380 })?;
3381 }
3382 }
3383 WaitableCheck::Poll => {}
3384 }
3385
3386 log::trace!(
3387 "waitable check for {guest_thread:?}; set {:?}, part two",
3388 params.set
3389 );
3390
3391 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3393
3394 let (ordinal, handle, result) = match &check {
3395 WaitableCheck::Wait => {
3396 let (event, waitable) = event.unwrap();
3397 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3398 let (ordinal, result) = event.parts();
3399 (ordinal, handle, result)
3400 }
3401 WaitableCheck::Poll => {
3402 if let Some((event, waitable)) = event {
3403 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3404 let (ordinal, result) = event.parts();
3405 (ordinal, handle, result)
3406 } else {
3407 log::trace!(
3408 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3409 guest_thread.task,
3410 params.set
3411 );
3412 let (ordinal, result) = Event::None.parts();
3413 (ordinal, 0, result)
3414 }
3415 }
3416 };
3417 let memory = self.options_memory_mut(store, params.options);
3418 let ptr = func::validate_inbounds_dynamic(
3419 &CanonicalAbiInfo::POINTER_PAIR,
3420 memory,
3421 &ValRaw::u32(params.payload),
3422 )?;
3423 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3424 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3425 Ok(ordinal)
3426 }
3427
3428 pub(crate) fn subtask_cancel(
3430 self,
3431 store: &mut StoreOpaque,
3432 caller_instance: RuntimeComponentInstanceIndex,
3433 async_: bool,
3434 task_id: u32,
3435 ) -> Result<u32> {
3436 if !async_ {
3437 store.check_blocking()?;
3441 }
3442
3443 let (rep, is_host) = store
3444 .instance_state(RuntimeInstance {
3445 instance: self.id().instance(),
3446 index: caller_instance,
3447 })
3448 .handle_table()
3449 .subtask_rep(task_id)?;
3450 let (waitable, expected_caller) = if is_host {
3451 let id = TableId::<HostTask>::new(rep);
3452 (
3453 Waitable::Host(id),
3454 store.concurrent_state_mut().get_mut(id)?.caller,
3455 )
3456 } else {
3457 let id = TableId::<GuestTask>::new(rep);
3458 if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3459 (Waitable::Guest(id), thread)
3460 } else {
3461 unreachable!()
3462 }
3463 };
3464 let concurrent_state = store.concurrent_state_mut();
3468 assert_eq!(
3469 expected_caller,
3470 concurrent_state.unwrap_current_guest_thread(),
3471 );
3472
3473 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3474
3475 if let Waitable::Host(host_task) = waitable {
3476 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3477 handle.abort();
3478 return Ok(Status::ReturnCancelled as u32);
3479 }
3480 } else {
3481 let caller = concurrent_state.unwrap_current_guest_thread();
3482 let guest_task = TableId::<GuestTask>::new(rep);
3483 let task = concurrent_state.get_mut(guest_task)?;
3484 if !task.already_lowered_parameters() {
3485 task.lower_params = None;
3489 task.lift_result = None;
3490 task.exited = true;
3491
3492 let instance = task.instance;
3493
3494 assert_eq!(1, task.threads.len());
3495 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3496 let concurrent_state = store.concurrent_state_mut();
3497 concurrent_state.delete(thread)?;
3498 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3499
3500 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3502 let pending_count = pending.len();
3503 pending.retain(|thread, _| thread.task != guest_task);
3504 if pending.len() == pending_count {
3506 bail!("`subtask.cancel` called after terminal status delivered");
3507 }
3508 return Ok(Status::StartCancelled as u32);
3509 } else if !task.returned_or_cancelled() {
3510 task.cancel_sent = true;
3513 task.event = Some(Event::Cancelled);
3518 for thread in task.threads.clone() {
3519 let thread = QualifiedThreadId {
3520 task: guest_task,
3521 thread,
3522 };
3523 if let Some(set) = concurrent_state
3524 .get_mut(thread.thread)
3525 .unwrap()
3526 .wake_on_cancel
3527 .take()
3528 {
3529 let item = match concurrent_state
3530 .get_mut(set)?
3531 .waiting
3532 .remove(&thread)
3533 .unwrap()
3534 {
3535 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3536 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3537 thread,
3538 kind: GuestCallKind::DeliverEvent {
3539 instance,
3540 set: None,
3541 },
3542 }),
3543 };
3544 concurrent_state.push_high_priority(item);
3545
3546 store.suspend(SuspendReason::Yielding {
3547 thread: caller,
3548 skip_may_block_check: false,
3551 })?;
3552 break;
3553 }
3554 }
3555
3556 let concurrent_state = store.concurrent_state_mut();
3557 let task = concurrent_state.get_mut(guest_task)?;
3558 if !task.returned_or_cancelled() {
3559 if async_ {
3560 return Ok(BLOCKED);
3561 } else {
3562 store.wait_for_event(Waitable::Guest(guest_task))?;
3563 }
3564 }
3565 }
3566 }
3567
3568 let event = waitable.take_event(store.concurrent_state_mut())?;
3569 if let Some(Event::Subtask {
3570 status: status @ (Status::Returned | Status::ReturnCancelled),
3571 }) = event
3572 {
3573 Ok(status as u32)
3574 } else {
3575 bail!("`subtask.cancel` called after terminal status delivered");
3576 }
3577 }
3578
3579 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3580 store.concurrent_state_mut().context_get(slot)
3581 }
3582
3583 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3584 store.concurrent_state_mut().context_set(slot, value)
3585 }
3586}
3587
3588pub trait VMComponentAsyncStore {
3596 unsafe fn prepare_call(
3602 &mut self,
3603 instance: Instance,
3604 memory: *mut VMMemoryDefinition,
3605 start: *mut VMFuncRef,
3606 return_: *mut VMFuncRef,
3607 caller_instance: RuntimeComponentInstanceIndex,
3608 callee_instance: RuntimeComponentInstanceIndex,
3609 task_return_type: TypeTupleIndex,
3610 callee_async: bool,
3611 string_encoding: u8,
3612 result_count: u32,
3613 storage: *mut ValRaw,
3614 storage_len: usize,
3615 ) -> Result<()>;
3616
3617 unsafe fn sync_start(
3620 &mut self,
3621 instance: Instance,
3622 callback: *mut VMFuncRef,
3623 callee: *mut VMFuncRef,
3624 param_count: u32,
3625 storage: *mut MaybeUninit<ValRaw>,
3626 storage_len: usize,
3627 ) -> Result<()>;
3628
3629 unsafe fn async_start(
3632 &mut self,
3633 instance: Instance,
3634 callback: *mut VMFuncRef,
3635 post_return: *mut VMFuncRef,
3636 callee: *mut VMFuncRef,
3637 param_count: u32,
3638 result_count: u32,
3639 flags: u32,
3640 ) -> Result<u32>;
3641
3642 fn future_write(
3644 &mut self,
3645 instance: Instance,
3646 caller: RuntimeComponentInstanceIndex,
3647 ty: TypeFutureTableIndex,
3648 options: OptionsIndex,
3649 future: u32,
3650 address: u32,
3651 ) -> Result<u32>;
3652
3653 fn future_read(
3655 &mut self,
3656 instance: Instance,
3657 caller: RuntimeComponentInstanceIndex,
3658 ty: TypeFutureTableIndex,
3659 options: OptionsIndex,
3660 future: u32,
3661 address: u32,
3662 ) -> Result<u32>;
3663
3664 fn future_drop_writable(
3666 &mut self,
3667 instance: Instance,
3668 ty: TypeFutureTableIndex,
3669 writer: u32,
3670 ) -> Result<()>;
3671
3672 fn stream_write(
3674 &mut self,
3675 instance: Instance,
3676 caller: RuntimeComponentInstanceIndex,
3677 ty: TypeStreamTableIndex,
3678 options: OptionsIndex,
3679 stream: u32,
3680 address: u32,
3681 count: u32,
3682 ) -> Result<u32>;
3683
3684 fn stream_read(
3686 &mut self,
3687 instance: Instance,
3688 caller: RuntimeComponentInstanceIndex,
3689 ty: TypeStreamTableIndex,
3690 options: OptionsIndex,
3691 stream: u32,
3692 address: u32,
3693 count: u32,
3694 ) -> Result<u32>;
3695
3696 fn flat_stream_write(
3699 &mut self,
3700 instance: Instance,
3701 caller: RuntimeComponentInstanceIndex,
3702 ty: TypeStreamTableIndex,
3703 options: OptionsIndex,
3704 payload_size: u32,
3705 payload_align: u32,
3706 stream: u32,
3707 address: u32,
3708 count: u32,
3709 ) -> Result<u32>;
3710
3711 fn flat_stream_read(
3714 &mut self,
3715 instance: Instance,
3716 caller: RuntimeComponentInstanceIndex,
3717 ty: TypeStreamTableIndex,
3718 options: OptionsIndex,
3719 payload_size: u32,
3720 payload_align: u32,
3721 stream: u32,
3722 address: u32,
3723 count: u32,
3724 ) -> Result<u32>;
3725
3726 fn stream_drop_writable(
3728 &mut self,
3729 instance: Instance,
3730 ty: TypeStreamTableIndex,
3731 writer: u32,
3732 ) -> Result<()>;
3733
3734 fn error_context_debug_message(
3736 &mut self,
3737 instance: Instance,
3738 ty: TypeComponentLocalErrorContextTableIndex,
3739 options: OptionsIndex,
3740 err_ctx_handle: u32,
3741 debug_msg_address: u32,
3742 ) -> Result<()>;
3743
3744 fn thread_new_indirect(
3746 &mut self,
3747 instance: Instance,
3748 caller: RuntimeComponentInstanceIndex,
3749 func_ty_idx: TypeFuncIndex,
3750 start_func_table_idx: RuntimeTableIndex,
3751 start_func_idx: u32,
3752 context: i32,
3753 ) -> Result<u32>;
3754}
3755
3756impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3758 unsafe fn prepare_call(
3759 &mut self,
3760 instance: Instance,
3761 memory: *mut VMMemoryDefinition,
3762 start: *mut VMFuncRef,
3763 return_: *mut VMFuncRef,
3764 caller_instance: RuntimeComponentInstanceIndex,
3765 callee_instance: RuntimeComponentInstanceIndex,
3766 task_return_type: TypeTupleIndex,
3767 callee_async: bool,
3768 string_encoding: u8,
3769 result_count_or_max_if_async: u32,
3770 storage: *mut ValRaw,
3771 storage_len: usize,
3772 ) -> Result<()> {
3773 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3777
3778 unsafe {
3779 instance.prepare_call(
3780 StoreContextMut(self),
3781 start,
3782 return_,
3783 caller_instance,
3784 callee_instance,
3785 task_return_type,
3786 callee_async,
3787 memory,
3788 string_encoding,
3789 match result_count_or_max_if_async {
3790 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3791 params,
3792 has_result: false,
3793 },
3794 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3795 params,
3796 has_result: true,
3797 },
3798 result_count => CallerInfo::Sync {
3799 params,
3800 result_count,
3801 },
3802 },
3803 )
3804 }
3805 }
3806
3807 unsafe fn sync_start(
3808 &mut self,
3809 instance: Instance,
3810 callback: *mut VMFuncRef,
3811 callee: *mut VMFuncRef,
3812 param_count: u32,
3813 storage: *mut MaybeUninit<ValRaw>,
3814 storage_len: usize,
3815 ) -> Result<()> {
3816 unsafe {
3817 instance
3818 .start_call(
3819 StoreContextMut(self),
3820 callback,
3821 ptr::null_mut(),
3822 callee,
3823 param_count,
3824 1,
3825 START_FLAG_ASYNC_CALLEE,
3826 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3830 )
3831 .map(drop)
3832 }
3833 }
3834
3835 unsafe fn async_start(
3836 &mut self,
3837 instance: Instance,
3838 callback: *mut VMFuncRef,
3839 post_return: *mut VMFuncRef,
3840 callee: *mut VMFuncRef,
3841 param_count: u32,
3842 result_count: u32,
3843 flags: u32,
3844 ) -> Result<u32> {
3845 unsafe {
3846 instance.start_call(
3847 StoreContextMut(self),
3848 callback,
3849 post_return,
3850 callee,
3851 param_count,
3852 result_count,
3853 flags,
3854 None,
3855 )
3856 }
3857 }
3858
3859 fn future_write(
3860 &mut self,
3861 instance: Instance,
3862 caller: RuntimeComponentInstanceIndex,
3863 ty: TypeFutureTableIndex,
3864 options: OptionsIndex,
3865 future: u32,
3866 address: u32,
3867 ) -> Result<u32> {
3868 instance
3869 .guest_write(
3870 StoreContextMut(self),
3871 caller,
3872 TransmitIndex::Future(ty),
3873 options,
3874 None,
3875 future,
3876 address,
3877 1,
3878 )
3879 .map(|result| result.encode())
3880 }
3881
3882 fn future_read(
3883 &mut self,
3884 instance: Instance,
3885 caller: RuntimeComponentInstanceIndex,
3886 ty: TypeFutureTableIndex,
3887 options: OptionsIndex,
3888 future: u32,
3889 address: u32,
3890 ) -> Result<u32> {
3891 instance
3892 .guest_read(
3893 StoreContextMut(self),
3894 caller,
3895 TransmitIndex::Future(ty),
3896 options,
3897 None,
3898 future,
3899 address,
3900 1,
3901 )
3902 .map(|result| result.encode())
3903 }
3904
3905 fn stream_write(
3906 &mut self,
3907 instance: Instance,
3908 caller: RuntimeComponentInstanceIndex,
3909 ty: TypeStreamTableIndex,
3910 options: OptionsIndex,
3911 stream: u32,
3912 address: u32,
3913 count: u32,
3914 ) -> Result<u32> {
3915 instance
3916 .guest_write(
3917 StoreContextMut(self),
3918 caller,
3919 TransmitIndex::Stream(ty),
3920 options,
3921 None,
3922 stream,
3923 address,
3924 count,
3925 )
3926 .map(|result| result.encode())
3927 }
3928
3929 fn stream_read(
3930 &mut self,
3931 instance: Instance,
3932 caller: RuntimeComponentInstanceIndex,
3933 ty: TypeStreamTableIndex,
3934 options: OptionsIndex,
3935 stream: u32,
3936 address: u32,
3937 count: u32,
3938 ) -> Result<u32> {
3939 instance
3940 .guest_read(
3941 StoreContextMut(self),
3942 caller,
3943 TransmitIndex::Stream(ty),
3944 options,
3945 None,
3946 stream,
3947 address,
3948 count,
3949 )
3950 .map(|result| result.encode())
3951 }
3952
3953 fn future_drop_writable(
3954 &mut self,
3955 instance: Instance,
3956 ty: TypeFutureTableIndex,
3957 writer: u32,
3958 ) -> Result<()> {
3959 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3960 }
3961
3962 fn flat_stream_write(
3963 &mut self,
3964 instance: Instance,
3965 caller: RuntimeComponentInstanceIndex,
3966 ty: TypeStreamTableIndex,
3967 options: OptionsIndex,
3968 payload_size: u32,
3969 payload_align: u32,
3970 stream: u32,
3971 address: u32,
3972 count: u32,
3973 ) -> Result<u32> {
3974 instance
3975 .guest_write(
3976 StoreContextMut(self),
3977 caller,
3978 TransmitIndex::Stream(ty),
3979 options,
3980 Some(FlatAbi {
3981 size: payload_size,
3982 align: payload_align,
3983 }),
3984 stream,
3985 address,
3986 count,
3987 )
3988 .map(|result| result.encode())
3989 }
3990
3991 fn flat_stream_read(
3992 &mut self,
3993 instance: Instance,
3994 caller: RuntimeComponentInstanceIndex,
3995 ty: TypeStreamTableIndex,
3996 options: OptionsIndex,
3997 payload_size: u32,
3998 payload_align: u32,
3999 stream: u32,
4000 address: u32,
4001 count: u32,
4002 ) -> Result<u32> {
4003 instance
4004 .guest_read(
4005 StoreContextMut(self),
4006 caller,
4007 TransmitIndex::Stream(ty),
4008 options,
4009 Some(FlatAbi {
4010 size: payload_size,
4011 align: payload_align,
4012 }),
4013 stream,
4014 address,
4015 count,
4016 )
4017 .map(|result| result.encode())
4018 }
4019
4020 fn stream_drop_writable(
4021 &mut self,
4022 instance: Instance,
4023 ty: TypeStreamTableIndex,
4024 writer: u32,
4025 ) -> Result<()> {
4026 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4027 }
4028
4029 fn error_context_debug_message(
4030 &mut self,
4031 instance: Instance,
4032 ty: TypeComponentLocalErrorContextTableIndex,
4033 options: OptionsIndex,
4034 err_ctx_handle: u32,
4035 debug_msg_address: u32,
4036 ) -> Result<()> {
4037 instance.error_context_debug_message(
4038 StoreContextMut(self),
4039 ty,
4040 options,
4041 err_ctx_handle,
4042 debug_msg_address,
4043 )
4044 }
4045
4046 fn thread_new_indirect(
4047 &mut self,
4048 instance: Instance,
4049 caller: RuntimeComponentInstanceIndex,
4050 func_ty_idx: TypeFuncIndex,
4051 start_func_table_idx: RuntimeTableIndex,
4052 start_func_idx: u32,
4053 context: i32,
4054 ) -> Result<u32> {
4055 instance.thread_new_indirect(
4056 StoreContextMut(self),
4057 caller,
4058 func_ty_idx,
4059 start_func_table_idx,
4060 start_func_idx,
4061 context,
4062 )
4063 }
4064}
4065
4066type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4067
4068struct HostTask {
4072 common: WaitableCommon,
4073
4074 caller: QualifiedThreadId,
4076
4077 call_context: CallContext,
4080
4081 join_handle: Option<JoinHandle>,
4086
4087 result: Option<LiftedResult>,
4089}
4090
4091impl HostTask {
4092 fn new(caller: QualifiedThreadId) -> Self {
4093 Self {
4094 common: WaitableCommon::default(),
4095 call_context: CallContext::default(),
4096 caller,
4097 join_handle: None,
4098 result: None,
4099 }
4100 }
4101}
4102
4103impl TableDebug for HostTask {
4104 fn type_name() -> &'static str {
4105 "HostTask"
4106 }
4107}
4108
4109type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4110
4111enum Caller {
4113 Host {
4115 tx: Option<oneshot::Sender<LiftedResult>>,
4117 exit_tx: Arc<oneshot::Sender<()>>,
4124 host_future_present: bool,
4127 caller: CurrentThread,
4131 },
4132 Guest {
4134 thread: QualifiedThreadId,
4136 },
4137}
4138
4139struct LiftResult {
4142 lift: RawLift,
4143 ty: TypeTupleIndex,
4144 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4145 string_encoding: StringEncoding,
4146}
4147
4148#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4153struct QualifiedThreadId {
4154 task: TableId<GuestTask>,
4155 thread: TableId<GuestThread>,
4156}
4157
4158impl QualifiedThreadId {
4159 fn qualify(
4160 state: &mut ConcurrentState,
4161 thread: TableId<GuestThread>,
4162 ) -> Result<QualifiedThreadId> {
4163 Ok(QualifiedThreadId {
4164 task: state.get_mut(thread)?.parent_task,
4165 thread,
4166 })
4167 }
4168}
4169
4170impl fmt::Debug for QualifiedThreadId {
4171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4172 f.debug_tuple("QualifiedThreadId")
4173 .field(&self.task.rep())
4174 .field(&self.thread.rep())
4175 .finish()
4176 }
4177}
4178
4179enum GuestThreadState {
4180 NotStartedImplicit,
4181 NotStartedExplicit(
4182 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4183 ),
4184 Running,
4185 Suspended(StoreFiber<'static>),
4186 Pending,
4187 Completed,
4188}
4189pub struct GuestThread {
4190 context: [u32; 2],
4193 parent_task: TableId<GuestTask>,
4195 wake_on_cancel: Option<TableId<WaitableSet>>,
4198 state: GuestThreadState,
4200 instance_rep: Option<u32>,
4203}
4204
4205impl GuestThread {
4206 fn from_instance(
4209 state: Pin<&mut ComponentInstance>,
4210 caller_instance: RuntimeComponentInstanceIndex,
4211 guest_thread: u32,
4212 ) -> Result<TableId<Self>> {
4213 let rep = state.instance_states().0[caller_instance]
4214 .thread_handle_table()
4215 .guest_thread_rep(guest_thread)?;
4216 Ok(TableId::new(rep))
4217 }
4218
4219 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4220 Self {
4221 context: [0; 2],
4222 parent_task,
4223 wake_on_cancel: None,
4224 state: GuestThreadState::NotStartedImplicit,
4225 instance_rep: None,
4226 }
4227 }
4228
4229 fn new_explicit(
4230 parent_task: TableId<GuestTask>,
4231 start_func: Box<
4232 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4233 >,
4234 ) -> Self {
4235 Self {
4236 context: [0; 2],
4237 parent_task,
4238 wake_on_cancel: None,
4239 state: GuestThreadState::NotStartedExplicit(start_func),
4240 instance_rep: None,
4241 }
4242 }
4243}
4244
4245impl TableDebug for GuestThread {
4246 fn type_name() -> &'static str {
4247 "GuestThread"
4248 }
4249}
4250
4251enum SyncResult {
4252 NotProduced,
4253 Produced(Option<ValRaw>),
4254 Taken,
4255}
4256
4257impl SyncResult {
4258 fn take(&mut self) -> Option<Option<ValRaw>> {
4259 match mem::replace(self, SyncResult::Taken) {
4260 SyncResult::NotProduced => None,
4261 SyncResult::Produced(val) => Some(val),
4262 SyncResult::Taken => {
4263 panic!("attempted to take a synchronous result that was already taken")
4264 }
4265 }
4266 }
4267}
4268
4269#[derive(Debug)]
4270enum HostFutureState {
4271 NotApplicable,
4272 Live,
4273 Dropped,
4274}
4275
4276pub(crate) struct GuestTask {
4278 common: WaitableCommon,
4280 lower_params: Option<RawLower>,
4282 lift_result: Option<LiftResult>,
4284 result: Option<LiftedResult>,
4287 callback: Option<CallbackFn>,
4290 caller: Caller,
4292 call_context: CallContext,
4297 sync_result: SyncResult,
4300 cancel_sent: bool,
4303 starting_sent: bool,
4306 subtasks: HashSet<TableId<GuestTask>>,
4311 sync_call_set: TableId<WaitableSet>,
4313 instance: RuntimeInstance,
4320 event: Option<Event>,
4323 exited: bool,
4325 threads: HashSet<TableId<GuestThread>>,
4327 host_future_state: HostFutureState,
4330 async_function: bool,
4333}
4334
4335impl GuestTask {
4336 fn already_lowered_parameters(&self) -> bool {
4337 self.lower_params.is_none()
4339 }
4340
4341 fn returned_or_cancelled(&self) -> bool {
4342 self.lift_result.is_none()
4344 }
4345
4346 fn ready_to_delete(&self) -> bool {
4347 let threads_completed = self.threads.is_empty();
4348 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4349 let pending_completion_event = matches!(
4350 self.common.event,
4351 Some(Event::Subtask {
4352 status: Status::Returned | Status::ReturnCancelled
4353 })
4354 );
4355 let ready = threads_completed
4356 && !has_sync_result
4357 && !pending_completion_event
4358 && !matches!(self.host_future_state, HostFutureState::Live);
4359 log::trace!(
4360 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4361 threads_completed,
4362 has_sync_result,
4363 pending_completion_event,
4364 self.host_future_state
4365 );
4366 ready
4367 }
4368
4369 fn new(
4370 state: &mut ConcurrentState,
4371 lower_params: RawLower,
4372 lift_result: LiftResult,
4373 caller: Caller,
4374 callback: Option<CallbackFn>,
4375 instance: RuntimeInstance,
4376 async_function: bool,
4377 ) -> Result<Self> {
4378 let sync_call_set = state.push(WaitableSet::default())?;
4379 let host_future_state = match &caller {
4380 Caller::Guest { .. } => HostFutureState::NotApplicable,
4381 Caller::Host {
4382 host_future_present,
4383 ..
4384 } => {
4385 if *host_future_present {
4386 HostFutureState::Live
4387 } else {
4388 HostFutureState::NotApplicable
4389 }
4390 }
4391 };
4392 Ok(Self {
4393 common: WaitableCommon::default(),
4394 lower_params: Some(lower_params),
4395 lift_result: Some(lift_result),
4396 result: None,
4397 callback,
4398 caller,
4399 call_context: CallContext::default(),
4400 sync_result: SyncResult::NotProduced,
4401 cancel_sent: false,
4402 starting_sent: false,
4403 subtasks: HashSet::new(),
4404 sync_call_set,
4405 instance,
4406 event: None,
4407 exited: false,
4408 threads: HashSet::new(),
4409 host_future_state,
4410 async_function,
4411 })
4412 }
4413
4414 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4417 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4420 if let Some(Event::Subtask {
4421 status: Status::Returned | Status::ReturnCancelled,
4422 }) = waitable.common(state)?.event
4423 {
4424 waitable.delete_from(state)?;
4425 }
4426 }
4427
4428 assert!(self.threads.is_empty());
4429
4430 state.delete(self.sync_call_set)?;
4431
4432 match &self.caller {
4434 Caller::Guest { thread } => {
4435 let task_mut = state.get_mut(thread.task)?;
4436 let present = task_mut.subtasks.remove(&me);
4437 assert!(present);
4438
4439 for subtask in &self.subtasks {
4440 task_mut.subtasks.insert(*subtask);
4441 }
4442
4443 for subtask in &self.subtasks {
4444 state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4445 }
4446 }
4447 Caller::Host {
4448 exit_tx, caller, ..
4449 } => {
4450 for subtask in &self.subtasks {
4451 state.get_mut(*subtask)?.caller = Caller::Host {
4452 tx: None,
4453 exit_tx: exit_tx.clone(),
4457 host_future_present: false,
4458 caller: *caller,
4459 };
4460 }
4461 }
4462 }
4463
4464 for subtask in self.subtasks {
4465 let task = state.get_mut(subtask)?;
4466 if task.exited && task.ready_to_delete() {
4467 Waitable::Guest(subtask).delete_from(state)?;
4468 }
4469 }
4470
4471 Ok(())
4472 }
4473}
4474
4475impl TableDebug for GuestTask {
4476 fn type_name() -> &'static str {
4477 "GuestTask"
4478 }
4479}
4480
4481#[derive(Default)]
4483struct WaitableCommon {
4484 event: Option<Event>,
4486 set: Option<TableId<WaitableSet>>,
4488 handle: Option<u32>,
4490}
4491
4492#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4494enum Waitable {
4495 Host(TableId<HostTask>),
4497 Guest(TableId<GuestTask>),
4499 Transmit(TableId<TransmitHandle>),
4501}
4502
4503impl Waitable {
4504 fn from_instance(
4507 state: Pin<&mut ComponentInstance>,
4508 caller_instance: RuntimeComponentInstanceIndex,
4509 waitable: u32,
4510 ) -> Result<Self> {
4511 use crate::runtime::vm::component::Waitable;
4512
4513 let (waitable, kind) = state.instance_states().0[caller_instance]
4514 .handle_table()
4515 .waitable_rep(waitable)?;
4516
4517 Ok(match kind {
4518 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4519 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4520 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4521 })
4522 }
4523
4524 fn rep(&self) -> u32 {
4526 match self {
4527 Self::Host(id) => id.rep(),
4528 Self::Guest(id) => id.rep(),
4529 Self::Transmit(id) => id.rep(),
4530 }
4531 }
4532
4533 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4537 log::trace!("waitable {self:?} join set {set:?}",);
4538
4539 let old = mem::replace(&mut self.common(state)?.set, set);
4540
4541 if let Some(old) = old {
4542 match *self {
4543 Waitable::Host(id) => state.remove_child(id, old),
4544 Waitable::Guest(id) => state.remove_child(id, old),
4545 Waitable::Transmit(id) => state.remove_child(id, old),
4546 }?;
4547
4548 state.get_mut(old)?.ready.remove(self);
4549 }
4550
4551 if let Some(set) = set {
4552 match *self {
4553 Waitable::Host(id) => state.add_child(id, set),
4554 Waitable::Guest(id) => state.add_child(id, set),
4555 Waitable::Transmit(id) => state.add_child(id, set),
4556 }?;
4557
4558 if self.common(state)?.event.is_some() {
4559 self.mark_ready(state)?;
4560 }
4561 }
4562
4563 Ok(())
4564 }
4565
4566 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4568 Ok(match self {
4569 Self::Host(id) => &mut state.get_mut(*id)?.common,
4570 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4571 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4572 })
4573 }
4574
4575 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4579 log::trace!("set event for {self:?}: {event:?}");
4580 self.common(state)?.event = event;
4581 self.mark_ready(state)
4582 }
4583
4584 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4586 let common = self.common(state)?;
4587 let event = common.event.take();
4588 if let Some(set) = self.common(state)?.set {
4589 state.get_mut(set)?.ready.remove(self);
4590 }
4591
4592 Ok(event)
4593 }
4594
4595 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4599 if let Some(set) = self.common(state)?.set {
4600 state.get_mut(set)?.ready.insert(*self);
4601 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4602 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4603 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4604
4605 let item = match mode {
4606 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4607 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4608 thread,
4609 kind: GuestCallKind::DeliverEvent {
4610 instance,
4611 set: Some(set),
4612 },
4613 }),
4614 };
4615 state.push_high_priority(item);
4616 }
4617 }
4618 Ok(())
4619 }
4620
4621 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4623 match self {
4624 Self::Host(task) => {
4625 log::trace!("delete host task {task:?}");
4626 state.delete(*task)?;
4627 }
4628 Self::Guest(task) => {
4629 log::trace!("delete guest task {task:?}");
4630 state.delete(*task)?.dispose(state, *task)?;
4631 }
4632 Self::Transmit(task) => {
4633 state.delete(*task)?;
4634 }
4635 }
4636
4637 Ok(())
4638 }
4639}
4640
4641impl fmt::Debug for Waitable {
4642 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4643 match self {
4644 Self::Host(id) => write!(f, "{id:?}"),
4645 Self::Guest(id) => write!(f, "{id:?}"),
4646 Self::Transmit(id) => write!(f, "{id:?}"),
4647 }
4648 }
4649}
4650
4651#[derive(Default)]
4653struct WaitableSet {
4654 ready: BTreeSet<Waitable>,
4656 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4658}
4659
4660impl TableDebug for WaitableSet {
4661 fn type_name() -> &'static str {
4662 "WaitableSet"
4663 }
4664}
4665
4666type RawLower =
4668 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4669
4670type RawLift = Box<
4672 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4673>;
4674
4675type LiftedResult = Box<dyn Any + Send + Sync>;
4679
4680struct DummyResult;
4683
4684#[derive(Default)]
4686pub struct ConcurrentInstanceState {
4687 backpressure: u16,
4689 do_not_enter: bool,
4691 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4694}
4695
4696impl ConcurrentInstanceState {
4697 pub fn pending_is_empty(&self) -> bool {
4698 self.pending.is_empty()
4699 }
4700}
4701
4702#[derive(Debug, Copy, Clone)]
4703enum CurrentThread {
4704 Guest(QualifiedThreadId),
4705 Host(TableId<HostTask>),
4706 None,
4707}
4708
4709impl CurrentThread {
4710 fn guest(&self) -> Option<&QualifiedThreadId> {
4711 match self {
4712 Self::Guest(id) => Some(id),
4713 _ => None,
4714 }
4715 }
4716
4717 fn host(&self) -> Option<TableId<HostTask>> {
4718 match self {
4719 Self::Host(id) => Some(*id),
4720 _ => None,
4721 }
4722 }
4723
4724 fn is_none(&self) -> bool {
4725 matches!(self, Self::None)
4726 }
4727}
4728
4729impl From<QualifiedThreadId> for CurrentThread {
4730 fn from(id: QualifiedThreadId) -> Self {
4731 Self::Guest(id)
4732 }
4733}
4734
4735impl From<TableId<HostTask>> for CurrentThread {
4736 fn from(id: TableId<HostTask>) -> Self {
4737 Self::Host(id)
4738 }
4739}
4740
4741pub struct ConcurrentState {
4743 current_thread: CurrentThread,
4745
4746 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4751 table: AlwaysMut<ResourceTable>,
4753 high_priority: Vec<WorkItem>,
4755 low_priority: VecDeque<WorkItem>,
4757 suspend_reason: Option<SuspendReason>,
4761 worker: Option<StoreFiber<'static>>,
4765 worker_item: Option<WorkerItem>,
4767
4768 global_error_context_ref_counts:
4781 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4782}
4783
4784impl Default for ConcurrentState {
4785 fn default() -> Self {
4786 Self {
4787 current_thread: CurrentThread::None,
4788 table: AlwaysMut::new(ResourceTable::new()),
4789 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4790 high_priority: Vec::new(),
4791 low_priority: VecDeque::new(),
4792 suspend_reason: None,
4793 worker: None,
4794 worker_item: None,
4795 global_error_context_ref_counts: BTreeMap::new(),
4796 }
4797 }
4798}
4799
4800impl ConcurrentState {
4801 pub(crate) fn take_fibers_and_futures(
4818 &mut self,
4819 fibers: &mut Vec<StoreFiber<'static>>,
4820 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4821 ) {
4822 for entry in self.table.get_mut().iter_mut() {
4823 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4824 for mode in mem::take(&mut set.waiting).into_values() {
4825 if let WaitMode::Fiber(fiber) = mode {
4826 fibers.push(fiber);
4827 }
4828 }
4829 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4830 if let GuestThreadState::Suspended(fiber) =
4831 mem::replace(&mut thread.state, GuestThreadState::Completed)
4832 {
4833 fibers.push(fiber);
4834 }
4835 }
4836 }
4837
4838 if let Some(fiber) = self.worker.take() {
4839 fibers.push(fiber);
4840 }
4841
4842 let mut handle_item = |item| match item {
4843 WorkItem::ResumeFiber(fiber) => {
4844 fibers.push(fiber);
4845 }
4846 WorkItem::PushFuture(future) => {
4847 self.futures
4848 .get_mut()
4849 .as_mut()
4850 .unwrap()
4851 .push(future.into_inner());
4852 }
4853 _ => {}
4854 };
4855
4856 for item in mem::take(&mut self.high_priority) {
4857 handle_item(item);
4858 }
4859 for item in mem::take(&mut self.low_priority) {
4860 handle_item(item);
4861 }
4862
4863 if let Some(them) = self.futures.get_mut().take() {
4864 futures.push(them);
4865 }
4866 }
4867
4868 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4872 let mut ready = mem::take(&mut self.high_priority);
4873 if ready.is_empty() {
4874 if let Some(item) = self.low_priority.pop_back() {
4875 ready.push(item);
4876 }
4877 }
4878 ready
4879 }
4880
4881 fn push<V: Send + Sync + 'static>(
4882 &mut self,
4883 value: V,
4884 ) -> Result<TableId<V>, ResourceTableError> {
4885 self.table.get_mut().push(value).map(TableId::from)
4886 }
4887
4888 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4889 self.table.get_mut().get_mut(&Resource::from(id))
4890 }
4891
4892 pub fn add_child<T: 'static, U: 'static>(
4893 &mut self,
4894 child: TableId<T>,
4895 parent: TableId<U>,
4896 ) -> Result<(), ResourceTableError> {
4897 self.table
4898 .get_mut()
4899 .add_child(Resource::from(child), Resource::from(parent))
4900 }
4901
4902 pub fn remove_child<T: 'static, U: 'static>(
4903 &mut self,
4904 child: TableId<T>,
4905 parent: TableId<U>,
4906 ) -> Result<(), ResourceTableError> {
4907 self.table
4908 .get_mut()
4909 .remove_child(Resource::from(child), Resource::from(parent))
4910 }
4911
4912 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4913 self.table.get_mut().delete(Resource::from(id))
4914 }
4915
4916 fn push_future(&mut self, future: HostTaskFuture) {
4917 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4924 }
4925
4926 fn push_high_priority(&mut self, item: WorkItem) {
4927 log::trace!("push high priority: {item:?}");
4928 self.high_priority.push(item);
4929 }
4930
4931 fn push_low_priority(&mut self, item: WorkItem) {
4932 log::trace!("push low priority: {item:?}");
4933 self.low_priority.push_front(item);
4934 }
4935
4936 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4937 if high_priority {
4938 self.push_high_priority(item);
4939 } else {
4940 self.push_low_priority(item);
4941 }
4942 }
4943
4944 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4946 let thread = self.unwrap_current_guest_thread();
4947 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4948 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4949 Ok(val)
4950 }
4951
4952 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4954 let thread = self.unwrap_current_guest_thread();
4955 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4956 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4957 Ok(())
4958 }
4959
4960 fn take_pending_cancellation(&mut self) -> bool {
4963 let thread = self.unwrap_current_guest_thread();
4964 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4965 assert!(matches!(event, Event::Cancelled));
4966 true
4967 } else {
4968 false
4969 }
4970 }
4971
4972 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4973 if self.may_block(task) {
4974 Ok(())
4975 } else {
4976 Err(Trap::CannotBlockSyncTask.into())
4977 }
4978 }
4979
4980 fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4981 let task = self.get_mut(task).unwrap();
4982 task.async_function || task.returned_or_cancelled()
4983 }
4984
4985 pub fn call_context(&mut self, task: u32) -> &mut CallContext {
4991 let (task, is_host) = (task >> 1, task & 1 == 1);
4992 if is_host {
4993 let task: TableId<HostTask> = TableId::new(task);
4994 &mut self.get_mut(task).unwrap().call_context
4995 } else {
4996 let task: TableId<GuestTask> = TableId::new(task);
4997 &mut self.get_mut(task).unwrap().call_context
4998 }
4999 }
5000
5001 pub fn current_call_context_scope_id(&self) -> u32 {
5004 let (bits, is_host) = match self.current_thread {
5005 CurrentThread::Guest(id) => (id.task.rep(), false),
5006 CurrentThread::Host(id) => (id.rep(), true),
5007 CurrentThread::None => unreachable!(),
5008 };
5009 assert_eq!((bits << 1) >> 1, bits);
5010 (bits << 1) | u32::from(is_host)
5011 }
5012
5013 fn unwrap_current_guest_thread(&self) -> QualifiedThreadId {
5014 *self.current_thread.guest().unwrap()
5015 }
5016
5017 fn unwrap_current_host_thread(&self) -> TableId<HostTask> {
5018 self.current_thread.host().unwrap()
5019 }
5020}
5021
5022fn for_any_lower<
5025 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5026>(
5027 fun: F,
5028) -> F {
5029 fun
5030}
5031
5032fn for_any_lift<
5034 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5035>(
5036 fun: F,
5037) -> F {
5038 fun
5039}
5040
5041fn checked<F: Future + Send + 'static>(
5046 id: StoreId,
5047 fut: F,
5048) -> impl Future<Output = F::Output> + Send + 'static {
5049 async move {
5050 let mut fut = pin!(fut);
5051 future::poll_fn(move |cx| {
5052 let message = "\
5053 `Future`s which depend on asynchronous component tasks, streams, or \
5054 futures to complete may only be polled from the event loop of the \
5055 store to which they belong. Please use \
5056 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5057 ";
5058 tls::try_get(|store| {
5059 let matched = match store {
5060 tls::TryGet::Some(store) => store.id() == id,
5061 tls::TryGet::Taken | tls::TryGet::None => false,
5062 };
5063
5064 if !matched {
5065 panic!("{message}")
5066 }
5067 });
5068 fut.as_mut().poll(cx)
5069 })
5070 .await
5071 }
5072}
5073
5074fn check_recursive_run() {
5077 tls::try_get(|store| {
5078 if !matches!(store, tls::TryGet::None) {
5079 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5080 }
5081 });
5082}
5083
5084fn unpack_callback_code(code: u32) -> (u32, u32) {
5085 (code & 0xF, code >> 4)
5086}
5087
5088struct WaitableCheckParams {
5092 set: TableId<WaitableSet>,
5093 options: OptionsIndex,
5094 payload: u32,
5095}
5096
5097enum WaitableCheck {
5100 Wait,
5101 Poll,
5102}
5103
5104pub(crate) struct PreparedCall<R> {
5106 handle: Func,
5108 thread: QualifiedThreadId,
5110 param_count: usize,
5112 rx: oneshot::Receiver<LiftedResult>,
5115 exit_rx: oneshot::Receiver<()>,
5118 _phantom: PhantomData<R>,
5119}
5120
5121impl<R> PreparedCall<R> {
5122 pub(crate) fn task_id(&self) -> TaskId {
5124 TaskId {
5125 task: self.thread.task,
5126 }
5127 }
5128}
5129
5130pub(crate) struct TaskId {
5132 task: TableId<GuestTask>,
5133}
5134
5135impl TaskId {
5136 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5142 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5143 if !task.already_lowered_parameters() {
5144 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5145 } else {
5146 task.host_future_state = HostFutureState::Dropped;
5147 if task.ready_to_delete() {
5148 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5149 }
5150 }
5151 Ok(())
5152 }
5153}
5154
5155pub(crate) fn prepare_call<T, R>(
5161 mut store: StoreContextMut<T>,
5162 handle: Func,
5163 param_count: usize,
5164 host_future_present: bool,
5165 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5166 + Send
5167 + Sync
5168 + 'static,
5169 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5170 + Send
5171 + Sync
5172 + 'static,
5173) -> Result<PreparedCall<R>> {
5174 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5175
5176 let instance = handle.instance().id().get(store.0);
5177 let options = &instance.component().env_component().options[options];
5178 let ty = &instance.component().types()[ty];
5179 let async_function = ty.async_;
5180 let task_return_type = ty.results;
5181 let component_instance = raw_options.instance;
5182 let callback = options.callback.map(|i| instance.runtime_callback(i));
5183 let memory = options
5184 .memory()
5185 .map(|i| instance.runtime_memory(i))
5186 .map(SendSyncPtr::new);
5187 let string_encoding = options.string_encoding;
5188 let token = StoreToken::new(store.as_context_mut());
5189 let state = store.0.concurrent_state_mut();
5190
5191 let (tx, rx) = oneshot::channel();
5192 let (exit_tx, exit_rx) = oneshot::channel();
5193
5194 let instance = RuntimeInstance {
5195 instance: handle.instance().id().instance(),
5196 index: component_instance,
5197 };
5198 let caller = state.current_thread;
5199 let task = GuestTask::new(
5200 state,
5201 Box::new(for_any_lower(move |store, params| {
5202 lower_params(handle, token.as_context_mut(store), params)
5203 })),
5204 LiftResult {
5205 lift: Box::new(for_any_lift(move |store, result| {
5206 lift_result(handle, store, result)
5207 })),
5208 ty: task_return_type,
5209 memory,
5210 string_encoding,
5211 },
5212 Caller::Host {
5213 tx: Some(tx),
5214 exit_tx: Arc::new(exit_tx),
5215 host_future_present,
5216 caller,
5217 },
5218 callback.map(|callback| {
5219 let callback = SendSyncPtr::new(callback);
5220 let instance = handle.instance();
5221 Box::new(move |store: &mut dyn VMStore, event, handle| {
5222 let store = token.as_context_mut(store);
5223 unsafe { instance.call_callback(store, callback, event, handle) }
5226 }) as CallbackFn
5227 }),
5228 instance,
5229 async_function,
5230 )?;
5231
5232 let task = state.push(task)?;
5233 let thread = state.push(GuestThread::new_implicit(task))?;
5234 state.get_mut(task)?.threads.insert(thread);
5235
5236 if !store.0.may_enter(instance) {
5237 bail!(crate::Trap::CannotEnterComponent);
5238 }
5239
5240 Ok(PreparedCall {
5241 handle,
5242 thread: QualifiedThreadId { task, thread },
5243 param_count,
5244 rx,
5245 exit_rx,
5246 _phantom: PhantomData,
5247 })
5248}
5249
5250pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5257 mut store: StoreContextMut<T>,
5258 prepared: PreparedCall<R>,
5259) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5260 let PreparedCall {
5261 handle,
5262 thread,
5263 param_count,
5264 rx,
5265 exit_rx,
5266 ..
5267 } = prepared;
5268
5269 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5270
5271 Ok(checked(
5272 store.0.id(),
5273 rx.map(move |result| {
5274 result
5275 .map(|v| (*v.downcast().unwrap(), exit_rx))
5276 .map_err(crate::Error::from)
5277 }),
5278 ))
5279}
5280
5281fn queue_call0<T: 'static>(
5284 store: StoreContextMut<T>,
5285 handle: Func,
5286 guest_thread: QualifiedThreadId,
5287 param_count: usize,
5288) -> Result<()> {
5289 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5290 let is_concurrent = raw_options.async_;
5291 let callback = raw_options.callback;
5292 let instance = handle.instance();
5293 let callee = handle.lifted_core_func(store.0);
5294 let post_return = handle.post_return_core_func(store.0);
5295 let callback = callback.map(|i| {
5296 let instance = instance.id().get(store.0);
5297 SendSyncPtr::new(instance.runtime_callback(i))
5298 });
5299
5300 log::trace!("queueing call {guest_thread:?}");
5301
5302 unsafe {
5306 instance.queue_call(
5307 store,
5308 guest_thread,
5309 SendSyncPtr::new(callee),
5310 param_count,
5311 1,
5312 is_concurrent,
5313 callback,
5314 post_return.map(SendSyncPtr::new),
5315 )
5316 }
5317}