1use crate::component::func::{self, Func};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables};
58use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
59use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
60use anyhow::{Context as _, Result, anyhow, bail};
61use error_contexts::GlobalErrorContextRefCount;
62use futures::channel::oneshot;
63use futures::future::{self, Either, FutureExt};
64use futures::stream::{FuturesUnordered, StreamExt};
65use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
66use std::any::Any;
67use std::borrow::ToOwned;
68use std::boxed::Box;
69use std::cell::UnsafeCell;
70use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
71use std::fmt;
72use std::future::Future;
73use std::marker::PhantomData;
74use std::mem::{self, ManuallyDrop, MaybeUninit};
75use std::ops::DerefMut;
76use std::pin::{Pin, pin};
77use std::ptr::{self, NonNull};
78use std::slice;
79use std::sync::Arc;
80use std::task::{Context, Poll, Waker};
81use std::vec::Vec;
82use table::{TableDebug, TableId};
83use wasmtime_environ::Trap;
84use wasmtime_environ::component::{
85 CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
86 OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
87 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
88 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
89 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
90};
91
92pub use abort::JoinHandle;
93pub use futures_and_streams::{
94 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
95 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
96 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
97};
98pub(crate) use futures_and_streams::{
99 ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
100};
101
102mod abort;
103mod error_contexts;
104mod futures_and_streams;
105pub(crate) mod table;
106pub(crate) mod tls;
107
108const BLOCKED: u32 = 0xffff_ffff;
111
112#[derive(Clone, Copy, Eq, PartialEq, Debug)]
114pub enum Status {
115 Starting = 0,
116 Started = 1,
117 Returned = 2,
118 StartCancelled = 3,
119 ReturnCancelled = 4,
120}
121
122impl Status {
123 pub fn pack(self, waitable: Option<u32>) -> u32 {
129 assert!(matches!(self, Status::Returned) == waitable.is_none());
130 let waitable = waitable.unwrap_or(0);
131 assert!(waitable < (1 << 28));
132 (waitable << 4) | (self as u32)
133 }
134}
135
136#[derive(Clone, Copy, Debug)]
139enum Event {
140 None,
141 Cancelled,
142 Subtask {
143 status: Status,
144 },
145 StreamRead {
146 code: ReturnCode,
147 pending: Option<(TypeStreamTableIndex, u32)>,
148 },
149 StreamWrite {
150 code: ReturnCode,
151 pending: Option<(TypeStreamTableIndex, u32)>,
152 },
153 FutureRead {
154 code: ReturnCode,
155 pending: Option<(TypeFutureTableIndex, u32)>,
156 },
157 FutureWrite {
158 code: ReturnCode,
159 pending: Option<(TypeFutureTableIndex, u32)>,
160 },
161}
162
163impl Event {
164 fn parts(self) -> (u32, u32) {
169 const EVENT_NONE: u32 = 0;
170 const EVENT_SUBTASK: u32 = 1;
171 const EVENT_STREAM_READ: u32 = 2;
172 const EVENT_STREAM_WRITE: u32 = 3;
173 const EVENT_FUTURE_READ: u32 = 4;
174 const EVENT_FUTURE_WRITE: u32 = 5;
175 const EVENT_CANCELLED: u32 = 6;
176 match self {
177 Event::None => (EVENT_NONE, 0),
178 Event::Cancelled => (EVENT_CANCELLED, 0),
179 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
180 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
181 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
182 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
183 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
184 }
185 }
186}
187
188mod callback_code {
190 pub const EXIT: u32 = 0;
191 pub const YIELD: u32 = 1;
192 pub const WAIT: u32 = 2;
193 pub const POLL: u32 = 3;
194}
195
196const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
200
201pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
207 store: StoreContextMut<'a, T>,
208 get_data: fn(&mut T) -> D::Data<'_>,
209}
210
211impl<'a, T, D> Access<'a, T, D>
212where
213 D: HasData + ?Sized,
214 T: 'static,
215{
216 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
218 Self { store, get_data }
219 }
220
221 pub fn data_mut(&mut self) -> &mut T {
223 self.store.data_mut()
224 }
225
226 pub fn get(&mut self) -> D::Data<'_> {
228 (self.get_data)(self.data_mut())
229 }
230
231 pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
235 where
236 T: 'static,
237 {
238 let accessor = Accessor {
239 get_data: self.get_data,
240 token: StoreToken::new(self.store.as_context_mut()),
241 };
242 self.store
243 .as_context_mut()
244 .spawn_with_accessor(accessor, task)
245 }
246
247 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
250 self.get_data
251 }
252}
253
254impl<'a, T, D> AsContext for Access<'a, T, D>
255where
256 D: HasData + ?Sized,
257 T: 'static,
258{
259 type Data = T;
260
261 fn as_context(&self) -> StoreContext<'_, T> {
262 self.store.as_context()
263 }
264}
265
266impl<'a, T, D> AsContextMut for Access<'a, T, D>
267where
268 D: HasData + ?Sized,
269 T: 'static,
270{
271 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
272 self.store.as_context_mut()
273 }
274}
275
276pub struct Accessor<T: 'static, D = HasSelf<T>>
336where
337 D: HasData + ?Sized,
338{
339 token: StoreToken<T>,
340 get_data: fn(&mut T) -> D::Data<'_>,
341}
342
343pub trait AsAccessor {
360 type Data: 'static;
362
363 type AccessorData: HasData + ?Sized;
366
367 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
369}
370
371impl<T: AsAccessor + ?Sized> AsAccessor for &T {
372 type Data = T::Data;
373 type AccessorData = T::AccessorData;
374
375 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
376 T::as_accessor(self)
377 }
378}
379
380impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
381 type Data = T;
382 type AccessorData = D;
383
384 fn as_accessor(&self) -> &Accessor<T, D> {
385 self
386 }
387}
388
389const _: () = {
412 const fn assert<T: Send + Sync>() {}
413 assert::<Accessor<UnsafeCell<u32>>>();
414};
415
416impl<T> Accessor<T> {
417 pub(crate) fn new(token: StoreToken<T>) -> Self {
426 Self {
427 token,
428 get_data: |x| x,
429 }
430 }
431}
432
433impl<T, D> Accessor<T, D>
434where
435 D: HasData + ?Sized,
436{
437 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
455 tls::get(|vmstore| {
456 fun(Access {
457 store: self.token.as_context_mut(vmstore),
458 get_data: self.get_data,
459 })
460 })
461 }
462
463 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
466 self.get_data
467 }
468
469 pub fn with_getter<D2: HasData>(
486 &self,
487 get_data: fn(&mut T) -> D2::Data<'_>,
488 ) -> Accessor<T, D2> {
489 Accessor {
490 token: self.token,
491 get_data,
492 }
493 }
494
495 pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
511 where
512 T: 'static,
513 {
514 let accessor = self.clone_for_spawn();
515 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
516 }
517
518 fn clone_for_spawn(&self) -> Self {
519 Self {
520 token: self.token,
521 get_data: self.get_data,
522 }
523 }
524}
525
526pub trait AccessorTask<T, D, R>: Send + 'static
538where
539 D: HasData + ?Sized,
540{
541 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
543}
544
545enum CallerInfo {
548 Async {
550 params: Vec<ValRaw>,
551 has_result: bool,
552 },
553 Sync {
555 params: Vec<ValRaw>,
556 result_count: u32,
557 },
558}
559
560enum WaitMode {
562 Fiber(StoreFiber<'static>),
564 Callback(Instance),
567}
568
569#[derive(Debug)]
571enum SuspendReason {
572 Waiting {
575 set: TableId<WaitableSet>,
576 thread: QualifiedThreadId,
577 },
578 NeedWork,
581 Yielding { thread: QualifiedThreadId },
584 ExplicitlySuspending { thread: QualifiedThreadId },
586}
587
588enum GuestCallKind {
590 DeliverEvent {
593 instance: Instance,
595 set: Option<TableId<WaitableSet>>,
600 },
601 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
604 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
605}
606
607impl fmt::Debug for GuestCallKind {
608 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
609 match self {
610 Self::DeliverEvent { instance, set } => f
611 .debug_struct("DeliverEvent")
612 .field("instance", instance)
613 .field("set", set)
614 .finish(),
615 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
616 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
617 }
618 }
619}
620
621#[derive(Debug)]
623struct GuestCall {
624 thread: QualifiedThreadId,
625 kind: GuestCallKind,
626}
627
628impl GuestCall {
629 fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
639 let task_instance = state.get_mut(self.thread.task)?.instance;
640 let state = state.instance_state(task_instance);
641
642 let ready = match &self.kind {
643 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
644 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
645 GuestCallKind::StartExplicit(_) => true,
646 };
647 log::trace!(
648 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
649 state.do_not_enter,
650 state.backpressure
651 );
652 Ok(ready)
653 }
654}
655
656enum WorkerItem {
658 GuestCall(GuestCall),
659 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
660}
661
662#[derive(Debug)]
665struct PollParams {
666 instance: Instance,
668 thread: QualifiedThreadId,
670 set: TableId<WaitableSet>,
672}
673
674enum WorkItem {
677 PushFuture(AlwaysMut<HostTaskFuture>),
679 ResumeFiber(StoreFiber<'static>),
681 GuestCall(GuestCall),
683 Poll(PollParams),
685 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
687}
688
689impl fmt::Debug for WorkItem {
690 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
691 match self {
692 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
693 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
694 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
695 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
696 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
697 }
698 }
699}
700
701#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
703pub(crate) enum WaitResult {
704 Cancelled,
705 Completed,
706}
707
708pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
716 store: &mut dyn VMStore,
717 future: impl Future<Output = Result<R>> + Send + 'static,
718 caller_instance: RuntimeComponentInstanceIndex,
719) -> Result<R> {
720 let state = store.concurrent_state_mut();
721
722 let Some(caller) = state.guest_thread else {
726 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
727 Poll::Ready(result) => result,
728 Poll::Pending => {
729 unreachable!()
730 }
731 };
732 };
733
734 let old_result = state
737 .get_mut(caller.task)
738 .with_context(|| format!("bad handle: {caller:?}"))?
739 .result
740 .take();
741
742 let task = state.push(HostTask::new(caller_instance, None))?;
746
747 log::trace!("new host task child of {caller:?}: {task:?}");
748
749 let mut future = Box::pin(async move {
753 let result = future.await?;
754 tls::get(move |store| {
755 let state = store.concurrent_state_mut();
756 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
757
758 Waitable::Host(task).set_event(
759 state,
760 Some(Event::Subtask {
761 status: Status::Returned,
762 }),
763 )?;
764
765 Ok(())
766 })
767 }) as HostTaskFuture;
768
769 let poll = tls::set(store, || {
773 future
774 .as_mut()
775 .poll(&mut Context::from_waker(&Waker::noop()))
776 });
777
778 match poll {
779 Poll::Ready(result) => {
780 result?;
782 log::trace!("delete host task {task:?} (already ready)");
783 store.concurrent_state_mut().delete(task)?;
784 }
785 Poll::Pending => {
786 let state = store.concurrent_state_mut();
791 state.push_future(future);
792
793 let set = state.get_mut(caller.task)?.sync_call_set;
794 Waitable::Host(task).join(state, Some(set))?;
795
796 store.suspend(SuspendReason::Waiting {
797 set,
798 thread: caller,
799 })?;
800 }
801 }
802
803 Ok(*mem::replace(
805 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
806 old_result,
807 )
808 .unwrap()
809 .downcast()
810 .unwrap())
811}
812
813fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
815 match call.kind {
816 GuestCallKind::DeliverEvent { instance, set } => {
817 let (event, waitable) = instance
818 .get_event(store, call.thread.task, set, true)?
819 .unwrap();
820 let state = store.concurrent_state_mut();
821 let task = state.get_mut(call.thread.task)?;
822 let runtime_instance = task.instance;
823 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
824
825 log::trace!(
826 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
827 call.thread,
828 );
829
830 let old_thread = state.guest_thread.replace(call.thread);
831 log::trace!(
832 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
833 call.thread
834 );
835
836 store.maybe_push_call_context(call.thread.task)?;
837
838 let state = store.concurrent_state_mut();
839 state.enter_instance(runtime_instance);
840
841 let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
842
843 let code = callback(store, runtime_instance, event, handle)?;
844
845 let state = store.concurrent_state_mut();
846
847 state.get_mut(call.thread.task)?.callback = Some(callback);
848 state.exit_instance(runtime_instance)?;
849
850 store.maybe_pop_call_context(call.thread.task)?;
851
852 instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
853
854 store.concurrent_state_mut().guest_thread = old_thread;
855 log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
856 }
857 GuestCallKind::StartImplicit(fun) => {
858 fun(store)?;
859 }
860 GuestCallKind::StartExplicit(fun) => {
861 fun(store)?;
862 }
863 }
864
865 Ok(())
866}
867
868impl<T> Store<T> {
869 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
871 where
872 T: Send + 'static,
873 {
874 self.as_context_mut().run_concurrent(fun).await
875 }
876
877 #[doc(hidden)]
878 pub fn assert_concurrent_state_empty(&mut self) {
879 self.as_context_mut().assert_concurrent_state_empty();
880 }
881
882 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
884 where
885 T: 'static,
886 {
887 self.as_context_mut().spawn(task)
888 }
889}
890
891impl<T> StoreContextMut<'_, T> {
892 #[doc(hidden)]
901 pub fn assert_concurrent_state_empty(self) {
902 let store = self.0;
903 store
904 .store_data_mut()
905 .components
906 .assert_guest_tables_empty();
907 let state = store.concurrent_state_mut();
908 assert!(
909 state.table.get_mut().is_empty(),
910 "non-empty table: {:?}",
911 state.table.get_mut()
912 );
913 assert!(state.high_priority.is_empty());
914 assert!(state.low_priority.is_empty());
915 assert!(state.guest_thread.is_none());
916 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
917 assert!(
918 state
919 .instance_states
920 .iter()
921 .all(|(_, state)| state.pending.is_empty())
922 );
923 assert!(state.global_error_context_ref_counts.is_empty());
924 }
925
926 pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
936 where
937 T: 'static,
938 {
939 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
940 self.spawn_with_accessor(accessor, task)
941 }
942
943 fn spawn_with_accessor<D>(
946 self,
947 accessor: Accessor<T, D>,
948 task: impl AccessorTask<T, D, Result<()>>,
949 ) -> JoinHandle
950 where
951 T: 'static,
952 D: HasData + ?Sized,
953 {
954 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
958 self.0
959 .concurrent_state_mut()
960 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
961 handle
962 }
963
964 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1045 where
1046 T: Send + 'static,
1047 {
1048 self.do_run_concurrent(fun, false).await
1049 }
1050
1051 pub(super) async fn run_concurrent_trap_on_idle<R>(
1052 self,
1053 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1054 ) -> Result<R>
1055 where
1056 T: Send + 'static,
1057 {
1058 self.do_run_concurrent(fun, true).await
1059 }
1060
1061 async fn do_run_concurrent<R>(
1062 mut self,
1063 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1064 trap_on_idle: bool,
1065 ) -> Result<R>
1066 where
1067 T: Send + 'static,
1068 {
1069 check_recursive_run();
1070 let token = StoreToken::new(self.as_context_mut());
1071
1072 struct Dropper<'a, T: 'static, V> {
1073 store: StoreContextMut<'a, T>,
1074 value: ManuallyDrop<V>,
1075 }
1076
1077 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1078 fn drop(&mut self) {
1079 tls::set(self.store.0, || {
1080 unsafe { ManuallyDrop::drop(&mut self.value) }
1085 });
1086 }
1087 }
1088
1089 let accessor = &Accessor::new(token);
1090 let dropper = &mut Dropper {
1091 store: self,
1092 value: ManuallyDrop::new(fun(accessor)),
1093 };
1094 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1096
1097 dropper
1098 .store
1099 .as_context_mut()
1100 .poll_until(future, trap_on_idle)
1101 .await
1102 }
1103
1104 async fn poll_until<R>(
1110 mut self,
1111 mut future: Pin<&mut impl Future<Output = R>>,
1112 trap_on_idle: bool,
1113 ) -> Result<R>
1114 where
1115 T: Send + 'static,
1116 {
1117 struct Reset<'a, T: 'static> {
1118 store: StoreContextMut<'a, T>,
1119 futures: Option<FuturesUnordered<HostTaskFuture>>,
1120 }
1121
1122 impl<'a, T> Drop for Reset<'a, T> {
1123 fn drop(&mut self) {
1124 if let Some(futures) = self.futures.take() {
1125 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1126 }
1127 }
1128 }
1129
1130 loop {
1131 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1135 let mut reset = Reset {
1136 store: self.as_context_mut(),
1137 futures,
1138 };
1139 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1140
1141 let result = future::poll_fn(|cx| {
1142 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1145 return Poll::Ready(Ok(Either::Left(value)));
1146 }
1147
1148 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1152 Poll::Ready(Some(output)) => {
1153 match output {
1154 Err(e) => return Poll::Ready(Err(e)),
1155 Ok(()) => {}
1156 }
1157 Poll::Ready(true)
1158 }
1159 Poll::Ready(None) => Poll::Ready(false),
1160 Poll::Pending => Poll::Pending,
1161 };
1162
1163 let state = reset.store.0.concurrent_state_mut();
1166 let ready = mem::take(&mut state.high_priority);
1167 let ready = if ready.is_empty() {
1168 let ready = mem::take(&mut state.low_priority);
1171 if ready.is_empty() {
1172 return match next {
1173 Poll::Ready(true) => {
1174 Poll::Ready(Ok(Either::Right(Vec::new())))
1180 }
1181 Poll::Ready(false) => {
1182 if let Poll::Ready(value) =
1186 tls::set(reset.store.0, || future.as_mut().poll(cx))
1187 {
1188 Poll::Ready(Ok(Either::Left(value)))
1189 } else {
1190 if trap_on_idle {
1196 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1199 } else {
1200 Poll::Pending
1204 }
1205 }
1206 }
1207 Poll::Pending => Poll::Pending,
1212 };
1213 } else {
1214 ready
1215 }
1216 } else {
1217 ready
1218 };
1219
1220 Poll::Ready(Ok(Either::Right(ready)))
1221 })
1222 .await;
1223
1224 drop(reset);
1228
1229 match result? {
1230 Either::Left(value) => break Ok(value),
1233 Either::Right(ready) => {
1236 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1237 store: StoreContextMut<'a, T>,
1238 ready: I,
1239 }
1240
1241 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1242 fn drop(&mut self) {
1243 while let Some(item) = self.ready.next() {
1244 match item {
1245 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1246 WorkItem::PushFuture(future) => {
1247 tls::set(self.store.0, move || drop(future))
1248 }
1249 _ => {}
1250 }
1251 }
1252 }
1253 }
1254
1255 let mut dispose = Dispose {
1256 store: self.as_context_mut(),
1257 ready: ready.into_iter(),
1258 };
1259
1260 while let Some(item) = dispose.ready.next() {
1261 dispose
1262 .store
1263 .as_context_mut()
1264 .handle_work_item(item)
1265 .await?;
1266 }
1267 }
1268 }
1269 }
1270 }
1271
1272 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1274 where
1275 T: Send,
1276 {
1277 log::trace!("handle work item {item:?}");
1278 match item {
1279 WorkItem::PushFuture(future) => {
1280 self.0
1281 .concurrent_state_mut()
1282 .futures
1283 .get_mut()
1284 .as_mut()
1285 .unwrap()
1286 .push(future.into_inner());
1287 }
1288 WorkItem::ResumeFiber(fiber) => {
1289 self.0.resume_fiber(fiber).await?;
1290 }
1291 WorkItem::GuestCall(call) => {
1292 let state = self.0.concurrent_state_mut();
1293 if call.is_ready(state)? {
1294 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1295 } else {
1296 let task = state.get_mut(call.thread.task)?;
1297 if !task.starting_sent {
1298 task.starting_sent = true;
1299 if let GuestCallKind::StartImplicit(_) = &call.kind {
1300 Waitable::Guest(call.thread.task).set_event(
1301 state,
1302 Some(Event::Subtask {
1303 status: Status::Starting,
1304 }),
1305 )?;
1306 }
1307 }
1308
1309 let runtime_instance = state.get_mut(call.thread.task)?.instance;
1310 state
1311 .instance_state(runtime_instance)
1312 .pending
1313 .insert(call.thread, call.kind);
1314 }
1315 }
1316 WorkItem::Poll(params) => {
1317 let state = self.0.concurrent_state_mut();
1318 if state.get_mut(params.thread.task)?.event.is_some()
1319 || !state.get_mut(params.set)?.ready.is_empty()
1320 {
1321 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1324 thread: params.thread,
1325 kind: GuestCallKind::DeliverEvent {
1326 instance: params.instance,
1327 set: Some(params.set),
1328 },
1329 }));
1330 } else {
1331 state.get_mut(params.thread.task)?.event = Some(Event::None);
1334 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1335 thread: params.thread,
1336 kind: GuestCallKind::DeliverEvent {
1337 instance: params.instance,
1338 set: Some(params.set),
1339 },
1340 }));
1341 }
1342 }
1343 WorkItem::WorkerFunction(fun) => {
1344 self.run_on_worker(WorkerItem::Function(fun)).await?;
1345 }
1346 }
1347
1348 Ok(())
1349 }
1350
1351 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1353 where
1354 T: Send,
1355 {
1356 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1357 fiber
1358 } else {
1359 fiber::make_fiber(self.0, move |store| {
1360 loop {
1361 match store.concurrent_state_mut().worker_item.take().unwrap() {
1362 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1363 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1364 }
1365
1366 store.suspend(SuspendReason::NeedWork)?;
1367 }
1368 })?
1369 };
1370
1371 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1372 assert!(worker_item.is_none());
1373 *worker_item = Some(item);
1374
1375 self.0.resume_fiber(worker).await
1376 }
1377
1378 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1383 where
1384 T: 'static,
1385 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1386 + Send
1387 + Sync
1388 + 'static,
1389 R: Send + Sync + 'static,
1390 {
1391 let token = StoreToken::new(self);
1392 async move {
1393 let mut accessor = Accessor::new(token);
1394 closure(&mut accessor).await
1395 }
1396 }
1397}
1398
1399impl StoreOpaque {
1400 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1403 let old_thread = self.concurrent_state_mut().guest_thread;
1404 log::trace!("resume_fiber: save current thread {old_thread:?}");
1405
1406 let fiber = fiber::resolve_or_release(self, fiber).await?;
1407
1408 let state = self.concurrent_state_mut();
1409
1410 state.guest_thread = old_thread;
1411 if let Some(ref ot) = old_thread {
1412 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1413 }
1414 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1415
1416 if let Some(mut fiber) = fiber {
1417 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1418 match state.suspend_reason.take().unwrap() {
1420 SuspendReason::NeedWork => {
1421 if state.worker.is_none() {
1422 state.worker = Some(fiber);
1423 } else {
1424 fiber.dispose(self);
1425 }
1426 }
1427 SuspendReason::Yielding { thread, .. } => {
1428 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1429 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1430 }
1431 SuspendReason::ExplicitlySuspending { thread, .. } => {
1432 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1433 }
1434 SuspendReason::Waiting { set, thread } => {
1435 let old = state
1436 .get_mut(set)?
1437 .waiting
1438 .insert(thread, WaitMode::Fiber(fiber));
1439 assert!(old.is_none());
1440 }
1441 };
1442 } else {
1443 log::trace!("resume_fiber: fiber has exited");
1444 }
1445
1446 Ok(())
1447 }
1448
1449 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1455 log::trace!("suspend fiber: {reason:?}");
1456
1457 let task = match &reason {
1461 SuspendReason::Yielding { thread, .. }
1462 | SuspendReason::Waiting { thread, .. }
1463 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1464 SuspendReason::NeedWork => None,
1465 };
1466
1467 let old_guest_thread = if let Some(task) = task {
1468 self.maybe_pop_call_context(task)?;
1469 self.concurrent_state_mut().guest_thread
1470 } else {
1471 None
1472 };
1473
1474 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1475 assert!(suspend_reason.is_none());
1476 *suspend_reason = Some(reason);
1477
1478 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1479
1480 if let Some(task) = task {
1481 self.concurrent_state_mut().guest_thread = old_guest_thread;
1482 self.maybe_push_call_context(task)?;
1483 }
1484
1485 Ok(())
1486 }
1487
1488 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1492 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1493
1494 if !task.returned_or_cancelled() {
1495 log::trace!("push call context for {guest_task:?}");
1496 let call_context = task.call_context.take().unwrap();
1497 self.component_resource_state().0.push(call_context);
1498 }
1499 Ok(())
1500 }
1501
1502 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1506 if !self
1507 .concurrent_state_mut()
1508 .get_mut(guest_task)?
1509 .returned_or_cancelled()
1510 {
1511 log::trace!("pop call context for {guest_task:?}");
1512 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1513 self.concurrent_state_mut()
1514 .get_mut(guest_task)?
1515 .call_context = call_context;
1516 }
1517 Ok(())
1518 }
1519
1520 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1521 let state = self.concurrent_state_mut();
1522 let caller = state.guest_thread.unwrap();
1523 let old_set = waitable.common(state)?.set;
1524 let set = state.get_mut(caller.task)?.sync_call_set;
1525 waitable.join(state, Some(set))?;
1526 self.suspend(SuspendReason::Waiting {
1527 set,
1528 thread: caller,
1529 })?;
1530 let state = self.concurrent_state_mut();
1531 waitable.join(state, old_set)
1532 }
1533}
1534
1535impl Instance {
1536 fn get_event(
1539 self,
1540 store: &mut StoreOpaque,
1541 guest_task: TableId<GuestTask>,
1542 set: Option<TableId<WaitableSet>>,
1543 cancellable: bool,
1544 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1545 let state = store.concurrent_state_mut();
1546
1547 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1548 log::trace!("deliver event {event:?} to {guest_task:?}");
1549
1550 if cancellable || !matches!(event, Event::Cancelled) {
1551 return Ok(Some((event, None)));
1552 } else {
1553 state.get_mut(guest_task)?.event = Some(event);
1554 }
1555 }
1556
1557 Ok(
1558 if let Some((set, waitable)) = set
1559 .and_then(|set| {
1560 state
1561 .get_mut(set)
1562 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1563 .transpose()
1564 })
1565 .transpose()?
1566 {
1567 let common = waitable.common(state)?;
1568 let handle = common.handle.unwrap();
1569 let event = common.event.take().unwrap();
1570
1571 log::trace!(
1572 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1573 );
1574
1575 waitable.on_delivery(store, self, event);
1576
1577 Some((event, Some((waitable, handle))))
1578 } else {
1579 None
1580 },
1581 )
1582 }
1583
1584 fn handle_callback_code(
1587 self,
1588 store: &mut StoreOpaque,
1589 guest_thread: QualifiedThreadId,
1590 runtime_instance: RuntimeComponentInstanceIndex,
1591 code: u32,
1592 ) -> Result<()> {
1593 let (code, set) = unpack_callback_code(code);
1594
1595 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1596
1597 let state = store.concurrent_state_mut();
1598
1599 let get_set = |store, handle| {
1600 if handle == 0 {
1601 bail!("invalid waitable-set handle");
1602 }
1603
1604 let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1605 .waitable_set_rep(handle)?;
1606
1607 Ok(TableId::<WaitableSet>::new(set))
1608 };
1609
1610 match code {
1611 callback_code::EXIT => {
1612 log::trace!("implicit thread {guest_thread:?} completed");
1613 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1614 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1615 if task.threads.is_empty() && !task.returned_or_cancelled() {
1616 bail!(Trap::NoAsyncResult);
1617 }
1618 match &task.caller {
1619 Caller::Host { .. } => {
1620 if task.ready_to_delete() {
1621 Waitable::Guest(guest_thread.task)
1622 .delete_from(store.concurrent_state_mut())?;
1623 }
1624 }
1625 Caller::Guest { .. } => {
1626 task.exited = true;
1627 task.callback = None;
1628 }
1629 }
1630 }
1631 callback_code::YIELD => {
1632 let task = state.get_mut(guest_thread.task)?;
1635 assert!(task.event.is_none());
1636 task.event = Some(Event::None);
1637 state.push_low_priority(WorkItem::GuestCall(GuestCall {
1638 thread: guest_thread,
1639 kind: GuestCallKind::DeliverEvent {
1640 instance: self,
1641 set: None,
1642 },
1643 }));
1644 }
1645 callback_code::WAIT | callback_code::POLL => {
1646 let set = get_set(store, set)?;
1647 let state = store.concurrent_state_mut();
1648
1649 if state.get_mut(guest_thread.task)?.event.is_some()
1650 || !state.get_mut(set)?.ready.is_empty()
1651 {
1652 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1654 thread: guest_thread,
1655 kind: GuestCallKind::DeliverEvent {
1656 instance: self,
1657 set: Some(set),
1658 },
1659 }));
1660 } else {
1661 match code {
1663 callback_code::POLL => {
1664 state.push_low_priority(WorkItem::Poll(PollParams {
1667 instance: self,
1668 thread: guest_thread,
1669 set,
1670 }));
1671 }
1672 callback_code::WAIT => {
1673 let old = state
1680 .get_mut(guest_thread.thread)?
1681 .wake_on_cancel
1682 .replace(set);
1683 assert!(old.is_none());
1684 let old = state
1685 .get_mut(set)?
1686 .waiting
1687 .insert(guest_thread, WaitMode::Callback(self));
1688 assert!(old.is_none());
1689 }
1690 _ => unreachable!(),
1691 }
1692 }
1693 }
1694 _ => bail!("unsupported callback code: {code}"),
1695 }
1696
1697 Ok(())
1698 }
1699
1700 fn cleanup_thread(
1701 self,
1702 store: &mut StoreOpaque,
1703 guest_thread: QualifiedThreadId,
1704 runtime_instance: RuntimeComponentInstanceIndex,
1705 ) -> Result<()> {
1706 let guest_id = store
1707 .concurrent_state_mut()
1708 .get_mut(guest_thread.thread)?
1709 .instance_rep;
1710 self.id().get_mut(store).guest_tables().0[runtime_instance]
1711 .guest_thread_remove(guest_id.unwrap())?;
1712
1713 store.concurrent_state_mut().delete(guest_thread.thread)?;
1714 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1715 task.threads.remove(&guest_thread.thread);
1716 Ok(())
1717 }
1718
1719 unsafe fn queue_call<T: 'static>(
1726 self,
1727 mut store: StoreContextMut<T>,
1728 guest_thread: QualifiedThreadId,
1729 callee: SendSyncPtr<VMFuncRef>,
1730 param_count: usize,
1731 result_count: usize,
1732 flags: Option<InstanceFlags>,
1733 async_: bool,
1734 callback: Option<SendSyncPtr<VMFuncRef>>,
1735 post_return: Option<SendSyncPtr<VMFuncRef>>,
1736 ) -> Result<()> {
1737 unsafe fn make_call<T: 'static>(
1752 store: StoreContextMut<T>,
1753 guest_thread: QualifiedThreadId,
1754 callee: SendSyncPtr<VMFuncRef>,
1755 param_count: usize,
1756 result_count: usize,
1757 flags: Option<InstanceFlags>,
1758 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1759 + Send
1760 + Sync
1761 + 'static
1762 + use<T> {
1763 let token = StoreToken::new(store);
1764 move |store: &mut dyn VMStore| {
1765 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1766
1767 store
1768 .concurrent_state_mut()
1769 .get_mut(guest_thread.thread)?
1770 .state = GuestThreadState::Running;
1771 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1772 let may_enter_after_call = task.call_post_return_automatically();
1773 let lower = task.lower_params.take().unwrap();
1774
1775 lower(store, &mut storage[..param_count])?;
1776
1777 let mut store = token.as_context_mut(store);
1778
1779 unsafe {
1782 if let Some(mut flags) = flags {
1783 flags.set_may_enter(false);
1784 }
1785 crate::Func::call_unchecked_raw(
1786 &mut store,
1787 callee.as_non_null(),
1788 NonNull::new(
1789 &mut storage[..param_count.max(result_count)]
1790 as *mut [MaybeUninit<ValRaw>] as _,
1791 )
1792 .unwrap(),
1793 )?;
1794 if let Some(mut flags) = flags {
1795 flags.set_may_enter(may_enter_after_call);
1796 }
1797 }
1798
1799 Ok(storage)
1800 }
1801 }
1802
1803 let call = unsafe {
1807 make_call(
1808 store.as_context_mut(),
1809 guest_thread,
1810 callee,
1811 param_count,
1812 result_count,
1813 flags,
1814 )
1815 };
1816
1817 let callee_instance = store
1818 .0
1819 .concurrent_state_mut()
1820 .get_mut(guest_thread.task)?
1821 .instance;
1822 let fun = if callback.is_some() {
1823 assert!(async_);
1824
1825 Box::new(move |store: &mut dyn VMStore| {
1826 self.add_guest_thread_to_instance_table(
1827 guest_thread.thread,
1828 store,
1829 callee_instance,
1830 )?;
1831 let old_thread = store
1832 .concurrent_state_mut()
1833 .guest_thread
1834 .replace(guest_thread);
1835 log::trace!(
1836 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1837 );
1838
1839 store.maybe_push_call_context(guest_thread.task)?;
1840
1841 store.concurrent_state_mut().enter_instance(callee_instance);
1842
1843 let storage = call(store)?;
1850
1851 store
1852 .concurrent_state_mut()
1853 .exit_instance(callee_instance)?;
1854
1855 store.maybe_pop_call_context(guest_thread.task)?;
1856
1857 let state = store.concurrent_state_mut();
1858 state.guest_thread = old_thread;
1859 old_thread
1860 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1861 log::trace!("stackless call: restored {old_thread:?} as current thread");
1862
1863 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1866
1867 self.handle_callback_code(store, guest_thread, callee_instance, code)?;
1868
1869 Ok(())
1870 }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1871 } else {
1872 let token = StoreToken::new(store.as_context_mut());
1873 Box::new(move |store: &mut dyn VMStore| {
1874 self.add_guest_thread_to_instance_table(
1875 guest_thread.thread,
1876 store,
1877 callee_instance,
1878 )?;
1879 let old_thread = store
1880 .concurrent_state_mut()
1881 .guest_thread
1882 .replace(guest_thread);
1883 log::trace!(
1884 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1885 );
1886 let mut flags = self.id().get(store).instance_flags(callee_instance);
1887
1888 store.maybe_push_call_context(guest_thread.task)?;
1889
1890 if !async_ {
1894 store.concurrent_state_mut().enter_instance(callee_instance);
1895 }
1896
1897 let storage = call(store)?;
1904
1905 self.cleanup_thread(store, guest_thread, callee_instance)?;
1907
1908 if async_ {
1909 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1910 if task.threads.is_empty() && !task.returned_or_cancelled() {
1911 bail!(Trap::NoAsyncResult);
1912 }
1913 } else {
1914 let lift = {
1920 let state = store.concurrent_state_mut();
1921 state.exit_instance(callee_instance)?;
1922
1923 assert!(state.get_mut(guest_thread.task)?.result.is_none());
1924
1925 state
1926 .get_mut(guest_thread.task)?
1927 .lift_result
1928 .take()
1929 .unwrap()
1930 };
1931
1932 let result = (lift.lift)(store, unsafe {
1935 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1936 &storage[..result_count],
1937 )
1938 })?;
1939
1940 let post_return_arg = match result_count {
1941 0 => ValRaw::i32(0),
1942 1 => unsafe { storage[0].assume_init() },
1945 _ => unreachable!(),
1946 };
1947
1948 if store
1949 .concurrent_state_mut()
1950 .get_mut(guest_thread.task)?
1951 .call_post_return_automatically()
1952 {
1953 unsafe {
1954 flags.set_may_leave(false);
1955 flags.set_needs_post_return(false);
1956 }
1957
1958 if let Some(func) = post_return {
1959 let mut store = token.as_context_mut(store);
1960
1961 unsafe {
1967 crate::Func::call_unchecked_raw(
1968 &mut store,
1969 func.as_non_null(),
1970 slice::from_ref(&post_return_arg).into(),
1971 )?;
1972 }
1973 }
1974
1975 unsafe {
1976 flags.set_may_leave(true);
1977 flags.set_may_enter(true);
1978 }
1979 }
1980
1981 self.task_complete(
1982 store,
1983 guest_thread.task,
1984 result,
1985 Status::Returned,
1986 post_return_arg,
1987 )?;
1988 }
1989
1990 store.maybe_pop_call_context(guest_thread.task)?;
1991
1992 let state = store.concurrent_state_mut();
1993 let task = state.get_mut(guest_thread.task)?;
1994
1995 match &task.caller {
1996 Caller::Host { .. } => {
1997 if task.ready_to_delete() {
1998 Waitable::Guest(guest_thread.task).delete_from(state)?;
1999 }
2000 }
2001 Caller::Guest { .. } => {
2002 task.exited = true;
2003 }
2004 }
2005
2006 Ok(())
2007 })
2008 };
2009
2010 store
2011 .0
2012 .concurrent_state_mut()
2013 .push_high_priority(WorkItem::GuestCall(GuestCall {
2014 thread: guest_thread,
2015 kind: GuestCallKind::StartImplicit(fun),
2016 }));
2017
2018 Ok(())
2019 }
2020
2021 unsafe fn prepare_call<T: 'static>(
2034 self,
2035 mut store: StoreContextMut<T>,
2036 start: *mut VMFuncRef,
2037 return_: *mut VMFuncRef,
2038 caller_instance: RuntimeComponentInstanceIndex,
2039 callee_instance: RuntimeComponentInstanceIndex,
2040 task_return_type: TypeTupleIndex,
2041 memory: *mut VMMemoryDefinition,
2042 string_encoding: u8,
2043 caller_info: CallerInfo,
2044 ) -> Result<()> {
2045 self.id().get(store.0).check_may_leave(caller_instance)?;
2046
2047 enum ResultInfo {
2048 Heap { results: u32 },
2049 Stack { result_count: u32 },
2050 }
2051
2052 let result_info = match &caller_info {
2053 CallerInfo::Async {
2054 has_result: true,
2055 params,
2056 } => ResultInfo::Heap {
2057 results: params.last().unwrap().get_u32(),
2058 },
2059 CallerInfo::Async {
2060 has_result: false, ..
2061 } => ResultInfo::Stack { result_count: 0 },
2062 CallerInfo::Sync {
2063 result_count,
2064 params,
2065 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2066 results: params.last().unwrap().get_u32(),
2067 },
2068 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2069 result_count: *result_count,
2070 },
2071 };
2072
2073 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2074
2075 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2079 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2080 let token = StoreToken::new(store.as_context_mut());
2081 let state = store.0.concurrent_state_mut();
2082 let old_thread = state.guest_thread.take();
2083 let new_task = GuestTask::new(
2084 state,
2085 Box::new(move |store, dst| {
2086 let mut store = token.as_context_mut(store);
2087 assert!(dst.len() <= MAX_FLAT_PARAMS);
2088 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2090 let count = match caller_info {
2091 CallerInfo::Async { params, has_result } => {
2095 let params = ¶ms[..params.len() - usize::from(has_result)];
2096 for (param, src) in params.iter().zip(&mut src) {
2097 src.write(*param);
2098 }
2099 params.len()
2100 }
2101
2102 CallerInfo::Sync { params, .. } => {
2104 for (param, src) in params.iter().zip(&mut src) {
2105 src.write(*param);
2106 }
2107 params.len()
2108 }
2109 };
2110 unsafe {
2117 crate::Func::call_unchecked_raw(
2118 &mut store,
2119 start.as_non_null(),
2120 NonNull::new(
2121 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2122 )
2123 .unwrap(),
2124 )?;
2125 }
2126 dst.copy_from_slice(&src[..dst.len()]);
2127 let state = store.0.concurrent_state_mut();
2128 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2129 state,
2130 Some(Event::Subtask {
2131 status: Status::Started,
2132 }),
2133 )?;
2134 Ok(())
2135 }),
2136 LiftResult {
2137 lift: Box::new(move |store, src| {
2138 let mut store = token.as_context_mut(store);
2141 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2143 my_src.push(ValRaw::u32(*results));
2144 }
2145 unsafe {
2152 crate::Func::call_unchecked_raw(
2153 &mut store,
2154 return_.as_non_null(),
2155 my_src.as_mut_slice().into(),
2156 )?;
2157 }
2158 let state = store.0.concurrent_state_mut();
2159 let thread = state.guest_thread.unwrap();
2160 if sync_caller {
2161 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2162 if let ResultInfo::Stack { result_count } = &result_info {
2163 match result_count {
2164 0 => None,
2165 1 => Some(my_src[0]),
2166 _ => unreachable!(),
2167 }
2168 } else {
2169 None
2170 },
2171 );
2172 }
2173 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2174 }),
2175 ty: task_return_type,
2176 memory: NonNull::new(memory).map(SendSyncPtr::new),
2177 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2178 },
2179 Caller::Guest {
2180 thread: old_thread.unwrap(),
2181 instance: caller_instance,
2182 },
2183 None,
2184 callee_instance,
2185 )?;
2186
2187 let guest_task = state.push(new_task)?;
2188 let new_thread = GuestThread::new_implicit(guest_task);
2189 let guest_thread = state.push(new_thread)?;
2190 state.get_mut(guest_task)?.threads.insert(guest_thread);
2191
2192 let state = store.0.concurrent_state_mut();
2193 if let Some(old_thread) = old_thread {
2194 if !state.may_enter(guest_task) {
2195 bail!(crate::Trap::CannotEnterComponent);
2196 }
2197
2198 state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2199 };
2200
2201 state.guest_thread = Some(QualifiedThreadId {
2204 task: guest_task,
2205 thread: guest_thread,
2206 });
2207 log::trace!(
2208 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2209 );
2210
2211 Ok(())
2212 }
2213
2214 unsafe fn call_callback<T>(
2219 self,
2220 mut store: StoreContextMut<T>,
2221 callee_instance: RuntimeComponentInstanceIndex,
2222 function: SendSyncPtr<VMFuncRef>,
2223 event: Event,
2224 handle: u32,
2225 may_enter_after_call: bool,
2226 ) -> Result<u32> {
2227 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2228
2229 let (ordinal, result) = event.parts();
2230 let params = &mut [
2231 ValRaw::u32(ordinal),
2232 ValRaw::u32(handle),
2233 ValRaw::u32(result),
2234 ];
2235 unsafe {
2240 flags.set_may_enter(false);
2241 crate::Func::call_unchecked_raw(
2242 &mut store,
2243 function.as_non_null(),
2244 params.as_mut_slice().into(),
2245 )?;
2246 flags.set_may_enter(may_enter_after_call);
2247 }
2248 Ok(params[0].get_u32())
2249 }
2250
2251 unsafe fn start_call<T: 'static>(
2264 self,
2265 mut store: StoreContextMut<T>,
2266 callback: *mut VMFuncRef,
2267 post_return: *mut VMFuncRef,
2268 callee: *mut VMFuncRef,
2269 param_count: u32,
2270 result_count: u32,
2271 flags: u32,
2272 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2273 ) -> Result<u32> {
2274 let token = StoreToken::new(store.as_context_mut());
2275 let async_caller = storage.is_none();
2276 let state = store.0.concurrent_state_mut();
2277 let guest_thread = state.guest_thread.unwrap();
2278 let may_enter_after_call = state
2279 .get_mut(guest_thread.task)?
2280 .call_post_return_automatically();
2281 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2282 let param_count = usize::try_from(param_count).unwrap();
2283 assert!(param_count <= MAX_FLAT_PARAMS);
2284 let result_count = usize::try_from(result_count).unwrap();
2285 assert!(result_count <= MAX_FLAT_RESULTS);
2286
2287 let task = state.get_mut(guest_thread.task)?;
2288 if !callback.is_null() {
2289 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2293 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2294 let store = token.as_context_mut(store);
2295 unsafe {
2296 self.call_callback::<T>(
2297 store,
2298 runtime_instance,
2299 callback,
2300 event,
2301 handle,
2302 may_enter_after_call,
2303 )
2304 }
2305 }));
2306 }
2307
2308 let Caller::Guest {
2309 thread: caller,
2310 instance: runtime_instance,
2311 } = &task.caller
2312 else {
2313 unreachable!()
2316 };
2317 let caller = *caller;
2318 let caller_instance = *runtime_instance;
2319
2320 let callee_instance = task.instance;
2321
2322 let instance_flags = if callback.is_null() {
2323 None
2324 } else {
2325 Some(self.id().get(store.0).instance_flags(callee_instance))
2326 };
2327
2328 unsafe {
2330 self.queue_call(
2331 store.as_context_mut(),
2332 guest_thread,
2333 callee,
2334 param_count,
2335 result_count,
2336 instance_flags,
2337 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2338 NonNull::new(callback).map(SendSyncPtr::new),
2339 NonNull::new(post_return).map(SendSyncPtr::new),
2340 )?;
2341 }
2342
2343 let state = store.0.concurrent_state_mut();
2344
2345 let guest_waitable = Waitable::Guest(guest_thread.task);
2348 let old_set = guest_waitable.common(state)?.set;
2349 let set = state.get_mut(caller.task)?.sync_call_set;
2350 guest_waitable.join(state, Some(set))?;
2351
2352 let (status, waitable) = loop {
2368 store.0.suspend(SuspendReason::Waiting {
2369 set,
2370 thread: caller,
2371 })?;
2372
2373 let state = store.0.concurrent_state_mut();
2374
2375 log::trace!("taking event for {:?}", guest_thread.task);
2376 let event = guest_waitable.take_event(state)?;
2377 let Some(Event::Subtask { status }) = event else {
2378 unreachable!();
2379 };
2380
2381 log::trace!("status {status:?} for {:?}", guest_thread.task);
2382
2383 if status == Status::Returned {
2384 break (status, None);
2386 } else if async_caller {
2387 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2391 .subtask_insert_guest(guest_thread.task.rep())?;
2392 store
2393 .0
2394 .concurrent_state_mut()
2395 .get_mut(guest_thread.task)?
2396 .common
2397 .handle = Some(handle);
2398 break (status, Some(handle));
2399 } else {
2400 }
2404 };
2405
2406 let state = store.0.concurrent_state_mut();
2407
2408 guest_waitable.join(state, old_set)?;
2409
2410 if let Some(storage) = storage {
2411 let task = state.get_mut(guest_thread.task)?;
2415 if let Some(result) = task.sync_result.take() {
2416 if let Some(result) = result {
2417 storage[0] = MaybeUninit::new(result);
2418 }
2419
2420 if task.exited {
2421 if task.ready_to_delete() {
2422 Waitable::Guest(guest_thread.task).delete_from(state)?;
2423 }
2424 }
2425 }
2426 }
2427
2428 state.guest_thread = Some(caller);
2430 state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2431 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2432
2433 Ok(status.pack(waitable))
2434 }
2435
2436 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2448 self,
2449 mut store: StoreContextMut<T>,
2450 future: impl Future<Output = Result<R>> + Send + 'static,
2451 caller_instance: RuntimeComponentInstanceIndex,
2452 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2453 ) -> Result<Option<u32>> {
2454 let token = StoreToken::new(store.as_context_mut());
2455 let state = store.0.concurrent_state_mut();
2456 let caller = state.guest_thread.unwrap();
2457
2458 let (join_handle, future) = JoinHandle::run(async move {
2461 let mut future = pin!(future);
2462 let mut call_context = None;
2463 future::poll_fn(move |cx| {
2464 tls::get(|store| {
2467 if let Some(call_context) = call_context.take() {
2468 token
2469 .as_context_mut(store)
2470 .0
2471 .component_resource_state()
2472 .0
2473 .push(call_context);
2474 }
2475 });
2476
2477 let result = future.as_mut().poll(cx);
2478
2479 if result.is_pending() {
2480 tls::get(|store| {
2483 call_context = Some(
2484 token
2485 .as_context_mut(store)
2486 .0
2487 .component_resource_state()
2488 .0
2489 .pop()
2490 .unwrap(),
2491 );
2492 });
2493 }
2494 result
2495 })
2496 .await
2497 });
2498
2499 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2503
2504 log::trace!("new host task child of {caller:?}: {task:?}");
2505
2506 let mut future = Box::pin(future);
2507
2508 let poll = tls::set(store.0, || {
2513 future
2514 .as_mut()
2515 .poll(&mut Context::from_waker(&Waker::noop()))
2516 });
2517
2518 Ok(match poll {
2519 Poll::Ready(None) => unreachable!(),
2520 Poll::Ready(Some(result)) => {
2521 lower(store.as_context_mut(), result?)?;
2524 log::trace!("delete host task {task:?} (already ready)");
2525 store.0.concurrent_state_mut().delete(task)?;
2526 None
2527 }
2528 Poll::Pending => {
2529 let future =
2537 Box::pin(async move {
2538 let result = match future.await {
2539 Some(result) => result?,
2540 None => return Ok(()),
2542 };
2543 tls::get(move |store| {
2544 store.concurrent_state_mut().push_high_priority(
2550 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2551 lower(token.as_context_mut(store), result)?;
2552 let state = store.concurrent_state_mut();
2553 state.get_mut(task)?.join_handle.take();
2554 Waitable::Host(task).set_event(
2555 state,
2556 Some(Event::Subtask {
2557 status: Status::Returned,
2558 }),
2559 )
2560 }))),
2561 );
2562 Ok(())
2563 })
2564 });
2565
2566 store.0.concurrent_state_mut().push_future(future);
2567 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2568 .subtask_insert_host(task.rep())?;
2569 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2570 log::trace!(
2571 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2572 );
2573 Some(handle)
2574 }
2575 })
2576 }
2577
2578 pub(crate) fn task_return(
2581 self,
2582 store: &mut dyn VMStore,
2583 caller: RuntimeComponentInstanceIndex,
2584 ty: TypeTupleIndex,
2585 options: OptionsIndex,
2586 storage: &[ValRaw],
2587 ) -> Result<()> {
2588 self.id().get(store).check_may_leave(caller)?;
2589 let state = store.concurrent_state_mut();
2590 let guest_thread = state.guest_thread.unwrap();
2591 let lift = state
2592 .get_mut(guest_thread.task)?
2593 .lift_result
2594 .take()
2595 .ok_or_else(|| {
2596 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2597 })?;
2598 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2599
2600 let CanonicalOptions {
2601 string_encoding,
2602 data_model,
2603 ..
2604 } = &self.id().get(store).component().env_component().options[options];
2605
2606 let invalid = ty != lift.ty
2607 || string_encoding != &lift.string_encoding
2608 || match data_model {
2609 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2610 Some(memory) => {
2611 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2612 let actual = self.id().get(store).runtime_memory(memory);
2613 expected != actual.as_ptr()
2614 }
2615 None => false,
2618 },
2619 CanonicalOptionsDataModel::Gc { .. } => true,
2621 };
2622
2623 if invalid {
2624 bail!("invalid `task.return` signature and/or options for current task");
2625 }
2626
2627 log::trace!("task.return for {guest_thread:?}");
2628
2629 let result = (lift.lift)(store, storage)?;
2630 self.task_complete(
2631 store,
2632 guest_thread.task,
2633 result,
2634 Status::Returned,
2635 ValRaw::i32(0),
2636 )
2637 }
2638
2639 pub(crate) fn task_cancel(
2641 self,
2642 store: &mut StoreOpaque,
2643 caller: RuntimeComponentInstanceIndex,
2644 ) -> Result<()> {
2645 self.id().get(store).check_may_leave(caller)?;
2646 let state = store.concurrent_state_mut();
2647 let guest_thread = state.guest_thread.unwrap();
2648 let task = state.get_mut(guest_thread.task)?;
2649 if !task.cancel_sent {
2650 bail!("`task.cancel` called by task which has not been cancelled")
2651 }
2652 _ = task.lift_result.take().ok_or_else(|| {
2653 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2654 })?;
2655
2656 assert!(task.result.is_none());
2657
2658 log::trace!("task.cancel for {guest_thread:?}");
2659
2660 self.task_complete(
2661 store,
2662 guest_thread.task,
2663 Box::new(DummyResult),
2664 Status::ReturnCancelled,
2665 ValRaw::i32(0),
2666 )
2667 }
2668
2669 fn task_complete(
2675 self,
2676 store: &mut StoreOpaque,
2677 guest_task: TableId<GuestTask>,
2678 result: Box<dyn Any + Send + Sync>,
2679 status: Status,
2680 post_return_arg: ValRaw,
2681 ) -> Result<()> {
2682 if store
2683 .concurrent_state_mut()
2684 .get_mut(guest_task)?
2685 .call_post_return_automatically()
2686 {
2687 let (calls, host_table, _, instance) =
2688 store.component_resource_state_with_instance(self);
2689 ResourceTables {
2690 calls,
2691 host_table: Some(host_table),
2692 guest: Some(instance.guest_tables()),
2693 }
2694 .exit_call()?;
2695 } else {
2696 let function_index = store
2701 .concurrent_state_mut()
2702 .get_mut(guest_task)?
2703 .function_index
2704 .unwrap();
2705 self.id()
2706 .get_mut(store)
2707 .post_return_arg_set(function_index, post_return_arg);
2708 }
2709
2710 let state = store.concurrent_state_mut();
2711 let task = state.get_mut(guest_task)?;
2712
2713 if let Caller::Host { tx, .. } = &mut task.caller {
2714 if let Some(tx) = tx.take() {
2715 _ = tx.send(result);
2716 }
2717 } else {
2718 task.result = Some(result);
2719 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2720 }
2721
2722 Ok(())
2723 }
2724
2725 pub(crate) fn waitable_set_new(
2727 self,
2728 store: &mut StoreOpaque,
2729 caller_instance: RuntimeComponentInstanceIndex,
2730 ) -> Result<u32> {
2731 self.id().get_mut(store).check_may_leave(caller_instance)?;
2732 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2733 let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2734 .waitable_set_insert(set.rep())?;
2735 log::trace!("new waitable set {set:?} (handle {handle})");
2736 Ok(handle)
2737 }
2738
2739 pub(crate) fn waitable_set_drop(
2741 self,
2742 store: &mut StoreOpaque,
2743 caller_instance: RuntimeComponentInstanceIndex,
2744 set: u32,
2745 ) -> Result<()> {
2746 self.id().get_mut(store).check_may_leave(caller_instance)?;
2747 let rep =
2748 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2749
2750 log::trace!("drop waitable set {rep} (handle {set})");
2751
2752 let set = store
2753 .concurrent_state_mut()
2754 .delete(TableId::<WaitableSet>::new(rep))?;
2755
2756 if !set.waiting.is_empty() {
2757 bail!("cannot drop waitable set with waiters");
2758 }
2759
2760 Ok(())
2761 }
2762
2763 pub(crate) fn waitable_join(
2765 self,
2766 store: &mut StoreOpaque,
2767 caller_instance: RuntimeComponentInstanceIndex,
2768 waitable_handle: u32,
2769 set_handle: u32,
2770 ) -> Result<()> {
2771 let mut instance = self.id().get_mut(store);
2772 instance.check_may_leave(caller_instance)?;
2773 let waitable =
2774 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2775
2776 let set = if set_handle == 0 {
2777 None
2778 } else {
2779 let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2780
2781 Some(TableId::<WaitableSet>::new(set))
2782 };
2783
2784 log::trace!(
2785 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2786 );
2787
2788 waitable.join(store.concurrent_state_mut(), set)
2789 }
2790
2791 pub(crate) fn subtask_drop(
2793 self,
2794 store: &mut StoreOpaque,
2795 caller_instance: RuntimeComponentInstanceIndex,
2796 task_id: u32,
2797 ) -> Result<()> {
2798 self.id().get_mut(store).check_may_leave(caller_instance)?;
2799 self.waitable_join(store, caller_instance, task_id, 0)?;
2800
2801 let (rep, is_host) =
2802 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2803
2804 let concurrent_state = store.concurrent_state_mut();
2805 let (waitable, expected_caller_instance, delete) = if is_host {
2806 let id = TableId::<HostTask>::new(rep);
2807 let task = concurrent_state.get_mut(id)?;
2808 if task.join_handle.is_some() {
2809 bail!("cannot drop a subtask which has not yet resolved");
2810 }
2811 (Waitable::Host(id), task.caller_instance, true)
2812 } else {
2813 let id = TableId::<GuestTask>::new(rep);
2814 let task = concurrent_state.get_mut(id)?;
2815 if task.lift_result.is_some() {
2816 bail!("cannot drop a subtask which has not yet resolved");
2817 }
2818 if let Caller::Guest { instance, .. } = &task.caller {
2819 (Waitable::Guest(id), *instance, task.exited)
2820 } else {
2821 unreachable!()
2822 }
2823 };
2824
2825 waitable.common(concurrent_state)?.handle = None;
2826
2827 if waitable.take_event(concurrent_state)?.is_some() {
2828 bail!("cannot drop a subtask with an undelivered event");
2829 }
2830
2831 if delete {
2832 waitable.delete_from(concurrent_state)?;
2833 }
2834
2835 assert_eq!(expected_caller_instance, caller_instance);
2839 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2840 Ok(())
2841 }
2842
2843 pub(crate) fn waitable_set_wait(
2845 self,
2846 store: &mut StoreOpaque,
2847 caller: RuntimeComponentInstanceIndex,
2848 options: OptionsIndex,
2849 set: u32,
2850 payload: u32,
2851 ) -> Result<u32> {
2852 self.id().get(store).check_may_leave(caller)?;
2853 let &CanonicalOptions {
2854 cancellable,
2855 instance: caller_instance,
2856 ..
2857 } = &self.id().get(store).component().env_component().options[options];
2858 let rep =
2859 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2860
2861 self.waitable_check(
2862 store,
2863 cancellable,
2864 WaitableCheck::Wait(WaitableCheckParams {
2865 set: TableId::new(rep),
2866 options,
2867 payload,
2868 }),
2869 )
2870 }
2871
2872 pub(crate) fn waitable_set_poll(
2874 self,
2875 store: &mut StoreOpaque,
2876 caller: RuntimeComponentInstanceIndex,
2877 options: OptionsIndex,
2878 set: u32,
2879 payload: u32,
2880 ) -> Result<u32> {
2881 self.id().get(store).check_may_leave(caller)?;
2882 let &CanonicalOptions {
2883 cancellable,
2884 instance: caller_instance,
2885 ..
2886 } = &self.id().get(store).component().env_component().options[options];
2887 let rep =
2888 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2889
2890 self.waitable_check(
2891 store,
2892 cancellable,
2893 WaitableCheck::Poll(WaitableCheckParams {
2894 set: TableId::new(rep),
2895 options,
2896 payload,
2897 }),
2898 )
2899 }
2900
2901 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2903 let thread_id = store
2904 .concurrent_state_mut()
2905 .guest_thread
2906 .ok_or_else(|| anyhow!("no current thread"))?
2907 .thread;
2908 Ok(store
2910 .concurrent_state_mut()
2911 .get_mut(thread_id)?
2912 .instance_rep
2913 .unwrap())
2914 }
2915
2916 pub(crate) fn thread_new_indirect<T: 'static>(
2918 self,
2919 mut store: StoreContextMut<T>,
2920 runtime_instance: RuntimeComponentInstanceIndex,
2921 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
2923 start_func_idx: u32,
2924 context: i32,
2925 ) -> Result<u32> {
2926 self.id().get(store.0).check_may_leave(runtime_instance)?;
2927
2928 log::trace!("creating new thread");
2929
2930 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
2931 let (instance, registry) = self.id().get_mut_and_registry(store.0);
2932 let callee = instance
2933 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
2934 .ok_or_else(|| {
2935 anyhow!("the start function index points to an uninitialized function")
2936 })?;
2937 if callee.type_index(store.0) != start_func_ty.type_index() {
2938 bail!(
2939 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
2940 );
2941 }
2942
2943 let token = StoreToken::new(store.as_context_mut());
2944 let start_func = Box::new(
2945 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
2946 let old_thread = store
2947 .concurrent_state_mut()
2948 .guest_thread
2949 .replace(guest_thread);
2950 log::trace!(
2951 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
2952 );
2953
2954 store.maybe_push_call_context(guest_thread.task)?;
2955
2956 let mut store = token.as_context_mut(store);
2957 let mut params = [ValRaw::i32(context)];
2958 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
2961
2962 store.0.maybe_pop_call_context(guest_thread.task)?;
2963
2964 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
2965 log::trace!("explicit thread {guest_thread:?} completed");
2966 let state = store.0.concurrent_state_mut();
2967 let task = state.get_mut(guest_thread.task)?;
2968 if task.threads.is_empty() && !task.returned_or_cancelled() {
2969 bail!(Trap::NoAsyncResult);
2970 }
2971 state.guest_thread = old_thread;
2972 old_thread
2973 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2974 if state.get_mut(guest_thread.task)?.ready_to_delete() {
2975 Waitable::Guest(guest_thread.task).delete_from(state)?;
2976 }
2977 log::trace!("thread start: restored {old_thread:?} as current thread");
2978
2979 Ok(())
2980 },
2981 );
2982
2983 let state = store.0.concurrent_state_mut();
2984 let current_thread = state.guest_thread.unwrap();
2985 let parent_task = current_thread.task;
2986
2987 let new_thread = GuestThread::new_explicit(parent_task, start_func);
2988 let thread_id = state.push(new_thread)?;
2989 state.get_mut(parent_task)?.threads.insert(thread_id);
2990
2991 log::trace!("new thread with id {thread_id:?} created");
2992
2993 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
2994 }
2995
2996 pub(crate) fn resume_suspended_thread(
2997 self,
2998 store: &mut StoreOpaque,
2999 runtime_instance: RuntimeComponentInstanceIndex,
3000 thread_idx: u32,
3001 high_priority: bool,
3002 ) -> Result<()> {
3003 let thread_id =
3004 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3005 let state = store.concurrent_state_mut();
3006 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3007 let thread = state.get_mut(guest_thread.thread)?;
3008
3009 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3010 GuestThreadState::NotStartedExplicit(start_func) => {
3011 log::trace!("starting thread {guest_thread:?}");
3012 let guest_call = WorkItem::GuestCall(GuestCall {
3013 thread: guest_thread,
3014 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3015 start_func(store, guest_thread)
3016 })),
3017 });
3018 store
3019 .concurrent_state_mut()
3020 .push_work_item(guest_call, high_priority);
3021 }
3022 GuestThreadState::Suspended(fiber) => {
3023 log::trace!("resuming thread {thread_id:?} that was suspended");
3024 store
3025 .concurrent_state_mut()
3026 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3027 }
3028 _ => {
3029 bail!("cannot resume thread which is not suspended");
3030 }
3031 }
3032 Ok(())
3033 }
3034
3035 fn add_guest_thread_to_instance_table(
3036 self,
3037 thread_id: TableId<GuestThread>,
3038 store: &mut StoreOpaque,
3039 runtime_instance: RuntimeComponentInstanceIndex,
3040 ) -> Result<u32> {
3041 let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3042 .guest_thread_insert(thread_id.rep())?;
3043 store
3044 .concurrent_state_mut()
3045 .get_mut(thread_id)?
3046 .instance_rep = Some(guest_id);
3047 Ok(guest_id)
3048 }
3049
3050 pub(crate) fn suspension_intrinsic(
3053 self,
3054 store: &mut StoreOpaque,
3055 caller: RuntimeComponentInstanceIndex,
3056 cancellable: bool,
3057 yielding: bool,
3058 to_thread: Option<u32>,
3059 ) -> Result<WaitResult> {
3060 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3062 return Ok(WaitResult::Cancelled);
3063 }
3064
3065 self.id().get(store).check_may_leave(caller)?;
3066
3067 if let Some(thread) = to_thread {
3068 self.resume_suspended_thread(store, caller, thread, true)?;
3069 }
3070
3071 let state = store.concurrent_state_mut();
3072 let guest_thread = state.guest_thread.unwrap();
3073 let reason = if yielding {
3074 SuspendReason::Yielding {
3075 thread: guest_thread,
3076 }
3077 } else {
3078 SuspendReason::ExplicitlySuspending {
3079 thread: guest_thread,
3080 }
3081 };
3082
3083 store.suspend(reason)?;
3084
3085 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3086 Ok(WaitResult::Cancelled)
3087 } else {
3088 Ok(WaitResult::Completed)
3089 }
3090 }
3091
3092 fn waitable_check(
3094 self,
3095 store: &mut StoreOpaque,
3096 cancellable: bool,
3097 check: WaitableCheck,
3098 ) -> Result<u32> {
3099 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3100
3101 let (wait, set) = match &check {
3102 WaitableCheck::Wait(params) => (true, Some(params.set)),
3103 WaitableCheck::Poll(params) => (false, Some(params.set)),
3104 };
3105
3106 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3107 store.suspend(SuspendReason::Yielding {
3109 thread: guest_thread,
3110 })?;
3111
3112 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3113
3114 let state = store.concurrent_state_mut();
3115 let task = state.get_mut(guest_thread.task)?;
3116
3117 if wait {
3120 let set = set.unwrap();
3121
3122 if (task.event.is_none()
3123 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3124 && state.get_mut(set)?.ready.is_empty()
3125 {
3126 if cancellable {
3127 let old = state
3128 .get_mut(guest_thread.thread)?
3129 .wake_on_cancel
3130 .replace(set);
3131 assert!(old.is_none());
3132 }
3133
3134 store.suspend(SuspendReason::Waiting {
3135 set,
3136 thread: guest_thread,
3137 })?;
3138 }
3139 }
3140
3141 log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3142
3143 let result = match check {
3144 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3146 let event =
3147 self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3148
3149 let (ordinal, handle, result) = if wait {
3150 let (event, waitable) = event.unwrap();
3151 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3152 let (ordinal, result) = event.parts();
3153 (ordinal, handle, result)
3154 } else {
3155 if let Some((event, waitable)) = event {
3156 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3157 let (ordinal, result) = event.parts();
3158 (ordinal, handle, result)
3159 } else {
3160 log::trace!(
3161 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3162 guest_thread.task,
3163 params.set
3164 );
3165 let (ordinal, result) = Event::None.parts();
3166 (ordinal, 0, result)
3167 }
3168 };
3169 let memory = self.options_memory_mut(store, params.options);
3170 let ptr =
3171 func::validate_inbounds::<(u32, u32)>(memory, &ValRaw::u32(params.payload))?;
3172 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3173 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3174 Ok(ordinal)
3175 }
3176 };
3177
3178 result
3179 }
3180
3181 pub(crate) fn subtask_cancel(
3183 self,
3184 store: &mut StoreOpaque,
3185 caller_instance: RuntimeComponentInstanceIndex,
3186 async_: bool,
3187 task_id: u32,
3188 ) -> Result<u32> {
3189 self.id().get(store).check_may_leave(caller_instance)?;
3190 let (rep, is_host) =
3191 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3192 let (waitable, expected_caller_instance) = if is_host {
3193 let id = TableId::<HostTask>::new(rep);
3194 (
3195 Waitable::Host(id),
3196 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3197 )
3198 } else {
3199 let id = TableId::<GuestTask>::new(rep);
3200 if let Caller::Guest { instance, .. } =
3201 &store.concurrent_state_mut().get_mut(id)?.caller
3202 {
3203 (Waitable::Guest(id), *instance)
3204 } else {
3205 unreachable!()
3206 }
3207 };
3208 assert_eq!(expected_caller_instance, caller_instance);
3212
3213 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3214
3215 let concurrent_state = store.concurrent_state_mut();
3216 if let Waitable::Host(host_task) = waitable {
3217 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3218 handle.abort();
3219 return Ok(Status::ReturnCancelled as u32);
3220 }
3221 } else {
3222 let caller = concurrent_state.guest_thread.unwrap();
3223 let guest_task = TableId::<GuestTask>::new(rep);
3224 let task = concurrent_state.get_mut(guest_task)?;
3225 if !task.already_lowered_parameters() {
3226 task.lower_params = None;
3227 task.lift_result = None;
3228
3229 let callee_instance = task.instance;
3231
3232 let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3233 let pending_count = pending.len();
3234 pending.retain(|thread, _| thread.task != guest_task);
3235 if pending.len() == pending_count {
3237 bail!("`subtask.cancel` called after terminal status delivered");
3238 }
3239 return Ok(Status::StartCancelled as u32);
3240 } else if !task.returned_or_cancelled() {
3241 task.cancel_sent = true;
3244 task.event = Some(Event::Cancelled);
3249 for thread in task.threads.clone() {
3250 let thread = QualifiedThreadId {
3251 task: guest_task,
3252 thread,
3253 };
3254 if let Some(set) = concurrent_state
3255 .get_mut(thread.thread)
3256 .unwrap()
3257 .wake_on_cancel
3258 .take()
3259 {
3260 let item = match concurrent_state
3261 .get_mut(set)?
3262 .waiting
3263 .remove(&thread)
3264 .unwrap()
3265 {
3266 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3267 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3268 thread,
3269 kind: GuestCallKind::DeliverEvent {
3270 instance,
3271 set: None,
3272 },
3273 }),
3274 };
3275 concurrent_state.push_high_priority(item);
3276
3277 store.suspend(SuspendReason::Yielding { thread: caller })?;
3278 break;
3279 }
3280 }
3281
3282 let concurrent_state = store.concurrent_state_mut();
3283 let task = concurrent_state.get_mut(guest_task)?;
3284 if !task.returned_or_cancelled() {
3285 if async_ {
3286 return Ok(BLOCKED);
3287 } else {
3288 store.wait_for_event(Waitable::Guest(guest_task))?;
3289 }
3290 }
3291 }
3292 }
3293
3294 let event = waitable.take_event(store.concurrent_state_mut())?;
3295 if let Some(Event::Subtask {
3296 status: status @ (Status::Returned | Status::ReturnCancelled),
3297 }) = event
3298 {
3299 Ok(status as u32)
3300 } else {
3301 bail!("`subtask.cancel` called after terminal status delivered");
3302 }
3303 }
3304
3305 pub(crate) fn context_get(
3306 self,
3307 store: &mut StoreOpaque,
3308 caller: RuntimeComponentInstanceIndex,
3309 slot: u32,
3310 ) -> Result<u32> {
3311 self.id().get(store).check_may_leave(caller)?;
3312 store.concurrent_state_mut().context_get(slot)
3313 }
3314
3315 pub(crate) fn context_set(
3316 self,
3317 store: &mut StoreOpaque,
3318 caller: RuntimeComponentInstanceIndex,
3319 slot: u32,
3320 value: u32,
3321 ) -> Result<()> {
3322 self.id().get(store).check_may_leave(caller)?;
3323 store.concurrent_state_mut().context_set(slot, value)
3324 }
3325}
3326
3327pub trait VMComponentAsyncStore {
3335 unsafe fn prepare_call(
3341 &mut self,
3342 instance: Instance,
3343 memory: *mut VMMemoryDefinition,
3344 start: *mut VMFuncRef,
3345 return_: *mut VMFuncRef,
3346 caller_instance: RuntimeComponentInstanceIndex,
3347 callee_instance: RuntimeComponentInstanceIndex,
3348 task_return_type: TypeTupleIndex,
3349 string_encoding: u8,
3350 result_count: u32,
3351 storage: *mut ValRaw,
3352 storage_len: usize,
3353 ) -> Result<()>;
3354
3355 unsafe fn sync_start(
3358 &mut self,
3359 instance: Instance,
3360 callback: *mut VMFuncRef,
3361 callee: *mut VMFuncRef,
3362 param_count: u32,
3363 storage: *mut MaybeUninit<ValRaw>,
3364 storage_len: usize,
3365 ) -> Result<()>;
3366
3367 unsafe fn async_start(
3370 &mut self,
3371 instance: Instance,
3372 callback: *mut VMFuncRef,
3373 post_return: *mut VMFuncRef,
3374 callee: *mut VMFuncRef,
3375 param_count: u32,
3376 result_count: u32,
3377 flags: u32,
3378 ) -> Result<u32>;
3379
3380 fn future_write(
3382 &mut self,
3383 instance: Instance,
3384 caller: RuntimeComponentInstanceIndex,
3385 ty: TypeFutureTableIndex,
3386 options: OptionsIndex,
3387 future: u32,
3388 address: u32,
3389 ) -> Result<u32>;
3390
3391 fn future_read(
3393 &mut self,
3394 instance: Instance,
3395 caller: RuntimeComponentInstanceIndex,
3396 ty: TypeFutureTableIndex,
3397 options: OptionsIndex,
3398 future: u32,
3399 address: u32,
3400 ) -> Result<u32>;
3401
3402 fn future_drop_writable(
3404 &mut self,
3405 instance: Instance,
3406 caller: RuntimeComponentInstanceIndex,
3407 ty: TypeFutureTableIndex,
3408 writer: u32,
3409 ) -> Result<()>;
3410
3411 fn stream_write(
3413 &mut self,
3414 instance: Instance,
3415 caller: RuntimeComponentInstanceIndex,
3416 ty: TypeStreamTableIndex,
3417 options: OptionsIndex,
3418 stream: u32,
3419 address: u32,
3420 count: u32,
3421 ) -> Result<u32>;
3422
3423 fn stream_read(
3425 &mut self,
3426 instance: Instance,
3427 caller: RuntimeComponentInstanceIndex,
3428 ty: TypeStreamTableIndex,
3429 options: OptionsIndex,
3430 stream: u32,
3431 address: u32,
3432 count: u32,
3433 ) -> Result<u32>;
3434
3435 fn flat_stream_write(
3438 &mut self,
3439 instance: Instance,
3440 caller: RuntimeComponentInstanceIndex,
3441 ty: TypeStreamTableIndex,
3442 options: OptionsIndex,
3443 payload_size: u32,
3444 payload_align: u32,
3445 stream: u32,
3446 address: u32,
3447 count: u32,
3448 ) -> Result<u32>;
3449
3450 fn flat_stream_read(
3453 &mut self,
3454 instance: Instance,
3455 caller: RuntimeComponentInstanceIndex,
3456 ty: TypeStreamTableIndex,
3457 options: OptionsIndex,
3458 payload_size: u32,
3459 payload_align: u32,
3460 stream: u32,
3461 address: u32,
3462 count: u32,
3463 ) -> Result<u32>;
3464
3465 fn stream_drop_writable(
3467 &mut self,
3468 instance: Instance,
3469 caller: RuntimeComponentInstanceIndex,
3470 ty: TypeStreamTableIndex,
3471 writer: u32,
3472 ) -> Result<()>;
3473
3474 fn error_context_debug_message(
3476 &mut self,
3477 instance: Instance,
3478 caller: RuntimeComponentInstanceIndex,
3479 ty: TypeComponentLocalErrorContextTableIndex,
3480 options: OptionsIndex,
3481 err_ctx_handle: u32,
3482 debug_msg_address: u32,
3483 ) -> Result<()>;
3484
3485 fn thread_new_indirect(
3487 &mut self,
3488 instance: Instance,
3489 caller: RuntimeComponentInstanceIndex,
3490 func_ty_idx: TypeFuncIndex,
3491 start_func_table_idx: RuntimeTableIndex,
3492 start_func_idx: u32,
3493 context: i32,
3494 ) -> Result<u32>;
3495}
3496
3497impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3499 unsafe fn prepare_call(
3500 &mut self,
3501 instance: Instance,
3502 memory: *mut VMMemoryDefinition,
3503 start: *mut VMFuncRef,
3504 return_: *mut VMFuncRef,
3505 caller_instance: RuntimeComponentInstanceIndex,
3506 callee_instance: RuntimeComponentInstanceIndex,
3507 task_return_type: TypeTupleIndex,
3508 string_encoding: u8,
3509 result_count_or_max_if_async: u32,
3510 storage: *mut ValRaw,
3511 storage_len: usize,
3512 ) -> Result<()> {
3513 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3517
3518 unsafe {
3519 instance.prepare_call(
3520 StoreContextMut(self),
3521 start,
3522 return_,
3523 caller_instance,
3524 callee_instance,
3525 task_return_type,
3526 memory,
3527 string_encoding,
3528 match result_count_or_max_if_async {
3529 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3530 params,
3531 has_result: false,
3532 },
3533 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3534 params,
3535 has_result: true,
3536 },
3537 result_count => CallerInfo::Sync {
3538 params,
3539 result_count,
3540 },
3541 },
3542 )
3543 }
3544 }
3545
3546 unsafe fn sync_start(
3547 &mut self,
3548 instance: Instance,
3549 callback: *mut VMFuncRef,
3550 callee: *mut VMFuncRef,
3551 param_count: u32,
3552 storage: *mut MaybeUninit<ValRaw>,
3553 storage_len: usize,
3554 ) -> Result<()> {
3555 unsafe {
3556 instance
3557 .start_call(
3558 StoreContextMut(self),
3559 callback,
3560 ptr::null_mut(),
3561 callee,
3562 param_count,
3563 1,
3564 START_FLAG_ASYNC_CALLEE,
3565 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3569 )
3570 .map(drop)
3571 }
3572 }
3573
3574 unsafe fn async_start(
3575 &mut self,
3576 instance: Instance,
3577 callback: *mut VMFuncRef,
3578 post_return: *mut VMFuncRef,
3579 callee: *mut VMFuncRef,
3580 param_count: u32,
3581 result_count: u32,
3582 flags: u32,
3583 ) -> Result<u32> {
3584 unsafe {
3585 instance.start_call(
3586 StoreContextMut(self),
3587 callback,
3588 post_return,
3589 callee,
3590 param_count,
3591 result_count,
3592 flags,
3593 None,
3594 )
3595 }
3596 }
3597
3598 fn future_write(
3599 &mut self,
3600 instance: Instance,
3601 caller: RuntimeComponentInstanceIndex,
3602 ty: TypeFutureTableIndex,
3603 options: OptionsIndex,
3604 future: u32,
3605 address: u32,
3606 ) -> Result<u32> {
3607 instance.id().get(self).check_may_leave(caller)?;
3608 instance
3609 .guest_write(
3610 StoreContextMut(self),
3611 caller,
3612 TransmitIndex::Future(ty),
3613 options,
3614 None,
3615 future,
3616 address,
3617 1,
3618 )
3619 .map(|result| result.encode())
3620 }
3621
3622 fn future_read(
3623 &mut self,
3624 instance: Instance,
3625 caller: RuntimeComponentInstanceIndex,
3626 ty: TypeFutureTableIndex,
3627 options: OptionsIndex,
3628 future: u32,
3629 address: u32,
3630 ) -> Result<u32> {
3631 instance.id().get(self).check_may_leave(caller)?;
3632 instance
3633 .guest_read(
3634 StoreContextMut(self),
3635 caller,
3636 TransmitIndex::Future(ty),
3637 options,
3638 None,
3639 future,
3640 address,
3641 1,
3642 )
3643 .map(|result| result.encode())
3644 }
3645
3646 fn stream_write(
3647 &mut self,
3648 instance: Instance,
3649 caller: RuntimeComponentInstanceIndex,
3650 ty: TypeStreamTableIndex,
3651 options: OptionsIndex,
3652 stream: u32,
3653 address: u32,
3654 count: u32,
3655 ) -> Result<u32> {
3656 instance.id().get(self).check_may_leave(caller)?;
3657 instance
3658 .guest_write(
3659 StoreContextMut(self),
3660 caller,
3661 TransmitIndex::Stream(ty),
3662 options,
3663 None,
3664 stream,
3665 address,
3666 count,
3667 )
3668 .map(|result| result.encode())
3669 }
3670
3671 fn stream_read(
3672 &mut self,
3673 instance: Instance,
3674 caller: RuntimeComponentInstanceIndex,
3675 ty: TypeStreamTableIndex,
3676 options: OptionsIndex,
3677 stream: u32,
3678 address: u32,
3679 count: u32,
3680 ) -> Result<u32> {
3681 instance.id().get(self).check_may_leave(caller)?;
3682 instance
3683 .guest_read(
3684 StoreContextMut(self),
3685 caller,
3686 TransmitIndex::Stream(ty),
3687 options,
3688 None,
3689 stream,
3690 address,
3691 count,
3692 )
3693 .map(|result| result.encode())
3694 }
3695
3696 fn future_drop_writable(
3697 &mut self,
3698 instance: Instance,
3699 caller: RuntimeComponentInstanceIndex,
3700 ty: TypeFutureTableIndex,
3701 writer: u32,
3702 ) -> Result<()> {
3703 instance.id().get(self).check_may_leave(caller)?;
3704 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3705 }
3706
3707 fn flat_stream_write(
3708 &mut self,
3709 instance: Instance,
3710 caller: RuntimeComponentInstanceIndex,
3711 ty: TypeStreamTableIndex,
3712 options: OptionsIndex,
3713 payload_size: u32,
3714 payload_align: u32,
3715 stream: u32,
3716 address: u32,
3717 count: u32,
3718 ) -> Result<u32> {
3719 instance.id().get(self).check_may_leave(caller)?;
3720 instance
3721 .guest_write(
3722 StoreContextMut(self),
3723 caller,
3724 TransmitIndex::Stream(ty),
3725 options,
3726 Some(FlatAbi {
3727 size: payload_size,
3728 align: payload_align,
3729 }),
3730 stream,
3731 address,
3732 count,
3733 )
3734 .map(|result| result.encode())
3735 }
3736
3737 fn flat_stream_read(
3738 &mut self,
3739 instance: Instance,
3740 caller: RuntimeComponentInstanceIndex,
3741 ty: TypeStreamTableIndex,
3742 options: OptionsIndex,
3743 payload_size: u32,
3744 payload_align: u32,
3745 stream: u32,
3746 address: u32,
3747 count: u32,
3748 ) -> Result<u32> {
3749 instance.id().get(self).check_may_leave(caller)?;
3750 instance
3751 .guest_read(
3752 StoreContextMut(self),
3753 caller,
3754 TransmitIndex::Stream(ty),
3755 options,
3756 Some(FlatAbi {
3757 size: payload_size,
3758 align: payload_align,
3759 }),
3760 stream,
3761 address,
3762 count,
3763 )
3764 .map(|result| result.encode())
3765 }
3766
3767 fn stream_drop_writable(
3768 &mut self,
3769 instance: Instance,
3770 caller: RuntimeComponentInstanceIndex,
3771 ty: TypeStreamTableIndex,
3772 writer: u32,
3773 ) -> Result<()> {
3774 instance.id().get(self).check_may_leave(caller)?;
3775 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3776 }
3777
3778 fn error_context_debug_message(
3779 &mut self,
3780 instance: Instance,
3781 caller: RuntimeComponentInstanceIndex,
3782 ty: TypeComponentLocalErrorContextTableIndex,
3783 options: OptionsIndex,
3784 err_ctx_handle: u32,
3785 debug_msg_address: u32,
3786 ) -> Result<()> {
3787 instance.id().get(self).check_may_leave(caller)?;
3788 instance.error_context_debug_message(
3789 StoreContextMut(self),
3790 ty,
3791 options,
3792 err_ctx_handle,
3793 debug_msg_address,
3794 )
3795 }
3796
3797 fn thread_new_indirect(
3798 &mut self,
3799 instance: Instance,
3800 caller: RuntimeComponentInstanceIndex,
3801 func_ty_idx: TypeFuncIndex,
3802 start_func_table_idx: RuntimeTableIndex,
3803 start_func_idx: u32,
3804 context: i32,
3805 ) -> Result<u32> {
3806 instance.thread_new_indirect(
3807 StoreContextMut(self),
3808 caller,
3809 func_ty_idx,
3810 start_func_table_idx,
3811 start_func_idx,
3812 context,
3813 )
3814 }
3815}
3816
3817type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3818
3819struct HostTask {
3821 common: WaitableCommon,
3822 caller_instance: RuntimeComponentInstanceIndex,
3823 join_handle: Option<JoinHandle>,
3824}
3825
3826impl HostTask {
3827 fn new(
3828 caller_instance: RuntimeComponentInstanceIndex,
3829 join_handle: Option<JoinHandle>,
3830 ) -> Self {
3831 Self {
3832 common: WaitableCommon::default(),
3833 caller_instance,
3834 join_handle,
3835 }
3836 }
3837}
3838
3839impl TableDebug for HostTask {
3840 fn type_name() -> &'static str {
3841 "HostTask"
3842 }
3843}
3844
3845type CallbackFn = Box<
3846 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3847 + Send
3848 + Sync
3849 + 'static,
3850>;
3851
3852enum Caller {
3854 Host {
3856 tx: Option<oneshot::Sender<LiftedResult>>,
3858 exit_tx: Arc<oneshot::Sender<()>>,
3865 host_future_present: bool,
3868 call_post_return_automatically: bool,
3870 },
3871 Guest {
3873 thread: QualifiedThreadId,
3875 instance: RuntimeComponentInstanceIndex,
3881 },
3882}
3883
3884struct LiftResult {
3887 lift: RawLift,
3888 ty: TypeTupleIndex,
3889 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3890 string_encoding: StringEncoding,
3891}
3892
3893#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3898struct QualifiedThreadId {
3899 task: TableId<GuestTask>,
3900 thread: TableId<GuestThread>,
3901}
3902
3903impl QualifiedThreadId {
3904 fn qualify(
3905 state: &mut ConcurrentState,
3906 thread: TableId<GuestThread>,
3907 ) -> Result<QualifiedThreadId> {
3908 Ok(QualifiedThreadId {
3909 task: state.get_mut(thread)?.parent_task,
3910 thread,
3911 })
3912 }
3913}
3914
3915impl fmt::Debug for QualifiedThreadId {
3916 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3917 f.debug_tuple("QualifiedThreadId")
3918 .field(&self.task.rep())
3919 .field(&self.thread.rep())
3920 .finish()
3921 }
3922}
3923
3924enum GuestThreadState {
3925 NotStartedImplicit,
3926 NotStartedExplicit(
3927 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
3928 ),
3929 Running,
3930 Suspended(StoreFiber<'static>),
3931 Pending,
3932 Completed,
3933}
3934pub struct GuestThread {
3935 context: [u32; 2],
3938 parent_task: TableId<GuestTask>,
3940 wake_on_cancel: Option<TableId<WaitableSet>>,
3943 state: GuestThreadState,
3945 instance_rep: Option<u32>,
3948}
3949
3950impl GuestThread {
3951 fn from_instance(
3954 state: Pin<&mut ComponentInstance>,
3955 caller_instance: RuntimeComponentInstanceIndex,
3956 guest_thread: u32,
3957 ) -> Result<TableId<Self>> {
3958 let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
3959 Ok(TableId::new(rep))
3960 }
3961
3962 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
3963 Self {
3964 context: [0; 2],
3965 parent_task,
3966 wake_on_cancel: None,
3967 state: GuestThreadState::NotStartedImplicit,
3968 instance_rep: None,
3969 }
3970 }
3971
3972 fn new_explicit(
3973 parent_task: TableId<GuestTask>,
3974 start_func: Box<
3975 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
3976 >,
3977 ) -> Self {
3978 Self {
3979 context: [0; 2],
3980 parent_task,
3981 wake_on_cancel: None,
3982 state: GuestThreadState::NotStartedExplicit(start_func),
3983 instance_rep: None,
3984 }
3985 }
3986}
3987
3988impl TableDebug for GuestThread {
3989 fn type_name() -> &'static str {
3990 "GuestThread"
3991 }
3992}
3993
3994enum SyncResult {
3995 NotProduced,
3996 Produced(Option<ValRaw>),
3997 Taken,
3998}
3999
4000impl SyncResult {
4001 fn take(&mut self) -> Option<Option<ValRaw>> {
4002 match mem::replace(self, SyncResult::Taken) {
4003 SyncResult::NotProduced => None,
4004 SyncResult::Produced(val) => Some(val),
4005 SyncResult::Taken => {
4006 panic!("attempted to take a synchronous result that was already taken")
4007 }
4008 }
4009 }
4010}
4011
4012#[derive(Debug)]
4013enum HostFutureState {
4014 NotApplicable,
4015 Live,
4016 Dropped,
4017}
4018
4019pub(crate) struct GuestTask {
4021 common: WaitableCommon,
4023 lower_params: Option<RawLower>,
4025 lift_result: Option<LiftResult>,
4027 result: Option<LiftedResult>,
4030 callback: Option<CallbackFn>,
4033 caller: Caller,
4035 call_context: Option<CallContext>,
4038 sync_result: SyncResult,
4041 cancel_sent: bool,
4044 starting_sent: bool,
4047 subtasks: HashSet<TableId<GuestTask>>,
4052 sync_call_set: TableId<WaitableSet>,
4054 instance: RuntimeComponentInstanceIndex,
4060 event: Option<Event>,
4063 function_index: Option<ExportIndex>,
4065 exited: bool,
4067 threads: HashSet<TableId<GuestThread>>,
4069 host_future_state: HostFutureState,
4072}
4073
4074impl GuestTask {
4075 fn already_lowered_parameters(&self) -> bool {
4076 self.lower_params.is_none()
4078 }
4079 fn returned_or_cancelled(&self) -> bool {
4080 self.lift_result.is_none()
4082 }
4083 fn ready_to_delete(&self) -> bool {
4084 let threads_completed = self.threads.is_empty();
4085 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4086 let pending_completion_event = matches!(
4087 self.common.event,
4088 Some(Event::Subtask {
4089 status: Status::Returned | Status::ReturnCancelled
4090 })
4091 );
4092 let ready = threads_completed
4093 && !has_sync_result
4094 && !pending_completion_event
4095 && !matches!(self.host_future_state, HostFutureState::Live);
4096 log::trace!(
4097 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4098 threads_completed,
4099 has_sync_result,
4100 pending_completion_event,
4101 self.host_future_state
4102 );
4103 ready
4104 }
4105 fn new(
4106 state: &mut ConcurrentState,
4107 lower_params: RawLower,
4108 lift_result: LiftResult,
4109 caller: Caller,
4110 callback: Option<CallbackFn>,
4111 component_instance: RuntimeComponentInstanceIndex,
4112 ) -> Result<Self> {
4113 let sync_call_set = state.push(WaitableSet::default())?;
4114 let host_future_state = match &caller {
4115 Caller::Guest { .. } => HostFutureState::NotApplicable,
4116 Caller::Host {
4117 host_future_present,
4118 ..
4119 } => {
4120 if *host_future_present {
4121 HostFutureState::Live
4122 } else {
4123 HostFutureState::NotApplicable
4124 }
4125 }
4126 };
4127 Ok(Self {
4128 common: WaitableCommon::default(),
4129 lower_params: Some(lower_params),
4130 lift_result: Some(lift_result),
4131 result: None,
4132 callback,
4133 caller,
4134 call_context: Some(CallContext::default()),
4135 sync_result: SyncResult::NotProduced,
4136 cancel_sent: false,
4137 starting_sent: false,
4138 subtasks: HashSet::new(),
4139 sync_call_set,
4140 instance: component_instance,
4141 event: None,
4142 function_index: None,
4143 exited: false,
4144 threads: HashSet::new(),
4145 host_future_state,
4146 })
4147 }
4148
4149 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4152 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4155 if let Some(Event::Subtask {
4156 status: Status::Returned | Status::ReturnCancelled,
4157 }) = waitable.common(state)?.event
4158 {
4159 waitable.delete_from(state)?;
4160 }
4161 }
4162
4163 assert!(self.threads.is_empty());
4164
4165 state.delete(self.sync_call_set)?;
4166
4167 match &self.caller {
4169 Caller::Guest {
4170 thread,
4171 instance: runtime_instance,
4172 } => {
4173 let task_mut = state.get_mut(thread.task)?;
4174 let present = task_mut.subtasks.remove(&me);
4175 assert!(present);
4176
4177 for subtask in &self.subtasks {
4178 task_mut.subtasks.insert(*subtask);
4179 }
4180
4181 for subtask in &self.subtasks {
4182 state.get_mut(*subtask)?.caller = Caller::Guest {
4183 thread: *thread,
4184 instance: *runtime_instance,
4185 };
4186 }
4187 }
4188 Caller::Host { exit_tx, .. } => {
4189 for subtask in &self.subtasks {
4190 state.get_mut(*subtask)?.caller = Caller::Host {
4191 tx: None,
4192 exit_tx: exit_tx.clone(),
4196 host_future_present: false,
4197 call_post_return_automatically: true,
4198 };
4199 }
4200 }
4201 }
4202
4203 for subtask in self.subtasks {
4204 if state.get_mut(subtask)?.exited {
4205 Waitable::Guest(subtask).delete_from(state)?;
4206 }
4207 }
4208
4209 Ok(())
4210 }
4211
4212 fn call_post_return_automatically(&self) -> bool {
4213 matches!(
4214 self.caller,
4215 Caller::Guest { .. }
4216 | Caller::Host {
4217 call_post_return_automatically: true,
4218 ..
4219 }
4220 )
4221 }
4222}
4223
4224impl TableDebug for GuestTask {
4225 fn type_name() -> &'static str {
4226 "GuestTask"
4227 }
4228}
4229
4230#[derive(Default)]
4232struct WaitableCommon {
4233 event: Option<Event>,
4235 set: Option<TableId<WaitableSet>>,
4237 handle: Option<u32>,
4239}
4240
4241#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4243enum Waitable {
4244 Host(TableId<HostTask>),
4246 Guest(TableId<GuestTask>),
4248 Transmit(TableId<TransmitHandle>),
4250}
4251
4252impl Waitable {
4253 fn from_instance(
4256 state: Pin<&mut ComponentInstance>,
4257 caller_instance: RuntimeComponentInstanceIndex,
4258 waitable: u32,
4259 ) -> Result<Self> {
4260 use crate::runtime::vm::component::Waitable;
4261
4262 let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4263
4264 Ok(match kind {
4265 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4266 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4267 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4268 })
4269 }
4270
4271 fn rep(&self) -> u32 {
4273 match self {
4274 Self::Host(id) => id.rep(),
4275 Self::Guest(id) => id.rep(),
4276 Self::Transmit(id) => id.rep(),
4277 }
4278 }
4279
4280 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4284 log::trace!("waitable {self:?} join set {set:?}",);
4285
4286 let old = mem::replace(&mut self.common(state)?.set, set);
4287
4288 if let Some(old) = old {
4289 match *self {
4290 Waitable::Host(id) => state.remove_child(id, old),
4291 Waitable::Guest(id) => state.remove_child(id, old),
4292 Waitable::Transmit(id) => state.remove_child(id, old),
4293 }?;
4294
4295 state.get_mut(old)?.ready.remove(self);
4296 }
4297
4298 if let Some(set) = set {
4299 match *self {
4300 Waitable::Host(id) => state.add_child(id, set),
4301 Waitable::Guest(id) => state.add_child(id, set),
4302 Waitable::Transmit(id) => state.add_child(id, set),
4303 }?;
4304
4305 if self.common(state)?.event.is_some() {
4306 self.mark_ready(state)?;
4307 }
4308 }
4309
4310 Ok(())
4311 }
4312
4313 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4315 Ok(match self {
4316 Self::Host(id) => &mut state.get_mut(*id)?.common,
4317 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4318 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4319 })
4320 }
4321
4322 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4326 log::trace!("set event for {self:?}: {event:?}");
4327 self.common(state)?.event = event;
4328 self.mark_ready(state)
4329 }
4330
4331 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4333 let common = self.common(state)?;
4334 let event = common.event.take();
4335 if let Some(set) = self.common(state)?.set {
4336 state.get_mut(set)?.ready.remove(self);
4337 }
4338
4339 Ok(event)
4340 }
4341
4342 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4346 if let Some(set) = self.common(state)?.set {
4347 state.get_mut(set)?.ready.insert(*self);
4348 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4349 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4350 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4351
4352 let item = match mode {
4353 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4354 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4355 thread,
4356 kind: GuestCallKind::DeliverEvent {
4357 instance,
4358 set: Some(set),
4359 },
4360 }),
4361 };
4362 state.push_high_priority(item);
4363 }
4364 }
4365 Ok(())
4366 }
4367
4368 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4370 match self {
4371 Self::Host(task) => {
4372 log::trace!("delete host task {task:?}");
4373 state.delete(*task)?;
4374 }
4375 Self::Guest(task) => {
4376 log::trace!("delete guest task {task:?}");
4377 state.delete(*task)?.dispose(state, *task)?;
4378 }
4379 Self::Transmit(task) => {
4380 state.delete(*task)?;
4381 }
4382 }
4383
4384 Ok(())
4385 }
4386}
4387
4388impl fmt::Debug for Waitable {
4389 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4390 match self {
4391 Self::Host(id) => write!(f, "{id:?}"),
4392 Self::Guest(id) => write!(f, "{id:?}"),
4393 Self::Transmit(id) => write!(f, "{id:?}"),
4394 }
4395 }
4396}
4397
4398#[derive(Default)]
4400struct WaitableSet {
4401 ready: BTreeSet<Waitable>,
4403 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4405}
4406
4407impl TableDebug for WaitableSet {
4408 fn type_name() -> &'static str {
4409 "WaitableSet"
4410 }
4411}
4412
4413type RawLower =
4415 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4416
4417type RawLift = Box<
4419 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4420>;
4421
4422type LiftedResult = Box<dyn Any + Send + Sync>;
4426
4427struct DummyResult;
4430
4431#[derive(Default)]
4433struct InstanceState {
4434 backpressure: u16,
4436 do_not_enter: bool,
4438 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4441}
4442
4443pub struct ConcurrentState {
4445 guest_thread: Option<QualifiedThreadId>,
4447
4448 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4453 table: AlwaysMut<ResourceTable>,
4455 instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4461 high_priority: Vec<WorkItem>,
4463 low_priority: Vec<WorkItem>,
4465 suspend_reason: Option<SuspendReason>,
4469 worker: Option<StoreFiber<'static>>,
4473 worker_item: Option<WorkerItem>,
4475
4476 global_error_context_ref_counts:
4489 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4490}
4491
4492impl Default for ConcurrentState {
4493 fn default() -> Self {
4494 Self {
4495 guest_thread: None,
4496 table: AlwaysMut::new(ResourceTable::new()),
4497 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4498 instance_states: HashMap::new(),
4499 high_priority: Vec::new(),
4500 low_priority: Vec::new(),
4501 suspend_reason: None,
4502 worker: None,
4503 worker_item: None,
4504 global_error_context_ref_counts: BTreeMap::new(),
4505 }
4506 }
4507}
4508
4509impl ConcurrentState {
4510 pub(crate) fn take_fibers_and_futures(
4527 &mut self,
4528 fibers: &mut Vec<StoreFiber<'static>>,
4529 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4530 ) {
4531 for entry in self.table.get_mut().iter_mut() {
4532 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4533 for mode in mem::take(&mut set.waiting).into_values() {
4534 if let WaitMode::Fiber(fiber) = mode {
4535 fibers.push(fiber);
4536 }
4537 }
4538 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4539 if let GuestThreadState::Suspended(fiber) =
4540 mem::replace(&mut thread.state, GuestThreadState::Completed)
4541 {
4542 fibers.push(fiber);
4543 }
4544 }
4545 }
4546
4547 if let Some(fiber) = self.worker.take() {
4548 fibers.push(fiber);
4549 }
4550
4551 let mut take_items = |list| {
4552 for item in mem::take(list) {
4553 match item {
4554 WorkItem::ResumeFiber(fiber) => {
4555 fibers.push(fiber);
4556 }
4557 WorkItem::PushFuture(future) => {
4558 self.futures
4559 .get_mut()
4560 .as_mut()
4561 .unwrap()
4562 .push(future.into_inner());
4563 }
4564 _ => {}
4565 }
4566 }
4567 };
4568
4569 take_items(&mut self.high_priority);
4570 take_items(&mut self.low_priority);
4571
4572 if let Some(them) = self.futures.get_mut().take() {
4573 futures.push(them);
4574 }
4575 }
4576
4577 fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4578 self.instance_states.entry(instance).or_default()
4579 }
4580
4581 fn push<V: Send + Sync + 'static>(
4582 &mut self,
4583 value: V,
4584 ) -> Result<TableId<V>, ResourceTableError> {
4585 self.table.get_mut().push(value).map(TableId::from)
4586 }
4587
4588 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4589 self.table.get_mut().get_mut(&Resource::from(id))
4590 }
4591
4592 pub fn add_child<T: 'static, U: 'static>(
4593 &mut self,
4594 child: TableId<T>,
4595 parent: TableId<U>,
4596 ) -> Result<(), ResourceTableError> {
4597 self.table
4598 .get_mut()
4599 .add_child(Resource::from(child), Resource::from(parent))
4600 }
4601
4602 pub fn remove_child<T: 'static, U: 'static>(
4603 &mut self,
4604 child: TableId<T>,
4605 parent: TableId<U>,
4606 ) -> Result<(), ResourceTableError> {
4607 self.table
4608 .get_mut()
4609 .remove_child(Resource::from(child), Resource::from(parent))
4610 }
4611
4612 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4613 self.table.get_mut().delete(Resource::from(id))
4614 }
4615
4616 fn push_future(&mut self, future: HostTaskFuture) {
4617 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4624 }
4625
4626 fn push_high_priority(&mut self, item: WorkItem) {
4627 log::trace!("push high priority: {item:?}");
4628 self.high_priority.push(item);
4629 }
4630
4631 fn push_low_priority(&mut self, item: WorkItem) {
4632 log::trace!("push low priority: {item:?}");
4633 self.low_priority.push(item);
4634 }
4635
4636 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4637 if high_priority {
4638 self.push_high_priority(item);
4639 } else {
4640 self.push_low_priority(item);
4641 }
4642 }
4643
4644 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4654 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4655
4656 loop {
4664 let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4665 Caller::Host { .. } => break true,
4666 Caller::Guest { thread, instance } => {
4667 if *instance == guest_instance {
4668 break false;
4669 } else {
4670 *thread
4671 }
4672 }
4673 };
4674 guest_task = next_thread.task;
4675 }
4676 }
4677
4678 fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4682 self.instance_state(instance).do_not_enter = true;
4683 }
4684
4685 fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4689 self.instance_state(instance).do_not_enter = false;
4690 self.partition_pending(instance)
4691 }
4692
4693 fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4698 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4699 let call = GuestCall { thread, kind };
4700 if call.is_ready(self)? {
4701 self.push_high_priority(WorkItem::GuestCall(call));
4702 } else {
4703 self.instance_state(instance)
4704 .pending
4705 .insert(call.thread, call.kind);
4706 }
4707 }
4708
4709 Ok(())
4710 }
4711
4712 pub(crate) fn backpressure_modify(
4714 &mut self,
4715 caller_instance: RuntimeComponentInstanceIndex,
4716 modify: impl FnOnce(u16) -> Option<u16>,
4717 ) -> Result<()> {
4718 let state = self.instance_state(caller_instance);
4719 let old = state.backpressure;
4720 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4721 state.backpressure = new;
4722
4723 if old > 0 && new == 0 {
4724 self.partition_pending(caller_instance)?;
4727 }
4728
4729 Ok(())
4730 }
4731
4732 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4734 let thread = self.guest_thread.unwrap();
4735 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4736 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4737 Ok(val)
4738 }
4739
4740 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4742 let thread = self.guest_thread.unwrap();
4743 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4744 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4745 Ok(())
4746 }
4747
4748 fn take_pending_cancellation(&mut self) -> bool {
4751 let thread = self.guest_thread.unwrap();
4752 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4753 assert!(matches!(event, Event::Cancelled));
4754 true
4755 } else {
4756 false
4757 }
4758 }
4759}
4760
4761fn for_any_lower<
4764 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4765>(
4766 fun: F,
4767) -> F {
4768 fun
4769}
4770
4771fn for_any_lift<
4773 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4774>(
4775 fun: F,
4776) -> F {
4777 fun
4778}
4779
4780fn checked<F: Future + Send + 'static>(
4785 id: StoreId,
4786 fut: F,
4787) -> impl Future<Output = F::Output> + Send + 'static {
4788 async move {
4789 let mut fut = pin!(fut);
4790 future::poll_fn(move |cx| {
4791 let message = "\
4792 `Future`s which depend on asynchronous component tasks, streams, or \
4793 futures to complete may only be polled from the event loop of the \
4794 store to which they belong. Please use \
4795 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4796 ";
4797 tls::try_get(|store| {
4798 let matched = match store {
4799 tls::TryGet::Some(store) => store.id() == id,
4800 tls::TryGet::Taken | tls::TryGet::None => false,
4801 };
4802
4803 if !matched {
4804 panic!("{message}")
4805 }
4806 });
4807 fut.as_mut().poll(cx)
4808 })
4809 .await
4810 }
4811}
4812
4813fn check_recursive_run() {
4816 tls::try_get(|store| {
4817 if !matches!(store, tls::TryGet::None) {
4818 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4819 }
4820 });
4821}
4822
4823fn unpack_callback_code(code: u32) -> (u32, u32) {
4824 (code & 0xF, code >> 4)
4825}
4826
4827struct WaitableCheckParams {
4831 set: TableId<WaitableSet>,
4832 options: OptionsIndex,
4833 payload: u32,
4834}
4835
4836enum WaitableCheck {
4838 Wait(WaitableCheckParams),
4839 Poll(WaitableCheckParams),
4840}
4841
4842pub(crate) struct PreparedCall<R> {
4844 handle: Func,
4846 thread: QualifiedThreadId,
4848 param_count: usize,
4850 rx: oneshot::Receiver<LiftedResult>,
4853 exit_rx: oneshot::Receiver<()>,
4856 _phantom: PhantomData<R>,
4857}
4858
4859impl<R> PreparedCall<R> {
4860 pub(crate) fn task_id(&self) -> TaskId {
4862 TaskId {
4863 task: self.thread.task,
4864 }
4865 }
4866}
4867
4868pub(crate) struct TaskId {
4870 task: TableId<GuestTask>,
4871}
4872
4873impl TaskId {
4874 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4880 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
4881 if !task.already_lowered_parameters() {
4882 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4883 } else {
4884 task.host_future_state = HostFutureState::Dropped;
4885 if task.ready_to_delete() {
4886 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4887 }
4888 }
4889 Ok(())
4890 }
4891}
4892
4893pub(crate) fn prepare_call<T, R>(
4899 mut store: StoreContextMut<T>,
4900 handle: Func,
4901 param_count: usize,
4902 host_future_present: bool,
4903 call_post_return_automatically: bool,
4904 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4905 + Send
4906 + Sync
4907 + 'static,
4908 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4909 + Send
4910 + Sync
4911 + 'static,
4912) -> Result<PreparedCall<R>> {
4913 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4914
4915 let instance = handle.instance().id().get(store.0);
4916 let options = &instance.component().env_component().options[options];
4917 let task_return_type = instance.component().types()[ty].results;
4918 let component_instance = raw_options.instance;
4919 let callback = options.callback.map(|i| instance.runtime_callback(i));
4920 let memory = options
4921 .memory()
4922 .map(|i| instance.runtime_memory(i))
4923 .map(SendSyncPtr::new);
4924 let string_encoding = options.string_encoding;
4925 let token = StoreToken::new(store.as_context_mut());
4926 let state = store.0.concurrent_state_mut();
4927
4928 assert!(state.guest_thread.is_none());
4929
4930 let (tx, rx) = oneshot::channel();
4931 let (exit_tx, exit_rx) = oneshot::channel();
4932
4933 let mut task = GuestTask::new(
4934 state,
4935 Box::new(for_any_lower(move |store, params| {
4936 lower_params(handle, token.as_context_mut(store), params)
4937 })),
4938 LiftResult {
4939 lift: Box::new(for_any_lift(move |store, result| {
4940 lift_result(handle, store, result)
4941 })),
4942 ty: task_return_type,
4943 memory,
4944 string_encoding,
4945 },
4946 Caller::Host {
4947 tx: Some(tx),
4948 exit_tx: Arc::new(exit_tx),
4949 host_future_present,
4950 call_post_return_automatically,
4951 },
4952 callback.map(|callback| {
4953 let callback = SendSyncPtr::new(callback);
4954 let instance = handle.instance();
4955 Box::new(
4956 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
4957 let store = token.as_context_mut(store);
4958 unsafe {
4961 instance.call_callback(
4962 store,
4963 runtime_instance,
4964 callback,
4965 event,
4966 handle,
4967 call_post_return_automatically,
4968 )
4969 }
4970 },
4971 ) as CallbackFn
4972 }),
4973 component_instance,
4974 )?;
4975 task.function_index = Some(handle.index());
4976
4977 let task = state.push(task)?;
4978 let thread = state.push(GuestThread::new_implicit(task))?;
4979 state.get_mut(task)?.threads.insert(thread);
4980
4981 Ok(PreparedCall {
4982 handle,
4983 thread: QualifiedThreadId { task, thread },
4984 param_count,
4985 rx,
4986 exit_rx,
4987 _phantom: PhantomData,
4988 })
4989}
4990
4991pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4998 mut store: StoreContextMut<T>,
4999 prepared: PreparedCall<R>,
5000) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5001 let PreparedCall {
5002 handle,
5003 thread,
5004 param_count,
5005 rx,
5006 exit_rx,
5007 ..
5008 } = prepared;
5009
5010 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5011
5012 Ok(checked(
5013 store.0.id(),
5014 rx.map(move |result| {
5015 result
5016 .map(|v| (*v.downcast().unwrap(), exit_rx))
5017 .map_err(anyhow::Error::from)
5018 }),
5019 ))
5020}
5021
5022fn queue_call0<T: 'static>(
5025 store: StoreContextMut<T>,
5026 handle: Func,
5027 guest_thread: QualifiedThreadId,
5028 param_count: usize,
5029) -> Result<()> {
5030 let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5031 let is_concurrent = raw_options.async_;
5032 let callback = raw_options.callback;
5033 let instance = handle.instance();
5034 let callee = handle.lifted_core_func(store.0);
5035 let post_return = handle.post_return_core_func(store.0);
5036 let callback = callback.map(|i| {
5037 let instance = instance.id().get(store.0);
5038 SendSyncPtr::new(instance.runtime_callback(i))
5039 });
5040
5041 log::trace!("queueing call {guest_thread:?}");
5042
5043 let instance_flags = if callback.is_none() {
5044 None
5045 } else {
5046 Some(flags)
5047 };
5048
5049 unsafe {
5053 instance.queue_call(
5054 store,
5055 guest_thread,
5056 SendSyncPtr::new(callee),
5057 param_count,
5058 1,
5059 instance_flags,
5060 is_concurrent,
5061 callback,
5062 post_return.map(SendSyncPtr::new),
5063 )
5064 }
5065}