1use crate::component::func::{self, Func, call_post_return};
54use crate::component::{
55 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64 bail, error::format_err,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::Trap;
87use wasmtime_environ::component::{
88 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
89 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
90 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
91 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
92 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
93};
94use wasmtime_environ::packed_option::ReservedValue;
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112const BLOCKED: u32 = 0xffff_ffff;
115
116#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119 Starting = 0,
120 Started = 1,
121 Returned = 2,
122 StartCancelled = 3,
123 ReturnCancelled = 4,
124}
125
126impl Status {
127 pub fn pack(self, waitable: Option<u32>) -> u32 {
133 assert!(matches!(self, Status::Returned) == waitable.is_none());
134 let waitable = waitable.unwrap_or(0);
135 assert!(waitable < (1 << 28));
136 (waitable << 4) | (self as u32)
137 }
138}
139
140#[derive(Clone, Copy, Debug)]
143enum Event {
144 None,
145 Cancelled,
146 Subtask {
147 status: Status,
148 },
149 StreamRead {
150 code: ReturnCode,
151 pending: Option<(TypeStreamTableIndex, u32)>,
152 },
153 StreamWrite {
154 code: ReturnCode,
155 pending: Option<(TypeStreamTableIndex, u32)>,
156 },
157 FutureRead {
158 code: ReturnCode,
159 pending: Option<(TypeFutureTableIndex, u32)>,
160 },
161 FutureWrite {
162 code: ReturnCode,
163 pending: Option<(TypeFutureTableIndex, u32)>,
164 },
165}
166
167impl Event {
168 fn parts(self) -> (u32, u32) {
173 const EVENT_NONE: u32 = 0;
174 const EVENT_SUBTASK: u32 = 1;
175 const EVENT_STREAM_READ: u32 = 2;
176 const EVENT_STREAM_WRITE: u32 = 3;
177 const EVENT_FUTURE_READ: u32 = 4;
178 const EVENT_FUTURE_WRITE: u32 = 5;
179 const EVENT_CANCELLED: u32 = 6;
180 match self {
181 Event::None => (EVENT_NONE, 0),
182 Event::Cancelled => (EVENT_CANCELLED, 0),
183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188 }
189 }
190}
191
192mod callback_code {
194 pub const EXIT: u32 = 0;
195 pub const YIELD: u32 = 1;
196 pub const WAIT: u32 = 2;
197}
198
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210 store: StoreContextMut<'a, T>,
211 get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216 D: HasData + ?Sized,
217 T: 'static,
218{
219 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221 Self { store, get_data }
222 }
223
224 pub fn data_mut(&mut self) -> &mut T {
226 self.store.data_mut()
227 }
228
229 pub fn get(&mut self) -> D::Data<'_> {
231 (self.get_data)(self.data_mut())
232 }
233
234 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238 where
239 T: 'static,
240 {
241 let accessor = Accessor {
242 get_data: self.get_data,
243 token: StoreToken::new(self.store.as_context_mut()),
244 };
245 self.store
246 .as_context_mut()
247 .spawn_with_accessor(accessor, task)
248 }
249
250 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253 self.get_data
254 }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259 D: HasData + ?Sized,
260 T: 'static,
261{
262 type Data = T;
263
264 fn as_context(&self) -> StoreContext<'_, T> {
265 self.store.as_context()
266 }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271 D: HasData + ?Sized,
272 T: 'static,
273{
274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275 self.store.as_context_mut()
276 }
277}
278
279pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340 D: HasData + ?Sized,
341{
342 token: StoreToken<T>,
343 get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346pub trait AsAccessor {
363 type Data: 'static;
365
366 type AccessorData: HasData + ?Sized;
369
370 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375 type Data = T::Data;
376 type AccessorData = T::AccessorData;
377
378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379 T::as_accessor(self)
380 }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384 type Data = T;
385 type AccessorData = D;
386
387 fn as_accessor(&self) -> &Accessor<T, D> {
388 self
389 }
390}
391
392const _: () = {
415 const fn assert<T: Send + Sync>() {}
416 assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420 pub(crate) fn new(token: StoreToken<T>) -> Self {
429 Self {
430 token,
431 get_data: |x| x,
432 }
433 }
434}
435
436impl<T, D> Accessor<T, D>
437where
438 D: HasData + ?Sized,
439{
440 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458 tls::get(|vmstore| {
459 fun(Access {
460 store: self.token.as_context_mut(vmstore),
461 get_data: self.get_data,
462 })
463 })
464 }
465
466 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469 self.get_data
470 }
471
472 pub fn with_getter<D2: HasData>(
489 &self,
490 get_data: fn(&mut T) -> D2::Data<'_>,
491 ) -> Accessor<T, D2> {
492 Accessor {
493 token: self.token,
494 get_data,
495 }
496 }
497
498 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514 where
515 T: 'static,
516 {
517 let accessor = self.clone_for_spawn();
518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519 }
520
521 fn clone_for_spawn(&self) -> Self {
522 Self {
523 token: self.token,
524 get_data: self.get_data,
525 }
526 }
527}
528
529pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541where
542 D: HasData + ?Sized,
543{
544 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546}
547
548enum CallerInfo {
551 Async {
553 params: Vec<ValRaw>,
554 has_result: bool,
555 },
556 Sync {
558 params: Vec<ValRaw>,
559 result_count: u32,
560 },
561}
562
563enum WaitMode {
565 Fiber(StoreFiber<'static>),
567 Callback(Instance),
570}
571
572#[derive(Debug)]
574enum SuspendReason {
575 Waiting {
578 set: TableId<WaitableSet>,
579 thread: QualifiedThreadId,
580 skip_may_block_check: bool,
581 },
582 NeedWork,
585 Yielding {
588 thread: QualifiedThreadId,
589 skip_may_block_check: bool,
590 },
591 ExplicitlySuspending {
593 thread: QualifiedThreadId,
594 skip_may_block_check: bool,
595 },
596}
597
598enum GuestCallKind {
600 DeliverEvent {
603 instance: Instance,
605 set: Option<TableId<WaitableSet>>,
610 },
611 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618}
619
620impl fmt::Debug for GuestCallKind {
621 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622 match self {
623 Self::DeliverEvent { instance, set } => f
624 .debug_struct("DeliverEvent")
625 .field("instance", instance)
626 .field("set", set)
627 .finish(),
628 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630 }
631 }
632}
633
634#[derive(Copy, Clone, Debug)]
636pub enum SuspensionTarget {
637 SomeSuspended(u32),
638 Some(u32),
639 None,
640}
641
642impl SuspensionTarget {
643 fn is_none(&self) -> bool {
644 matches!(self, SuspensionTarget::None)
645 }
646 fn is_some(&self) -> bool {
647 !self.is_none()
648 }
649}
650
651#[derive(Debug)]
653struct GuestCall {
654 thread: QualifiedThreadId,
655 kind: GuestCallKind,
656}
657
658impl GuestCall {
659 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669 let instance = store
670 .concurrent_state_mut()
671 .get_mut(self.thread.task)?
672 .instance;
673 let state = store.instance_state(instance).concurrent_state();
674
675 let ready = match &self.kind {
676 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678 GuestCallKind::StartExplicit(_) => true,
679 };
680 log::trace!(
681 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682 state.do_not_enter,
683 state.backpressure
684 );
685 Ok(ready)
686 }
687}
688
689enum WorkerItem {
691 GuestCall(GuestCall),
692 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693}
694
695enum WorkItem {
698 PushFuture(AlwaysMut<HostTaskFuture>),
700 ResumeFiber(StoreFiber<'static>),
702 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708}
709
710impl fmt::Debug for WorkItem {
711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712 match self {
713 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715 Self::ResumeThread(instance, thread) => f
716 .debug_tuple("ResumeThread")
717 .field(instance)
718 .field(thread)
719 .finish(),
720 Self::GuestCall(instance, call) => f
721 .debug_tuple("GuestCall")
722 .field(instance)
723 .field(call)
724 .finish(),
725 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726 }
727 }
728}
729
730#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732pub(crate) enum WaitResult {
733 Cancelled,
734 Completed,
735}
736
737pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745 store: &mut dyn VMStore,
746 future: impl Future<Output = Result<R>> + Send + 'static,
747) -> Result<R> {
748 let state = store.concurrent_state_mut();
749 let task = state.unwrap_current_host_thread();
750
751 let mut future = Box::pin(async move {
755 let result = future.await?;
756 tls::get(move |store| {
757 let state = store.concurrent_state_mut();
758 let host_state = &mut state.get_mut(task)?.state;
759 assert!(matches!(host_state, HostTaskState::CalleeStarted));
760 *host_state = HostTaskState::CalleeFinished(Box::new(result));
761
762 Waitable::Host(task).set_event(
763 state,
764 Some(Event::Subtask {
765 status: Status::Returned,
766 }),
767 )?;
768
769 Ok(())
770 })
771 }) as HostTaskFuture;
772
773 let poll = tls::set(store, || {
777 future
778 .as_mut()
779 .poll(&mut Context::from_waker(&Waker::noop()))
780 });
781
782 match poll {
783 Poll::Ready(result) => result?,
785
786 Poll::Pending => {
791 let state = store.concurrent_state_mut();
792 state.push_future(future);
793
794 let caller = state.get_mut(task)?.caller;
795 let set = state.get_mut(caller.task)?.sync_call_set;
796 Waitable::Host(task).join(state, Some(set))?;
797
798 store.suspend(SuspendReason::Waiting {
799 set,
800 thread: caller,
801 skip_may_block_check: false,
802 })?;
803
804 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808 }
809 }
810
811 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813 match mem::replace(host_state, HostTaskState::CalleeDone) {
814 HostTaskState::CalleeFinished(result) => Ok(*result.downcast().unwrap()),
815 _ => panic!("unexpected host task state after completion"),
816 }
817}
818
819fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821 let mut next = Some(call);
822 while let Some(call) = next.take() {
823 match call.kind {
824 GuestCallKind::DeliverEvent { instance, set } => {
825 let (event, waitable) = instance
826 .get_event(store, call.thread.task, set, true)?
827 .unwrap();
828 let state = store.concurrent_state_mut();
829 let task = state.get_mut(call.thread.task)?;
830 let runtime_instance = task.instance;
831 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
832
833 log::trace!(
834 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
835 call.thread,
836 );
837
838 let old_thread = store.set_thread(call.thread);
839 log::trace!(
840 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
841 call.thread
842 );
843
844 store.enter_instance(runtime_instance);
845
846 let callback = store
847 .concurrent_state_mut()
848 .get_mut(call.thread.task)?
849 .callback
850 .take()
851 .unwrap();
852
853 let code = callback(store, event, handle)?;
854
855 store
856 .concurrent_state_mut()
857 .get_mut(call.thread.task)?
858 .callback = Some(callback);
859
860 store.exit_instance(runtime_instance)?;
861
862 store.set_thread(old_thread);
863
864 next = instance.handle_callback_code(
865 store,
866 call.thread,
867 runtime_instance.index,
868 code,
869 )?;
870
871 log::trace!(
872 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
873 );
874 }
875 GuestCallKind::StartImplicit(fun) => {
876 next = fun(store)?;
877 }
878 GuestCallKind::StartExplicit(fun) => {
879 fun(store)?;
880 }
881 }
882 }
883
884 Ok(())
885}
886
887impl<T> Store<T> {
888 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
890 where
891 T: Send + 'static,
892 {
893 ensure!(
894 self.as_context().0.concurrency_support(),
895 "cannot use `run_concurrent` when Config::concurrency_support disabled",
896 );
897 self.as_context_mut().run_concurrent(fun).await
898 }
899
900 #[doc(hidden)]
901 pub fn assert_concurrent_state_empty(&mut self) {
902 self.as_context_mut().assert_concurrent_state_empty();
903 }
904
905 #[doc(hidden)]
906 pub fn concurrent_state_table_size(&mut self) -> usize {
907 self.as_context_mut().concurrent_state_table_size()
908 }
909
910 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
912 where
913 T: 'static,
914 {
915 self.as_context_mut().spawn(task)
916 }
917}
918
919impl<T> StoreContextMut<'_, T> {
920 #[doc(hidden)]
931 pub fn assert_concurrent_state_empty(self) {
932 let store = self.0;
933 store
934 .store_data_mut()
935 .components
936 .assert_instance_states_empty();
937 let state = store.concurrent_state_mut();
938 assert!(
939 state.table.get_mut().is_empty(),
940 "non-empty table: {:?}",
941 state.table.get_mut()
942 );
943 assert!(state.high_priority.is_empty());
944 assert!(state.low_priority.is_empty());
945 assert!(state.current_thread.is_none());
946 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
947 assert!(state.global_error_context_ref_counts.is_empty());
948 }
949
950 #[doc(hidden)]
955 pub fn concurrent_state_table_size(&mut self) -> usize {
956 self.0
957 .concurrent_state_mut()
958 .table
959 .get_mut()
960 .iter_mut()
961 .count()
962 }
963
964 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
974 where
975 T: 'static,
976 {
977 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
978 self.spawn_with_accessor(accessor, task)
979 }
980
981 fn spawn_with_accessor<D>(
984 self,
985 accessor: Accessor<T, D>,
986 task: impl AccessorTask<T, D>,
987 ) -> JoinHandle
988 where
989 T: 'static,
990 D: HasData + ?Sized,
991 {
992 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
996 self.0
997 .concurrent_state_mut()
998 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
999 handle
1000 }
1001
1002 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1086 where
1087 T: Send + 'static,
1088 {
1089 ensure!(
1090 self.0.concurrency_support(),
1091 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1092 );
1093 self.do_run_concurrent(fun, false).await
1094 }
1095
1096 pub(super) async fn run_concurrent_trap_on_idle<R>(
1097 self,
1098 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1099 ) -> Result<R>
1100 where
1101 T: Send + 'static,
1102 {
1103 self.do_run_concurrent(fun, true).await
1104 }
1105
1106 async fn do_run_concurrent<R>(
1107 mut self,
1108 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1109 trap_on_idle: bool,
1110 ) -> Result<R>
1111 where
1112 T: Send + 'static,
1113 {
1114 debug_assert!(self.0.concurrency_support());
1115 check_recursive_run();
1116 let token = StoreToken::new(self.as_context_mut());
1117
1118 struct Dropper<'a, T: 'static, V> {
1119 store: StoreContextMut<'a, T>,
1120 value: ManuallyDrop<V>,
1121 }
1122
1123 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1124 fn drop(&mut self) {
1125 tls::set(self.store.0, || {
1126 unsafe { ManuallyDrop::drop(&mut self.value) }
1131 });
1132 }
1133 }
1134
1135 let accessor = &Accessor::new(token);
1136 let dropper = &mut Dropper {
1137 store: self,
1138 value: ManuallyDrop::new(fun(accessor)),
1139 };
1140 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1142
1143 dropper
1144 .store
1145 .as_context_mut()
1146 .poll_until(future, trap_on_idle)
1147 .await
1148 }
1149
1150 async fn poll_until<R>(
1156 mut self,
1157 mut future: Pin<&mut impl Future<Output = R>>,
1158 trap_on_idle: bool,
1159 ) -> Result<R>
1160 where
1161 T: Send + 'static,
1162 {
1163 struct Reset<'a, T: 'static> {
1164 store: StoreContextMut<'a, T>,
1165 futures: Option<FuturesUnordered<HostTaskFuture>>,
1166 }
1167
1168 impl<'a, T> Drop for Reset<'a, T> {
1169 fn drop(&mut self) {
1170 if let Some(futures) = self.futures.take() {
1171 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1172 }
1173 }
1174 }
1175
1176 loop {
1177 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1181 let mut reset = Reset {
1182 store: self.as_context_mut(),
1183 futures,
1184 };
1185 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1186
1187 enum PollResult<R> {
1188 Complete(R),
1189 ProcessWork(Vec<WorkItem>),
1190 }
1191 let result = future::poll_fn(|cx| {
1192 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1195 return Poll::Ready(Ok(PollResult::Complete(value)));
1196 }
1197
1198 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1202 Poll::Ready(Some(output)) => {
1203 match output {
1204 Err(e) => return Poll::Ready(Err(e)),
1205 Ok(()) => {}
1206 }
1207 Poll::Ready(true)
1208 }
1209 Poll::Ready(None) => Poll::Ready(false),
1210 Poll::Pending => Poll::Pending,
1211 };
1212
1213 let state = reset.store.0.concurrent_state_mut();
1217 let ready = state.collect_work_items_to_run();
1218 if !ready.is_empty() {
1219 return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1220 }
1221
1222 return match next {
1226 Poll::Ready(true) => {
1227 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1233 }
1234 Poll::Ready(false) => {
1235 if let Poll::Ready(value) =
1239 tls::set(reset.store.0, || future.as_mut().poll(cx))
1240 {
1241 Poll::Ready(Ok(PollResult::Complete(value)))
1242 } else {
1243 if trap_on_idle {
1249 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1252 } else {
1253 Poll::Pending
1257 }
1258 }
1259 }
1260 Poll::Pending => Poll::Pending,
1265 };
1266 })
1267 .await;
1268
1269 drop(reset);
1273
1274 match result? {
1275 PollResult::Complete(value) => break Ok(value),
1278 PollResult::ProcessWork(ready) => {
1281 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1282 store: StoreContextMut<'a, T>,
1283 ready: I,
1284 }
1285
1286 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1287 fn drop(&mut self) {
1288 while let Some(item) = self.ready.next() {
1289 match item {
1290 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1291 WorkItem::PushFuture(future) => {
1292 tls::set(self.store.0, move || drop(future))
1293 }
1294 _ => {}
1295 }
1296 }
1297 }
1298 }
1299
1300 let mut dispose = Dispose {
1301 store: self.as_context_mut(),
1302 ready: ready.into_iter(),
1303 };
1304
1305 while let Some(item) = dispose.ready.next() {
1306 dispose
1307 .store
1308 .as_context_mut()
1309 .handle_work_item(item)
1310 .await?;
1311 }
1312 }
1313 }
1314 }
1315 }
1316
1317 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1319 where
1320 T: Send,
1321 {
1322 log::trace!("handle work item {item:?}");
1323 match item {
1324 WorkItem::PushFuture(future) => {
1325 self.0
1326 .concurrent_state_mut()
1327 .futures
1328 .get_mut()
1329 .as_mut()
1330 .unwrap()
1331 .push(future.into_inner());
1332 }
1333 WorkItem::ResumeFiber(fiber) => {
1334 self.0.resume_fiber(fiber).await?;
1335 }
1336 WorkItem::ResumeThread(_, thread) => {
1337 if let GuestThreadState::Ready(fiber) = mem::replace(
1338 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1339 GuestThreadState::Running,
1340 ) {
1341 self.0.resume_fiber(fiber).await?;
1342 } else {
1343 bail!("cannot resume non-pending thread {thread:?}");
1344 }
1345 }
1346 WorkItem::GuestCall(_, call) => {
1347 if call.is_ready(self.0)? {
1348 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1349 } else {
1350 let state = self.0.concurrent_state_mut();
1351 let task = state.get_mut(call.thread.task)?;
1352 if !task.starting_sent {
1353 task.starting_sent = true;
1354 if let GuestCallKind::StartImplicit(_) = &call.kind {
1355 Waitable::Guest(call.thread.task).set_event(
1356 state,
1357 Some(Event::Subtask {
1358 status: Status::Starting,
1359 }),
1360 )?;
1361 }
1362 }
1363
1364 let instance = state.get_mut(call.thread.task)?.instance;
1365 self.0
1366 .instance_state(instance)
1367 .concurrent_state()
1368 .pending
1369 .insert(call.thread, call.kind);
1370 }
1371 }
1372 WorkItem::WorkerFunction(fun) => {
1373 self.run_on_worker(WorkerItem::Function(fun)).await?;
1374 }
1375 }
1376
1377 Ok(())
1378 }
1379
1380 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1382 where
1383 T: Send,
1384 {
1385 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1386 fiber
1387 } else {
1388 fiber::make_fiber(self.0, move |store| {
1389 loop {
1390 match store.concurrent_state_mut().worker_item.take().unwrap() {
1391 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1392 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1393 }
1394
1395 store.suspend(SuspendReason::NeedWork)?;
1396 }
1397 })?
1398 };
1399
1400 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1401 assert!(worker_item.is_none());
1402 *worker_item = Some(item);
1403
1404 self.0.resume_fiber(worker).await
1405 }
1406
1407 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1412 where
1413 T: 'static,
1414 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1415 + Send
1416 + Sync
1417 + 'static,
1418 R: Send + Sync + 'static,
1419 {
1420 let token = StoreToken::new(self);
1421 async move {
1422 let mut accessor = Accessor::new(token);
1423 closure(&mut accessor).await
1424 }
1425 }
1426}
1427
1428impl StoreOpaque {
1429 pub(crate) fn enter_guest_sync_call(
1436 &mut self,
1437 guest_caller: Option<RuntimeInstance>,
1438 callee_async: bool,
1439 callee: RuntimeInstance,
1440 ) -> Result<()> {
1441 log::trace!("enter sync call {callee:?}");
1442 if !self.concurrency_support() {
1443 return Ok(self.enter_call_not_concurrent());
1444 }
1445
1446 let state = self.concurrent_state_mut();
1447 let thread = state.current_thread;
1448 let instance = if let Some(thread) = thread.guest() {
1449 Some(state.get_mut(thread.task)?.instance)
1450 } else {
1451 None
1452 };
1453 let task = GuestTask::new(
1454 state,
1455 Box::new(move |_, _| unreachable!()),
1456 LiftResult {
1457 lift: Box::new(move |_, _| unreachable!()),
1458 ty: TypeTupleIndex::reserved_value(),
1459 memory: None,
1460 string_encoding: StringEncoding::Utf8,
1461 },
1462 if let Some(caller) = guest_caller {
1463 assert_eq!(caller, instance.unwrap());
1464 Caller::Guest {
1465 thread: *thread.guest().unwrap(),
1466 }
1467 } else {
1468 Caller::Host {
1469 tx: None,
1470 host_future_present: false,
1471 caller: thread,
1472 }
1473 },
1474 None,
1475 callee,
1476 callee_async,
1477 )?;
1478
1479 let guest_task = state.push(task)?;
1480 let new_thread = GuestThread::new_implicit(guest_task);
1481 let guest_thread = state.push(new_thread)?;
1482 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1483 guest_thread,
1484 self,
1485 callee.index,
1486 )?;
1487
1488 let state = self.concurrent_state_mut();
1489 state.get_mut(guest_task)?.threads.insert(guest_thread);
1490
1491 self.set_thread(QualifiedThreadId {
1492 task: guest_task,
1493 thread: guest_thread,
1494 });
1495
1496 Ok(())
1497 }
1498
1499 pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1501 if !self.concurrency_support() {
1502 return Ok(self.exit_call_not_concurrent());
1503 }
1504 let thread = *self.set_thread(CurrentThread::None).guest().unwrap();
1505 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1506 log::trace!("exit sync call {instance:?}");
1507 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1508 self,
1509 thread,
1510 instance.index,
1511 )?;
1512
1513 let state = self.concurrent_state_mut();
1514 let task = state.get_mut(thread.task)?;
1515 let caller = match &task.caller {
1516 &Caller::Guest { thread } => {
1517 assert!(guest_caller);
1518 thread.into()
1519 }
1520 &Caller::Host { caller, .. } => {
1521 assert!(!guest_caller);
1522 caller
1523 }
1524 };
1525 self.set_thread(caller);
1526
1527 let state = self.concurrent_state_mut();
1528 let task = state.get_mut(thread.task)?;
1529 if task.ready_to_delete() {
1530 state.delete(thread.task)?.dispose(state)?;
1531 }
1532
1533 Ok(())
1534 }
1535
1536 pub fn enter_host_call(&mut self) -> Result<()> {
1544 if !self.concurrency_support() {
1545 self.enter_call_not_concurrent();
1546 return Ok(());
1547 }
1548 let state = self.concurrent_state_mut();
1549 let caller = state.unwrap_current_guest_thread();
1550 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1551 log::trace!("new host task {task:?}");
1552 self.set_thread(task);
1553 Ok(())
1554 }
1555
1556 pub fn exit_host_call(&mut self) -> Result<()> {
1563 if !self.concurrency_support() {
1564 self.exit_call_not_concurrent();
1565 return Ok(());
1566 }
1567 let task = self.concurrent_state_mut().unwrap_current_host_thread();
1568 log::trace!("delete host task {task:?}");
1569 let task = self.concurrent_state_mut().delete(task)?;
1570 self.set_thread(task.caller);
1571 Ok(())
1572 }
1573
1574 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1582 if self.trapped() {
1583 return false;
1584 }
1585 if !self.concurrency_support() {
1586 return true;
1587 }
1588 let state = self.concurrent_state_mut();
1589 let mut cur = state.current_thread;
1590 loop {
1591 match cur {
1592 CurrentThread::None => break true,
1593 CurrentThread::Guest(thread) => {
1594 let task = state.get_mut(thread.task).unwrap();
1595
1596 if task.instance.instance == instance.instance {
1603 break false;
1604 }
1605 cur = match task.caller {
1606 Caller::Host { caller, .. } => caller,
1607 Caller::Guest { thread } => thread.into(),
1608 };
1609 }
1610 CurrentThread::Host(id) => {
1611 cur = state.get_mut(id).unwrap().caller.into();
1612 }
1613 }
1614 }
1615 }
1616
1617 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1620 self.component_instance_mut(instance.instance)
1621 .instance_state(instance.index)
1622 }
1623
1624 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread {
1625 let state = self.concurrent_state_mut();
1630 let old_thread = mem::replace(&mut state.current_thread, thread.into());
1631 if let Some(old_thread) = old_thread.guest() {
1632 let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1633 self.component_instance_mut(instance)
1634 .set_task_may_block(false)
1635 }
1636
1637 if self.concurrent_state_mut().current_thread.guest().is_some() {
1640 self.set_task_may_block();
1641 }
1642
1643 old_thread
1644 }
1645
1646 fn set_task_may_block(&mut self) {
1649 let state = self.concurrent_state_mut();
1650 let guest_thread = state.unwrap_current_guest_thread();
1651 let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1652 let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1653 self.component_instance_mut(instance)
1654 .set_task_may_block(may_block)
1655 }
1656
1657 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1658 if !self.concurrency_support() {
1659 return Ok(());
1660 }
1661 let state = self.concurrent_state_mut();
1662 let task = state.unwrap_current_guest_thread().task;
1663 let instance = state.get_mut(task).unwrap().instance.instance;
1664 let task_may_block = self.component_instance(instance).get_task_may_block();
1665
1666 if task_may_block {
1667 Ok(())
1668 } else {
1669 Err(Trap::CannotBlockSyncTask.into())
1670 }
1671 }
1672
1673 fn enter_instance(&mut self, instance: RuntimeInstance) {
1677 log::trace!("enter {instance:?}");
1678 self.instance_state(instance)
1679 .concurrent_state()
1680 .do_not_enter = true;
1681 }
1682
1683 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1687 log::trace!("exit {instance:?}");
1688 self.instance_state(instance)
1689 .concurrent_state()
1690 .do_not_enter = false;
1691 self.partition_pending(instance)
1692 }
1693
1694 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1699 for (thread, kind) in
1700 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1701 {
1702 let call = GuestCall { thread, kind };
1703 if call.is_ready(self)? {
1704 self.concurrent_state_mut()
1705 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1706 } else {
1707 self.instance_state(instance)
1708 .concurrent_state()
1709 .pending
1710 .insert(call.thread, call.kind);
1711 }
1712 }
1713
1714 Ok(())
1715 }
1716
1717 pub(crate) fn backpressure_modify(
1719 &mut self,
1720 caller_instance: RuntimeInstance,
1721 modify: impl FnOnce(u16) -> Option<u16>,
1722 ) -> Result<()> {
1723 let state = self.instance_state(caller_instance).concurrent_state();
1724 let old = state.backpressure;
1725 let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1726 state.backpressure = new;
1727
1728 if old > 0 && new == 0 {
1729 self.partition_pending(caller_instance)?;
1732 }
1733
1734 Ok(())
1735 }
1736
1737 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1740 let old_thread = self.concurrent_state_mut().current_thread;
1741 log::trace!("resume_fiber: save current thread {old_thread:?}");
1742
1743 let fiber = fiber::resolve_or_release(self, fiber).await?;
1744
1745 self.set_thread(old_thread);
1746
1747 let state = self.concurrent_state_mut();
1748
1749 if let Some(ot) = old_thread.guest() {
1750 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1751 }
1752 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1753
1754 if let Some(mut fiber) = fiber {
1755 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1756 match state.suspend_reason.take().unwrap() {
1758 SuspendReason::NeedWork => {
1759 if state.worker.is_none() {
1760 state.worker = Some(fiber);
1761 } else {
1762 fiber.dispose(self);
1763 }
1764 }
1765 SuspendReason::Yielding { thread, .. } => {
1766 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1767 let instance = state.get_mut(thread.task)?.instance.index;
1768 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1769 }
1770 SuspendReason::ExplicitlySuspending { thread, .. } => {
1771 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1772 }
1773 SuspendReason::Waiting { set, thread, .. } => {
1774 let old = state
1775 .get_mut(set)?
1776 .waiting
1777 .insert(thread, WaitMode::Fiber(fiber));
1778 assert!(old.is_none());
1779 }
1780 };
1781 } else {
1782 log::trace!("resume_fiber: fiber has exited");
1783 }
1784
1785 Ok(())
1786 }
1787
1788 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1794 log::trace!("suspend fiber: {reason:?}");
1795
1796 let task = match &reason {
1800 SuspendReason::Yielding { thread, .. }
1801 | SuspendReason::Waiting { thread, .. }
1802 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1803 SuspendReason::NeedWork => None,
1804 };
1805
1806 let old_guest_thread = if task.is_some() {
1807 self.concurrent_state_mut().current_thread
1808 } else {
1809 CurrentThread::None
1810 };
1811
1812 assert!(
1818 matches!(
1819 reason,
1820 SuspendReason::ExplicitlySuspending {
1821 skip_may_block_check: true,
1822 ..
1823 } | SuspendReason::Waiting {
1824 skip_may_block_check: true,
1825 ..
1826 } | SuspendReason::Yielding {
1827 skip_may_block_check: true,
1828 ..
1829 }
1830 ) || old_guest_thread
1831 .guest()
1832 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1833 .unwrap_or(true)
1834 );
1835
1836 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1837 assert!(suspend_reason.is_none());
1838 *suspend_reason = Some(reason);
1839
1840 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1841
1842 if task.is_some() {
1843 self.set_thread(old_guest_thread);
1844 }
1845
1846 Ok(())
1847 }
1848
1849 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1850 let state = self.concurrent_state_mut();
1851 let caller = state.unwrap_current_guest_thread();
1852 let old_set = waitable.common(state)?.set;
1853 let set = state.get_mut(caller.task)?.sync_call_set;
1854 waitable.join(state, Some(set))?;
1855 self.suspend(SuspendReason::Waiting {
1856 set,
1857 thread: caller,
1858 skip_may_block_check: false,
1859 })?;
1860 let state = self.concurrent_state_mut();
1861 waitable.join(state, old_set)
1862 }
1863}
1864
1865impl Instance {
1866 fn get_event(
1869 self,
1870 store: &mut StoreOpaque,
1871 guest_task: TableId<GuestTask>,
1872 set: Option<TableId<WaitableSet>>,
1873 cancellable: bool,
1874 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1875 let state = store.concurrent_state_mut();
1876
1877 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1878 log::trace!("deliver event {event:?} to {guest_task:?}");
1879
1880 if cancellable || !matches!(event, Event::Cancelled) {
1881 return Ok(Some((event, None)));
1882 } else {
1883 state.get_mut(guest_task)?.event = Some(event);
1884 }
1885 }
1886
1887 Ok(
1888 if let Some((set, waitable)) = set
1889 .and_then(|set| {
1890 state
1891 .get_mut(set)
1892 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1893 .transpose()
1894 })
1895 .transpose()?
1896 {
1897 let common = waitable.common(state)?;
1898 let handle = common.handle.unwrap();
1899 let event = common.event.take().unwrap();
1900
1901 log::trace!(
1902 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1903 );
1904
1905 waitable.on_delivery(store, self, event);
1906
1907 Some((event, Some((waitable, handle))))
1908 } else {
1909 None
1910 },
1911 )
1912 }
1913
1914 fn handle_callback_code(
1920 self,
1921 store: &mut StoreOpaque,
1922 guest_thread: QualifiedThreadId,
1923 runtime_instance: RuntimeComponentInstanceIndex,
1924 code: u32,
1925 ) -> Result<Option<GuestCall>> {
1926 let (code, set) = unpack_callback_code(code);
1927
1928 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1929
1930 let state = store.concurrent_state_mut();
1931
1932 let get_set = |store: &mut StoreOpaque, handle| {
1933 if handle == 0 {
1934 bail!("invalid waitable-set handle");
1935 }
1936
1937 let set = store
1938 .instance_state(RuntimeInstance {
1939 instance: self.id().instance(),
1940 index: runtime_instance,
1941 })
1942 .handle_table()
1943 .waitable_set_rep(handle)?;
1944
1945 Ok(TableId::<WaitableSet>::new(set))
1946 };
1947
1948 Ok(match code {
1949 callback_code::EXIT => {
1950 log::trace!("implicit thread {guest_thread:?} completed");
1951 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1952 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1953 if task.threads.is_empty() && !task.returned_or_cancelled() {
1954 bail!(Trap::NoAsyncResult);
1955 }
1956 if let Caller::Guest { .. } = task.caller {
1957 task.exited = true;
1958 task.callback = None;
1959 }
1960 if task.ready_to_delete() {
1961 Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
1962 }
1963 None
1964 }
1965 callback_code::YIELD => {
1966 let task = state.get_mut(guest_thread.task)?;
1967 if let Some(event) = task.event {
1972 assert!(matches!(event, Event::None | Event::Cancelled));
1973 } else {
1974 task.event = Some(Event::None);
1975 }
1976 let call = GuestCall {
1977 thread: guest_thread,
1978 kind: GuestCallKind::DeliverEvent {
1979 instance: self,
1980 set: None,
1981 },
1982 };
1983 if state.may_block(guest_thread.task) {
1984 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
1987 None
1988 } else {
1989 Some(call)
1993 }
1994 }
1995 callback_code::WAIT => {
1996 state.check_blocking_for(guest_thread.task)?;
1999
2000 let set = get_set(store, set)?;
2001 let state = store.concurrent_state_mut();
2002
2003 if state.get_mut(guest_thread.task)?.event.is_some()
2004 || !state.get_mut(set)?.ready.is_empty()
2005 {
2006 state.push_high_priority(WorkItem::GuestCall(
2008 runtime_instance,
2009 GuestCall {
2010 thread: guest_thread,
2011 kind: GuestCallKind::DeliverEvent {
2012 instance: self,
2013 set: Some(set),
2014 },
2015 },
2016 ));
2017 } else {
2018 let old = state
2026 .get_mut(guest_thread.thread)?
2027 .wake_on_cancel
2028 .replace(set);
2029 assert!(old.is_none());
2030 let old = state
2031 .get_mut(set)?
2032 .waiting
2033 .insert(guest_thread, WaitMode::Callback(self));
2034 assert!(old.is_none());
2035 }
2036 None
2037 }
2038 _ => bail!("unsupported callback code: {code}"),
2039 })
2040 }
2041
2042 fn cleanup_thread(
2043 self,
2044 store: &mut StoreOpaque,
2045 guest_thread: QualifiedThreadId,
2046 runtime_instance: RuntimeComponentInstanceIndex,
2047 ) -> Result<()> {
2048 let guest_id = store
2049 .concurrent_state_mut()
2050 .get_mut(guest_thread.thread)?
2051 .instance_rep;
2052 store
2053 .instance_state(RuntimeInstance {
2054 instance: self.id().instance(),
2055 index: runtime_instance,
2056 })
2057 .thread_handle_table()
2058 .guest_thread_remove(guest_id.unwrap())?;
2059
2060 store.concurrent_state_mut().delete(guest_thread.thread)?;
2061 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2062 task.threads.remove(&guest_thread.thread);
2063 Ok(())
2064 }
2065
2066 unsafe fn queue_call<T: 'static>(
2073 self,
2074 mut store: StoreContextMut<T>,
2075 guest_thread: QualifiedThreadId,
2076 callee: SendSyncPtr<VMFuncRef>,
2077 param_count: usize,
2078 result_count: usize,
2079 async_: bool,
2080 callback: Option<SendSyncPtr<VMFuncRef>>,
2081 post_return: Option<SendSyncPtr<VMFuncRef>>,
2082 ) -> Result<()> {
2083 unsafe fn make_call<T: 'static>(
2098 store: StoreContextMut<T>,
2099 guest_thread: QualifiedThreadId,
2100 callee: SendSyncPtr<VMFuncRef>,
2101 param_count: usize,
2102 result_count: usize,
2103 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2104 + Send
2105 + Sync
2106 + 'static
2107 + use<T> {
2108 let token = StoreToken::new(store);
2109 move |store: &mut dyn VMStore| {
2110 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2111
2112 store
2113 .concurrent_state_mut()
2114 .get_mut(guest_thread.thread)?
2115 .state = GuestThreadState::Running;
2116 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2117 let lower = task.lower_params.take().unwrap();
2118
2119 lower(store, &mut storage[..param_count])?;
2120
2121 let mut store = token.as_context_mut(store);
2122
2123 unsafe {
2126 crate::Func::call_unchecked_raw(
2127 &mut store,
2128 callee.as_non_null(),
2129 NonNull::new(
2130 &mut storage[..param_count.max(result_count)]
2131 as *mut [MaybeUninit<ValRaw>] as _,
2132 )
2133 .unwrap(),
2134 )?;
2135 }
2136
2137 Ok(storage)
2138 }
2139 }
2140
2141 let call = unsafe {
2145 make_call(
2146 store.as_context_mut(),
2147 guest_thread,
2148 callee,
2149 param_count,
2150 result_count,
2151 )
2152 };
2153
2154 let callee_instance = store
2155 .0
2156 .concurrent_state_mut()
2157 .get_mut(guest_thread.task)?
2158 .instance;
2159
2160 let fun = if callback.is_some() {
2161 assert!(async_);
2162
2163 Box::new(move |store: &mut dyn VMStore| {
2164 self.add_guest_thread_to_instance_table(
2165 guest_thread.thread,
2166 store,
2167 callee_instance.index,
2168 )?;
2169 let old_thread = store.set_thread(guest_thread);
2170 log::trace!(
2171 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2172 );
2173
2174 store.enter_instance(callee_instance);
2175
2176 let storage = call(store)?;
2183
2184 store.exit_instance(callee_instance)?;
2185
2186 store.set_thread(old_thread);
2187 let state = store.concurrent_state_mut();
2188 old_thread
2189 .guest()
2190 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2191 log::trace!("stackless call: restored {old_thread:?} as current thread");
2192
2193 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2196
2197 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2198 })
2199 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2200 } else {
2201 let token = StoreToken::new(store.as_context_mut());
2202 Box::new(move |store: &mut dyn VMStore| {
2203 self.add_guest_thread_to_instance_table(
2204 guest_thread.thread,
2205 store,
2206 callee_instance.index,
2207 )?;
2208 let old_thread = store.set_thread(guest_thread);
2209 log::trace!(
2210 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2211 );
2212 let flags = self.id().get(store).instance_flags(callee_instance.index);
2213
2214 if !async_ {
2218 store.enter_instance(callee_instance);
2219 }
2220
2221 let storage = call(store)?;
2228
2229 if async_ {
2230 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2231 if task.threads.len() == 1 && !task.returned_or_cancelled() {
2232 bail!(Trap::NoAsyncResult);
2233 }
2234 } else {
2235 let lift = {
2241 store.exit_instance(callee_instance)?;
2242
2243 let state = store.concurrent_state_mut();
2244 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2245
2246 state
2247 .get_mut(guest_thread.task)?
2248 .lift_result
2249 .take()
2250 .unwrap()
2251 };
2252
2253 let result = (lift.lift)(store, unsafe {
2256 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2257 &storage[..result_count],
2258 )
2259 })?;
2260
2261 let post_return_arg = match result_count {
2262 0 => ValRaw::i32(0),
2263 1 => unsafe { storage[0].assume_init() },
2266 _ => unreachable!(),
2267 };
2268
2269 unsafe {
2270 call_post_return(
2271 token.as_context_mut(store),
2272 post_return.map(|v| v.as_non_null()),
2273 post_return_arg,
2274 flags,
2275 )?;
2276 }
2277
2278 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2279 }
2280
2281 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2283
2284 store.set_thread(old_thread);
2285
2286 let state = store.concurrent_state_mut();
2287 let task = state.get_mut(guest_thread.task)?;
2288
2289 match &task.caller {
2290 Caller::Host { .. } => {
2291 if task.ready_to_delete() {
2292 Waitable::Guest(guest_thread.task).delete_from(state)?;
2293 }
2294 }
2295 Caller::Guest { .. } => {
2296 task.exited = true;
2297 }
2298 }
2299
2300 Ok(None)
2301 })
2302 };
2303
2304 store
2305 .0
2306 .concurrent_state_mut()
2307 .push_high_priority(WorkItem::GuestCall(
2308 callee_instance.index,
2309 GuestCall {
2310 thread: guest_thread,
2311 kind: GuestCallKind::StartImplicit(fun),
2312 },
2313 ));
2314
2315 Ok(())
2316 }
2317
2318 unsafe fn prepare_call<T: 'static>(
2331 self,
2332 mut store: StoreContextMut<T>,
2333 start: *mut VMFuncRef,
2334 return_: *mut VMFuncRef,
2335 caller_instance: RuntimeComponentInstanceIndex,
2336 callee_instance: RuntimeComponentInstanceIndex,
2337 task_return_type: TypeTupleIndex,
2338 callee_async: bool,
2339 memory: *mut VMMemoryDefinition,
2340 string_encoding: u8,
2341 caller_info: CallerInfo,
2342 ) -> Result<()> {
2343 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2344 store.0.check_blocking()?;
2348 }
2349
2350 enum ResultInfo {
2351 Heap { results: u32 },
2352 Stack { result_count: u32 },
2353 }
2354
2355 let result_info = match &caller_info {
2356 CallerInfo::Async {
2357 has_result: true,
2358 params,
2359 } => ResultInfo::Heap {
2360 results: params.last().unwrap().get_u32(),
2361 },
2362 CallerInfo::Async {
2363 has_result: false, ..
2364 } => ResultInfo::Stack { result_count: 0 },
2365 CallerInfo::Sync {
2366 result_count,
2367 params,
2368 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2369 results: params.last().unwrap().get_u32(),
2370 },
2371 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2372 result_count: *result_count,
2373 },
2374 };
2375
2376 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2377
2378 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2382 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2383 let token = StoreToken::new(store.as_context_mut());
2384 let state = store.0.concurrent_state_mut();
2385 let old_thread = state.unwrap_current_guest_thread();
2386
2387 assert_eq!(
2388 state.get_mut(old_thread.task)?.instance,
2389 RuntimeInstance {
2390 instance: self.id().instance(),
2391 index: caller_instance,
2392 }
2393 );
2394
2395 let new_task = GuestTask::new(
2396 state,
2397 Box::new(move |store, dst| {
2398 let mut store = token.as_context_mut(store);
2399 assert!(dst.len() <= MAX_FLAT_PARAMS);
2400 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2402 let count = match caller_info {
2403 CallerInfo::Async { params, has_result } => {
2407 let params = ¶ms[..params.len() - usize::from(has_result)];
2408 for (param, src) in params.iter().zip(&mut src) {
2409 src.write(*param);
2410 }
2411 params.len()
2412 }
2413
2414 CallerInfo::Sync { params, .. } => {
2416 for (param, src) in params.iter().zip(&mut src) {
2417 src.write(*param);
2418 }
2419 params.len()
2420 }
2421 };
2422 unsafe {
2429 crate::Func::call_unchecked_raw(
2430 &mut store,
2431 start.as_non_null(),
2432 NonNull::new(
2433 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2434 )
2435 .unwrap(),
2436 )?;
2437 }
2438 dst.copy_from_slice(&src[..dst.len()]);
2439 let state = store.0.concurrent_state_mut();
2440 Waitable::Guest(state.unwrap_current_guest_thread().task).set_event(
2441 state,
2442 Some(Event::Subtask {
2443 status: Status::Started,
2444 }),
2445 )?;
2446 Ok(())
2447 }),
2448 LiftResult {
2449 lift: Box::new(move |store, src| {
2450 let mut store = token.as_context_mut(store);
2453 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2455 my_src.push(ValRaw::u32(*results));
2456 }
2457 unsafe {
2464 crate::Func::call_unchecked_raw(
2465 &mut store,
2466 return_.as_non_null(),
2467 my_src.as_mut_slice().into(),
2468 )?;
2469 }
2470 let state = store.0.concurrent_state_mut();
2471 let thread = state.unwrap_current_guest_thread();
2472 if sync_caller {
2473 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2474 if let ResultInfo::Stack { result_count } = &result_info {
2475 match result_count {
2476 0 => None,
2477 1 => Some(my_src[0]),
2478 _ => unreachable!(),
2479 }
2480 } else {
2481 None
2482 },
2483 );
2484 }
2485 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2486 }),
2487 ty: task_return_type,
2488 memory: NonNull::new(memory).map(SendSyncPtr::new),
2489 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2490 },
2491 Caller::Guest { thread: old_thread },
2492 None,
2493 RuntimeInstance {
2494 instance: self.id().instance(),
2495 index: callee_instance,
2496 },
2497 callee_async,
2498 )?;
2499
2500 let guest_task = state.push(new_task)?;
2501 let new_thread = GuestThread::new_implicit(guest_task);
2502 let guest_thread = state.push(new_thread)?;
2503 state.get_mut(guest_task)?.threads.insert(guest_thread);
2504
2505 store.0.set_thread(QualifiedThreadId {
2508 task: guest_task,
2509 thread: guest_thread,
2510 });
2511 log::trace!(
2512 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2513 );
2514
2515 Ok(())
2516 }
2517
2518 unsafe fn call_callback<T>(
2523 self,
2524 mut store: StoreContextMut<T>,
2525 function: SendSyncPtr<VMFuncRef>,
2526 event: Event,
2527 handle: u32,
2528 ) -> Result<u32> {
2529 let (ordinal, result) = event.parts();
2530 let params = &mut [
2531 ValRaw::u32(ordinal),
2532 ValRaw::u32(handle),
2533 ValRaw::u32(result),
2534 ];
2535 unsafe {
2540 crate::Func::call_unchecked_raw(
2541 &mut store,
2542 function.as_non_null(),
2543 params.as_mut_slice().into(),
2544 )?;
2545 }
2546 Ok(params[0].get_u32())
2547 }
2548
2549 unsafe fn start_call<T: 'static>(
2562 self,
2563 mut store: StoreContextMut<T>,
2564 callback: *mut VMFuncRef,
2565 post_return: *mut VMFuncRef,
2566 callee: *mut VMFuncRef,
2567 param_count: u32,
2568 result_count: u32,
2569 flags: u32,
2570 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2571 ) -> Result<u32> {
2572 let token = StoreToken::new(store.as_context_mut());
2573 let async_caller = storage.is_none();
2574 let state = store.0.concurrent_state_mut();
2575 let guest_thread = state.unwrap_current_guest_thread();
2576 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2577 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2578 let param_count = usize::try_from(param_count).unwrap();
2579 assert!(param_count <= MAX_FLAT_PARAMS);
2580 let result_count = usize::try_from(result_count).unwrap();
2581 assert!(result_count <= MAX_FLAT_RESULTS);
2582
2583 let task = state.get_mut(guest_thread.task)?;
2584 if !callback.is_null() {
2585 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2589 task.callback = Some(Box::new(move |store, event, handle| {
2590 let store = token.as_context_mut(store);
2591 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2592 }));
2593 }
2594
2595 let Caller::Guest { thread: caller } = &task.caller else {
2596 unreachable!()
2599 };
2600 let caller = *caller;
2601 let caller_instance = state.get_mut(caller.task)?.instance;
2602
2603 unsafe {
2605 self.queue_call(
2606 store.as_context_mut(),
2607 guest_thread,
2608 callee,
2609 param_count,
2610 result_count,
2611 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2612 NonNull::new(callback).map(SendSyncPtr::new),
2613 NonNull::new(post_return).map(SendSyncPtr::new),
2614 )?;
2615 }
2616
2617 let state = store.0.concurrent_state_mut();
2618
2619 let guest_waitable = Waitable::Guest(guest_thread.task);
2622 let old_set = guest_waitable.common(state)?.set;
2623 let set = state.get_mut(caller.task)?.sync_call_set;
2624 guest_waitable.join(state, Some(set))?;
2625
2626 let (status, waitable) = loop {
2642 store.0.suspend(SuspendReason::Waiting {
2643 set,
2644 thread: caller,
2645 skip_may_block_check: async_caller || !callee_async,
2653 })?;
2654
2655 let state = store.0.concurrent_state_mut();
2656
2657 log::trace!("taking event for {:?}", guest_thread.task);
2658 let event = guest_waitable.take_event(state)?;
2659 let Some(Event::Subtask { status }) = event else {
2660 unreachable!();
2661 };
2662
2663 log::trace!("status {status:?} for {:?}", guest_thread.task);
2664
2665 if status == Status::Returned {
2666 break (status, None);
2668 } else if async_caller {
2669 let handle = store
2673 .0
2674 .instance_state(caller_instance)
2675 .handle_table()
2676 .subtask_insert_guest(guest_thread.task.rep())?;
2677 store
2678 .0
2679 .concurrent_state_mut()
2680 .get_mut(guest_thread.task)?
2681 .common
2682 .handle = Some(handle);
2683 break (status, Some(handle));
2684 } else {
2685 }
2689 };
2690
2691 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2692
2693 store.0.set_thread(caller);
2695 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2696 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2697
2698 if let Some(storage) = storage {
2699 let state = store.0.concurrent_state_mut();
2703 let task = state.get_mut(guest_thread.task)?;
2704 if let Some(result) = task.sync_result.take() {
2705 if let Some(result) = result {
2706 storage[0] = MaybeUninit::new(result);
2707 }
2708
2709 if task.exited && task.ready_to_delete() {
2710 Waitable::Guest(guest_thread.task).delete_from(state)?;
2711 }
2712 }
2713 }
2714
2715 Ok(status.pack(waitable))
2716 }
2717
2718 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2731 self,
2732 mut store: StoreContextMut<'_, T>,
2733 future: impl Future<Output = Result<R>> + Send + 'static,
2734 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2735 ) -> Result<Option<u32>> {
2736 let token = StoreToken::new(store.as_context_mut());
2737 let state = store.0.concurrent_state_mut();
2738 let task = state.unwrap_current_host_thread();
2739
2740 let (join_handle, future) = JoinHandle::run(future);
2743 {
2744 let state = &mut state.get_mut(task)?.state;
2745 assert!(matches!(state, HostTaskState::CalleeStarted));
2746 *state = HostTaskState::CalleeRunning(join_handle);
2747 }
2748
2749 let mut future = Box::pin(future);
2750
2751 let poll = tls::set(store.0, || {
2756 future
2757 .as_mut()
2758 .poll(&mut Context::from_waker(&Waker::noop()))
2759 });
2760
2761 match poll {
2762 Poll::Ready(Some(result)) => {
2764 lower(store.as_context_mut(), Some(result?))?;
2765 return Ok(None);
2766 }
2767
2768 Poll::Ready(None) => unreachable!(),
2771
2772 Poll::Pending => {}
2774 }
2775
2776 let future = Box::pin(async move {
2784 let result = match future.await {
2785 Some(result) => Some(result?),
2786 None => None,
2787 };
2788 let on_complete = move |store: &mut dyn VMStore| {
2789 let mut store = token.as_context_mut(store);
2793 let state = store.0.concurrent_state_mut();
2794 assert!(state.current_thread.is_none());
2795 store.0.set_thread(task);
2796
2797 let status = if result.is_some() {
2798 Status::Returned
2799 } else {
2800 Status::ReturnCancelled
2801 };
2802
2803 lower(store.as_context_mut(), result)?;
2804 let state = store.0.concurrent_state_mut();
2805 state.get_mut(task)?.state = HostTaskState::CalleeDone;
2806 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2807
2808 store.0.set_thread(CurrentThread::None);
2810 Ok(())
2811 };
2812
2813 tls::get(move |store| {
2818 store
2819 .concurrent_state_mut()
2820 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2821 on_complete,
2822 ))));
2823 Ok(())
2824 })
2825 });
2826
2827 let state = store.0.concurrent_state_mut();
2830 state.push_future(future);
2831 let caller = state.get_mut(task)?.caller;
2832 let instance = state.get_mut(caller.task)?.instance;
2833 let handle = store
2834 .0
2835 .instance_state(instance)
2836 .handle_table()
2837 .subtask_insert_host(task.rep())?;
2838 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2839 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2840
2841 store.0.set_thread(caller);
2845 Ok(Some(handle))
2846 }
2847
2848 pub(crate) fn task_return(
2851 self,
2852 store: &mut dyn VMStore,
2853 ty: TypeTupleIndex,
2854 options: OptionsIndex,
2855 storage: &[ValRaw],
2856 ) -> Result<()> {
2857 let state = store.concurrent_state_mut();
2858 let guest_thread = state.unwrap_current_guest_thread();
2859 let lift = state
2860 .get_mut(guest_thread.task)?
2861 .lift_result
2862 .take()
2863 .ok_or_else(|| {
2864 format_err!("`task.return` or `task.cancel` called more than once for current task")
2865 })?;
2866 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2867
2868 let CanonicalOptions {
2869 string_encoding,
2870 data_model,
2871 ..
2872 } = &self.id().get(store).component().env_component().options[options];
2873
2874 let invalid = ty != lift.ty
2875 || string_encoding != &lift.string_encoding
2876 || match data_model {
2877 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2878 Some(memory) => {
2879 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2880 let actual = self.id().get(store).runtime_memory(memory);
2881 expected != actual.as_ptr()
2882 }
2883 None => false,
2886 },
2887 CanonicalOptionsDataModel::Gc { .. } => true,
2889 };
2890
2891 if invalid {
2892 bail!("invalid `task.return` signature and/or options for current task");
2893 }
2894
2895 log::trace!("task.return for {guest_thread:?}");
2896
2897 let result = (lift.lift)(store, storage)?;
2898 self.task_complete(store, guest_thread.task, result, Status::Returned)
2899 }
2900
2901 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2903 let state = store.concurrent_state_mut();
2904 let guest_thread = state.unwrap_current_guest_thread();
2905 let task = state.get_mut(guest_thread.task)?;
2906 if !task.cancel_sent {
2907 bail!("`task.cancel` called by task which has not been cancelled")
2908 }
2909 _ = task.lift_result.take().ok_or_else(|| {
2910 format_err!("`task.return` or `task.cancel` called more than once for current task")
2911 })?;
2912
2913 assert!(task.result.is_none());
2914
2915 log::trace!("task.cancel for {guest_thread:?}");
2916
2917 self.task_complete(
2918 store,
2919 guest_thread.task,
2920 Box::new(DummyResult),
2921 Status::ReturnCancelled,
2922 )
2923 }
2924
2925 fn task_complete(
2931 self,
2932 store: &mut StoreOpaque,
2933 guest_task: TableId<GuestTask>,
2934 result: Box<dyn Any + Send + Sync>,
2935 status: Status,
2936 ) -> Result<()> {
2937 store
2938 .component_resource_tables(Some(self))
2939 .validate_scope_exit()?;
2940
2941 let state = store.concurrent_state_mut();
2942 let task = state.get_mut(guest_task)?;
2943
2944 if let Caller::Host { tx, .. } = &mut task.caller {
2945 if let Some(tx) = tx.take() {
2946 _ = tx.send(result);
2947 }
2948 } else {
2949 task.result = Some(result);
2950 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2951 }
2952
2953 Ok(())
2954 }
2955
2956 pub(crate) fn waitable_set_new(
2958 self,
2959 store: &mut StoreOpaque,
2960 caller_instance: RuntimeComponentInstanceIndex,
2961 ) -> Result<u32> {
2962 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2963 let handle = store
2964 .instance_state(RuntimeInstance {
2965 instance: self.id().instance(),
2966 index: caller_instance,
2967 })
2968 .handle_table()
2969 .waitable_set_insert(set.rep())?;
2970 log::trace!("new waitable set {set:?} (handle {handle})");
2971 Ok(handle)
2972 }
2973
2974 pub(crate) fn waitable_set_drop(
2976 self,
2977 store: &mut StoreOpaque,
2978 caller_instance: RuntimeComponentInstanceIndex,
2979 set: u32,
2980 ) -> Result<()> {
2981 let rep = store
2982 .instance_state(RuntimeInstance {
2983 instance: self.id().instance(),
2984 index: caller_instance,
2985 })
2986 .handle_table()
2987 .waitable_set_remove(set)?;
2988
2989 log::trace!("drop waitable set {rep} (handle {set})");
2990
2991 let set = store
2992 .concurrent_state_mut()
2993 .delete(TableId::<WaitableSet>::new(rep))?;
2994
2995 if !set.waiting.is_empty() {
2996 bail!("cannot drop waitable set with waiters");
2997 }
2998
2999 Ok(())
3000 }
3001
3002 pub(crate) fn waitable_join(
3004 self,
3005 store: &mut StoreOpaque,
3006 caller_instance: RuntimeComponentInstanceIndex,
3007 waitable_handle: u32,
3008 set_handle: u32,
3009 ) -> Result<()> {
3010 let mut instance = self.id().get_mut(store);
3011 let waitable =
3012 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3013
3014 let set = if set_handle == 0 {
3015 None
3016 } else {
3017 let set = instance.instance_states().0[caller_instance]
3018 .handle_table()
3019 .waitable_set_rep(set_handle)?;
3020
3021 Some(TableId::<WaitableSet>::new(set))
3022 };
3023
3024 log::trace!(
3025 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3026 );
3027
3028 waitable.join(store.concurrent_state_mut(), set)
3029 }
3030
3031 pub(crate) fn subtask_drop(
3033 self,
3034 store: &mut StoreOpaque,
3035 caller_instance: RuntimeComponentInstanceIndex,
3036 task_id: u32,
3037 ) -> Result<()> {
3038 self.waitable_join(store, caller_instance, task_id, 0)?;
3039
3040 let (rep, is_host) = store
3041 .instance_state(RuntimeInstance {
3042 instance: self.id().instance(),
3043 index: caller_instance,
3044 })
3045 .handle_table()
3046 .subtask_remove(task_id)?;
3047
3048 let concurrent_state = store.concurrent_state_mut();
3049 let (waitable, expected_caller, delete) = if is_host {
3050 let id = TableId::<HostTask>::new(rep);
3051 let task = concurrent_state.get_mut(id)?;
3052 match &task.state {
3053 HostTaskState::CalleeRunning(_) => {
3054 bail!("cannot drop a subtask which has not yet resolved");
3055 }
3056 HostTaskState::CalleeDone => {}
3057 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => unreachable!(),
3058 }
3059 (Waitable::Host(id), task.caller, true)
3060 } else {
3061 let id = TableId::<GuestTask>::new(rep);
3062 let task = concurrent_state.get_mut(id)?;
3063 if task.lift_result.is_some() {
3064 bail!("cannot drop a subtask which has not yet resolved");
3065 }
3066 if let Caller::Guest { thread } = task.caller {
3067 (
3068 Waitable::Guest(id),
3069 thread,
3070 concurrent_state.get_mut(id)?.exited,
3071 )
3072 } else {
3073 unreachable!()
3074 }
3075 };
3076
3077 waitable.common(concurrent_state)?.handle = None;
3078
3079 if waitable.take_event(concurrent_state)?.is_some() {
3080 bail!("cannot drop a subtask with an undelivered event");
3081 }
3082
3083 if delete {
3084 waitable.delete_from(concurrent_state)?;
3085 }
3086
3087 assert_eq!(
3091 expected_caller,
3092 concurrent_state.unwrap_current_guest_thread(),
3093 );
3094 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3095 Ok(())
3096 }
3097
3098 pub(crate) fn waitable_set_wait(
3100 self,
3101 store: &mut StoreOpaque,
3102 options: OptionsIndex,
3103 set: u32,
3104 payload: u32,
3105 ) -> Result<u32> {
3106 if !self.options(store, options).async_ {
3107 store.check_blocking()?;
3111 }
3112
3113 let &CanonicalOptions {
3114 cancellable,
3115 instance: caller_instance,
3116 ..
3117 } = &self.id().get(store).component().env_component().options[options];
3118 let rep = store
3119 .instance_state(RuntimeInstance {
3120 instance: self.id().instance(),
3121 index: caller_instance,
3122 })
3123 .handle_table()
3124 .waitable_set_rep(set)?;
3125
3126 self.waitable_check(
3127 store,
3128 cancellable,
3129 WaitableCheck::Wait,
3130 WaitableCheckParams {
3131 set: TableId::new(rep),
3132 options,
3133 payload,
3134 },
3135 )
3136 }
3137
3138 pub(crate) fn waitable_set_poll(
3140 self,
3141 store: &mut StoreOpaque,
3142 options: OptionsIndex,
3143 set: u32,
3144 payload: u32,
3145 ) -> Result<u32> {
3146 let &CanonicalOptions {
3147 cancellable,
3148 instance: caller_instance,
3149 ..
3150 } = &self.id().get(store).component().env_component().options[options];
3151 let rep = store
3152 .instance_state(RuntimeInstance {
3153 instance: self.id().instance(),
3154 index: caller_instance,
3155 })
3156 .handle_table()
3157 .waitable_set_rep(set)?;
3158
3159 self.waitable_check(
3160 store,
3161 cancellable,
3162 WaitableCheck::Poll,
3163 WaitableCheckParams {
3164 set: TableId::new(rep),
3165 options,
3166 payload,
3167 },
3168 )
3169 }
3170
3171 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3173 let thread_id = store
3174 .concurrent_state_mut()
3175 .unwrap_current_guest_thread()
3176 .thread;
3177 Ok(store
3179 .concurrent_state_mut()
3180 .get_mut(thread_id)?
3181 .instance_rep
3182 .unwrap())
3183 }
3184
3185 pub(crate) fn thread_new_indirect<T: 'static>(
3187 self,
3188 mut store: StoreContextMut<T>,
3189 runtime_instance: RuntimeComponentInstanceIndex,
3190 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
3192 start_func_idx: u32,
3193 context: i32,
3194 ) -> Result<u32> {
3195 log::trace!("creating new thread");
3196
3197 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3198 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3199 let callee = instance
3200 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3201 .ok_or_else(|| {
3202 format_err!("the start function index points to an uninitialized function")
3203 })?;
3204 if callee.type_index(store.0) != start_func_ty.type_index() {
3205 bail!(
3206 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3207 );
3208 }
3209
3210 let token = StoreToken::new(store.as_context_mut());
3211 let start_func = Box::new(
3212 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3213 let old_thread = store.set_thread(guest_thread);
3214 log::trace!(
3215 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3216 );
3217
3218 let mut store = token.as_context_mut(store);
3219 let mut params = [ValRaw::i32(context)];
3220 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3223
3224 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3225 log::trace!("explicit thread {guest_thread:?} completed");
3226 let state = store.0.concurrent_state_mut();
3227 let task = state.get_mut(guest_thread.task)?;
3228 if task.threads.is_empty() && !task.returned_or_cancelled() {
3229 bail!(Trap::NoAsyncResult);
3230 }
3231 store.0.set_thread(old_thread);
3232 let state = store.0.concurrent_state_mut();
3233 old_thread
3234 .guest()
3235 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3236 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3237 Waitable::Guest(guest_thread.task).delete_from(state)?;
3238 }
3239 log::trace!("thread start: restored {old_thread:?} as current thread");
3240
3241 Ok(())
3242 },
3243 );
3244
3245 let state = store.0.concurrent_state_mut();
3246 let current_thread = state.unwrap_current_guest_thread();
3247 let parent_task = current_thread.task;
3248
3249 let new_thread = GuestThread::new_explicit(parent_task, start_func);
3250 let thread_id = state.push(new_thread)?;
3251 state.get_mut(parent_task)?.threads.insert(thread_id);
3252
3253 log::trace!("new thread with id {thread_id:?} created");
3254
3255 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3256 }
3257
3258 pub(crate) fn resume_thread(
3259 self,
3260 store: &mut StoreOpaque,
3261 runtime_instance: RuntimeComponentInstanceIndex,
3262 thread_idx: u32,
3263 high_priority: bool,
3264 allow_ready: bool,
3265 ) -> Result<()> {
3266 let thread_id =
3267 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3268 let state = store.concurrent_state_mut();
3269 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3270 let thread = state.get_mut(guest_thread.thread)?;
3271
3272 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3273 GuestThreadState::NotStartedExplicit(start_func) => {
3274 log::trace!("starting thread {guest_thread:?}");
3275 let guest_call = WorkItem::GuestCall(
3276 runtime_instance,
3277 GuestCall {
3278 thread: guest_thread,
3279 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3280 start_func(store, guest_thread)
3281 })),
3282 },
3283 );
3284 store
3285 .concurrent_state_mut()
3286 .push_work_item(guest_call, high_priority);
3287 }
3288 GuestThreadState::Suspended(fiber) => {
3289 log::trace!("resuming thread {thread_id:?} that was suspended");
3290 store
3291 .concurrent_state_mut()
3292 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3293 }
3294 GuestThreadState::Ready(fiber) if allow_ready => {
3295 log::trace!("resuming thread {thread_id:?} that was ready");
3296 thread.state = GuestThreadState::Ready(fiber);
3297 store
3298 .concurrent_state_mut()
3299 .promote_thread_work_item(guest_thread);
3300 }
3301 other => {
3302 thread.state = other;
3303 bail!("cannot resume thread which is not suspended");
3304 }
3305 }
3306 Ok(())
3307 }
3308
3309 fn add_guest_thread_to_instance_table(
3310 self,
3311 thread_id: TableId<GuestThread>,
3312 store: &mut StoreOpaque,
3313 runtime_instance: RuntimeComponentInstanceIndex,
3314 ) -> Result<u32> {
3315 let guest_id = store
3316 .instance_state(RuntimeInstance {
3317 instance: self.id().instance(),
3318 index: runtime_instance,
3319 })
3320 .thread_handle_table()
3321 .guest_thread_insert(thread_id.rep())?;
3322 store
3323 .concurrent_state_mut()
3324 .get_mut(thread_id)?
3325 .instance_rep = Some(guest_id);
3326 Ok(guest_id)
3327 }
3328
3329 pub(crate) fn suspension_intrinsic(
3332 self,
3333 store: &mut StoreOpaque,
3334 caller: RuntimeComponentInstanceIndex,
3335 cancellable: bool,
3336 yielding: bool,
3337 to_thread: SuspensionTarget,
3338 ) -> Result<WaitResult> {
3339 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3340 if to_thread.is_none() {
3341 let state = store.concurrent_state_mut();
3342 if yielding {
3343 if !state.may_block(guest_thread.task) {
3345 if !state.promote_instance_local_thread_work_item(caller) {
3348 return Ok(WaitResult::Completed);
3350 }
3351 }
3352 } else {
3353 store.check_blocking()?;
3357 }
3358 }
3359
3360 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3362 return Ok(WaitResult::Cancelled);
3363 }
3364
3365 match to_thread {
3366 SuspensionTarget::SomeSuspended(thread) => {
3367 self.resume_thread(store, caller, thread, true, false)?
3368 }
3369 SuspensionTarget::Some(thread) => {
3370 self.resume_thread(store, caller, thread, true, true)?
3371 }
3372 SuspensionTarget::None => { }
3373 }
3374
3375 let reason = if yielding {
3376 SuspendReason::Yielding {
3377 thread: guest_thread,
3378 skip_may_block_check: to_thread.is_some(),
3382 }
3383 } else {
3384 SuspendReason::ExplicitlySuspending {
3385 thread: guest_thread,
3386 skip_may_block_check: to_thread.is_some(),
3390 }
3391 };
3392
3393 store.suspend(reason)?;
3394
3395 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3396 Ok(WaitResult::Cancelled)
3397 } else {
3398 Ok(WaitResult::Completed)
3399 }
3400 }
3401
3402 fn waitable_check(
3404 self,
3405 store: &mut StoreOpaque,
3406 cancellable: bool,
3407 check: WaitableCheck,
3408 params: WaitableCheckParams,
3409 ) -> Result<u32> {
3410 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3411
3412 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3413
3414 let state = store.concurrent_state_mut();
3415 let task = state.get_mut(guest_thread.task)?;
3416
3417 match &check {
3420 WaitableCheck::Wait => {
3421 let set = params.set;
3422
3423 if (task.event.is_none()
3424 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3425 && state.get_mut(set)?.ready.is_empty()
3426 {
3427 if cancellable {
3428 let old = state
3429 .get_mut(guest_thread.thread)?
3430 .wake_on_cancel
3431 .replace(set);
3432 assert!(old.is_none());
3433 }
3434
3435 store.suspend(SuspendReason::Waiting {
3436 set,
3437 thread: guest_thread,
3438 skip_may_block_check: false,
3439 })?;
3440 }
3441 }
3442 WaitableCheck::Poll => {}
3443 }
3444
3445 log::trace!(
3446 "waitable check for {guest_thread:?}; set {:?}, part two",
3447 params.set
3448 );
3449
3450 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3452
3453 let (ordinal, handle, result) = match &check {
3454 WaitableCheck::Wait => {
3455 let (event, waitable) = event.unwrap();
3456 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3457 let (ordinal, result) = event.parts();
3458 (ordinal, handle, result)
3459 }
3460 WaitableCheck::Poll => {
3461 if let Some((event, waitable)) = event {
3462 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3463 let (ordinal, result) = event.parts();
3464 (ordinal, handle, result)
3465 } else {
3466 log::trace!(
3467 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3468 guest_thread.task,
3469 params.set
3470 );
3471 let (ordinal, result) = Event::None.parts();
3472 (ordinal, 0, result)
3473 }
3474 }
3475 };
3476 let memory = self.options_memory_mut(store, params.options);
3477 let ptr = func::validate_inbounds_dynamic(
3478 &CanonicalAbiInfo::POINTER_PAIR,
3479 memory,
3480 &ValRaw::u32(params.payload),
3481 )?;
3482 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3483 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3484 Ok(ordinal)
3485 }
3486
3487 pub(crate) fn subtask_cancel(
3489 self,
3490 store: &mut StoreOpaque,
3491 caller_instance: RuntimeComponentInstanceIndex,
3492 async_: bool,
3493 task_id: u32,
3494 ) -> Result<u32> {
3495 if !async_ {
3496 store.check_blocking()?;
3500 }
3501
3502 let (rep, is_host) = store
3503 .instance_state(RuntimeInstance {
3504 instance: self.id().instance(),
3505 index: caller_instance,
3506 })
3507 .handle_table()
3508 .subtask_rep(task_id)?;
3509 let (waitable, expected_caller) = if is_host {
3510 let id = TableId::<HostTask>::new(rep);
3511 (
3512 Waitable::Host(id),
3513 store.concurrent_state_mut().get_mut(id)?.caller,
3514 )
3515 } else {
3516 let id = TableId::<GuestTask>::new(rep);
3517 if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3518 (Waitable::Guest(id), thread)
3519 } else {
3520 unreachable!()
3521 }
3522 };
3523 let concurrent_state = store.concurrent_state_mut();
3527 assert_eq!(
3528 expected_caller,
3529 concurrent_state.unwrap_current_guest_thread(),
3530 );
3531
3532 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3533
3534 let needs_block;
3535 if let Waitable::Host(host_task) = waitable {
3536 let state = &mut concurrent_state.get_mut(host_task)?.state;
3537 match mem::replace(state, HostTaskState::CalleeDone) {
3538 HostTaskState::CalleeRunning(handle) => handle.abort(),
3541
3542 HostTaskState::CalleeDone => {
3545 bail!("`subtask.cancel` called after terminal status delivered");
3546 }
3547
3548 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => unreachable!(),
3551 }
3552
3553 needs_block = true;
3558 } else {
3559 let caller = concurrent_state.unwrap_current_guest_thread();
3560 let guest_task = TableId::<GuestTask>::new(rep);
3561 let task = concurrent_state.get_mut(guest_task)?;
3562 if !task.already_lowered_parameters() {
3563 task.lower_params = None;
3567 task.lift_result = None;
3568 task.exited = true;
3569
3570 let instance = task.instance;
3571
3572 assert_eq!(1, task.threads.len());
3573 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3574 let concurrent_state = store.concurrent_state_mut();
3575 concurrent_state.delete(thread)?;
3576 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3577
3578 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3580 let pending_count = pending.len();
3581 pending.retain(|thread, _| thread.task != guest_task);
3582 if pending.len() == pending_count {
3584 bail!("`subtask.cancel` called after terminal status delivered");
3585 }
3586 return Ok(Status::StartCancelled as u32);
3587 } else if !task.returned_or_cancelled() {
3588 task.cancel_sent = true;
3591 task.event = Some(Event::Cancelled);
3596 let runtime_instance = task.instance.index;
3597 for thread in task.threads.clone() {
3598 let thread = QualifiedThreadId {
3599 task: guest_task,
3600 thread,
3601 };
3602 if let Some(set) = concurrent_state
3603 .get_mut(thread.thread)
3604 .unwrap()
3605 .wake_on_cancel
3606 .take()
3607 {
3608 let item = match concurrent_state
3609 .get_mut(set)?
3610 .waiting
3611 .remove(&thread)
3612 .unwrap()
3613 {
3614 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3615 WaitMode::Callback(instance) => WorkItem::GuestCall(
3616 runtime_instance,
3617 GuestCall {
3618 thread,
3619 kind: GuestCallKind::DeliverEvent {
3620 instance,
3621 set: None,
3622 },
3623 },
3624 ),
3625 };
3626 concurrent_state.push_high_priority(item);
3627
3628 store.suspend(SuspendReason::Yielding {
3629 thread: caller,
3630 skip_may_block_check: false,
3633 })?;
3634 break;
3635 }
3636 }
3637
3638 needs_block = !store
3641 .concurrent_state_mut()
3642 .get_mut(guest_task)?
3643 .returned_or_cancelled()
3644 } else {
3645 needs_block = false;
3646 }
3647 };
3648
3649 if needs_block {
3653 if async_ {
3654 return Ok(BLOCKED);
3655 }
3656
3657 store.wait_for_event(waitable)?;
3661
3662 }
3664
3665 let event = waitable.take_event(store.concurrent_state_mut())?;
3666 if let Some(Event::Subtask {
3667 status: status @ (Status::Returned | Status::ReturnCancelled),
3668 }) = event
3669 {
3670 Ok(status as u32)
3671 } else {
3672 bail!("`subtask.cancel` called after terminal status delivered");
3673 }
3674 }
3675
3676 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3677 store.concurrent_state_mut().context_get(slot)
3678 }
3679
3680 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3681 store.concurrent_state_mut().context_set(slot, value)
3682 }
3683}
3684
3685pub trait VMComponentAsyncStore {
3693 unsafe fn prepare_call(
3699 &mut self,
3700 instance: Instance,
3701 memory: *mut VMMemoryDefinition,
3702 start: *mut VMFuncRef,
3703 return_: *mut VMFuncRef,
3704 caller_instance: RuntimeComponentInstanceIndex,
3705 callee_instance: RuntimeComponentInstanceIndex,
3706 task_return_type: TypeTupleIndex,
3707 callee_async: bool,
3708 string_encoding: u8,
3709 result_count: u32,
3710 storage: *mut ValRaw,
3711 storage_len: usize,
3712 ) -> Result<()>;
3713
3714 unsafe fn sync_start(
3717 &mut self,
3718 instance: Instance,
3719 callback: *mut VMFuncRef,
3720 callee: *mut VMFuncRef,
3721 param_count: u32,
3722 storage: *mut MaybeUninit<ValRaw>,
3723 storage_len: usize,
3724 ) -> Result<()>;
3725
3726 unsafe fn async_start(
3729 &mut self,
3730 instance: Instance,
3731 callback: *mut VMFuncRef,
3732 post_return: *mut VMFuncRef,
3733 callee: *mut VMFuncRef,
3734 param_count: u32,
3735 result_count: u32,
3736 flags: u32,
3737 ) -> Result<u32>;
3738
3739 fn future_write(
3741 &mut self,
3742 instance: Instance,
3743 caller: RuntimeComponentInstanceIndex,
3744 ty: TypeFutureTableIndex,
3745 options: OptionsIndex,
3746 future: u32,
3747 address: u32,
3748 ) -> Result<u32>;
3749
3750 fn future_read(
3752 &mut self,
3753 instance: Instance,
3754 caller: RuntimeComponentInstanceIndex,
3755 ty: TypeFutureTableIndex,
3756 options: OptionsIndex,
3757 future: u32,
3758 address: u32,
3759 ) -> Result<u32>;
3760
3761 fn future_drop_writable(
3763 &mut self,
3764 instance: Instance,
3765 ty: TypeFutureTableIndex,
3766 writer: u32,
3767 ) -> Result<()>;
3768
3769 fn stream_write(
3771 &mut self,
3772 instance: Instance,
3773 caller: RuntimeComponentInstanceIndex,
3774 ty: TypeStreamTableIndex,
3775 options: OptionsIndex,
3776 stream: u32,
3777 address: u32,
3778 count: u32,
3779 ) -> Result<u32>;
3780
3781 fn stream_read(
3783 &mut self,
3784 instance: Instance,
3785 caller: RuntimeComponentInstanceIndex,
3786 ty: TypeStreamTableIndex,
3787 options: OptionsIndex,
3788 stream: u32,
3789 address: u32,
3790 count: u32,
3791 ) -> Result<u32>;
3792
3793 fn flat_stream_write(
3796 &mut self,
3797 instance: Instance,
3798 caller: RuntimeComponentInstanceIndex,
3799 ty: TypeStreamTableIndex,
3800 options: OptionsIndex,
3801 payload_size: u32,
3802 payload_align: u32,
3803 stream: u32,
3804 address: u32,
3805 count: u32,
3806 ) -> Result<u32>;
3807
3808 fn flat_stream_read(
3811 &mut self,
3812 instance: Instance,
3813 caller: RuntimeComponentInstanceIndex,
3814 ty: TypeStreamTableIndex,
3815 options: OptionsIndex,
3816 payload_size: u32,
3817 payload_align: u32,
3818 stream: u32,
3819 address: u32,
3820 count: u32,
3821 ) -> Result<u32>;
3822
3823 fn stream_drop_writable(
3825 &mut self,
3826 instance: Instance,
3827 ty: TypeStreamTableIndex,
3828 writer: u32,
3829 ) -> Result<()>;
3830
3831 fn error_context_debug_message(
3833 &mut self,
3834 instance: Instance,
3835 ty: TypeComponentLocalErrorContextTableIndex,
3836 options: OptionsIndex,
3837 err_ctx_handle: u32,
3838 debug_msg_address: u32,
3839 ) -> Result<()>;
3840
3841 fn thread_new_indirect(
3843 &mut self,
3844 instance: Instance,
3845 caller: RuntimeComponentInstanceIndex,
3846 func_ty_idx: TypeFuncIndex,
3847 start_func_table_idx: RuntimeTableIndex,
3848 start_func_idx: u32,
3849 context: i32,
3850 ) -> Result<u32>;
3851}
3852
3853impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3855 unsafe fn prepare_call(
3856 &mut self,
3857 instance: Instance,
3858 memory: *mut VMMemoryDefinition,
3859 start: *mut VMFuncRef,
3860 return_: *mut VMFuncRef,
3861 caller_instance: RuntimeComponentInstanceIndex,
3862 callee_instance: RuntimeComponentInstanceIndex,
3863 task_return_type: TypeTupleIndex,
3864 callee_async: bool,
3865 string_encoding: u8,
3866 result_count_or_max_if_async: u32,
3867 storage: *mut ValRaw,
3868 storage_len: usize,
3869 ) -> Result<()> {
3870 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3874
3875 unsafe {
3876 instance.prepare_call(
3877 StoreContextMut(self),
3878 start,
3879 return_,
3880 caller_instance,
3881 callee_instance,
3882 task_return_type,
3883 callee_async,
3884 memory,
3885 string_encoding,
3886 match result_count_or_max_if_async {
3887 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3888 params,
3889 has_result: false,
3890 },
3891 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3892 params,
3893 has_result: true,
3894 },
3895 result_count => CallerInfo::Sync {
3896 params,
3897 result_count,
3898 },
3899 },
3900 )
3901 }
3902 }
3903
3904 unsafe fn sync_start(
3905 &mut self,
3906 instance: Instance,
3907 callback: *mut VMFuncRef,
3908 callee: *mut VMFuncRef,
3909 param_count: u32,
3910 storage: *mut MaybeUninit<ValRaw>,
3911 storage_len: usize,
3912 ) -> Result<()> {
3913 unsafe {
3914 instance
3915 .start_call(
3916 StoreContextMut(self),
3917 callback,
3918 ptr::null_mut(),
3919 callee,
3920 param_count,
3921 1,
3922 START_FLAG_ASYNC_CALLEE,
3923 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3927 )
3928 .map(drop)
3929 }
3930 }
3931
3932 unsafe fn async_start(
3933 &mut self,
3934 instance: Instance,
3935 callback: *mut VMFuncRef,
3936 post_return: *mut VMFuncRef,
3937 callee: *mut VMFuncRef,
3938 param_count: u32,
3939 result_count: u32,
3940 flags: u32,
3941 ) -> Result<u32> {
3942 unsafe {
3943 instance.start_call(
3944 StoreContextMut(self),
3945 callback,
3946 post_return,
3947 callee,
3948 param_count,
3949 result_count,
3950 flags,
3951 None,
3952 )
3953 }
3954 }
3955
3956 fn future_write(
3957 &mut self,
3958 instance: Instance,
3959 caller: RuntimeComponentInstanceIndex,
3960 ty: TypeFutureTableIndex,
3961 options: OptionsIndex,
3962 future: u32,
3963 address: u32,
3964 ) -> Result<u32> {
3965 instance
3966 .guest_write(
3967 StoreContextMut(self),
3968 caller,
3969 TransmitIndex::Future(ty),
3970 options,
3971 None,
3972 future,
3973 address,
3974 1,
3975 )
3976 .map(|result| result.encode())
3977 }
3978
3979 fn future_read(
3980 &mut self,
3981 instance: Instance,
3982 caller: RuntimeComponentInstanceIndex,
3983 ty: TypeFutureTableIndex,
3984 options: OptionsIndex,
3985 future: u32,
3986 address: u32,
3987 ) -> Result<u32> {
3988 instance
3989 .guest_read(
3990 StoreContextMut(self),
3991 caller,
3992 TransmitIndex::Future(ty),
3993 options,
3994 None,
3995 future,
3996 address,
3997 1,
3998 )
3999 .map(|result| result.encode())
4000 }
4001
4002 fn stream_write(
4003 &mut self,
4004 instance: Instance,
4005 caller: RuntimeComponentInstanceIndex,
4006 ty: TypeStreamTableIndex,
4007 options: OptionsIndex,
4008 stream: u32,
4009 address: u32,
4010 count: u32,
4011 ) -> Result<u32> {
4012 instance
4013 .guest_write(
4014 StoreContextMut(self),
4015 caller,
4016 TransmitIndex::Stream(ty),
4017 options,
4018 None,
4019 stream,
4020 address,
4021 count,
4022 )
4023 .map(|result| result.encode())
4024 }
4025
4026 fn stream_read(
4027 &mut self,
4028 instance: Instance,
4029 caller: RuntimeComponentInstanceIndex,
4030 ty: TypeStreamTableIndex,
4031 options: OptionsIndex,
4032 stream: u32,
4033 address: u32,
4034 count: u32,
4035 ) -> Result<u32> {
4036 instance
4037 .guest_read(
4038 StoreContextMut(self),
4039 caller,
4040 TransmitIndex::Stream(ty),
4041 options,
4042 None,
4043 stream,
4044 address,
4045 count,
4046 )
4047 .map(|result| result.encode())
4048 }
4049
4050 fn future_drop_writable(
4051 &mut self,
4052 instance: Instance,
4053 ty: TypeFutureTableIndex,
4054 writer: u32,
4055 ) -> Result<()> {
4056 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4057 }
4058
4059 fn flat_stream_write(
4060 &mut self,
4061 instance: Instance,
4062 caller: RuntimeComponentInstanceIndex,
4063 ty: TypeStreamTableIndex,
4064 options: OptionsIndex,
4065 payload_size: u32,
4066 payload_align: u32,
4067 stream: u32,
4068 address: u32,
4069 count: u32,
4070 ) -> Result<u32> {
4071 instance
4072 .guest_write(
4073 StoreContextMut(self),
4074 caller,
4075 TransmitIndex::Stream(ty),
4076 options,
4077 Some(FlatAbi {
4078 size: payload_size,
4079 align: payload_align,
4080 }),
4081 stream,
4082 address,
4083 count,
4084 )
4085 .map(|result| result.encode())
4086 }
4087
4088 fn flat_stream_read(
4089 &mut self,
4090 instance: Instance,
4091 caller: RuntimeComponentInstanceIndex,
4092 ty: TypeStreamTableIndex,
4093 options: OptionsIndex,
4094 payload_size: u32,
4095 payload_align: u32,
4096 stream: u32,
4097 address: u32,
4098 count: u32,
4099 ) -> Result<u32> {
4100 instance
4101 .guest_read(
4102 StoreContextMut(self),
4103 caller,
4104 TransmitIndex::Stream(ty),
4105 options,
4106 Some(FlatAbi {
4107 size: payload_size,
4108 align: payload_align,
4109 }),
4110 stream,
4111 address,
4112 count,
4113 )
4114 .map(|result| result.encode())
4115 }
4116
4117 fn stream_drop_writable(
4118 &mut self,
4119 instance: Instance,
4120 ty: TypeStreamTableIndex,
4121 writer: u32,
4122 ) -> Result<()> {
4123 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4124 }
4125
4126 fn error_context_debug_message(
4127 &mut self,
4128 instance: Instance,
4129 ty: TypeComponentLocalErrorContextTableIndex,
4130 options: OptionsIndex,
4131 err_ctx_handle: u32,
4132 debug_msg_address: u32,
4133 ) -> Result<()> {
4134 instance.error_context_debug_message(
4135 StoreContextMut(self),
4136 ty,
4137 options,
4138 err_ctx_handle,
4139 debug_msg_address,
4140 )
4141 }
4142
4143 fn thread_new_indirect(
4144 &mut self,
4145 instance: Instance,
4146 caller: RuntimeComponentInstanceIndex,
4147 func_ty_idx: TypeFuncIndex,
4148 start_func_table_idx: RuntimeTableIndex,
4149 start_func_idx: u32,
4150 context: i32,
4151 ) -> Result<u32> {
4152 instance.thread_new_indirect(
4153 StoreContextMut(self),
4154 caller,
4155 func_ty_idx,
4156 start_func_table_idx,
4157 start_func_idx,
4158 context,
4159 )
4160 }
4161}
4162
4163type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4164
4165struct HostTask {
4169 common: WaitableCommon,
4170
4171 caller: QualifiedThreadId,
4173
4174 call_context: CallContext,
4177
4178 state: HostTaskState,
4179}
4180
4181enum HostTaskState {
4182 CalleeStarted,
4187
4188 CalleeRunning(JoinHandle),
4193
4194 CalleeFinished(LiftedResult),
4198
4199 CalleeDone,
4202}
4203
4204impl HostTask {
4205 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4206 Self {
4207 common: WaitableCommon::default(),
4208 call_context: CallContext::default(),
4209 caller,
4210 state,
4211 }
4212 }
4213}
4214
4215impl TableDebug for HostTask {
4216 fn type_name() -> &'static str {
4217 "HostTask"
4218 }
4219}
4220
4221type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4222
4223enum Caller {
4225 Host {
4227 tx: Option<oneshot::Sender<LiftedResult>>,
4229 host_future_present: bool,
4232 caller: CurrentThread,
4236 },
4237 Guest {
4239 thread: QualifiedThreadId,
4241 },
4242}
4243
4244struct LiftResult {
4247 lift: RawLift,
4248 ty: TypeTupleIndex,
4249 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4250 string_encoding: StringEncoding,
4251}
4252
4253#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4258struct QualifiedThreadId {
4259 task: TableId<GuestTask>,
4260 thread: TableId<GuestThread>,
4261}
4262
4263impl QualifiedThreadId {
4264 fn qualify(
4265 state: &mut ConcurrentState,
4266 thread: TableId<GuestThread>,
4267 ) -> Result<QualifiedThreadId> {
4268 Ok(QualifiedThreadId {
4269 task: state.get_mut(thread)?.parent_task,
4270 thread,
4271 })
4272 }
4273}
4274
4275impl fmt::Debug for QualifiedThreadId {
4276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4277 f.debug_tuple("QualifiedThreadId")
4278 .field(&self.task.rep())
4279 .field(&self.thread.rep())
4280 .finish()
4281 }
4282}
4283
4284enum GuestThreadState {
4285 NotStartedImplicit,
4286 NotStartedExplicit(
4287 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4288 ),
4289 Running,
4290 Suspended(StoreFiber<'static>),
4291 Ready(StoreFiber<'static>),
4292 Completed,
4293}
4294pub struct GuestThread {
4295 context: [u32; 2],
4298 parent_task: TableId<GuestTask>,
4300 wake_on_cancel: Option<TableId<WaitableSet>>,
4303 state: GuestThreadState,
4305 instance_rep: Option<u32>,
4308}
4309
4310impl GuestThread {
4311 fn from_instance(
4314 state: Pin<&mut ComponentInstance>,
4315 caller_instance: RuntimeComponentInstanceIndex,
4316 guest_thread: u32,
4317 ) -> Result<TableId<Self>> {
4318 let rep = state.instance_states().0[caller_instance]
4319 .thread_handle_table()
4320 .guest_thread_rep(guest_thread)?;
4321 Ok(TableId::new(rep))
4322 }
4323
4324 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4325 Self {
4326 context: [0; 2],
4327 parent_task,
4328 wake_on_cancel: None,
4329 state: GuestThreadState::NotStartedImplicit,
4330 instance_rep: None,
4331 }
4332 }
4333
4334 fn new_explicit(
4335 parent_task: TableId<GuestTask>,
4336 start_func: Box<
4337 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4338 >,
4339 ) -> Self {
4340 Self {
4341 context: [0; 2],
4342 parent_task,
4343 wake_on_cancel: None,
4344 state: GuestThreadState::NotStartedExplicit(start_func),
4345 instance_rep: None,
4346 }
4347 }
4348}
4349
4350impl TableDebug for GuestThread {
4351 fn type_name() -> &'static str {
4352 "GuestThread"
4353 }
4354}
4355
4356enum SyncResult {
4357 NotProduced,
4358 Produced(Option<ValRaw>),
4359 Taken,
4360}
4361
4362impl SyncResult {
4363 fn take(&mut self) -> Option<Option<ValRaw>> {
4364 match mem::replace(self, SyncResult::Taken) {
4365 SyncResult::NotProduced => None,
4366 SyncResult::Produced(val) => Some(val),
4367 SyncResult::Taken => {
4368 panic!("attempted to take a synchronous result that was already taken")
4369 }
4370 }
4371 }
4372}
4373
4374#[derive(Debug)]
4375enum HostFutureState {
4376 NotApplicable,
4377 Live,
4378 Dropped,
4379}
4380
4381pub(crate) struct GuestTask {
4383 common: WaitableCommon,
4385 lower_params: Option<RawLower>,
4387 lift_result: Option<LiftResult>,
4389 result: Option<LiftedResult>,
4392 callback: Option<CallbackFn>,
4395 caller: Caller,
4397 call_context: CallContext,
4402 sync_result: SyncResult,
4405 cancel_sent: bool,
4408 starting_sent: bool,
4411 sync_call_set: TableId<WaitableSet>,
4413 instance: RuntimeInstance,
4420 event: Option<Event>,
4423 exited: bool,
4425 threads: HashSet<TableId<GuestThread>>,
4427 host_future_state: HostFutureState,
4430 async_function: bool,
4433}
4434
4435impl GuestTask {
4436 fn already_lowered_parameters(&self) -> bool {
4437 self.lower_params.is_none()
4439 }
4440
4441 fn returned_or_cancelled(&self) -> bool {
4442 self.lift_result.is_none()
4444 }
4445
4446 fn ready_to_delete(&self) -> bool {
4447 let threads_completed = self.threads.is_empty();
4448 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4449 let pending_completion_event = matches!(
4450 self.common.event,
4451 Some(Event::Subtask {
4452 status: Status::Returned | Status::ReturnCancelled
4453 })
4454 );
4455 let ready = threads_completed
4456 && !has_sync_result
4457 && !pending_completion_event
4458 && !matches!(self.host_future_state, HostFutureState::Live);
4459 log::trace!(
4460 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4461 threads_completed,
4462 has_sync_result,
4463 pending_completion_event,
4464 self.host_future_state
4465 );
4466 ready
4467 }
4468
4469 fn new(
4470 state: &mut ConcurrentState,
4471 lower_params: RawLower,
4472 lift_result: LiftResult,
4473 caller: Caller,
4474 callback: Option<CallbackFn>,
4475 instance: RuntimeInstance,
4476 async_function: bool,
4477 ) -> Result<Self> {
4478 let sync_call_set = state.push(WaitableSet::default())?;
4479 let host_future_state = match &caller {
4480 Caller::Guest { .. } => HostFutureState::NotApplicable,
4481 Caller::Host {
4482 host_future_present,
4483 ..
4484 } => {
4485 if *host_future_present {
4486 HostFutureState::Live
4487 } else {
4488 HostFutureState::NotApplicable
4489 }
4490 }
4491 };
4492 Ok(Self {
4493 common: WaitableCommon::default(),
4494 lower_params: Some(lower_params),
4495 lift_result: Some(lift_result),
4496 result: None,
4497 callback,
4498 caller,
4499 call_context: CallContext::default(),
4500 sync_result: SyncResult::NotProduced,
4501 cancel_sent: false,
4502 starting_sent: false,
4503 sync_call_set,
4504 instance,
4505 event: None,
4506 exited: false,
4507 threads: HashSet::new(),
4508 host_future_state,
4509 async_function,
4510 })
4511 }
4512
4513 fn dispose(self, state: &mut ConcurrentState) -> Result<()> {
4516 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4519 if let Some(Event::Subtask {
4520 status: Status::Returned | Status::ReturnCancelled,
4521 }) = waitable.common(state)?.event
4522 {
4523 waitable.delete_from(state)?;
4524 }
4525 }
4526
4527 assert!(self.threads.is_empty());
4528
4529 state.delete(self.sync_call_set)?;
4530
4531 Ok(())
4532 }
4533}
4534
4535impl TableDebug for GuestTask {
4536 fn type_name() -> &'static str {
4537 "GuestTask"
4538 }
4539}
4540
4541#[derive(Default)]
4543struct WaitableCommon {
4544 event: Option<Event>,
4546 set: Option<TableId<WaitableSet>>,
4548 handle: Option<u32>,
4550}
4551
4552#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4554enum Waitable {
4555 Host(TableId<HostTask>),
4557 Guest(TableId<GuestTask>),
4559 Transmit(TableId<TransmitHandle>),
4561}
4562
4563impl Waitable {
4564 fn from_instance(
4567 state: Pin<&mut ComponentInstance>,
4568 caller_instance: RuntimeComponentInstanceIndex,
4569 waitable: u32,
4570 ) -> Result<Self> {
4571 use crate::runtime::vm::component::Waitable;
4572
4573 let (waitable, kind) = state.instance_states().0[caller_instance]
4574 .handle_table()
4575 .waitable_rep(waitable)?;
4576
4577 Ok(match kind {
4578 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4579 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4580 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4581 })
4582 }
4583
4584 fn rep(&self) -> u32 {
4586 match self {
4587 Self::Host(id) => id.rep(),
4588 Self::Guest(id) => id.rep(),
4589 Self::Transmit(id) => id.rep(),
4590 }
4591 }
4592
4593 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4597 log::trace!("waitable {self:?} join set {set:?}",);
4598
4599 let old = mem::replace(&mut self.common(state)?.set, set);
4600
4601 if let Some(old) = old {
4602 match *self {
4603 Waitable::Host(id) => state.remove_child(id, old),
4604 Waitable::Guest(id) => state.remove_child(id, old),
4605 Waitable::Transmit(id) => state.remove_child(id, old),
4606 }?;
4607
4608 state.get_mut(old)?.ready.remove(self);
4609 }
4610
4611 if let Some(set) = set {
4612 match *self {
4613 Waitable::Host(id) => state.add_child(id, set),
4614 Waitable::Guest(id) => state.add_child(id, set),
4615 Waitable::Transmit(id) => state.add_child(id, set),
4616 }?;
4617
4618 if self.common(state)?.event.is_some() {
4619 self.mark_ready(state)?;
4620 }
4621 }
4622
4623 Ok(())
4624 }
4625
4626 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4628 Ok(match self {
4629 Self::Host(id) => &mut state.get_mut(*id)?.common,
4630 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4631 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4632 })
4633 }
4634
4635 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4639 log::trace!("set event for {self:?}: {event:?}");
4640 self.common(state)?.event = event;
4641 self.mark_ready(state)
4642 }
4643
4644 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4646 let common = self.common(state)?;
4647 let event = common.event.take();
4648 if let Some(set) = self.common(state)?.set {
4649 state.get_mut(set)?.ready.remove(self);
4650 }
4651
4652 Ok(event)
4653 }
4654
4655 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4659 if let Some(set) = self.common(state)?.set {
4660 state.get_mut(set)?.ready.insert(*self);
4661 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4662 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4663 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4664
4665 let item = match mode {
4666 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4667 WaitMode::Callback(instance) => WorkItem::GuestCall(
4668 state.get_mut(thread.task)?.instance.index,
4669 GuestCall {
4670 thread,
4671 kind: GuestCallKind::DeliverEvent {
4672 instance,
4673 set: Some(set),
4674 },
4675 },
4676 ),
4677 };
4678 state.push_high_priority(item);
4679 }
4680 }
4681 Ok(())
4682 }
4683
4684 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4686 match self {
4687 Self::Host(task) => {
4688 log::trace!("delete host task {task:?}");
4689 state.delete(*task)?;
4690 }
4691 Self::Guest(task) => {
4692 log::trace!("delete guest task {task:?}");
4693 state.delete(*task)?.dispose(state)?;
4694 }
4695 Self::Transmit(task) => {
4696 state.delete(*task)?;
4697 }
4698 }
4699
4700 Ok(())
4701 }
4702}
4703
4704impl fmt::Debug for Waitable {
4705 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4706 match self {
4707 Self::Host(id) => write!(f, "{id:?}"),
4708 Self::Guest(id) => write!(f, "{id:?}"),
4709 Self::Transmit(id) => write!(f, "{id:?}"),
4710 }
4711 }
4712}
4713
4714#[derive(Default)]
4716struct WaitableSet {
4717 ready: BTreeSet<Waitable>,
4719 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4721}
4722
4723impl TableDebug for WaitableSet {
4724 fn type_name() -> &'static str {
4725 "WaitableSet"
4726 }
4727}
4728
4729type RawLower =
4731 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4732
4733type RawLift = Box<
4735 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4736>;
4737
4738type LiftedResult = Box<dyn Any + Send + Sync>;
4742
4743struct DummyResult;
4746
4747#[derive(Default)]
4749pub struct ConcurrentInstanceState {
4750 backpressure: u16,
4752 do_not_enter: bool,
4754 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4757}
4758
4759impl ConcurrentInstanceState {
4760 pub fn pending_is_empty(&self) -> bool {
4761 self.pending.is_empty()
4762 }
4763}
4764
4765#[derive(Debug, Copy, Clone)]
4766enum CurrentThread {
4767 Guest(QualifiedThreadId),
4768 Host(TableId<HostTask>),
4769 None,
4770}
4771
4772impl CurrentThread {
4773 fn guest(&self) -> Option<&QualifiedThreadId> {
4774 match self {
4775 Self::Guest(id) => Some(id),
4776 _ => None,
4777 }
4778 }
4779
4780 fn host(&self) -> Option<TableId<HostTask>> {
4781 match self {
4782 Self::Host(id) => Some(*id),
4783 _ => None,
4784 }
4785 }
4786
4787 fn is_none(&self) -> bool {
4788 matches!(self, Self::None)
4789 }
4790}
4791
4792impl From<QualifiedThreadId> for CurrentThread {
4793 fn from(id: QualifiedThreadId) -> Self {
4794 Self::Guest(id)
4795 }
4796}
4797
4798impl From<TableId<HostTask>> for CurrentThread {
4799 fn from(id: TableId<HostTask>) -> Self {
4800 Self::Host(id)
4801 }
4802}
4803
4804pub struct ConcurrentState {
4806 current_thread: CurrentThread,
4808
4809 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4814 table: AlwaysMut<ResourceTable>,
4816 high_priority: Vec<WorkItem>,
4818 low_priority: VecDeque<WorkItem>,
4820 suspend_reason: Option<SuspendReason>,
4824 worker: Option<StoreFiber<'static>>,
4828 worker_item: Option<WorkerItem>,
4830
4831 global_error_context_ref_counts:
4844 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4845}
4846
4847impl Default for ConcurrentState {
4848 fn default() -> Self {
4849 Self {
4850 current_thread: CurrentThread::None,
4851 table: AlwaysMut::new(ResourceTable::new()),
4852 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4853 high_priority: Vec::new(),
4854 low_priority: VecDeque::new(),
4855 suspend_reason: None,
4856 worker: None,
4857 worker_item: None,
4858 global_error_context_ref_counts: BTreeMap::new(),
4859 }
4860 }
4861}
4862
4863impl ConcurrentState {
4864 pub(crate) fn take_fibers_and_futures(
4881 &mut self,
4882 fibers: &mut Vec<StoreFiber<'static>>,
4883 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4884 ) {
4885 for entry in self.table.get_mut().iter_mut() {
4886 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4887 for mode in mem::take(&mut set.waiting).into_values() {
4888 if let WaitMode::Fiber(fiber) = mode {
4889 fibers.push(fiber);
4890 }
4891 }
4892 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4893 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4894 mem::replace(&mut thread.state, GuestThreadState::Completed)
4895 {
4896 fibers.push(fiber);
4897 }
4898 }
4899 }
4900
4901 if let Some(fiber) = self.worker.take() {
4902 fibers.push(fiber);
4903 }
4904
4905 let mut handle_item = |item| match item {
4906 WorkItem::ResumeFiber(fiber) => {
4907 fibers.push(fiber);
4908 }
4909 WorkItem::PushFuture(future) => {
4910 self.futures
4911 .get_mut()
4912 .as_mut()
4913 .unwrap()
4914 .push(future.into_inner());
4915 }
4916 _ => {}
4917 };
4918
4919 for item in mem::take(&mut self.high_priority) {
4920 handle_item(item);
4921 }
4922 for item in mem::take(&mut self.low_priority) {
4923 handle_item(item);
4924 }
4925
4926 if let Some(them) = self.futures.get_mut().take() {
4927 futures.push(them);
4928 }
4929 }
4930
4931 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4935 let mut ready = mem::take(&mut self.high_priority);
4936 if ready.is_empty() {
4937 if let Some(item) = self.low_priority.pop_back() {
4938 ready.push(item);
4939 }
4940 }
4941 ready
4942 }
4943
4944 fn push<V: Send + Sync + 'static>(
4945 &mut self,
4946 value: V,
4947 ) -> Result<TableId<V>, ResourceTableError> {
4948 self.table.get_mut().push(value).map(TableId::from)
4949 }
4950
4951 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4952 self.table.get_mut().get_mut(&Resource::from(id))
4953 }
4954
4955 pub fn add_child<T: 'static, U: 'static>(
4956 &mut self,
4957 child: TableId<T>,
4958 parent: TableId<U>,
4959 ) -> Result<(), ResourceTableError> {
4960 self.table
4961 .get_mut()
4962 .add_child(Resource::from(child), Resource::from(parent))
4963 }
4964
4965 pub fn remove_child<T: 'static, U: 'static>(
4966 &mut self,
4967 child: TableId<T>,
4968 parent: TableId<U>,
4969 ) -> Result<(), ResourceTableError> {
4970 self.table
4971 .get_mut()
4972 .remove_child(Resource::from(child), Resource::from(parent))
4973 }
4974
4975 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4976 self.table.get_mut().delete(Resource::from(id))
4977 }
4978
4979 fn push_future(&mut self, future: HostTaskFuture) {
4980 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4987 }
4988
4989 fn push_high_priority(&mut self, item: WorkItem) {
4990 log::trace!("push high priority: {item:?}");
4991 self.high_priority.push(item);
4992 }
4993
4994 fn push_low_priority(&mut self, item: WorkItem) {
4995 log::trace!("push low priority: {item:?}");
4996 self.low_priority.push_front(item);
4997 }
4998
4999 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5000 if high_priority {
5001 self.push_high_priority(item);
5002 } else {
5003 self.push_low_priority(item);
5004 }
5005 }
5006
5007 fn promote_instance_local_thread_work_item(
5008 &mut self,
5009 current_instance: RuntimeComponentInstanceIndex,
5010 ) -> bool {
5011 self.promote_work_items_matching(|item: &WorkItem| match item {
5012 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5013 *instance == current_instance
5014 }
5015 _ => false,
5016 })
5017 }
5018
5019 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5020 self.promote_work_items_matching(|item: &WorkItem| match item {
5021 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5022 *t == thread
5023 }
5024 _ => false,
5025 })
5026 }
5027
5028 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5029 where
5030 F: FnMut(&WorkItem) -> bool,
5031 {
5032 if self.high_priority.iter().any(&mut predicate) {
5036 true
5037 }
5038 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5041 let item = self.low_priority.remove(idx).unwrap();
5042 self.push_high_priority(item);
5043 true
5044 } else {
5045 false
5046 }
5047 }
5048
5049 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5051 let thread = self.unwrap_current_guest_thread();
5052 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
5053 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5054 Ok(val)
5055 }
5056
5057 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5059 let thread = self.unwrap_current_guest_thread();
5060 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5061 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
5062 Ok(())
5063 }
5064
5065 fn take_pending_cancellation(&mut self) -> bool {
5068 let thread = self.unwrap_current_guest_thread();
5069 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
5070 assert!(matches!(event, Event::Cancelled));
5071 true
5072 } else {
5073 false
5074 }
5075 }
5076
5077 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5078 if self.may_block(task) {
5079 Ok(())
5080 } else {
5081 Err(Trap::CannotBlockSyncTask.into())
5082 }
5083 }
5084
5085 fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5086 let task = self.get_mut(task).unwrap();
5087 task.async_function || task.returned_or_cancelled()
5088 }
5089
5090 pub fn call_context(&mut self, task: u32) -> &mut CallContext {
5096 let (task, is_host) = (task >> 1, task & 1 == 1);
5097 if is_host {
5098 let task: TableId<HostTask> = TableId::new(task);
5099 &mut self.get_mut(task).unwrap().call_context
5100 } else {
5101 let task: TableId<GuestTask> = TableId::new(task);
5102 &mut self.get_mut(task).unwrap().call_context
5103 }
5104 }
5105
5106 pub fn current_call_context_scope_id(&self) -> u32 {
5109 let (bits, is_host) = match self.current_thread {
5110 CurrentThread::Guest(id) => (id.task.rep(), false),
5111 CurrentThread::Host(id) => (id.rep(), true),
5112 CurrentThread::None => unreachable!(),
5113 };
5114 assert_eq!((bits << 1) >> 1, bits);
5115 (bits << 1) | u32::from(is_host)
5116 }
5117
5118 fn unwrap_current_guest_thread(&self) -> QualifiedThreadId {
5119 *self.current_thread.guest().unwrap()
5120 }
5121
5122 fn unwrap_current_host_thread(&self) -> TableId<HostTask> {
5123 self.current_thread.host().unwrap()
5124 }
5125}
5126
5127fn for_any_lower<
5130 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5131>(
5132 fun: F,
5133) -> F {
5134 fun
5135}
5136
5137fn for_any_lift<
5139 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5140>(
5141 fun: F,
5142) -> F {
5143 fun
5144}
5145
5146fn checked<F: Future + Send + 'static>(
5151 id: StoreId,
5152 fut: F,
5153) -> impl Future<Output = F::Output> + Send + 'static {
5154 async move {
5155 let mut fut = pin!(fut);
5156 future::poll_fn(move |cx| {
5157 let message = "\
5158 `Future`s which depend on asynchronous component tasks, streams, or \
5159 futures to complete may only be polled from the event loop of the \
5160 store to which they belong. Please use \
5161 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5162 ";
5163 tls::try_get(|store| {
5164 let matched = match store {
5165 tls::TryGet::Some(store) => store.id() == id,
5166 tls::TryGet::Taken | tls::TryGet::None => false,
5167 };
5168
5169 if !matched {
5170 panic!("{message}")
5171 }
5172 });
5173 fut.as_mut().poll(cx)
5174 })
5175 .await
5176 }
5177}
5178
5179fn check_recursive_run() {
5182 tls::try_get(|store| {
5183 if !matches!(store, tls::TryGet::None) {
5184 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5185 }
5186 });
5187}
5188
5189fn unpack_callback_code(code: u32) -> (u32, u32) {
5190 (code & 0xF, code >> 4)
5191}
5192
5193struct WaitableCheckParams {
5197 set: TableId<WaitableSet>,
5198 options: OptionsIndex,
5199 payload: u32,
5200}
5201
5202enum WaitableCheck {
5205 Wait,
5206 Poll,
5207}
5208
5209pub(crate) struct PreparedCall<R> {
5211 handle: Func,
5213 thread: QualifiedThreadId,
5215 param_count: usize,
5217 rx: oneshot::Receiver<LiftedResult>,
5220 _phantom: PhantomData<R>,
5221}
5222
5223impl<R> PreparedCall<R> {
5224 pub(crate) fn task_id(&self) -> TaskId {
5226 TaskId {
5227 task: self.thread.task,
5228 }
5229 }
5230}
5231
5232pub(crate) struct TaskId {
5234 task: TableId<GuestTask>,
5235}
5236
5237impl TaskId {
5238 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5244 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5245 if !task.already_lowered_parameters() {
5246 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5247 } else {
5248 task.host_future_state = HostFutureState::Dropped;
5249 if task.ready_to_delete() {
5250 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5251 }
5252 }
5253 Ok(())
5254 }
5255}
5256
5257pub(crate) fn prepare_call<T, R>(
5263 mut store: StoreContextMut<T>,
5264 handle: Func,
5265 param_count: usize,
5266 host_future_present: bool,
5267 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5268 + Send
5269 + Sync
5270 + 'static,
5271 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5272 + Send
5273 + Sync
5274 + 'static,
5275) -> Result<PreparedCall<R>> {
5276 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5277
5278 let instance = handle.instance().id().get(store.0);
5279 let options = &instance.component().env_component().options[options];
5280 let ty = &instance.component().types()[ty];
5281 let async_function = ty.async_;
5282 let task_return_type = ty.results;
5283 let component_instance = raw_options.instance;
5284 let callback = options.callback.map(|i| instance.runtime_callback(i));
5285 let memory = options
5286 .memory()
5287 .map(|i| instance.runtime_memory(i))
5288 .map(SendSyncPtr::new);
5289 let string_encoding = options.string_encoding;
5290 let token = StoreToken::new(store.as_context_mut());
5291 let state = store.0.concurrent_state_mut();
5292
5293 let (tx, rx) = oneshot::channel();
5294
5295 let instance = RuntimeInstance {
5296 instance: handle.instance().id().instance(),
5297 index: component_instance,
5298 };
5299 let caller = state.current_thread;
5300 let task = GuestTask::new(
5301 state,
5302 Box::new(for_any_lower(move |store, params| {
5303 lower_params(handle, token.as_context_mut(store), params)
5304 })),
5305 LiftResult {
5306 lift: Box::new(for_any_lift(move |store, result| {
5307 lift_result(handle, store, result)
5308 })),
5309 ty: task_return_type,
5310 memory,
5311 string_encoding,
5312 },
5313 Caller::Host {
5314 tx: Some(tx),
5315 host_future_present,
5316 caller,
5317 },
5318 callback.map(|callback| {
5319 let callback = SendSyncPtr::new(callback);
5320 let instance = handle.instance();
5321 Box::new(move |store: &mut dyn VMStore, event, handle| {
5322 let store = token.as_context_mut(store);
5323 unsafe { instance.call_callback(store, callback, event, handle) }
5326 }) as CallbackFn
5327 }),
5328 instance,
5329 async_function,
5330 )?;
5331
5332 let task = state.push(task)?;
5333 let thread = state.push(GuestThread::new_implicit(task))?;
5334 state.get_mut(task)?.threads.insert(thread);
5335
5336 if !store.0.may_enter(instance) {
5337 bail!(crate::Trap::CannotEnterComponent);
5338 }
5339
5340 Ok(PreparedCall {
5341 handle,
5342 thread: QualifiedThreadId { task, thread },
5343 param_count,
5344 rx,
5345 _phantom: PhantomData,
5346 })
5347}
5348
5349pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5356 mut store: StoreContextMut<T>,
5357 prepared: PreparedCall<R>,
5358) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5359 let PreparedCall {
5360 handle,
5361 thread,
5362 param_count,
5363 rx,
5364 ..
5365 } = prepared;
5366
5367 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5368
5369 Ok(checked(
5370 store.0.id(),
5371 rx.map(move |result| {
5372 result
5373 .map(|v| *v.downcast().unwrap())
5374 .map_err(crate::Error::from)
5375 }),
5376 ))
5377}
5378
5379fn queue_call0<T: 'static>(
5382 store: StoreContextMut<T>,
5383 handle: Func,
5384 guest_thread: QualifiedThreadId,
5385 param_count: usize,
5386) -> Result<()> {
5387 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5388 let is_concurrent = raw_options.async_;
5389 let callback = raw_options.callback;
5390 let instance = handle.instance();
5391 let callee = handle.lifted_core_func(store.0);
5392 let post_return = handle.post_return_core_func(store.0);
5393 let callback = callback.map(|i| {
5394 let instance = instance.id().get(store.0);
5395 SendSyncPtr::new(instance.runtime_callback(i))
5396 });
5397
5398 log::trace!("queueing call {guest_thread:?}");
5399
5400 unsafe {
5404 instance.queue_call(
5405 store,
5406 guest_thread,
5407 SendSyncPtr::new(callee),
5408 param_count,
5409 1,
5410 is_concurrent,
5411 callback,
5412 post_return.map(SendSyncPtr::new),
5413 )
5414 }
5415}