1use crate::component::func::{self, Func};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables};
58use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
59use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
60use anyhow::{Context as _, Result, anyhow, bail};
61use error_contexts::GlobalErrorContextRefCount;
62use futures::channel::oneshot;
63use futures::future::{self, Either, FutureExt};
64use futures::stream::{FuturesUnordered, StreamExt};
65use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
66use std::any::Any;
67use std::borrow::ToOwned;
68use std::boxed::Box;
69use std::cell::UnsafeCell;
70use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
71use std::fmt;
72use std::future::Future;
73use std::marker::PhantomData;
74use std::mem::{self, ManuallyDrop, MaybeUninit};
75use std::ops::DerefMut;
76use std::pin::{Pin, pin};
77use std::ptr::{self, NonNull};
78use std::slice;
79use std::sync::Arc;
80use std::task::{Context, Poll, Waker};
81use std::vec::Vec;
82use table::{TableDebug, TableId};
83use wasmtime_environ::Trap;
84use wasmtime_environ::component::{
85 CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
86 OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
87 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
88 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
89 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
90};
91
92pub use abort::JoinHandle;
93pub use futures_and_streams::{
94 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
95 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
96 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
97};
98pub(crate) use futures_and_streams::{
99 ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
100};
101
102mod abort;
103mod error_contexts;
104mod futures_and_streams;
105pub(crate) mod table;
106pub(crate) mod tls;
107
108const BLOCKED: u32 = 0xffff_ffff;
111
112#[derive(Clone, Copy, Eq, PartialEq, Debug)]
114pub enum Status {
115 Starting = 0,
116 Started = 1,
117 Returned = 2,
118 StartCancelled = 3,
119 ReturnCancelled = 4,
120}
121
122impl Status {
123 pub fn pack(self, waitable: Option<u32>) -> u32 {
129 assert!(matches!(self, Status::Returned) == waitable.is_none());
130 let waitable = waitable.unwrap_or(0);
131 assert!(waitable < (1 << 28));
132 (waitable << 4) | (self as u32)
133 }
134}
135
136#[derive(Clone, Copy, Debug)]
139enum Event {
140 None,
141 Cancelled,
142 Subtask {
143 status: Status,
144 },
145 StreamRead {
146 code: ReturnCode,
147 pending: Option<(TypeStreamTableIndex, u32)>,
148 },
149 StreamWrite {
150 code: ReturnCode,
151 pending: Option<(TypeStreamTableIndex, u32)>,
152 },
153 FutureRead {
154 code: ReturnCode,
155 pending: Option<(TypeFutureTableIndex, u32)>,
156 },
157 FutureWrite {
158 code: ReturnCode,
159 pending: Option<(TypeFutureTableIndex, u32)>,
160 },
161}
162
163impl Event {
164 fn parts(self) -> (u32, u32) {
169 const EVENT_NONE: u32 = 0;
170 const EVENT_SUBTASK: u32 = 1;
171 const EVENT_STREAM_READ: u32 = 2;
172 const EVENT_STREAM_WRITE: u32 = 3;
173 const EVENT_FUTURE_READ: u32 = 4;
174 const EVENT_FUTURE_WRITE: u32 = 5;
175 const EVENT_CANCELLED: u32 = 6;
176 match self {
177 Event::None => (EVENT_NONE, 0),
178 Event::Cancelled => (EVENT_CANCELLED, 0),
179 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
180 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
181 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
182 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
183 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
184 }
185 }
186}
187
188mod callback_code {
190 pub const EXIT: u32 = 0;
191 pub const YIELD: u32 = 1;
192 pub const WAIT: u32 = 2;
193 pub const POLL: u32 = 3;
194}
195
196const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
200
201pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
207 store: StoreContextMut<'a, T>,
208 get_data: fn(&mut T) -> D::Data<'_>,
209}
210
211impl<'a, T, D> Access<'a, T, D>
212where
213 D: HasData + ?Sized,
214 T: 'static,
215{
216 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
218 Self { store, get_data }
219 }
220
221 pub fn data_mut(&mut self) -> &mut T {
223 self.store.data_mut()
224 }
225
226 pub fn get(&mut self) -> D::Data<'_> {
228 (self.get_data)(self.data_mut())
229 }
230
231 pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
235 where
236 T: 'static,
237 {
238 let accessor = Accessor {
239 get_data: self.get_data,
240 token: StoreToken::new(self.store.as_context_mut()),
241 };
242 self.store
243 .as_context_mut()
244 .spawn_with_accessor(accessor, task)
245 }
246
247 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
250 self.get_data
251 }
252}
253
254impl<'a, T, D> AsContext for Access<'a, T, D>
255where
256 D: HasData + ?Sized,
257 T: 'static,
258{
259 type Data = T;
260
261 fn as_context(&self) -> StoreContext<'_, T> {
262 self.store.as_context()
263 }
264}
265
266impl<'a, T, D> AsContextMut for Access<'a, T, D>
267where
268 D: HasData + ?Sized,
269 T: 'static,
270{
271 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
272 self.store.as_context_mut()
273 }
274}
275
276pub struct Accessor<T: 'static, D = HasSelf<T>>
336where
337 D: HasData + ?Sized,
338{
339 token: StoreToken<T>,
340 get_data: fn(&mut T) -> D::Data<'_>,
341}
342
343pub trait AsAccessor {
360 type Data: 'static;
362
363 type AccessorData: HasData + ?Sized;
366
367 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
369}
370
371impl<T: AsAccessor + ?Sized> AsAccessor for &T {
372 type Data = T::Data;
373 type AccessorData = T::AccessorData;
374
375 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
376 T::as_accessor(self)
377 }
378}
379
380impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
381 type Data = T;
382 type AccessorData = D;
383
384 fn as_accessor(&self) -> &Accessor<T, D> {
385 self
386 }
387}
388
389const _: () = {
412 const fn assert<T: Send + Sync>() {}
413 assert::<Accessor<UnsafeCell<u32>>>();
414};
415
416impl<T> Accessor<T> {
417 pub(crate) fn new(token: StoreToken<T>) -> Self {
426 Self {
427 token,
428 get_data: |x| x,
429 }
430 }
431}
432
433impl<T, D> Accessor<T, D>
434where
435 D: HasData + ?Sized,
436{
437 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
455 tls::get(|vmstore| {
456 fun(Access {
457 store: self.token.as_context_mut(vmstore),
458 get_data: self.get_data,
459 })
460 })
461 }
462
463 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
466 self.get_data
467 }
468
469 pub fn with_getter<D2: HasData>(
486 &self,
487 get_data: fn(&mut T) -> D2::Data<'_>,
488 ) -> Accessor<T, D2> {
489 Accessor {
490 token: self.token,
491 get_data,
492 }
493 }
494
495 pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
511 where
512 T: 'static,
513 {
514 let accessor = self.clone_for_spawn();
515 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
516 }
517
518 fn clone_for_spawn(&self) -> Self {
519 Self {
520 token: self.token,
521 get_data: self.get_data,
522 }
523 }
524}
525
526pub trait AccessorTask<T, D, R>: Send + 'static
538where
539 D: HasData + ?Sized,
540{
541 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
543}
544
545enum CallerInfo {
548 Async {
550 params: Vec<ValRaw>,
551 has_result: bool,
552 },
553 Sync {
555 params: Vec<ValRaw>,
556 result_count: u32,
557 },
558}
559
560enum WaitMode {
562 Fiber(StoreFiber<'static>),
564 Callback(Instance),
567}
568
569#[derive(Debug)]
571enum SuspendReason {
572 Waiting {
575 set: TableId<WaitableSet>,
576 thread: QualifiedThreadId,
577 },
578 NeedWork,
581 Yielding { thread: QualifiedThreadId },
584 ExplicitlySuspending { thread: QualifiedThreadId },
586}
587
588enum GuestCallKind {
590 DeliverEvent {
593 instance: Instance,
595 set: Option<TableId<WaitableSet>>,
600 },
601 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
604 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
605}
606
607impl fmt::Debug for GuestCallKind {
608 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
609 match self {
610 Self::DeliverEvent { instance, set } => f
611 .debug_struct("DeliverEvent")
612 .field("instance", instance)
613 .field("set", set)
614 .finish(),
615 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
616 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
617 }
618 }
619}
620
621#[derive(Debug)]
623struct GuestCall {
624 thread: QualifiedThreadId,
625 kind: GuestCallKind,
626}
627
628impl GuestCall {
629 fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
639 let task_instance = state.get_mut(self.thread.task)?.instance;
640 let state = state.instance_state(task_instance);
641
642 let ready = match &self.kind {
643 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
644 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
645 GuestCallKind::StartExplicit(_) => true,
646 };
647 log::trace!(
648 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
649 state.do_not_enter,
650 state.backpressure
651 );
652 Ok(ready)
653 }
654}
655
656enum WorkerItem {
658 GuestCall(GuestCall),
659 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
660}
661
662#[derive(Debug)]
665struct PollParams {
666 instance: Instance,
668 thread: QualifiedThreadId,
670 set: TableId<WaitableSet>,
672}
673
674enum WorkItem {
677 PushFuture(AlwaysMut<HostTaskFuture>),
679 ResumeFiber(StoreFiber<'static>),
681 GuestCall(GuestCall),
683 Poll(PollParams),
685 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
687}
688
689impl fmt::Debug for WorkItem {
690 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
691 match self {
692 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
693 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
694 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
695 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
696 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
697 }
698 }
699}
700
701#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
703pub(crate) enum WaitResult {
704 Cancelled,
705 Completed,
706}
707
708pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
716 store: &mut dyn VMStore,
717 future: impl Future<Output = Result<R>> + Send + 'static,
718 caller_instance: RuntimeComponentInstanceIndex,
719) -> Result<R> {
720 let state = store.concurrent_state_mut();
721
722 let Some(caller) = state.guest_thread else {
726 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
727 Poll::Ready(result) => result,
728 Poll::Pending => {
729 unreachable!()
730 }
731 };
732 };
733
734 let old_result = state
737 .get_mut(caller.task)
738 .with_context(|| format!("bad handle: {caller:?}"))?
739 .result
740 .take();
741
742 let task = state.push(HostTask::new(caller_instance, None))?;
746
747 log::trace!("new host task child of {caller:?}: {task:?}");
748
749 let mut future = Box::pin(async move {
753 let result = future.await?;
754 tls::get(move |store| {
755 let state = store.concurrent_state_mut();
756 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
757
758 Waitable::Host(task).set_event(
759 state,
760 Some(Event::Subtask {
761 status: Status::Returned,
762 }),
763 )?;
764
765 Ok(())
766 })
767 }) as HostTaskFuture;
768
769 let poll = tls::set(store, || {
773 future
774 .as_mut()
775 .poll(&mut Context::from_waker(&Waker::noop()))
776 });
777
778 match poll {
779 Poll::Ready(result) => {
780 result?;
782 log::trace!("delete host task {task:?} (already ready)");
783 store.concurrent_state_mut().delete(task)?;
784 }
785 Poll::Pending => {
786 let state = store.concurrent_state_mut();
791 state.push_future(future);
792
793 let set = state.get_mut(caller.task)?.sync_call_set;
794 Waitable::Host(task).join(state, Some(set))?;
795
796 store.suspend(SuspendReason::Waiting {
797 set,
798 thread: caller,
799 })?;
800 }
801 }
802
803 Ok(*mem::replace(
805 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
806 old_result,
807 )
808 .unwrap()
809 .downcast()
810 .unwrap())
811}
812
813fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
815 match call.kind {
816 GuestCallKind::DeliverEvent { instance, set } => {
817 let (event, waitable) = instance
818 .get_event(store, call.thread.task, set, true)?
819 .unwrap();
820 let state = store.concurrent_state_mut();
821 let task = state.get_mut(call.thread.task)?;
822 let runtime_instance = task.instance;
823 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
824
825 log::trace!(
826 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
827 call.thread,
828 );
829
830 let old_thread = state.guest_thread.replace(call.thread);
831 log::trace!(
832 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
833 call.thread
834 );
835
836 store.maybe_push_call_context(call.thread.task)?;
837
838 let state = store.concurrent_state_mut();
839 state.enter_instance(runtime_instance);
840
841 let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
842
843 let code = callback(store, runtime_instance, event, handle)?;
844
845 let state = store.concurrent_state_mut();
846
847 state.get_mut(call.thread.task)?.callback = Some(callback);
848 state.exit_instance(runtime_instance)?;
849
850 store.maybe_pop_call_context(call.thread.task)?;
851
852 instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
853
854 store.concurrent_state_mut().guest_thread = old_thread;
855 log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
856 }
857 GuestCallKind::StartImplicit(fun) => {
858 fun(store)?;
859 }
860 GuestCallKind::StartExplicit(fun) => {
861 fun(store)?;
862 }
863 }
864
865 Ok(())
866}
867
868impl<T> Store<T> {
869 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
871 where
872 T: Send + 'static,
873 {
874 self.as_context_mut().run_concurrent(fun).await
875 }
876
877 #[doc(hidden)]
878 pub fn assert_concurrent_state_empty(&mut self) {
879 self.as_context_mut().assert_concurrent_state_empty();
880 }
881
882 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
884 where
885 T: 'static,
886 {
887 self.as_context_mut().spawn(task)
888 }
889}
890
891impl<T> StoreContextMut<'_, T> {
892 #[doc(hidden)]
901 pub fn assert_concurrent_state_empty(self) {
902 let store = self.0;
903 store
904 .store_data_mut()
905 .components
906 .assert_guest_tables_empty();
907 let state = store.concurrent_state_mut();
908 assert!(
909 state.table.get_mut().is_empty(),
910 "non-empty table: {:?}",
911 state.table.get_mut()
912 );
913 assert!(state.high_priority.is_empty());
914 assert!(state.low_priority.is_empty());
915 assert!(state.guest_thread.is_none());
916 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
917 assert!(
918 state
919 .instance_states
920 .iter()
921 .all(|(_, state)| state.pending.is_empty())
922 );
923 assert!(state.global_error_context_ref_counts.is_empty());
924 }
925
926 pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
936 where
937 T: 'static,
938 {
939 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
940 self.spawn_with_accessor(accessor, task)
941 }
942
943 fn spawn_with_accessor<D>(
946 self,
947 accessor: Accessor<T, D>,
948 task: impl AccessorTask<T, D, Result<()>>,
949 ) -> JoinHandle
950 where
951 T: 'static,
952 D: HasData + ?Sized,
953 {
954 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
958 self.0
959 .concurrent_state_mut()
960 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
961 handle
962 }
963
964 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1045 where
1046 T: Send + 'static,
1047 {
1048 self.do_run_concurrent(fun, false).await
1049 }
1050
1051 pub(super) async fn run_concurrent_trap_on_idle<R>(
1052 self,
1053 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1054 ) -> Result<R>
1055 where
1056 T: Send + 'static,
1057 {
1058 self.do_run_concurrent(fun, true).await
1059 }
1060
1061 async fn do_run_concurrent<R>(
1062 mut self,
1063 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1064 trap_on_idle: bool,
1065 ) -> Result<R>
1066 where
1067 T: Send + 'static,
1068 {
1069 check_recursive_run();
1070 let token = StoreToken::new(self.as_context_mut());
1071
1072 struct Dropper<'a, T: 'static, V> {
1073 store: StoreContextMut<'a, T>,
1074 value: ManuallyDrop<V>,
1075 }
1076
1077 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1078 fn drop(&mut self) {
1079 tls::set(self.store.0, || {
1080 unsafe { ManuallyDrop::drop(&mut self.value) }
1085 });
1086 }
1087 }
1088
1089 let accessor = &Accessor::new(token);
1090 let dropper = &mut Dropper {
1091 store: self,
1092 value: ManuallyDrop::new(fun(accessor)),
1093 };
1094 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1096
1097 dropper
1098 .store
1099 .as_context_mut()
1100 .poll_until(future, trap_on_idle)
1101 .await
1102 }
1103
1104 async fn poll_until<R>(
1110 mut self,
1111 mut future: Pin<&mut impl Future<Output = R>>,
1112 trap_on_idle: bool,
1113 ) -> Result<R>
1114 where
1115 T: Send + 'static,
1116 {
1117 struct Reset<'a, T: 'static> {
1118 store: StoreContextMut<'a, T>,
1119 futures: Option<FuturesUnordered<HostTaskFuture>>,
1120 }
1121
1122 impl<'a, T> Drop for Reset<'a, T> {
1123 fn drop(&mut self) {
1124 if let Some(futures) = self.futures.take() {
1125 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1126 }
1127 }
1128 }
1129
1130 loop {
1131 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1135 let mut reset = Reset {
1136 store: self.as_context_mut(),
1137 futures,
1138 };
1139 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1140
1141 let result = future::poll_fn(|cx| {
1142 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1145 return Poll::Ready(Ok(Either::Left(value)));
1146 }
1147
1148 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1152 Poll::Ready(Some(output)) => {
1153 match output {
1154 Err(e) => return Poll::Ready(Err(e)),
1155 Ok(()) => {}
1156 }
1157 Poll::Ready(true)
1158 }
1159 Poll::Ready(None) => Poll::Ready(false),
1160 Poll::Pending => Poll::Pending,
1161 };
1162
1163 let state = reset.store.0.concurrent_state_mut();
1166 let ready = mem::take(&mut state.high_priority);
1167 let ready = if ready.is_empty() {
1168 let ready = mem::take(&mut state.low_priority);
1171 if ready.is_empty() {
1172 return match next {
1173 Poll::Ready(true) => {
1174 Poll::Ready(Ok(Either::Right(Vec::new())))
1180 }
1181 Poll::Ready(false) => {
1182 if let Poll::Ready(value) =
1186 tls::set(reset.store.0, || future.as_mut().poll(cx))
1187 {
1188 Poll::Ready(Ok(Either::Left(value)))
1189 } else {
1190 if trap_on_idle {
1196 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1199 } else {
1200 Poll::Pending
1204 }
1205 }
1206 }
1207 Poll::Pending => Poll::Pending,
1212 };
1213 } else {
1214 ready
1215 }
1216 } else {
1217 ready
1218 };
1219
1220 Poll::Ready(Ok(Either::Right(ready)))
1221 })
1222 .await;
1223
1224 drop(reset);
1228
1229 match result? {
1230 Either::Left(value) => break Ok(value),
1233 Either::Right(ready) => {
1236 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1237 store: StoreContextMut<'a, T>,
1238 ready: I,
1239 }
1240
1241 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1242 fn drop(&mut self) {
1243 while let Some(item) = self.ready.next() {
1244 match item {
1245 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1246 WorkItem::PushFuture(future) => {
1247 tls::set(self.store.0, move || drop(future))
1248 }
1249 _ => {}
1250 }
1251 }
1252 }
1253 }
1254
1255 let mut dispose = Dispose {
1256 store: self.as_context_mut(),
1257 ready: ready.into_iter(),
1258 };
1259
1260 while let Some(item) = dispose.ready.next() {
1261 dispose
1262 .store
1263 .as_context_mut()
1264 .handle_work_item(item)
1265 .await?;
1266 }
1267 }
1268 }
1269 }
1270 }
1271
1272 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1274 where
1275 T: Send,
1276 {
1277 log::trace!("handle work item {item:?}");
1278 match item {
1279 WorkItem::PushFuture(future) => {
1280 self.0
1281 .concurrent_state_mut()
1282 .futures
1283 .get_mut()
1284 .as_mut()
1285 .unwrap()
1286 .push(future.into_inner());
1287 }
1288 WorkItem::ResumeFiber(fiber) => {
1289 self.0.resume_fiber(fiber).await?;
1290 }
1291 WorkItem::GuestCall(call) => {
1292 let state = self.0.concurrent_state_mut();
1293 if call.is_ready(state)? {
1294 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1295 } else {
1296 let task = state.get_mut(call.thread.task)?;
1297 if !task.starting_sent {
1298 task.starting_sent = true;
1299 if let GuestCallKind::StartImplicit(_) = &call.kind {
1300 Waitable::Guest(call.thread.task).set_event(
1301 state,
1302 Some(Event::Subtask {
1303 status: Status::Starting,
1304 }),
1305 )?;
1306 }
1307 }
1308
1309 let runtime_instance = state.get_mut(call.thread.task)?.instance;
1310 state
1311 .instance_state(runtime_instance)
1312 .pending
1313 .insert(call.thread, call.kind);
1314 }
1315 }
1316 WorkItem::Poll(params) => {
1317 let state = self.0.concurrent_state_mut();
1318 if state.get_mut(params.thread.task)?.event.is_some()
1319 || !state.get_mut(params.set)?.ready.is_empty()
1320 {
1321 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1324 thread: params.thread,
1325 kind: GuestCallKind::DeliverEvent {
1326 instance: params.instance,
1327 set: Some(params.set),
1328 },
1329 }));
1330 } else {
1331 state.get_mut(params.thread.task)?.event = Some(Event::None);
1334 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1335 thread: params.thread,
1336 kind: GuestCallKind::DeliverEvent {
1337 instance: params.instance,
1338 set: Some(params.set),
1339 },
1340 }));
1341 }
1342 }
1343 WorkItem::WorkerFunction(fun) => {
1344 self.run_on_worker(WorkerItem::Function(fun)).await?;
1345 }
1346 }
1347
1348 Ok(())
1349 }
1350
1351 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1353 where
1354 T: Send,
1355 {
1356 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1357 fiber
1358 } else {
1359 fiber::make_fiber(self.0, move |store| {
1360 loop {
1361 match store.concurrent_state_mut().worker_item.take().unwrap() {
1362 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1363 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1364 }
1365
1366 store.suspend(SuspendReason::NeedWork)?;
1367 }
1368 })?
1369 };
1370
1371 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1372 assert!(worker_item.is_none());
1373 *worker_item = Some(item);
1374
1375 self.0.resume_fiber(worker).await
1376 }
1377
1378 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1383 where
1384 T: 'static,
1385 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1386 + Send
1387 + Sync
1388 + 'static,
1389 R: Send + Sync + 'static,
1390 {
1391 let token = StoreToken::new(self);
1392 async move {
1393 let mut accessor = Accessor::new(token);
1394 closure(&mut accessor).await
1395 }
1396 }
1397}
1398
1399impl StoreOpaque {
1400 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1403 let old_thread = self.concurrent_state_mut().guest_thread;
1404 log::trace!("resume_fiber: save current thread {old_thread:?}");
1405
1406 let fiber = fiber::resolve_or_release(self, fiber).await?;
1407
1408 let state = self.concurrent_state_mut();
1409
1410 state.guest_thread = old_thread;
1411 if let Some(ref ot) = old_thread {
1412 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1413 }
1414 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1415
1416 if let Some(mut fiber) = fiber {
1417 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1418 match state.suspend_reason.take().unwrap() {
1420 SuspendReason::NeedWork => {
1421 if state.worker.is_none() {
1422 state.worker = Some(fiber);
1423 } else {
1424 fiber.dispose(self);
1425 }
1426 }
1427 SuspendReason::Yielding { thread, .. } => {
1428 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1429 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1430 }
1431 SuspendReason::ExplicitlySuspending { thread, .. } => {
1432 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1433 }
1434 SuspendReason::Waiting { set, thread } => {
1435 let old = state
1436 .get_mut(set)?
1437 .waiting
1438 .insert(thread, WaitMode::Fiber(fiber));
1439 assert!(old.is_none());
1440 }
1441 };
1442 } else {
1443 log::trace!("resume_fiber: fiber has exited");
1444 }
1445
1446 Ok(())
1447 }
1448
1449 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1455 log::trace!("suspend fiber: {reason:?}");
1456
1457 let task = match &reason {
1461 SuspendReason::Yielding { thread, .. }
1462 | SuspendReason::Waiting { thread, .. }
1463 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1464 SuspendReason::NeedWork => None,
1465 };
1466
1467 let old_guest_thread = if let Some(task) = task {
1468 self.maybe_pop_call_context(task)?;
1469 self.concurrent_state_mut().guest_thread
1470 } else {
1471 None
1472 };
1473
1474 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1475 assert!(suspend_reason.is_none());
1476 *suspend_reason = Some(reason);
1477
1478 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1479
1480 if let Some(task) = task {
1481 self.concurrent_state_mut().guest_thread = old_guest_thread;
1482 self.maybe_push_call_context(task)?;
1483 }
1484
1485 Ok(())
1486 }
1487
1488 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1492 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1493
1494 if !task.returned_or_cancelled() {
1495 log::trace!("push call context for {guest_task:?}");
1496 let call_context = task.call_context.take().unwrap();
1497 self.component_resource_state().0.push(call_context);
1498 }
1499 Ok(())
1500 }
1501
1502 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1506 if !self
1507 .concurrent_state_mut()
1508 .get_mut(guest_task)?
1509 .returned_or_cancelled()
1510 {
1511 log::trace!("pop call context for {guest_task:?}");
1512 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1513 self.concurrent_state_mut()
1514 .get_mut(guest_task)?
1515 .call_context = call_context;
1516 }
1517 Ok(())
1518 }
1519
1520 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1521 let state = self.concurrent_state_mut();
1522 let caller = state.guest_thread.unwrap();
1523 let old_set = waitable.common(state)?.set;
1524 let set = state.get_mut(caller.task)?.sync_call_set;
1525 waitable.join(state, Some(set))?;
1526 self.suspend(SuspendReason::Waiting {
1527 set,
1528 thread: caller,
1529 })?;
1530 let state = self.concurrent_state_mut();
1531 waitable.join(state, old_set)
1532 }
1533}
1534
1535impl Instance {
1536 fn get_event(
1539 self,
1540 store: &mut StoreOpaque,
1541 guest_task: TableId<GuestTask>,
1542 set: Option<TableId<WaitableSet>>,
1543 cancellable: bool,
1544 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1545 let state = store.concurrent_state_mut();
1546
1547 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1548 log::trace!("deliver event {event:?} to {guest_task:?}");
1549
1550 if cancellable || !matches!(event, Event::Cancelled) {
1551 return Ok(Some((event, None)));
1552 } else {
1553 state.get_mut(guest_task)?.event = Some(event);
1554 }
1555 }
1556
1557 Ok(
1558 if let Some((set, waitable)) = set
1559 .and_then(|set| {
1560 state
1561 .get_mut(set)
1562 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1563 .transpose()
1564 })
1565 .transpose()?
1566 {
1567 let common = waitable.common(state)?;
1568 let handle = common.handle.unwrap();
1569 let event = common.event.take().unwrap();
1570
1571 log::trace!(
1572 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1573 );
1574
1575 waitable.on_delivery(store, self, event);
1576
1577 Some((event, Some((waitable, handle))))
1578 } else {
1579 None
1580 },
1581 )
1582 }
1583
1584 fn handle_callback_code(
1587 self,
1588 store: &mut StoreOpaque,
1589 guest_thread: QualifiedThreadId,
1590 runtime_instance: RuntimeComponentInstanceIndex,
1591 code: u32,
1592 ) -> Result<()> {
1593 let (code, set) = unpack_callback_code(code);
1594
1595 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1596
1597 let state = store.concurrent_state_mut();
1598
1599 let get_set = |store, handle| {
1600 if handle == 0 {
1601 bail!("invalid waitable-set handle");
1602 }
1603
1604 let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1605 .waitable_set_rep(handle)?;
1606
1607 Ok(TableId::<WaitableSet>::new(set))
1608 };
1609
1610 match code {
1611 callback_code::EXIT => {
1612 log::trace!("implicit thread {guest_thread:?} completed");
1613 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1614 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1615 if task.threads.is_empty() && !task.returned_or_cancelled() {
1616 bail!(Trap::NoAsyncResult);
1617 }
1618 match &task.caller {
1619 Caller::Host { .. } => {
1620 if task.ready_to_delete() {
1621 Waitable::Guest(guest_thread.task)
1622 .delete_from(store.concurrent_state_mut())?;
1623 }
1624 }
1625 Caller::Guest { .. } => {
1626 task.exited = true;
1627 task.callback = None;
1628 }
1629 }
1630 }
1631 callback_code::YIELD => {
1632 let task = state.get_mut(guest_thread.task)?;
1635 assert!(task.event.is_none());
1636 task.event = Some(Event::None);
1637 state.push_low_priority(WorkItem::GuestCall(GuestCall {
1638 thread: guest_thread,
1639 kind: GuestCallKind::DeliverEvent {
1640 instance: self,
1641 set: None,
1642 },
1643 }));
1644 }
1645 callback_code::WAIT | callback_code::POLL => {
1646 let set = get_set(store, set)?;
1647 let state = store.concurrent_state_mut();
1648
1649 if state.get_mut(guest_thread.task)?.event.is_some()
1650 || !state.get_mut(set)?.ready.is_empty()
1651 {
1652 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1654 thread: guest_thread,
1655 kind: GuestCallKind::DeliverEvent {
1656 instance: self,
1657 set: Some(set),
1658 },
1659 }));
1660 } else {
1661 match code {
1663 callback_code::POLL => {
1664 state.push_low_priority(WorkItem::Poll(PollParams {
1667 instance: self,
1668 thread: guest_thread,
1669 set,
1670 }));
1671 }
1672 callback_code::WAIT => {
1673 let old = state
1680 .get_mut(guest_thread.thread)?
1681 .wake_on_cancel
1682 .replace(set);
1683 assert!(old.is_none());
1684 let old = state
1685 .get_mut(set)?
1686 .waiting
1687 .insert(guest_thread, WaitMode::Callback(self));
1688 assert!(old.is_none());
1689 }
1690 _ => unreachable!(),
1691 }
1692 }
1693 }
1694 _ => bail!("unsupported callback code: {code}"),
1695 }
1696
1697 Ok(())
1698 }
1699
1700 fn cleanup_thread(
1701 self,
1702 store: &mut StoreOpaque,
1703 guest_thread: QualifiedThreadId,
1704 runtime_instance: RuntimeComponentInstanceIndex,
1705 ) -> Result<()> {
1706 let guest_id = store
1707 .concurrent_state_mut()
1708 .get_mut(guest_thread.thread)?
1709 .instance_rep;
1710 self.id().get_mut(store).guest_tables().0[runtime_instance]
1711 .guest_thread_remove(guest_id.unwrap())?;
1712
1713 store.concurrent_state_mut().delete(guest_thread.thread)?;
1714 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1715 task.threads.remove(&guest_thread.thread);
1716 Ok(())
1717 }
1718
1719 unsafe fn queue_call<T: 'static>(
1726 self,
1727 mut store: StoreContextMut<T>,
1728 guest_thread: QualifiedThreadId,
1729 callee: SendSyncPtr<VMFuncRef>,
1730 param_count: usize,
1731 result_count: usize,
1732 flags: Option<InstanceFlags>,
1733 async_: bool,
1734 callback: Option<SendSyncPtr<VMFuncRef>>,
1735 post_return: Option<SendSyncPtr<VMFuncRef>>,
1736 ) -> Result<()> {
1737 unsafe fn make_call<T: 'static>(
1752 store: StoreContextMut<T>,
1753 guest_thread: QualifiedThreadId,
1754 callee: SendSyncPtr<VMFuncRef>,
1755 param_count: usize,
1756 result_count: usize,
1757 flags: Option<InstanceFlags>,
1758 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1759 + Send
1760 + Sync
1761 + 'static
1762 + use<T> {
1763 let token = StoreToken::new(store);
1764 move |store: &mut dyn VMStore| {
1765 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1766
1767 store
1768 .concurrent_state_mut()
1769 .get_mut(guest_thread.thread)?
1770 .state = GuestThreadState::Running;
1771 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1772 let may_enter_after_call = task.call_post_return_automatically();
1773 let lower = task.lower_params.take().unwrap();
1774
1775 lower(store, &mut storage[..param_count])?;
1776
1777 let mut store = token.as_context_mut(store);
1778
1779 unsafe {
1782 if let Some(mut flags) = flags {
1783 flags.set_may_enter(false);
1784 }
1785 crate::Func::call_unchecked_raw(
1786 &mut store,
1787 callee.as_non_null(),
1788 NonNull::new(
1789 &mut storage[..param_count.max(result_count)]
1790 as *mut [MaybeUninit<ValRaw>] as _,
1791 )
1792 .unwrap(),
1793 )?;
1794 if let Some(mut flags) = flags {
1795 flags.set_may_enter(may_enter_after_call);
1796 }
1797 }
1798
1799 Ok(storage)
1800 }
1801 }
1802
1803 let call = unsafe {
1807 make_call(
1808 store.as_context_mut(),
1809 guest_thread,
1810 callee,
1811 param_count,
1812 result_count,
1813 flags,
1814 )
1815 };
1816
1817 let callee_instance = store
1818 .0
1819 .concurrent_state_mut()
1820 .get_mut(guest_thread.task)?
1821 .instance;
1822 let fun = if callback.is_some() {
1823 assert!(async_);
1824
1825 Box::new(move |store: &mut dyn VMStore| {
1826 self.add_guest_thread_to_instance_table(
1827 guest_thread.thread,
1828 store,
1829 callee_instance,
1830 )?;
1831 let old_thread = store
1832 .concurrent_state_mut()
1833 .guest_thread
1834 .replace(guest_thread);
1835 log::trace!(
1836 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1837 );
1838
1839 store.maybe_push_call_context(guest_thread.task)?;
1840
1841 store.concurrent_state_mut().enter_instance(callee_instance);
1842
1843 let storage = call(store)?;
1850
1851 store
1852 .concurrent_state_mut()
1853 .exit_instance(callee_instance)?;
1854
1855 store.maybe_pop_call_context(guest_thread.task)?;
1856
1857 let state = store.concurrent_state_mut();
1858 state.guest_thread = old_thread;
1859 old_thread
1860 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1861 log::trace!("stackless call: restored {old_thread:?} as current thread");
1862
1863 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1866
1867 self.handle_callback_code(store, guest_thread, callee_instance, code)?;
1868
1869 Ok(())
1870 }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1871 } else {
1872 let token = StoreToken::new(store.as_context_mut());
1873 Box::new(move |store: &mut dyn VMStore| {
1874 self.add_guest_thread_to_instance_table(
1875 guest_thread.thread,
1876 store,
1877 callee_instance,
1878 )?;
1879 let old_thread = store
1880 .concurrent_state_mut()
1881 .guest_thread
1882 .replace(guest_thread);
1883 log::trace!(
1884 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1885 );
1886 let mut flags = self.id().get(store).instance_flags(callee_instance);
1887
1888 store.maybe_push_call_context(guest_thread.task)?;
1889
1890 if !async_ {
1894 store.concurrent_state_mut().enter_instance(callee_instance);
1895 }
1896
1897 let storage = call(store)?;
1904
1905 self.cleanup_thread(store, guest_thread, callee_instance)?;
1907
1908 if async_ {
1909 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1910 if task.threads.is_empty() && !task.returned_or_cancelled() {
1911 bail!(Trap::NoAsyncResult);
1912 }
1913 } else {
1914 let lift = {
1920 let state = store.concurrent_state_mut();
1921 state.exit_instance(callee_instance)?;
1922
1923 assert!(state.get_mut(guest_thread.task)?.result.is_none());
1924
1925 state
1926 .get_mut(guest_thread.task)?
1927 .lift_result
1928 .take()
1929 .unwrap()
1930 };
1931
1932 let result = (lift.lift)(store, unsafe {
1935 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1936 &storage[..result_count],
1937 )
1938 })?;
1939
1940 let post_return_arg = match result_count {
1941 0 => ValRaw::i32(0),
1942 1 => unsafe { storage[0].assume_init() },
1945 _ => unreachable!(),
1946 };
1947
1948 if store
1949 .concurrent_state_mut()
1950 .get_mut(guest_thread.task)?
1951 .call_post_return_automatically()
1952 {
1953 unsafe {
1954 flags.set_may_leave(false);
1955 flags.set_needs_post_return(false);
1956 }
1957
1958 if let Some(func) = post_return {
1959 let mut store = token.as_context_mut(store);
1960
1961 unsafe {
1967 crate::Func::call_unchecked_raw(
1968 &mut store,
1969 func.as_non_null(),
1970 slice::from_ref(&post_return_arg).into(),
1971 )?;
1972 }
1973 }
1974
1975 unsafe {
1976 flags.set_may_leave(true);
1977 flags.set_may_enter(true);
1978 }
1979 }
1980
1981 self.task_complete(
1982 store,
1983 guest_thread.task,
1984 result,
1985 Status::Returned,
1986 post_return_arg,
1987 )?;
1988 }
1989
1990 store.maybe_pop_call_context(guest_thread.task)?;
1991
1992 let state = store.concurrent_state_mut();
1993 let task = state.get_mut(guest_thread.task)?;
1994
1995 match &task.caller {
1996 Caller::Host { .. } => {
1997 if task.ready_to_delete() {
1998 Waitable::Guest(guest_thread.task).delete_from(state)?;
1999 }
2000 }
2001 Caller::Guest { .. } => {
2002 task.exited = true;
2003 }
2004 }
2005
2006 Ok(())
2007 })
2008 };
2009
2010 store
2011 .0
2012 .concurrent_state_mut()
2013 .push_high_priority(WorkItem::GuestCall(GuestCall {
2014 thread: guest_thread,
2015 kind: GuestCallKind::StartImplicit(fun),
2016 }));
2017
2018 Ok(())
2019 }
2020
2021 unsafe fn prepare_call<T: 'static>(
2034 self,
2035 mut store: StoreContextMut<T>,
2036 start: *mut VMFuncRef,
2037 return_: *mut VMFuncRef,
2038 caller_instance: RuntimeComponentInstanceIndex,
2039 callee_instance: RuntimeComponentInstanceIndex,
2040 task_return_type: TypeTupleIndex,
2041 memory: *mut VMMemoryDefinition,
2042 string_encoding: u8,
2043 caller_info: CallerInfo,
2044 ) -> Result<()> {
2045 self.id().get(store.0).check_may_leave(caller_instance)?;
2046
2047 enum ResultInfo {
2048 Heap { results: u32 },
2049 Stack { result_count: u32 },
2050 }
2051
2052 let result_info = match &caller_info {
2053 CallerInfo::Async {
2054 has_result: true,
2055 params,
2056 } => ResultInfo::Heap {
2057 results: params.last().unwrap().get_u32(),
2058 },
2059 CallerInfo::Async {
2060 has_result: false, ..
2061 } => ResultInfo::Stack { result_count: 0 },
2062 CallerInfo::Sync {
2063 result_count,
2064 params,
2065 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2066 results: params.last().unwrap().get_u32(),
2067 },
2068 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2069 result_count: *result_count,
2070 },
2071 };
2072
2073 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2074
2075 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2079 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2080 let token = StoreToken::new(store.as_context_mut());
2081 let state = store.0.concurrent_state_mut();
2082 let old_thread = state.guest_thread.take();
2083 let new_task = GuestTask::new(
2084 state,
2085 Box::new(move |store, dst| {
2086 let mut store = token.as_context_mut(store);
2087 assert!(dst.len() <= MAX_FLAT_PARAMS);
2088 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2090 let count = match caller_info {
2091 CallerInfo::Async { params, has_result } => {
2095 let params = ¶ms[..params.len() - usize::from(has_result)];
2096 for (param, src) in params.iter().zip(&mut src) {
2097 src.write(*param);
2098 }
2099 params.len()
2100 }
2101
2102 CallerInfo::Sync { params, .. } => {
2104 for (param, src) in params.iter().zip(&mut src) {
2105 src.write(*param);
2106 }
2107 params.len()
2108 }
2109 };
2110 unsafe {
2117 crate::Func::call_unchecked_raw(
2118 &mut store,
2119 start.as_non_null(),
2120 NonNull::new(
2121 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2122 )
2123 .unwrap(),
2124 )?;
2125 }
2126 dst.copy_from_slice(&src[..dst.len()]);
2127 let state = store.0.concurrent_state_mut();
2128 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2129 state,
2130 Some(Event::Subtask {
2131 status: Status::Started,
2132 }),
2133 )?;
2134 Ok(())
2135 }),
2136 LiftResult {
2137 lift: Box::new(move |store, src| {
2138 let mut store = token.as_context_mut(store);
2141 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2143 my_src.push(ValRaw::u32(*results));
2144 }
2145 unsafe {
2152 crate::Func::call_unchecked_raw(
2153 &mut store,
2154 return_.as_non_null(),
2155 my_src.as_mut_slice().into(),
2156 )?;
2157 }
2158 let state = store.0.concurrent_state_mut();
2159 let thread = state.guest_thread.unwrap();
2160 if sync_caller {
2161 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2162 if let ResultInfo::Stack { result_count } = &result_info {
2163 match result_count {
2164 0 => None,
2165 1 => Some(my_src[0]),
2166 _ => unreachable!(),
2167 }
2168 } else {
2169 None
2170 },
2171 );
2172 }
2173 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2174 }),
2175 ty: task_return_type,
2176 memory: NonNull::new(memory).map(SendSyncPtr::new),
2177 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2178 },
2179 Caller::Guest {
2180 thread: old_thread.unwrap(),
2181 instance: caller_instance,
2182 },
2183 None,
2184 callee_instance,
2185 )?;
2186
2187 let guest_task = state.push(new_task)?;
2188 let new_thread = GuestThread::new_implicit(guest_task);
2189 let guest_thread = state.push(new_thread)?;
2190 state.get_mut(guest_task)?.threads.insert(guest_thread);
2191
2192 let state = store.0.concurrent_state_mut();
2193 if let Some(old_thread) = old_thread {
2194 if !state.may_enter(guest_task) {
2195 bail!(crate::Trap::CannotEnterComponent);
2196 }
2197
2198 state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2199 };
2200
2201 state.guest_thread = Some(QualifiedThreadId {
2204 task: guest_task,
2205 thread: guest_thread,
2206 });
2207 log::trace!(
2208 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2209 );
2210
2211 Ok(())
2212 }
2213
2214 unsafe fn call_callback<T>(
2219 self,
2220 mut store: StoreContextMut<T>,
2221 callee_instance: RuntimeComponentInstanceIndex,
2222 function: SendSyncPtr<VMFuncRef>,
2223 event: Event,
2224 handle: u32,
2225 may_enter_after_call: bool,
2226 ) -> Result<u32> {
2227 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2228
2229 let (ordinal, result) = event.parts();
2230 let params = &mut [
2231 ValRaw::u32(ordinal),
2232 ValRaw::u32(handle),
2233 ValRaw::u32(result),
2234 ];
2235 unsafe {
2240 flags.set_may_enter(false);
2241 crate::Func::call_unchecked_raw(
2242 &mut store,
2243 function.as_non_null(),
2244 params.as_mut_slice().into(),
2245 )?;
2246 flags.set_may_enter(may_enter_after_call);
2247 }
2248 Ok(params[0].get_u32())
2249 }
2250
2251 unsafe fn start_call<T: 'static>(
2264 self,
2265 mut store: StoreContextMut<T>,
2266 callback: *mut VMFuncRef,
2267 post_return: *mut VMFuncRef,
2268 callee: *mut VMFuncRef,
2269 param_count: u32,
2270 result_count: u32,
2271 flags: u32,
2272 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2273 ) -> Result<u32> {
2274 let token = StoreToken::new(store.as_context_mut());
2275 let async_caller = storage.is_none();
2276 let state = store.0.concurrent_state_mut();
2277 let guest_thread = state.guest_thread.unwrap();
2278 let may_enter_after_call = state
2279 .get_mut(guest_thread.task)?
2280 .call_post_return_automatically();
2281 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2282 let param_count = usize::try_from(param_count).unwrap();
2283 assert!(param_count <= MAX_FLAT_PARAMS);
2284 let result_count = usize::try_from(result_count).unwrap();
2285 assert!(result_count <= MAX_FLAT_RESULTS);
2286
2287 let task = state.get_mut(guest_thread.task)?;
2288 if !callback.is_null() {
2289 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2293 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2294 let store = token.as_context_mut(store);
2295 unsafe {
2296 self.call_callback::<T>(
2297 store,
2298 runtime_instance,
2299 callback,
2300 event,
2301 handle,
2302 may_enter_after_call,
2303 )
2304 }
2305 }));
2306 }
2307
2308 let Caller::Guest {
2309 thread: caller,
2310 instance: runtime_instance,
2311 } = &task.caller
2312 else {
2313 unreachable!()
2316 };
2317 let caller = *caller;
2318 let caller_instance = *runtime_instance;
2319
2320 let callee_instance = task.instance;
2321
2322 let instance_flags = if callback.is_null() {
2323 None
2324 } else {
2325 Some(self.id().get(store.0).instance_flags(callee_instance))
2326 };
2327
2328 unsafe {
2330 self.queue_call(
2331 store.as_context_mut(),
2332 guest_thread,
2333 callee,
2334 param_count,
2335 result_count,
2336 instance_flags,
2337 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2338 NonNull::new(callback).map(SendSyncPtr::new),
2339 NonNull::new(post_return).map(SendSyncPtr::new),
2340 )?;
2341 }
2342
2343 let state = store.0.concurrent_state_mut();
2344
2345 let guest_waitable = Waitable::Guest(guest_thread.task);
2348 let old_set = guest_waitable.common(state)?.set;
2349 let set = state.get_mut(caller.task)?.sync_call_set;
2350 guest_waitable.join(state, Some(set))?;
2351
2352 let (status, waitable) = loop {
2368 store.0.suspend(SuspendReason::Waiting {
2369 set,
2370 thread: caller,
2371 })?;
2372
2373 let state = store.0.concurrent_state_mut();
2374
2375 log::trace!("taking event for {:?}", guest_thread.task);
2376 let event = guest_waitable.take_event(state)?;
2377 let Some(Event::Subtask { status }) = event else {
2378 unreachable!();
2379 };
2380
2381 log::trace!("status {status:?} for {:?}", guest_thread.task);
2382
2383 if status == Status::Returned {
2384 break (status, None);
2386 } else if async_caller {
2387 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2391 .subtask_insert_guest(guest_thread.task.rep())?;
2392 store
2393 .0
2394 .concurrent_state_mut()
2395 .get_mut(guest_thread.task)?
2396 .common
2397 .handle = Some(handle);
2398 break (status, Some(handle));
2399 } else {
2400 }
2404 };
2405
2406 let state = store.0.concurrent_state_mut();
2407
2408 guest_waitable.join(state, old_set)?;
2409
2410 if let Some(storage) = storage {
2411 let task = state.get_mut(guest_thread.task)?;
2415 if let Some(result) = task.sync_result.take() {
2416 if let Some(result) = result {
2417 storage[0] = MaybeUninit::new(result);
2418 }
2419
2420 if task.exited {
2421 if task.ready_to_delete() {
2422 Waitable::Guest(guest_thread.task).delete_from(state)?;
2423 }
2424 }
2425 }
2426 }
2427
2428 state.guest_thread = Some(caller);
2430 state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2431 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2432
2433 Ok(status.pack(waitable))
2434 }
2435
2436 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2448 self,
2449 mut store: StoreContextMut<T>,
2450 future: impl Future<Output = Result<R>> + Send + 'static,
2451 caller_instance: RuntimeComponentInstanceIndex,
2452 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2453 ) -> Result<Option<u32>> {
2454 let token = StoreToken::new(store.as_context_mut());
2455 let state = store.0.concurrent_state_mut();
2456 let caller = state.guest_thread.unwrap();
2457
2458 let (join_handle, future) = JoinHandle::run(async move {
2461 let mut future = pin!(future);
2462 let mut call_context = None;
2463 future::poll_fn(move |cx| {
2464 tls::get(|store| {
2467 if let Some(call_context) = call_context.take() {
2468 token
2469 .as_context_mut(store)
2470 .0
2471 .component_resource_state()
2472 .0
2473 .push(call_context);
2474 }
2475 });
2476
2477 let result = future.as_mut().poll(cx);
2478
2479 if result.is_pending() {
2480 tls::get(|store| {
2483 call_context = Some(
2484 token
2485 .as_context_mut(store)
2486 .0
2487 .component_resource_state()
2488 .0
2489 .pop()
2490 .unwrap(),
2491 );
2492 });
2493 }
2494 result
2495 })
2496 .await
2497 });
2498
2499 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2503
2504 log::trace!("new host task child of {caller:?}: {task:?}");
2505
2506 let mut future = Box::pin(future);
2507
2508 let poll = tls::set(store.0, || {
2513 future
2514 .as_mut()
2515 .poll(&mut Context::from_waker(&Waker::noop()))
2516 });
2517
2518 Ok(match poll {
2519 Poll::Ready(None) => unreachable!(),
2520 Poll::Ready(Some(result)) => {
2521 lower(store.as_context_mut(), result?)?;
2524 log::trace!("delete host task {task:?} (already ready)");
2525 store.0.concurrent_state_mut().delete(task)?;
2526 None
2527 }
2528 Poll::Pending => {
2529 let future =
2537 Box::pin(async move {
2538 let result = match future.await {
2539 Some(result) => result?,
2540 None => return Ok(()),
2542 };
2543 tls::get(move |store| {
2544 store.concurrent_state_mut().push_high_priority(
2550 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2551 lower(token.as_context_mut(store), result)?;
2552 let state = store.concurrent_state_mut();
2553 state.get_mut(task)?.join_handle.take();
2554 Waitable::Host(task).set_event(
2555 state,
2556 Some(Event::Subtask {
2557 status: Status::Returned,
2558 }),
2559 )
2560 }))),
2561 );
2562 Ok(())
2563 })
2564 });
2565
2566 store.0.concurrent_state_mut().push_future(future);
2567 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2568 .subtask_insert_host(task.rep())?;
2569 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2570 log::trace!(
2571 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2572 );
2573 Some(handle)
2574 }
2575 })
2576 }
2577
2578 pub(crate) fn task_return(
2581 self,
2582 store: &mut dyn VMStore,
2583 caller: RuntimeComponentInstanceIndex,
2584 ty: TypeTupleIndex,
2585 options: OptionsIndex,
2586 storage: &[ValRaw],
2587 ) -> Result<()> {
2588 self.id().get(store).check_may_leave(caller)?;
2589 let state = store.concurrent_state_mut();
2590 let guest_thread = state.guest_thread.unwrap();
2591 let lift = state
2592 .get_mut(guest_thread.task)?
2593 .lift_result
2594 .take()
2595 .ok_or_else(|| {
2596 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2597 })?;
2598 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2599
2600 let CanonicalOptions {
2601 string_encoding,
2602 data_model,
2603 ..
2604 } = &self.id().get(store).component().env_component().options[options];
2605
2606 let invalid = ty != lift.ty
2607 || string_encoding != &lift.string_encoding
2608 || match data_model {
2609 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2610 Some(memory) => {
2611 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2612 let actual = self.id().get(store).runtime_memory(memory);
2613 expected != actual.as_ptr()
2614 }
2615 None => false,
2618 },
2619 CanonicalOptionsDataModel::Gc { .. } => true,
2621 };
2622
2623 if invalid {
2624 bail!("invalid `task.return` signature and/or options for current task");
2625 }
2626
2627 log::trace!("task.return for {guest_thread:?}");
2628
2629 let result = (lift.lift)(store, storage)?;
2630 self.task_complete(
2631 store,
2632 guest_thread.task,
2633 result,
2634 Status::Returned,
2635 ValRaw::i32(0),
2636 )
2637 }
2638
2639 pub(crate) fn task_cancel(
2641 self,
2642 store: &mut StoreOpaque,
2643 caller: RuntimeComponentInstanceIndex,
2644 ) -> Result<()> {
2645 self.id().get(store).check_may_leave(caller)?;
2646 let state = store.concurrent_state_mut();
2647 let guest_thread = state.guest_thread.unwrap();
2648 let task = state.get_mut(guest_thread.task)?;
2649 if !task.cancel_sent {
2650 bail!("`task.cancel` called by task which has not been cancelled")
2651 }
2652 _ = task.lift_result.take().ok_or_else(|| {
2653 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2654 })?;
2655
2656 assert!(task.result.is_none());
2657
2658 log::trace!("task.cancel for {guest_thread:?}");
2659
2660 self.task_complete(
2661 store,
2662 guest_thread.task,
2663 Box::new(DummyResult),
2664 Status::ReturnCancelled,
2665 ValRaw::i32(0),
2666 )
2667 }
2668
2669 fn task_complete(
2675 self,
2676 store: &mut StoreOpaque,
2677 guest_task: TableId<GuestTask>,
2678 result: Box<dyn Any + Send + Sync>,
2679 status: Status,
2680 post_return_arg: ValRaw,
2681 ) -> Result<()> {
2682 if store
2683 .concurrent_state_mut()
2684 .get_mut(guest_task)?
2685 .call_post_return_automatically()
2686 {
2687 let (calls, host_table, _, instance) =
2688 store.component_resource_state_with_instance(self);
2689 ResourceTables {
2690 calls,
2691 host_table: Some(host_table),
2692 guest: Some(instance.guest_tables()),
2693 }
2694 .exit_call()?;
2695 } else {
2696 let function_index = store
2701 .concurrent_state_mut()
2702 .get_mut(guest_task)?
2703 .function_index
2704 .unwrap();
2705 self.id()
2706 .get_mut(store)
2707 .post_return_arg_set(function_index, post_return_arg);
2708 }
2709
2710 let state = store.concurrent_state_mut();
2711 let task = state.get_mut(guest_task)?;
2712
2713 if let Caller::Host { tx, .. } = &mut task.caller {
2714 if let Some(tx) = tx.take() {
2715 _ = tx.send(result);
2716 }
2717 } else {
2718 task.result = Some(result);
2719 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2720 }
2721
2722 Ok(())
2723 }
2724
2725 pub(crate) fn waitable_set_new(
2727 self,
2728 store: &mut StoreOpaque,
2729 caller_instance: RuntimeComponentInstanceIndex,
2730 ) -> Result<u32> {
2731 self.id().get_mut(store).check_may_leave(caller_instance)?;
2732 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2733 let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2734 .waitable_set_insert(set.rep())?;
2735 log::trace!("new waitable set {set:?} (handle {handle})");
2736 Ok(handle)
2737 }
2738
2739 pub(crate) fn waitable_set_drop(
2741 self,
2742 store: &mut StoreOpaque,
2743 caller_instance: RuntimeComponentInstanceIndex,
2744 set: u32,
2745 ) -> Result<()> {
2746 self.id().get_mut(store).check_may_leave(caller_instance)?;
2747 let rep =
2748 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2749
2750 log::trace!("drop waitable set {rep} (handle {set})");
2751
2752 let set = store
2753 .concurrent_state_mut()
2754 .delete(TableId::<WaitableSet>::new(rep))?;
2755
2756 if !set.waiting.is_empty() {
2757 bail!("cannot drop waitable set with waiters");
2758 }
2759
2760 Ok(())
2761 }
2762
2763 pub(crate) fn waitable_join(
2765 self,
2766 store: &mut StoreOpaque,
2767 caller_instance: RuntimeComponentInstanceIndex,
2768 waitable_handle: u32,
2769 set_handle: u32,
2770 ) -> Result<()> {
2771 let mut instance = self.id().get_mut(store);
2772 instance.check_may_leave(caller_instance)?;
2773 let waitable =
2774 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2775
2776 let set = if set_handle == 0 {
2777 None
2778 } else {
2779 let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2780
2781 Some(TableId::<WaitableSet>::new(set))
2782 };
2783
2784 log::trace!(
2785 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2786 );
2787
2788 waitable.join(store.concurrent_state_mut(), set)
2789 }
2790
2791 pub(crate) fn subtask_drop(
2793 self,
2794 store: &mut StoreOpaque,
2795 caller_instance: RuntimeComponentInstanceIndex,
2796 task_id: u32,
2797 ) -> Result<()> {
2798 self.id().get_mut(store).check_may_leave(caller_instance)?;
2799 self.waitable_join(store, caller_instance, task_id, 0)?;
2800
2801 let (rep, is_host) =
2802 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2803
2804 let concurrent_state = store.concurrent_state_mut();
2805 let (waitable, expected_caller_instance, delete) = if is_host {
2806 let id = TableId::<HostTask>::new(rep);
2807 let task = concurrent_state.get_mut(id)?;
2808 if task.join_handle.is_some() {
2809 bail!("cannot drop a subtask which has not yet resolved");
2810 }
2811 (Waitable::Host(id), task.caller_instance, true)
2812 } else {
2813 let id = TableId::<GuestTask>::new(rep);
2814 let task = concurrent_state.get_mut(id)?;
2815 if task.lift_result.is_some() {
2816 bail!("cannot drop a subtask which has not yet resolved");
2817 }
2818 if let Caller::Guest { instance, .. } = &task.caller {
2819 (Waitable::Guest(id), *instance, task.exited)
2820 } else {
2821 unreachable!()
2822 }
2823 };
2824
2825 waitable.common(concurrent_state)?.handle = None;
2826
2827 if waitable.take_event(concurrent_state)?.is_some() {
2828 bail!("cannot drop a subtask with an undelivered event");
2829 }
2830
2831 if delete {
2832 waitable.delete_from(concurrent_state)?;
2833 }
2834
2835 assert_eq!(expected_caller_instance, caller_instance);
2839 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2840 Ok(())
2841 }
2842
2843 pub(crate) fn waitable_set_wait(
2845 self,
2846 store: &mut StoreOpaque,
2847 caller: RuntimeComponentInstanceIndex,
2848 options: OptionsIndex,
2849 set: u32,
2850 payload: u32,
2851 ) -> Result<u32> {
2852 self.id().get(store).check_may_leave(caller)?;
2853 let &CanonicalOptions {
2854 cancellable,
2855 instance: caller_instance,
2856 ..
2857 } = &self.id().get(store).component().env_component().options[options];
2858 let rep =
2859 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2860
2861 self.waitable_check(
2862 store,
2863 cancellable,
2864 WaitableCheck::Wait(WaitableCheckParams {
2865 set: TableId::new(rep),
2866 options,
2867 payload,
2868 }),
2869 )
2870 }
2871
2872 pub(crate) fn waitable_set_poll(
2874 self,
2875 store: &mut StoreOpaque,
2876 caller: RuntimeComponentInstanceIndex,
2877 options: OptionsIndex,
2878 set: u32,
2879 payload: u32,
2880 ) -> Result<u32> {
2881 self.id().get(store).check_may_leave(caller)?;
2882 let &CanonicalOptions {
2883 cancellable,
2884 instance: caller_instance,
2885 ..
2886 } = &self.id().get(store).component().env_component().options[options];
2887 let rep =
2888 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2889
2890 self.waitable_check(
2891 store,
2892 cancellable,
2893 WaitableCheck::Poll(WaitableCheckParams {
2894 set: TableId::new(rep),
2895 options,
2896 payload,
2897 }),
2898 )
2899 }
2900
2901 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2903 let thread_id = store
2904 .concurrent_state_mut()
2905 .guest_thread
2906 .ok_or_else(|| anyhow!("no current thread"))?
2907 .thread;
2908 Ok(store
2910 .concurrent_state_mut()
2911 .get_mut(thread_id)?
2912 .instance_rep
2913 .unwrap())
2914 }
2915
2916 pub(crate) fn thread_new_indirect<T: 'static>(
2918 self,
2919 mut store: StoreContextMut<T>,
2920 runtime_instance: RuntimeComponentInstanceIndex,
2921 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
2923 start_func_idx: u32,
2924 context: i32,
2925 ) -> Result<u32> {
2926 self.id().get(store.0).check_may_leave(runtime_instance)?;
2927
2928 log::trace!("creating new thread");
2929
2930 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
2931 let instance = self.id().get_mut(store.0);
2932 let callee = instance
2933 .index_runtime_func_table(start_func_table_idx, start_func_idx as u64)?
2934 .ok_or_else(|| {
2935 anyhow!("the start function index points to an uninitialized function")
2936 })?;
2937 if callee.type_index(store.0) != start_func_ty.type_index() {
2938 bail!(
2939 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
2940 );
2941 }
2942
2943 let token = StoreToken::new(store.as_context_mut());
2944 let start_func = Box::new(
2945 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
2946 let old_thread = store
2947 .concurrent_state_mut()
2948 .guest_thread
2949 .replace(guest_thread);
2950 log::trace!(
2951 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
2952 );
2953
2954 store.maybe_push_call_context(guest_thread.task)?;
2955
2956 let mut store = token.as_context_mut(store);
2957 let mut params = [ValRaw::i32(context)];
2958 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
2961
2962 store.0.maybe_pop_call_context(guest_thread.task)?;
2963
2964 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
2965 log::trace!("explicit thread {guest_thread:?} completed");
2966 let state = store.0.concurrent_state_mut();
2967 let task = state.get_mut(guest_thread.task)?;
2968 if task.threads.is_empty() && !task.returned_or_cancelled() {
2969 bail!(Trap::NoAsyncResult);
2970 }
2971 state.guest_thread = old_thread;
2972 old_thread
2973 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2974 if state.get_mut(guest_thread.task)?.ready_to_delete() {
2975 Waitable::Guest(guest_thread.task).delete_from(state)?;
2976 }
2977 log::trace!("thread start: restored {old_thread:?} as current thread");
2978
2979 Ok(())
2980 },
2981 );
2982
2983 let state = store.0.concurrent_state_mut();
2984 let current_thread = state.guest_thread.unwrap();
2985 let parent_task = current_thread.task;
2986
2987 let new_thread = GuestThread::new_explicit(parent_task, start_func);
2988 let thread_id = state.push(new_thread)?;
2989 state.get_mut(parent_task)?.threads.insert(thread_id);
2990
2991 log::trace!("new thread with id {thread_id:?} created");
2992
2993 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
2994 }
2995
2996 pub(crate) fn resume_suspended_thread(
2997 self,
2998 store: &mut StoreOpaque,
2999 runtime_instance: RuntimeComponentInstanceIndex,
3000 thread_idx: u32,
3001 high_priority: bool,
3002 ) -> Result<()> {
3003 let thread_id =
3004 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3005 let state = store.concurrent_state_mut();
3006 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3007 let thread = state.get_mut(guest_thread.thread)?;
3008
3009 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3010 GuestThreadState::NotStartedExplicit(start_func) => {
3011 log::trace!("starting thread {guest_thread:?}");
3012 let guest_call = WorkItem::GuestCall(GuestCall {
3013 thread: guest_thread,
3014 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3015 start_func(store, guest_thread)
3016 })),
3017 });
3018 store
3019 .concurrent_state_mut()
3020 .push_work_item(guest_call, high_priority);
3021 }
3022 GuestThreadState::Suspended(fiber) => {
3023 log::trace!("resuming thread {thread_id:?} that was suspended");
3024 store
3025 .concurrent_state_mut()
3026 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3027 }
3028 _ => {
3029 bail!("cannot resume thread which is not suspended");
3030 }
3031 }
3032 Ok(())
3033 }
3034
3035 fn add_guest_thread_to_instance_table(
3036 self,
3037 thread_id: TableId<GuestThread>,
3038 store: &mut StoreOpaque,
3039 runtime_instance: RuntimeComponentInstanceIndex,
3040 ) -> Result<u32> {
3041 let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3042 .guest_thread_insert(thread_id.rep())?;
3043 store
3044 .concurrent_state_mut()
3045 .get_mut(thread_id)?
3046 .instance_rep = Some(guest_id);
3047 Ok(guest_id)
3048 }
3049
3050 pub(crate) fn suspension_intrinsic(
3053 self,
3054 store: &mut StoreOpaque,
3055 caller: RuntimeComponentInstanceIndex,
3056 cancellable: bool,
3057 yielding: bool,
3058 to_thread: Option<u32>,
3059 ) -> Result<WaitResult> {
3060 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3062 return Ok(WaitResult::Cancelled);
3063 }
3064
3065 self.id().get(store).check_may_leave(caller)?;
3066
3067 if let Some(thread) = to_thread {
3068 self.resume_suspended_thread(store, caller, thread, true)?;
3069 }
3070
3071 let state = store.concurrent_state_mut();
3072 let guest_thread = state.guest_thread.unwrap();
3073 let reason = if yielding {
3074 SuspendReason::Yielding {
3075 thread: guest_thread,
3076 }
3077 } else {
3078 SuspendReason::ExplicitlySuspending {
3079 thread: guest_thread,
3080 }
3081 };
3082
3083 store.suspend(reason)?;
3084
3085 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3086 Ok(WaitResult::Cancelled)
3087 } else {
3088 Ok(WaitResult::Completed)
3089 }
3090 }
3091
3092 fn waitable_check(
3094 self,
3095 store: &mut StoreOpaque,
3096 cancellable: bool,
3097 check: WaitableCheck,
3098 ) -> Result<u32> {
3099 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3100
3101 let (wait, set) = match &check {
3102 WaitableCheck::Wait(params) => (true, Some(params.set)),
3103 WaitableCheck::Poll(params) => (false, Some(params.set)),
3104 };
3105
3106 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3107 store.suspend(SuspendReason::Yielding {
3109 thread: guest_thread,
3110 })?;
3111
3112 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3113
3114 let state = store.concurrent_state_mut();
3115 let task = state.get_mut(guest_thread.task)?;
3116
3117 if wait {
3120 let set = set.unwrap();
3121
3122 if (task.event.is_none()
3123 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3124 && state.get_mut(set)?.ready.is_empty()
3125 {
3126 if cancellable {
3127 let old = state
3128 .get_mut(guest_thread.thread)?
3129 .wake_on_cancel
3130 .replace(set);
3131 assert!(old.is_none());
3132 }
3133
3134 store.suspend(SuspendReason::Waiting {
3135 set,
3136 thread: guest_thread,
3137 })?;
3138 }
3139 }
3140
3141 log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3142
3143 let result = match check {
3144 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3146 let event =
3147 self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3148
3149 let (ordinal, handle, result) = if wait {
3150 let (event, waitable) = event.unwrap();
3151 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3152 let (ordinal, result) = event.parts();
3153 (ordinal, handle, result)
3154 } else {
3155 if let Some((event, waitable)) = event {
3156 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3157 let (ordinal, result) = event.parts();
3158 (ordinal, handle, result)
3159 } else {
3160 log::trace!(
3161 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3162 guest_thread.task,
3163 params.set
3164 );
3165 let (ordinal, result) = Event::None.parts();
3166 (ordinal, 0, result)
3167 }
3168 };
3169 let memory = self.options_memory_mut(store, params.options);
3170 let ptr =
3171 func::validate_inbounds::<(u32, u32)>(memory, &ValRaw::u32(params.payload))?;
3172 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3173 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3174 Ok(ordinal)
3175 }
3176 };
3177
3178 result
3179 }
3180
3181 pub(crate) fn subtask_cancel(
3183 self,
3184 store: &mut StoreOpaque,
3185 caller_instance: RuntimeComponentInstanceIndex,
3186 async_: bool,
3187 task_id: u32,
3188 ) -> Result<u32> {
3189 self.id().get(store).check_may_leave(caller_instance)?;
3190 let (rep, is_host) =
3191 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3192 let (waitable, expected_caller_instance) = if is_host {
3193 let id = TableId::<HostTask>::new(rep);
3194 (
3195 Waitable::Host(id),
3196 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3197 )
3198 } else {
3199 let id = TableId::<GuestTask>::new(rep);
3200 if let Caller::Guest { instance, .. } =
3201 &store.concurrent_state_mut().get_mut(id)?.caller
3202 {
3203 (Waitable::Guest(id), *instance)
3204 } else {
3205 unreachable!()
3206 }
3207 };
3208 assert_eq!(expected_caller_instance, caller_instance);
3212
3213 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3214
3215 let concurrent_state = store.concurrent_state_mut();
3216 if let Waitable::Host(host_task) = waitable {
3217 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3218 handle.abort();
3219 return Ok(Status::ReturnCancelled as u32);
3220 }
3221 } else {
3222 let caller = concurrent_state.guest_thread.unwrap();
3223 let guest_task = TableId::<GuestTask>::new(rep);
3224 let task = concurrent_state.get_mut(guest_task)?;
3225 if !task.already_lowered_parameters() {
3226 task.lower_params = None;
3227 task.lift_result = None;
3228
3229 let callee_instance = task.instance;
3231
3232 let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3233 let pending_count = pending.len();
3234 pending.retain(|thread, _| thread.task != guest_task);
3235 if pending.len() == pending_count {
3237 bail!("`subtask.cancel` called after terminal status delivered");
3238 }
3239 return Ok(Status::StartCancelled as u32);
3240 } else if !task.returned_or_cancelled() {
3241 task.cancel_sent = true;
3244 task.event = Some(Event::Cancelled);
3249 for thread in task.threads.clone() {
3250 let thread = QualifiedThreadId {
3251 task: guest_task,
3252 thread,
3253 };
3254 if let Some(set) = concurrent_state
3255 .get_mut(thread.thread)
3256 .unwrap()
3257 .wake_on_cancel
3258 .take()
3259 {
3260 let item = match concurrent_state
3261 .get_mut(set)?
3262 .waiting
3263 .remove(&thread)
3264 .unwrap()
3265 {
3266 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3267 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3268 thread,
3269 kind: GuestCallKind::DeliverEvent {
3270 instance,
3271 set: None,
3272 },
3273 }),
3274 };
3275 concurrent_state.push_high_priority(item);
3276
3277 store.suspend(SuspendReason::Yielding { thread: caller })?;
3278 break;
3279 }
3280 }
3281
3282 let concurrent_state = store.concurrent_state_mut();
3283 let task = concurrent_state.get_mut(guest_task)?;
3284 if !task.returned_or_cancelled() {
3285 if async_ {
3286 return Ok(BLOCKED);
3287 } else {
3288 store.wait_for_event(Waitable::Guest(guest_task))?;
3289 }
3290 }
3291 }
3292 }
3293
3294 let event = waitable.take_event(store.concurrent_state_mut())?;
3295 if let Some(Event::Subtask {
3296 status: status @ (Status::Returned | Status::ReturnCancelled),
3297 }) = event
3298 {
3299 Ok(status as u32)
3300 } else {
3301 bail!("`subtask.cancel` called after terminal status delivered");
3302 }
3303 }
3304
3305 pub(crate) fn context_get(
3306 self,
3307 store: &mut StoreOpaque,
3308 caller: RuntimeComponentInstanceIndex,
3309 slot: u32,
3310 ) -> Result<u32> {
3311 self.id().get(store).check_may_leave(caller)?;
3312 store.concurrent_state_mut().context_get(slot)
3313 }
3314
3315 pub(crate) fn context_set(
3316 self,
3317 store: &mut StoreOpaque,
3318 caller: RuntimeComponentInstanceIndex,
3319 slot: u32,
3320 value: u32,
3321 ) -> Result<()> {
3322 self.id().get(store).check_may_leave(caller)?;
3323 store.concurrent_state_mut().context_set(slot, value)
3324 }
3325}
3326
3327pub trait VMComponentAsyncStore {
3335 unsafe fn prepare_call(
3341 &mut self,
3342 instance: Instance,
3343 memory: *mut VMMemoryDefinition,
3344 start: *mut VMFuncRef,
3345 return_: *mut VMFuncRef,
3346 caller_instance: RuntimeComponentInstanceIndex,
3347 callee_instance: RuntimeComponentInstanceIndex,
3348 task_return_type: TypeTupleIndex,
3349 string_encoding: u8,
3350 result_count: u32,
3351 storage: *mut ValRaw,
3352 storage_len: usize,
3353 ) -> Result<()>;
3354
3355 unsafe fn sync_start(
3358 &mut self,
3359 instance: Instance,
3360 callback: *mut VMFuncRef,
3361 callee: *mut VMFuncRef,
3362 param_count: u32,
3363 storage: *mut MaybeUninit<ValRaw>,
3364 storage_len: usize,
3365 ) -> Result<()>;
3366
3367 unsafe fn async_start(
3370 &mut self,
3371 instance: Instance,
3372 callback: *mut VMFuncRef,
3373 post_return: *mut VMFuncRef,
3374 callee: *mut VMFuncRef,
3375 param_count: u32,
3376 result_count: u32,
3377 flags: u32,
3378 ) -> Result<u32>;
3379
3380 fn future_write(
3382 &mut self,
3383 instance: Instance,
3384 caller: RuntimeComponentInstanceIndex,
3385 ty: TypeFutureTableIndex,
3386 options: OptionsIndex,
3387 future: u32,
3388 address: u32,
3389 ) -> Result<u32>;
3390
3391 fn future_read(
3393 &mut self,
3394 instance: Instance,
3395 caller: RuntimeComponentInstanceIndex,
3396 ty: TypeFutureTableIndex,
3397 options: OptionsIndex,
3398 future: u32,
3399 address: u32,
3400 ) -> Result<u32>;
3401
3402 fn future_drop_writable(
3404 &mut self,
3405 instance: Instance,
3406 caller: RuntimeComponentInstanceIndex,
3407 ty: TypeFutureTableIndex,
3408 writer: u32,
3409 ) -> Result<()>;
3410
3411 fn stream_write(
3413 &mut self,
3414 instance: Instance,
3415 caller: RuntimeComponentInstanceIndex,
3416 ty: TypeStreamTableIndex,
3417 options: OptionsIndex,
3418 stream: u32,
3419 address: u32,
3420 count: u32,
3421 ) -> Result<u32>;
3422
3423 fn stream_read(
3425 &mut self,
3426 instance: Instance,
3427 caller: RuntimeComponentInstanceIndex,
3428 ty: TypeStreamTableIndex,
3429 options: OptionsIndex,
3430 stream: u32,
3431 address: u32,
3432 count: u32,
3433 ) -> Result<u32>;
3434
3435 fn flat_stream_write(
3438 &mut self,
3439 instance: Instance,
3440 caller: RuntimeComponentInstanceIndex,
3441 ty: TypeStreamTableIndex,
3442 options: OptionsIndex,
3443 payload_size: u32,
3444 payload_align: u32,
3445 stream: u32,
3446 address: u32,
3447 count: u32,
3448 ) -> Result<u32>;
3449
3450 fn flat_stream_read(
3453 &mut self,
3454 instance: Instance,
3455 caller: RuntimeComponentInstanceIndex,
3456 ty: TypeStreamTableIndex,
3457 options: OptionsIndex,
3458 payload_size: u32,
3459 payload_align: u32,
3460 stream: u32,
3461 address: u32,
3462 count: u32,
3463 ) -> Result<u32>;
3464
3465 fn stream_drop_writable(
3467 &mut self,
3468 instance: Instance,
3469 caller: RuntimeComponentInstanceIndex,
3470 ty: TypeStreamTableIndex,
3471 writer: u32,
3472 ) -> Result<()>;
3473
3474 fn error_context_debug_message(
3476 &mut self,
3477 instance: Instance,
3478 caller: RuntimeComponentInstanceIndex,
3479 ty: TypeComponentLocalErrorContextTableIndex,
3480 options: OptionsIndex,
3481 err_ctx_handle: u32,
3482 debug_msg_address: u32,
3483 ) -> Result<()>;
3484
3485 fn thread_new_indirect(
3487 &mut self,
3488 instance: Instance,
3489 caller: RuntimeComponentInstanceIndex,
3490 func_ty_idx: TypeFuncIndex,
3491 start_func_table_idx: RuntimeTableIndex,
3492 start_func_idx: u32,
3493 context: i32,
3494 ) -> Result<u32>;
3495}
3496
3497impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3499 unsafe fn prepare_call(
3500 &mut self,
3501 instance: Instance,
3502 memory: *mut VMMemoryDefinition,
3503 start: *mut VMFuncRef,
3504 return_: *mut VMFuncRef,
3505 caller_instance: RuntimeComponentInstanceIndex,
3506 callee_instance: RuntimeComponentInstanceIndex,
3507 task_return_type: TypeTupleIndex,
3508 string_encoding: u8,
3509 result_count_or_max_if_async: u32,
3510 storage: *mut ValRaw,
3511 storage_len: usize,
3512 ) -> Result<()> {
3513 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3517
3518 unsafe {
3519 instance.prepare_call(
3520 StoreContextMut(self),
3521 start,
3522 return_,
3523 caller_instance,
3524 callee_instance,
3525 task_return_type,
3526 memory,
3527 string_encoding,
3528 match result_count_or_max_if_async {
3529 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3530 params,
3531 has_result: false,
3532 },
3533 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3534 params,
3535 has_result: true,
3536 },
3537 result_count => CallerInfo::Sync {
3538 params,
3539 result_count,
3540 },
3541 },
3542 )
3543 }
3544 }
3545
3546 unsafe fn sync_start(
3547 &mut self,
3548 instance: Instance,
3549 callback: *mut VMFuncRef,
3550 callee: *mut VMFuncRef,
3551 param_count: u32,
3552 storage: *mut MaybeUninit<ValRaw>,
3553 storage_len: usize,
3554 ) -> Result<()> {
3555 unsafe {
3556 instance
3557 .start_call(
3558 StoreContextMut(self),
3559 callback,
3560 ptr::null_mut(),
3561 callee,
3562 param_count,
3563 1,
3564 START_FLAG_ASYNC_CALLEE,
3565 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3569 )
3570 .map(drop)
3571 }
3572 }
3573
3574 unsafe fn async_start(
3575 &mut self,
3576 instance: Instance,
3577 callback: *mut VMFuncRef,
3578 post_return: *mut VMFuncRef,
3579 callee: *mut VMFuncRef,
3580 param_count: u32,
3581 result_count: u32,
3582 flags: u32,
3583 ) -> Result<u32> {
3584 unsafe {
3585 instance.start_call(
3586 StoreContextMut(self),
3587 callback,
3588 post_return,
3589 callee,
3590 param_count,
3591 result_count,
3592 flags,
3593 None,
3594 )
3595 }
3596 }
3597
3598 fn future_write(
3599 &mut self,
3600 instance: Instance,
3601 caller: RuntimeComponentInstanceIndex,
3602 ty: TypeFutureTableIndex,
3603 options: OptionsIndex,
3604 future: u32,
3605 address: u32,
3606 ) -> Result<u32> {
3607 instance.id().get(self).check_may_leave(caller)?;
3608 instance
3609 .guest_write(
3610 StoreContextMut(self),
3611 TransmitIndex::Future(ty),
3612 options,
3613 None,
3614 future,
3615 address,
3616 1,
3617 )
3618 .map(|result| result.encode())
3619 }
3620
3621 fn future_read(
3622 &mut self,
3623 instance: Instance,
3624 caller: RuntimeComponentInstanceIndex,
3625 ty: TypeFutureTableIndex,
3626 options: OptionsIndex,
3627 future: u32,
3628 address: u32,
3629 ) -> Result<u32> {
3630 instance.id().get(self).check_may_leave(caller)?;
3631 instance
3632 .guest_read(
3633 StoreContextMut(self),
3634 TransmitIndex::Future(ty),
3635 options,
3636 None,
3637 future,
3638 address,
3639 1,
3640 )
3641 .map(|result| result.encode())
3642 }
3643
3644 fn stream_write(
3645 &mut self,
3646 instance: Instance,
3647 caller: RuntimeComponentInstanceIndex,
3648 ty: TypeStreamTableIndex,
3649 options: OptionsIndex,
3650 stream: u32,
3651 address: u32,
3652 count: u32,
3653 ) -> Result<u32> {
3654 instance.id().get(self).check_may_leave(caller)?;
3655 instance
3656 .guest_write(
3657 StoreContextMut(self),
3658 TransmitIndex::Stream(ty),
3659 options,
3660 None,
3661 stream,
3662 address,
3663 count,
3664 )
3665 .map(|result| result.encode())
3666 }
3667
3668 fn stream_read(
3669 &mut self,
3670 instance: Instance,
3671 caller: RuntimeComponentInstanceIndex,
3672 ty: TypeStreamTableIndex,
3673 options: OptionsIndex,
3674 stream: u32,
3675 address: u32,
3676 count: u32,
3677 ) -> Result<u32> {
3678 instance.id().get(self).check_may_leave(caller)?;
3679 instance
3680 .guest_read(
3681 StoreContextMut(self),
3682 TransmitIndex::Stream(ty),
3683 options,
3684 None,
3685 stream,
3686 address,
3687 count,
3688 )
3689 .map(|result| result.encode())
3690 }
3691
3692 fn future_drop_writable(
3693 &mut self,
3694 instance: Instance,
3695 caller: RuntimeComponentInstanceIndex,
3696 ty: TypeFutureTableIndex,
3697 writer: u32,
3698 ) -> Result<()> {
3699 instance.id().get(self).check_may_leave(caller)?;
3700 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3701 }
3702
3703 fn flat_stream_write(
3704 &mut self,
3705 instance: Instance,
3706 caller: RuntimeComponentInstanceIndex,
3707 ty: TypeStreamTableIndex,
3708 options: OptionsIndex,
3709 payload_size: u32,
3710 payload_align: u32,
3711 stream: u32,
3712 address: u32,
3713 count: u32,
3714 ) -> Result<u32> {
3715 instance.id().get(self).check_may_leave(caller)?;
3716 instance
3717 .guest_write(
3718 StoreContextMut(self),
3719 TransmitIndex::Stream(ty),
3720 options,
3721 Some(FlatAbi {
3722 size: payload_size,
3723 align: payload_align,
3724 }),
3725 stream,
3726 address,
3727 count,
3728 )
3729 .map(|result| result.encode())
3730 }
3731
3732 fn flat_stream_read(
3733 &mut self,
3734 instance: Instance,
3735 caller: RuntimeComponentInstanceIndex,
3736 ty: TypeStreamTableIndex,
3737 options: OptionsIndex,
3738 payload_size: u32,
3739 payload_align: u32,
3740 stream: u32,
3741 address: u32,
3742 count: u32,
3743 ) -> Result<u32> {
3744 instance.id().get(self).check_may_leave(caller)?;
3745 instance
3746 .guest_read(
3747 StoreContextMut(self),
3748 TransmitIndex::Stream(ty),
3749 options,
3750 Some(FlatAbi {
3751 size: payload_size,
3752 align: payload_align,
3753 }),
3754 stream,
3755 address,
3756 count,
3757 )
3758 .map(|result| result.encode())
3759 }
3760
3761 fn stream_drop_writable(
3762 &mut self,
3763 instance: Instance,
3764 caller: RuntimeComponentInstanceIndex,
3765 ty: TypeStreamTableIndex,
3766 writer: u32,
3767 ) -> Result<()> {
3768 instance.id().get(self).check_may_leave(caller)?;
3769 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3770 }
3771
3772 fn error_context_debug_message(
3773 &mut self,
3774 instance: Instance,
3775 caller: RuntimeComponentInstanceIndex,
3776 ty: TypeComponentLocalErrorContextTableIndex,
3777 options: OptionsIndex,
3778 err_ctx_handle: u32,
3779 debug_msg_address: u32,
3780 ) -> Result<()> {
3781 instance.id().get(self).check_may_leave(caller)?;
3782 instance.error_context_debug_message(
3783 StoreContextMut(self),
3784 ty,
3785 options,
3786 err_ctx_handle,
3787 debug_msg_address,
3788 )
3789 }
3790
3791 fn thread_new_indirect(
3792 &mut self,
3793 instance: Instance,
3794 caller: RuntimeComponentInstanceIndex,
3795 func_ty_idx: TypeFuncIndex,
3796 start_func_table_idx: RuntimeTableIndex,
3797 start_func_idx: u32,
3798 context: i32,
3799 ) -> Result<u32> {
3800 instance.thread_new_indirect(
3801 StoreContextMut(self),
3802 caller,
3803 func_ty_idx,
3804 start_func_table_idx,
3805 start_func_idx,
3806 context,
3807 )
3808 }
3809}
3810
3811type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3812
3813struct HostTask {
3815 common: WaitableCommon,
3816 caller_instance: RuntimeComponentInstanceIndex,
3817 join_handle: Option<JoinHandle>,
3818}
3819
3820impl HostTask {
3821 fn new(
3822 caller_instance: RuntimeComponentInstanceIndex,
3823 join_handle: Option<JoinHandle>,
3824 ) -> Self {
3825 Self {
3826 common: WaitableCommon::default(),
3827 caller_instance,
3828 join_handle,
3829 }
3830 }
3831}
3832
3833impl TableDebug for HostTask {
3834 fn type_name() -> &'static str {
3835 "HostTask"
3836 }
3837}
3838
3839type CallbackFn = Box<
3840 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3841 + Send
3842 + Sync
3843 + 'static,
3844>;
3845
3846enum Caller {
3848 Host {
3850 tx: Option<oneshot::Sender<LiftedResult>>,
3852 exit_tx: Arc<oneshot::Sender<()>>,
3859 host_future_present: bool,
3862 call_post_return_automatically: bool,
3864 },
3865 Guest {
3867 thread: QualifiedThreadId,
3869 instance: RuntimeComponentInstanceIndex,
3875 },
3876}
3877
3878struct LiftResult {
3881 lift: RawLift,
3882 ty: TypeTupleIndex,
3883 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3884 string_encoding: StringEncoding,
3885}
3886
3887#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3892struct QualifiedThreadId {
3893 task: TableId<GuestTask>,
3894 thread: TableId<GuestThread>,
3895}
3896
3897impl QualifiedThreadId {
3898 fn qualify(
3899 state: &mut ConcurrentState,
3900 thread: TableId<GuestThread>,
3901 ) -> Result<QualifiedThreadId> {
3902 Ok(QualifiedThreadId {
3903 task: state.get_mut(thread)?.parent_task,
3904 thread,
3905 })
3906 }
3907}
3908
3909impl fmt::Debug for QualifiedThreadId {
3910 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3911 f.debug_tuple("QualifiedThreadId")
3912 .field(&self.task.rep())
3913 .field(&self.thread.rep())
3914 .finish()
3915 }
3916}
3917
3918enum GuestThreadState {
3919 NotStartedImplicit,
3920 NotStartedExplicit(
3921 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
3922 ),
3923 Running,
3924 Suspended(StoreFiber<'static>),
3925 Pending,
3926 Completed,
3927}
3928pub struct GuestThread {
3929 context: [u32; 2],
3932 parent_task: TableId<GuestTask>,
3934 wake_on_cancel: Option<TableId<WaitableSet>>,
3937 state: GuestThreadState,
3939 instance_rep: Option<u32>,
3942}
3943
3944impl GuestThread {
3945 fn from_instance(
3948 state: Pin<&mut ComponentInstance>,
3949 caller_instance: RuntimeComponentInstanceIndex,
3950 guest_thread: u32,
3951 ) -> Result<TableId<Self>> {
3952 let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
3953 Ok(TableId::new(rep))
3954 }
3955
3956 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
3957 Self {
3958 context: [0; 2],
3959 parent_task,
3960 wake_on_cancel: None,
3961 state: GuestThreadState::NotStartedImplicit,
3962 instance_rep: None,
3963 }
3964 }
3965
3966 fn new_explicit(
3967 parent_task: TableId<GuestTask>,
3968 start_func: Box<
3969 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
3970 >,
3971 ) -> Self {
3972 Self {
3973 context: [0; 2],
3974 parent_task,
3975 wake_on_cancel: None,
3976 state: GuestThreadState::NotStartedExplicit(start_func),
3977 instance_rep: None,
3978 }
3979 }
3980}
3981
3982impl TableDebug for GuestThread {
3983 fn type_name() -> &'static str {
3984 "GuestThread"
3985 }
3986}
3987
3988enum SyncResult {
3989 NotProduced,
3990 Produced(Option<ValRaw>),
3991 Taken,
3992}
3993
3994impl SyncResult {
3995 fn take(&mut self) -> Option<Option<ValRaw>> {
3996 match mem::replace(self, SyncResult::Taken) {
3997 SyncResult::NotProduced => None,
3998 SyncResult::Produced(val) => Some(val),
3999 SyncResult::Taken => {
4000 panic!("attempted to take a synchronous result that was already taken")
4001 }
4002 }
4003 }
4004}
4005
4006#[derive(Debug)]
4007enum HostFutureState {
4008 NotApplicable,
4009 Live,
4010 Dropped,
4011}
4012
4013pub(crate) struct GuestTask {
4015 common: WaitableCommon,
4017 lower_params: Option<RawLower>,
4019 lift_result: Option<LiftResult>,
4021 result: Option<LiftedResult>,
4024 callback: Option<CallbackFn>,
4027 caller: Caller,
4029 call_context: Option<CallContext>,
4032 sync_result: SyncResult,
4035 cancel_sent: bool,
4038 starting_sent: bool,
4041 subtasks: HashSet<TableId<GuestTask>>,
4046 sync_call_set: TableId<WaitableSet>,
4048 instance: RuntimeComponentInstanceIndex,
4054 event: Option<Event>,
4057 function_index: Option<ExportIndex>,
4059 exited: bool,
4061 threads: HashSet<TableId<GuestThread>>,
4063 host_future_state: HostFutureState,
4066}
4067
4068impl GuestTask {
4069 fn already_lowered_parameters(&self) -> bool {
4070 self.lower_params.is_none()
4072 }
4073 fn returned_or_cancelled(&self) -> bool {
4074 self.lift_result.is_none()
4076 }
4077 fn ready_to_delete(&self) -> bool {
4078 let threads_completed = self.threads.is_empty();
4079 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4080 let pending_completion_event = matches!(
4081 self.common.event,
4082 Some(Event::Subtask {
4083 status: Status::Returned | Status::ReturnCancelled
4084 })
4085 );
4086 let ready = threads_completed
4087 && !has_sync_result
4088 && !pending_completion_event
4089 && !matches!(self.host_future_state, HostFutureState::Live);
4090 log::trace!(
4091 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4092 threads_completed,
4093 has_sync_result,
4094 pending_completion_event,
4095 self.host_future_state
4096 );
4097 ready
4098 }
4099 fn new(
4100 state: &mut ConcurrentState,
4101 lower_params: RawLower,
4102 lift_result: LiftResult,
4103 caller: Caller,
4104 callback: Option<CallbackFn>,
4105 component_instance: RuntimeComponentInstanceIndex,
4106 ) -> Result<Self> {
4107 let sync_call_set = state.push(WaitableSet::default())?;
4108 let host_future_state = match &caller {
4109 Caller::Guest { .. } => HostFutureState::NotApplicable,
4110 Caller::Host {
4111 host_future_present,
4112 ..
4113 } => {
4114 if *host_future_present {
4115 HostFutureState::Live
4116 } else {
4117 HostFutureState::NotApplicable
4118 }
4119 }
4120 };
4121 Ok(Self {
4122 common: WaitableCommon::default(),
4123 lower_params: Some(lower_params),
4124 lift_result: Some(lift_result),
4125 result: None,
4126 callback,
4127 caller,
4128 call_context: Some(CallContext::default()),
4129 sync_result: SyncResult::NotProduced,
4130 cancel_sent: false,
4131 starting_sent: false,
4132 subtasks: HashSet::new(),
4133 sync_call_set,
4134 instance: component_instance,
4135 event: None,
4136 function_index: None,
4137 exited: false,
4138 threads: HashSet::new(),
4139 host_future_state,
4140 })
4141 }
4142
4143 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4146 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4149 if let Some(Event::Subtask {
4150 status: Status::Returned | Status::ReturnCancelled,
4151 }) = waitable.common(state)?.event
4152 {
4153 waitable.delete_from(state)?;
4154 }
4155 }
4156
4157 assert!(self.threads.is_empty());
4158
4159 state.delete(self.sync_call_set)?;
4160
4161 match &self.caller {
4163 Caller::Guest {
4164 thread,
4165 instance: runtime_instance,
4166 } => {
4167 let task_mut = state.get_mut(thread.task)?;
4168 let present = task_mut.subtasks.remove(&me);
4169 assert!(present);
4170
4171 for subtask in &self.subtasks {
4172 task_mut.subtasks.insert(*subtask);
4173 }
4174
4175 for subtask in &self.subtasks {
4176 state.get_mut(*subtask)?.caller = Caller::Guest {
4177 thread: *thread,
4178 instance: *runtime_instance,
4179 };
4180 }
4181 }
4182 Caller::Host { exit_tx, .. } => {
4183 for subtask in &self.subtasks {
4184 state.get_mut(*subtask)?.caller = Caller::Host {
4185 tx: None,
4186 exit_tx: exit_tx.clone(),
4190 host_future_present: false,
4191 call_post_return_automatically: true,
4192 };
4193 }
4194 }
4195 }
4196
4197 for subtask in self.subtasks {
4198 if state.get_mut(subtask)?.exited {
4199 Waitable::Guest(subtask).delete_from(state)?;
4200 }
4201 }
4202
4203 Ok(())
4204 }
4205
4206 fn call_post_return_automatically(&self) -> bool {
4207 matches!(
4208 self.caller,
4209 Caller::Guest { .. }
4210 | Caller::Host {
4211 call_post_return_automatically: true,
4212 ..
4213 }
4214 )
4215 }
4216}
4217
4218impl TableDebug for GuestTask {
4219 fn type_name() -> &'static str {
4220 "GuestTask"
4221 }
4222}
4223
4224#[derive(Default)]
4226struct WaitableCommon {
4227 event: Option<Event>,
4229 set: Option<TableId<WaitableSet>>,
4231 handle: Option<u32>,
4233}
4234
4235#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4237enum Waitable {
4238 Host(TableId<HostTask>),
4240 Guest(TableId<GuestTask>),
4242 Transmit(TableId<TransmitHandle>),
4244}
4245
4246impl Waitable {
4247 fn from_instance(
4250 state: Pin<&mut ComponentInstance>,
4251 caller_instance: RuntimeComponentInstanceIndex,
4252 waitable: u32,
4253 ) -> Result<Self> {
4254 use crate::runtime::vm::component::Waitable;
4255
4256 let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4257
4258 Ok(match kind {
4259 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4260 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4261 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4262 })
4263 }
4264
4265 fn rep(&self) -> u32 {
4267 match self {
4268 Self::Host(id) => id.rep(),
4269 Self::Guest(id) => id.rep(),
4270 Self::Transmit(id) => id.rep(),
4271 }
4272 }
4273
4274 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4278 log::trace!("waitable {self:?} join set {set:?}",);
4279
4280 let old = mem::replace(&mut self.common(state)?.set, set);
4281
4282 if let Some(old) = old {
4283 match *self {
4284 Waitable::Host(id) => state.remove_child(id, old),
4285 Waitable::Guest(id) => state.remove_child(id, old),
4286 Waitable::Transmit(id) => state.remove_child(id, old),
4287 }?;
4288
4289 state.get_mut(old)?.ready.remove(self);
4290 }
4291
4292 if let Some(set) = set {
4293 match *self {
4294 Waitable::Host(id) => state.add_child(id, set),
4295 Waitable::Guest(id) => state.add_child(id, set),
4296 Waitable::Transmit(id) => state.add_child(id, set),
4297 }?;
4298
4299 if self.common(state)?.event.is_some() {
4300 self.mark_ready(state)?;
4301 }
4302 }
4303
4304 Ok(())
4305 }
4306
4307 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4309 Ok(match self {
4310 Self::Host(id) => &mut state.get_mut(*id)?.common,
4311 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4312 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4313 })
4314 }
4315
4316 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4320 log::trace!("set event for {self:?}: {event:?}");
4321 self.common(state)?.event = event;
4322 self.mark_ready(state)
4323 }
4324
4325 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4327 let common = self.common(state)?;
4328 let event = common.event.take();
4329 if let Some(set) = self.common(state)?.set {
4330 state.get_mut(set)?.ready.remove(self);
4331 }
4332
4333 Ok(event)
4334 }
4335
4336 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4340 if let Some(set) = self.common(state)?.set {
4341 state.get_mut(set)?.ready.insert(*self);
4342 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4343 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4344 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4345
4346 let item = match mode {
4347 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4348 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4349 thread,
4350 kind: GuestCallKind::DeliverEvent {
4351 instance,
4352 set: Some(set),
4353 },
4354 }),
4355 };
4356 state.push_high_priority(item);
4357 }
4358 }
4359 Ok(())
4360 }
4361
4362 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4364 match self {
4365 Self::Host(task) => {
4366 log::trace!("delete host task {task:?}");
4367 state.delete(*task)?;
4368 }
4369 Self::Guest(task) => {
4370 log::trace!("delete guest task {task:?}");
4371 state.delete(*task)?.dispose(state, *task)?;
4372 }
4373 Self::Transmit(task) => {
4374 state.delete(*task)?;
4375 }
4376 }
4377
4378 Ok(())
4379 }
4380}
4381
4382impl fmt::Debug for Waitable {
4383 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4384 match self {
4385 Self::Host(id) => write!(f, "{id:?}"),
4386 Self::Guest(id) => write!(f, "{id:?}"),
4387 Self::Transmit(id) => write!(f, "{id:?}"),
4388 }
4389 }
4390}
4391
4392#[derive(Default)]
4394struct WaitableSet {
4395 ready: BTreeSet<Waitable>,
4397 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4399}
4400
4401impl TableDebug for WaitableSet {
4402 fn type_name() -> &'static str {
4403 "WaitableSet"
4404 }
4405}
4406
4407type RawLower =
4409 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4410
4411type RawLift = Box<
4413 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4414>;
4415
4416type LiftedResult = Box<dyn Any + Send + Sync>;
4420
4421struct DummyResult;
4424
4425#[derive(Default)]
4427struct InstanceState {
4428 backpressure: u16,
4430 do_not_enter: bool,
4432 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4435}
4436
4437pub struct ConcurrentState {
4439 guest_thread: Option<QualifiedThreadId>,
4441
4442 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4447 table: AlwaysMut<ResourceTable>,
4449 instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4455 high_priority: Vec<WorkItem>,
4457 low_priority: Vec<WorkItem>,
4459 suspend_reason: Option<SuspendReason>,
4463 worker: Option<StoreFiber<'static>>,
4467 worker_item: Option<WorkerItem>,
4469
4470 global_error_context_ref_counts:
4483 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4484}
4485
4486impl Default for ConcurrentState {
4487 fn default() -> Self {
4488 Self {
4489 guest_thread: None,
4490 table: AlwaysMut::new(ResourceTable::new()),
4491 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4492 instance_states: HashMap::new(),
4493 high_priority: Vec::new(),
4494 low_priority: Vec::new(),
4495 suspend_reason: None,
4496 worker: None,
4497 worker_item: None,
4498 global_error_context_ref_counts: BTreeMap::new(),
4499 }
4500 }
4501}
4502
4503impl ConcurrentState {
4504 pub(crate) fn take_fibers_and_futures(
4521 &mut self,
4522 fibers: &mut Vec<StoreFiber<'static>>,
4523 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4524 ) {
4525 for entry in self.table.get_mut().iter_mut() {
4526 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4527 for mode in mem::take(&mut set.waiting).into_values() {
4528 if let WaitMode::Fiber(fiber) = mode {
4529 fibers.push(fiber);
4530 }
4531 }
4532 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4533 if let GuestThreadState::Suspended(fiber) =
4534 mem::replace(&mut thread.state, GuestThreadState::Completed)
4535 {
4536 fibers.push(fiber);
4537 }
4538 }
4539 }
4540
4541 if let Some(fiber) = self.worker.take() {
4542 fibers.push(fiber);
4543 }
4544
4545 let mut take_items = |list| {
4546 for item in mem::take(list) {
4547 match item {
4548 WorkItem::ResumeFiber(fiber) => {
4549 fibers.push(fiber);
4550 }
4551 WorkItem::PushFuture(future) => {
4552 self.futures
4553 .get_mut()
4554 .as_mut()
4555 .unwrap()
4556 .push(future.into_inner());
4557 }
4558 _ => {}
4559 }
4560 }
4561 };
4562
4563 take_items(&mut self.high_priority);
4564 take_items(&mut self.low_priority);
4565
4566 if let Some(them) = self.futures.get_mut().take() {
4567 futures.push(them);
4568 }
4569 }
4570
4571 fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4572 self.instance_states.entry(instance).or_default()
4573 }
4574
4575 fn push<V: Send + Sync + 'static>(
4576 &mut self,
4577 value: V,
4578 ) -> Result<TableId<V>, ResourceTableError> {
4579 self.table.get_mut().push(value).map(TableId::from)
4580 }
4581
4582 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4583 self.table.get_mut().get_mut(&Resource::from(id))
4584 }
4585
4586 pub fn add_child<T: 'static, U: 'static>(
4587 &mut self,
4588 child: TableId<T>,
4589 parent: TableId<U>,
4590 ) -> Result<(), ResourceTableError> {
4591 self.table
4592 .get_mut()
4593 .add_child(Resource::from(child), Resource::from(parent))
4594 }
4595
4596 pub fn remove_child<T: 'static, U: 'static>(
4597 &mut self,
4598 child: TableId<T>,
4599 parent: TableId<U>,
4600 ) -> Result<(), ResourceTableError> {
4601 self.table
4602 .get_mut()
4603 .remove_child(Resource::from(child), Resource::from(parent))
4604 }
4605
4606 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4607 self.table.get_mut().delete(Resource::from(id))
4608 }
4609
4610 fn push_future(&mut self, future: HostTaskFuture) {
4611 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4618 }
4619
4620 fn push_high_priority(&mut self, item: WorkItem) {
4621 log::trace!("push high priority: {item:?}");
4622 self.high_priority.push(item);
4623 }
4624
4625 fn push_low_priority(&mut self, item: WorkItem) {
4626 log::trace!("push low priority: {item:?}");
4627 self.low_priority.push(item);
4628 }
4629
4630 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4631 if high_priority {
4632 self.push_high_priority(item);
4633 } else {
4634 self.push_low_priority(item);
4635 }
4636 }
4637
4638 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4648 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4649
4650 loop {
4658 let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4659 Caller::Host { .. } => break true,
4660 Caller::Guest { thread, instance } => {
4661 if *instance == guest_instance {
4662 break false;
4663 } else {
4664 *thread
4665 }
4666 }
4667 };
4668 guest_task = next_thread.task;
4669 }
4670 }
4671
4672 fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4676 self.instance_state(instance).do_not_enter = true;
4677 }
4678
4679 fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4683 self.instance_state(instance).do_not_enter = false;
4684 self.partition_pending(instance)
4685 }
4686
4687 fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4692 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4693 let call = GuestCall { thread, kind };
4694 if call.is_ready(self)? {
4695 self.push_high_priority(WorkItem::GuestCall(call));
4696 } else {
4697 self.instance_state(instance)
4698 .pending
4699 .insert(call.thread, call.kind);
4700 }
4701 }
4702
4703 Ok(())
4704 }
4705
4706 pub(crate) fn backpressure_modify(
4708 &mut self,
4709 caller_instance: RuntimeComponentInstanceIndex,
4710 modify: impl FnOnce(u16) -> Option<u16>,
4711 ) -> Result<()> {
4712 let state = self.instance_state(caller_instance);
4713 let old = state.backpressure;
4714 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4715 state.backpressure = new;
4716
4717 if old > 0 && new == 0 {
4718 self.partition_pending(caller_instance)?;
4721 }
4722
4723 Ok(())
4724 }
4725
4726 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4728 let thread = self.guest_thread.unwrap();
4729 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4730 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4731 Ok(val)
4732 }
4733
4734 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4736 let thread = self.guest_thread.unwrap();
4737 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4738 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4739 Ok(())
4740 }
4741
4742 fn take_pending_cancellation(&mut self) -> bool {
4745 let thread = self.guest_thread.unwrap();
4746 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4747 assert!(matches!(event, Event::Cancelled));
4748 true
4749 } else {
4750 false
4751 }
4752 }
4753}
4754
4755fn for_any_lower<
4758 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4759>(
4760 fun: F,
4761) -> F {
4762 fun
4763}
4764
4765fn for_any_lift<
4767 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4768>(
4769 fun: F,
4770) -> F {
4771 fun
4772}
4773
4774fn checked<F: Future + Send + 'static>(
4779 id: StoreId,
4780 fut: F,
4781) -> impl Future<Output = F::Output> + Send + 'static {
4782 async move {
4783 let mut fut = pin!(fut);
4784 future::poll_fn(move |cx| {
4785 let message = "\
4786 `Future`s which depend on asynchronous component tasks, streams, or \
4787 futures to complete may only be polled from the event loop of the \
4788 store to which they belong. Please use \
4789 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4790 ";
4791 tls::try_get(|store| {
4792 let matched = match store {
4793 tls::TryGet::Some(store) => store.id() == id,
4794 tls::TryGet::Taken | tls::TryGet::None => false,
4795 };
4796
4797 if !matched {
4798 panic!("{message}")
4799 }
4800 });
4801 fut.as_mut().poll(cx)
4802 })
4803 .await
4804 }
4805}
4806
4807fn check_recursive_run() {
4810 tls::try_get(|store| {
4811 if !matches!(store, tls::TryGet::None) {
4812 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4813 }
4814 });
4815}
4816
4817fn unpack_callback_code(code: u32) -> (u32, u32) {
4818 (code & 0xF, code >> 4)
4819}
4820
4821struct WaitableCheckParams {
4825 set: TableId<WaitableSet>,
4826 options: OptionsIndex,
4827 payload: u32,
4828}
4829
4830enum WaitableCheck {
4832 Wait(WaitableCheckParams),
4833 Poll(WaitableCheckParams),
4834}
4835
4836pub(crate) struct PreparedCall<R> {
4838 handle: Func,
4840 thread: QualifiedThreadId,
4842 param_count: usize,
4844 rx: oneshot::Receiver<LiftedResult>,
4847 exit_rx: oneshot::Receiver<()>,
4850 _phantom: PhantomData<R>,
4851}
4852
4853impl<R> PreparedCall<R> {
4854 pub(crate) fn task_id(&self) -> TaskId {
4856 TaskId {
4857 task: self.thread.task,
4858 }
4859 }
4860}
4861
4862pub(crate) struct TaskId {
4864 task: TableId<GuestTask>,
4865}
4866
4867impl TaskId {
4868 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4874 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
4875 if !task.already_lowered_parameters() {
4876 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4877 } else {
4878 task.host_future_state = HostFutureState::Dropped;
4879 if task.ready_to_delete() {
4880 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4881 }
4882 }
4883 Ok(())
4884 }
4885}
4886
4887pub(crate) fn prepare_call<T, R>(
4893 mut store: StoreContextMut<T>,
4894 handle: Func,
4895 param_count: usize,
4896 host_future_present: bool,
4897 call_post_return_automatically: bool,
4898 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4899 + Send
4900 + Sync
4901 + 'static,
4902 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4903 + Send
4904 + Sync
4905 + 'static,
4906) -> Result<PreparedCall<R>> {
4907 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4908
4909 let instance = handle.instance().id().get(store.0);
4910 let options = &instance.component().env_component().options[options];
4911 let task_return_type = instance.component().types()[ty].results;
4912 let component_instance = raw_options.instance;
4913 let callback = options.callback.map(|i| instance.runtime_callback(i));
4914 let memory = options
4915 .memory()
4916 .map(|i| instance.runtime_memory(i))
4917 .map(SendSyncPtr::new);
4918 let string_encoding = options.string_encoding;
4919 let token = StoreToken::new(store.as_context_mut());
4920 let state = store.0.concurrent_state_mut();
4921
4922 assert!(state.guest_thread.is_none());
4923
4924 let (tx, rx) = oneshot::channel();
4925 let (exit_tx, exit_rx) = oneshot::channel();
4926
4927 let mut task = GuestTask::new(
4928 state,
4929 Box::new(for_any_lower(move |store, params| {
4930 lower_params(handle, token.as_context_mut(store), params)
4931 })),
4932 LiftResult {
4933 lift: Box::new(for_any_lift(move |store, result| {
4934 lift_result(handle, store, result)
4935 })),
4936 ty: task_return_type,
4937 memory,
4938 string_encoding,
4939 },
4940 Caller::Host {
4941 tx: Some(tx),
4942 exit_tx: Arc::new(exit_tx),
4943 host_future_present,
4944 call_post_return_automatically,
4945 },
4946 callback.map(|callback| {
4947 let callback = SendSyncPtr::new(callback);
4948 let instance = handle.instance();
4949 Box::new(
4950 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
4951 let store = token.as_context_mut(store);
4952 unsafe {
4955 instance.call_callback(
4956 store,
4957 runtime_instance,
4958 callback,
4959 event,
4960 handle,
4961 call_post_return_automatically,
4962 )
4963 }
4964 },
4965 ) as CallbackFn
4966 }),
4967 component_instance,
4968 )?;
4969 task.function_index = Some(handle.index());
4970
4971 let task = state.push(task)?;
4972 let thread = state.push(GuestThread::new_implicit(task))?;
4973 state.get_mut(task)?.threads.insert(thread);
4974
4975 Ok(PreparedCall {
4976 handle,
4977 thread: QualifiedThreadId { task, thread },
4978 param_count,
4979 rx,
4980 exit_rx,
4981 _phantom: PhantomData,
4982 })
4983}
4984
4985pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4992 mut store: StoreContextMut<T>,
4993 prepared: PreparedCall<R>,
4994) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
4995 let PreparedCall {
4996 handle,
4997 thread,
4998 param_count,
4999 rx,
5000 exit_rx,
5001 ..
5002 } = prepared;
5003
5004 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5005
5006 Ok(checked(
5007 store.0.id(),
5008 rx.map(move |result| {
5009 result
5010 .map(|v| (*v.downcast().unwrap(), exit_rx))
5011 .map_err(anyhow::Error::from)
5012 }),
5013 ))
5014}
5015
5016fn queue_call0<T: 'static>(
5019 store: StoreContextMut<T>,
5020 handle: Func,
5021 guest_thread: QualifiedThreadId,
5022 param_count: usize,
5023) -> Result<()> {
5024 let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5025 let is_concurrent = raw_options.async_;
5026 let callback = raw_options.callback;
5027 let instance = handle.instance();
5028 let callee = handle.lifted_core_func(store.0);
5029 let post_return = handle.post_return_core_func(store.0);
5030 let callback = callback.map(|i| {
5031 let instance = instance.id().get(store.0);
5032 SendSyncPtr::new(instance.runtime_callback(i))
5033 });
5034
5035 log::trace!("queueing call {guest_thread:?}");
5036
5037 let instance_flags = if callback.is_none() {
5038 None
5039 } else {
5040 Some(flags)
5041 };
5042
5043 unsafe {
5047 instance.queue_call(
5048 store,
5049 guest_thread,
5050 SendSyncPtr::new(callee),
5051 param_count,
5052 1,
5053 instance_flags,
5054 is_concurrent,
5055 callback,
5056 post_return.map(SendSyncPtr::new),
5057 )
5058 }
5059}