1use crate::component::func::{self, Func, Options};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{
58 CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
59};
60use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
61use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
62use anyhow::{Context as _, Result, anyhow, bail};
63use error_contexts::GlobalErrorContextRefCount;
64use futures::channel::oneshot;
65use futures::future::{self, Either, FutureExt};
66use futures::stream::{FuturesUnordered, StreamExt};
67use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
68use std::any::Any;
69use std::borrow::ToOwned;
70use std::boxed::Box;
71use std::cell::UnsafeCell;
72use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
73use std::fmt;
74use std::future::Future;
75use std::marker::PhantomData;
76use std::mem::{self, ManuallyDrop, MaybeUninit};
77use std::ops::DerefMut;
78use std::pin::{Pin, pin};
79use std::ptr::{self, NonNull};
80use std::slice;
81use std::sync::Arc;
82use std::task::{Context, Poll, Waker};
83use std::vec::Vec;
84use table::{TableDebug, TableId};
85use wasmtime_environ::Trap;
86use wasmtime_environ::component::{
87 CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
88 OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
89 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
90 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
91 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
92};
93
94pub use abort::JoinHandle;
95pub use futures_and_streams::{
96 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
97 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
98 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
99};
100pub(crate) use futures_and_streams::{
101 ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
102};
103
104mod abort;
105mod error_contexts;
106mod futures_and_streams;
107pub(crate) mod table;
108pub(crate) mod tls;
109
110const BLOCKED: u32 = 0xffff_ffff;
113
114#[derive(Clone, Copy, Eq, PartialEq, Debug)]
116pub enum Status {
117 Starting = 0,
118 Started = 1,
119 Returned = 2,
120 StartCancelled = 3,
121 ReturnCancelled = 4,
122}
123
124impl Status {
125 pub fn pack(self, waitable: Option<u32>) -> u32 {
131 assert!(matches!(self, Status::Returned) == waitable.is_none());
132 let waitable = waitable.unwrap_or(0);
133 assert!(waitable < (1 << 28));
134 (waitable << 4) | (self as u32)
135 }
136}
137
138#[derive(Clone, Copy, Debug)]
141enum Event {
142 None,
143 Cancelled,
144 Subtask {
145 status: Status,
146 },
147 StreamRead {
148 code: ReturnCode,
149 pending: Option<(TypeStreamTableIndex, u32)>,
150 },
151 StreamWrite {
152 code: ReturnCode,
153 pending: Option<(TypeStreamTableIndex, u32)>,
154 },
155 FutureRead {
156 code: ReturnCode,
157 pending: Option<(TypeFutureTableIndex, u32)>,
158 },
159 FutureWrite {
160 code: ReturnCode,
161 pending: Option<(TypeFutureTableIndex, u32)>,
162 },
163}
164
165impl Event {
166 fn parts(self) -> (u32, u32) {
171 const EVENT_NONE: u32 = 0;
172 const EVENT_SUBTASK: u32 = 1;
173 const EVENT_STREAM_READ: u32 = 2;
174 const EVENT_STREAM_WRITE: u32 = 3;
175 const EVENT_FUTURE_READ: u32 = 4;
176 const EVENT_FUTURE_WRITE: u32 = 5;
177 const EVENT_CANCELLED: u32 = 6;
178 match self {
179 Event::None => (EVENT_NONE, 0),
180 Event::Cancelled => (EVENT_CANCELLED, 0),
181 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
182 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
183 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
184 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
185 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
186 }
187 }
188}
189
190mod callback_code {
192 pub const EXIT: u32 = 0;
193 pub const YIELD: u32 = 1;
194 pub const WAIT: u32 = 2;
195 pub const POLL: u32 = 3;
196}
197
198const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
202
203pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
209 store: StoreContextMut<'a, T>,
210 get_data: fn(&mut T) -> D::Data<'_>,
211}
212
213impl<'a, T, D> Access<'a, T, D>
214where
215 D: HasData + ?Sized,
216 T: 'static,
217{
218 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
220 Self { store, get_data }
221 }
222
223 pub fn data_mut(&mut self) -> &mut T {
225 self.store.data_mut()
226 }
227
228 pub fn get(&mut self) -> D::Data<'_> {
230 (self.get_data)(self.data_mut())
231 }
232
233 pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
237 where
238 T: 'static,
239 {
240 let accessor = Accessor {
241 get_data: self.get_data,
242 token: StoreToken::new(self.store.as_context_mut()),
243 };
244 self.store
245 .as_context_mut()
246 .spawn_with_accessor(accessor, task)
247 }
248}
249
250impl<'a, T, D> AsContext for Access<'a, T, D>
251where
252 D: HasData + ?Sized,
253 T: 'static,
254{
255 type Data = T;
256
257 fn as_context(&self) -> StoreContext<'_, T> {
258 self.store.as_context()
259 }
260}
261
262impl<'a, T, D> AsContextMut for Access<'a, T, D>
263where
264 D: HasData + ?Sized,
265 T: 'static,
266{
267 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
268 self.store.as_context_mut()
269 }
270}
271
272pub struct Accessor<T: 'static, D = HasSelf<T>>
332where
333 D: HasData + ?Sized,
334{
335 token: StoreToken<T>,
336 get_data: fn(&mut T) -> D::Data<'_>,
337}
338
339pub trait AsAccessor {
356 type Data: 'static;
358
359 type AccessorData: HasData + ?Sized;
362
363 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
365}
366
367impl<T: AsAccessor + ?Sized> AsAccessor for &T {
368 type Data = T::Data;
369 type AccessorData = T::AccessorData;
370
371 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
372 T::as_accessor(self)
373 }
374}
375
376impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
377 type Data = T;
378 type AccessorData = D;
379
380 fn as_accessor(&self) -> &Accessor<T, D> {
381 self
382 }
383}
384
385const _: () = {
408 const fn assert<T: Send + Sync>() {}
409 assert::<Accessor<UnsafeCell<u32>>>();
410};
411
412impl<T> Accessor<T> {
413 pub(crate) fn new(token: StoreToken<T>) -> Self {
422 Self {
423 token,
424 get_data: |x| x,
425 }
426 }
427}
428
429impl<T, D> Accessor<T, D>
430where
431 D: HasData + ?Sized,
432{
433 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
451 tls::get(|vmstore| {
452 fun(Access {
453 store: self.token.as_context_mut(vmstore),
454 get_data: self.get_data,
455 })
456 })
457 }
458
459 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
462 self.get_data
463 }
464
465 pub fn with_getter<D2: HasData>(
482 &self,
483 get_data: fn(&mut T) -> D2::Data<'_>,
484 ) -> Accessor<T, D2> {
485 Accessor {
486 token: self.token,
487 get_data,
488 }
489 }
490
491 pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
507 where
508 T: 'static,
509 {
510 let accessor = self.clone_for_spawn();
511 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
512 }
513
514 fn clone_for_spawn(&self) -> Self {
515 Self {
516 token: self.token,
517 get_data: self.get_data,
518 }
519 }
520}
521
522pub trait AccessorTask<T, D, R>: Send + 'static
534where
535 D: HasData + ?Sized,
536{
537 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
539}
540
541enum CallerInfo {
544 Async {
546 params: Vec<ValRaw>,
547 has_result: bool,
548 },
549 Sync {
551 params: Vec<ValRaw>,
552 result_count: u32,
553 },
554}
555
556enum WaitMode {
558 Fiber(StoreFiber<'static>),
560 Callback(Instance),
563}
564
565#[derive(Debug)]
567enum SuspendReason {
568 Waiting {
571 set: TableId<WaitableSet>,
572 thread: QualifiedThreadId,
573 },
574 NeedWork,
577 Yielding { thread: QualifiedThreadId },
580 ExplicitlySuspending { thread: QualifiedThreadId },
582}
583
584enum GuestCallKind {
586 DeliverEvent {
589 instance: Instance,
591 set: Option<TableId<WaitableSet>>,
596 },
597 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
600 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
601}
602
603impl fmt::Debug for GuestCallKind {
604 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
605 match self {
606 Self::DeliverEvent { instance, set } => f
607 .debug_struct("DeliverEvent")
608 .field("instance", instance)
609 .field("set", set)
610 .finish(),
611 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
612 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
613 }
614 }
615}
616
617#[derive(Debug)]
619struct GuestCall {
620 thread: QualifiedThreadId,
621 kind: GuestCallKind,
622}
623
624impl GuestCall {
625 fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
635 let task_instance = state.get_mut(self.thread.task)?.instance;
636 let state = state.instance_state(task_instance);
637
638 let ready = match &self.kind {
639 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
640 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
641 GuestCallKind::StartExplicit(_) => true,
642 };
643 log::trace!(
644 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
645 state.do_not_enter,
646 state.backpressure
647 );
648 Ok(ready)
649 }
650}
651
652enum WorkerItem {
654 GuestCall(GuestCall),
655 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
656}
657
658#[derive(Debug)]
661struct PollParams {
662 instance: Instance,
664 thread: QualifiedThreadId,
666 set: TableId<WaitableSet>,
668}
669
670enum WorkItem {
673 PushFuture(AlwaysMut<HostTaskFuture>),
675 ResumeFiber(StoreFiber<'static>),
677 GuestCall(GuestCall),
679 Poll(PollParams),
681 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
683}
684
685impl fmt::Debug for WorkItem {
686 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
687 match self {
688 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
689 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
690 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
691 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
692 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
693 }
694 }
695}
696
697#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
699pub(crate) enum WaitResult {
700 Cancelled,
701 Completed,
702}
703
704pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
712 store: &mut dyn VMStore,
713 future: impl Future<Output = Result<R>> + Send + 'static,
714 caller_instance: RuntimeComponentInstanceIndex,
715) -> Result<R> {
716 let state = store.concurrent_state_mut();
717
718 let Some(caller) = state.guest_thread else {
722 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
723 Poll::Ready(result) => result,
724 Poll::Pending => {
725 unreachable!()
726 }
727 };
728 };
729
730 let old_result = state
733 .get_mut(caller.task)
734 .with_context(|| format!("bad handle: {caller:?}"))?
735 .result
736 .take();
737
738 let task = state.push(HostTask::new(caller_instance, None))?;
742
743 log::trace!("new host task child of {caller:?}: {task:?}");
744
745 let mut future = Box::pin(async move {
749 let result = future.await?;
750 tls::get(move |store| {
751 let state = store.concurrent_state_mut();
752 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
753
754 Waitable::Host(task).set_event(
755 state,
756 Some(Event::Subtask {
757 status: Status::Returned,
758 }),
759 )?;
760
761 Ok(())
762 })
763 }) as HostTaskFuture;
764
765 let poll = tls::set(store, || {
769 future
770 .as_mut()
771 .poll(&mut Context::from_waker(&Waker::noop()))
772 });
773
774 match poll {
775 Poll::Ready(result) => {
776 result?;
778 log::trace!("delete host task {task:?} (already ready)");
779 store.concurrent_state_mut().delete(task)?;
780 }
781 Poll::Pending => {
782 let state = store.concurrent_state_mut();
787 state.push_future(future);
788
789 let set = state.get_mut(caller.task)?.sync_call_set;
790 Waitable::Host(task).join(state, Some(set))?;
791
792 store.suspend(SuspendReason::Waiting {
793 set,
794 thread: caller,
795 })?;
796 }
797 }
798
799 Ok(*mem::replace(
801 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
802 old_result,
803 )
804 .unwrap()
805 .downcast()
806 .unwrap())
807}
808
809fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
811 match call.kind {
812 GuestCallKind::DeliverEvent { instance, set } => {
813 let (event, waitable) = instance
814 .get_event(store, call.thread.task, set, true)?
815 .unwrap();
816 let state = store.concurrent_state_mut();
817 let task = state.get_mut(call.thread.task)?;
818 let runtime_instance = task.instance;
819 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
820
821 log::trace!(
822 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
823 call.thread,
824 );
825
826 let old_thread = state.guest_thread.replace(call.thread);
827 log::trace!(
828 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
829 call.thread
830 );
831
832 store.maybe_push_call_context(call.thread.task)?;
833
834 let state = store.concurrent_state_mut();
835 state.enter_instance(runtime_instance);
836
837 let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
838
839 let code = callback(store, runtime_instance, event, handle)?;
840
841 let state = store.concurrent_state_mut();
842
843 state.get_mut(call.thread.task)?.callback = Some(callback);
844 state.exit_instance(runtime_instance)?;
845
846 store.maybe_pop_call_context(call.thread.task)?;
847
848 instance.handle_callback_code(store, call.thread, runtime_instance, code, false)?;
849
850 store.concurrent_state_mut().guest_thread = old_thread;
851 log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
852 }
853 GuestCallKind::StartImplicit(fun) => {
854 fun(store)?;
855 }
856 GuestCallKind::StartExplicit(fun) => {
857 fun(store)?;
858 }
859 }
860
861 Ok(())
862}
863
864impl<T> Store<T> {
865 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
867 where
868 T: Send + 'static,
869 {
870 self.as_context_mut().run_concurrent(fun).await
871 }
872
873 #[doc(hidden)]
874 pub fn assert_concurrent_state_empty(&mut self) {
875 self.as_context_mut().assert_concurrent_state_empty();
876 }
877
878 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
880 where
881 T: 'static,
882 {
883 self.as_context_mut().spawn(task)
884 }
885}
886
887impl<T> StoreContextMut<'_, T> {
888 #[doc(hidden)]
897 pub fn assert_concurrent_state_empty(self) {
898 let store = self.0;
899 store
900 .store_data_mut()
901 .components
902 .assert_guest_tables_empty();
903 let state = store.concurrent_state_mut();
904 assert!(
905 state.table.get_mut().is_empty(),
906 "non-empty table: {:?}",
907 state.table.get_mut()
908 );
909 assert!(state.high_priority.is_empty());
910 assert!(state.low_priority.is_empty());
911 assert!(state.guest_thread.is_none());
912 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
913 assert!(
914 state
915 .instance_states
916 .iter()
917 .all(|(_, state)| state.pending.is_empty())
918 );
919 assert!(state.global_error_context_ref_counts.is_empty());
920 }
921
922 pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
932 where
933 T: 'static,
934 {
935 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
936 self.spawn_with_accessor(accessor, task)
937 }
938
939 fn spawn_with_accessor<D>(
942 self,
943 accessor: Accessor<T, D>,
944 task: impl AccessorTask<T, D, Result<()>>,
945 ) -> JoinHandle
946 where
947 T: 'static,
948 D: HasData + ?Sized,
949 {
950 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
954 self.0
955 .concurrent_state_mut()
956 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
957 handle
958 }
959
960 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1041 where
1042 T: Send + 'static,
1043 {
1044 self.do_run_concurrent(fun, false).await
1045 }
1046
1047 pub(super) async fn run_concurrent_trap_on_idle<R>(
1048 self,
1049 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1050 ) -> Result<R>
1051 where
1052 T: Send + 'static,
1053 {
1054 self.do_run_concurrent(fun, true).await
1055 }
1056
1057 async fn do_run_concurrent<R>(
1058 mut self,
1059 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1060 trap_on_idle: bool,
1061 ) -> Result<R>
1062 where
1063 T: Send + 'static,
1064 {
1065 check_recursive_run();
1066 let token = StoreToken::new(self.as_context_mut());
1067
1068 struct Dropper<'a, T: 'static, V> {
1069 store: StoreContextMut<'a, T>,
1070 value: ManuallyDrop<V>,
1071 }
1072
1073 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1074 fn drop(&mut self) {
1075 tls::set(self.store.0, || {
1076 unsafe { ManuallyDrop::drop(&mut self.value) }
1081 });
1082 }
1083 }
1084
1085 let accessor = &Accessor::new(token);
1086 let dropper = &mut Dropper {
1087 store: self,
1088 value: ManuallyDrop::new(fun(accessor)),
1089 };
1090 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1092
1093 dropper
1094 .store
1095 .as_context_mut()
1096 .poll_until(future, trap_on_idle)
1097 .await
1098 }
1099
1100 async fn poll_until<R>(
1106 mut self,
1107 mut future: Pin<&mut impl Future<Output = R>>,
1108 trap_on_idle: bool,
1109 ) -> Result<R>
1110 where
1111 T: Send + 'static,
1112 {
1113 struct Reset<'a, T: 'static> {
1114 store: StoreContextMut<'a, T>,
1115 futures: Option<FuturesUnordered<HostTaskFuture>>,
1116 }
1117
1118 impl<'a, T> Drop for Reset<'a, T> {
1119 fn drop(&mut self) {
1120 if let Some(futures) = self.futures.take() {
1121 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1122 }
1123 }
1124 }
1125
1126 loop {
1127 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1131 let mut reset = Reset {
1132 store: self.as_context_mut(),
1133 futures,
1134 };
1135 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1136
1137 let result = future::poll_fn(|cx| {
1138 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1141 return Poll::Ready(Ok(Either::Left(value)));
1142 }
1143
1144 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1148 Poll::Ready(Some(output)) => {
1149 match output {
1150 Err(e) => return Poll::Ready(Err(e)),
1151 Ok(()) => {}
1152 }
1153 Poll::Ready(true)
1154 }
1155 Poll::Ready(None) => Poll::Ready(false),
1156 Poll::Pending => Poll::Pending,
1157 };
1158
1159 let state = reset.store.0.concurrent_state_mut();
1162 let ready = mem::take(&mut state.high_priority);
1163 let ready = if ready.is_empty() {
1164 let ready = mem::take(&mut state.low_priority);
1167 if ready.is_empty() {
1168 return match next {
1169 Poll::Ready(true) => {
1170 Poll::Ready(Ok(Either::Right(Vec::new())))
1176 }
1177 Poll::Ready(false) => {
1178 if let Poll::Ready(value) =
1182 tls::set(reset.store.0, || future.as_mut().poll(cx))
1183 {
1184 Poll::Ready(Ok(Either::Left(value)))
1185 } else {
1186 if trap_on_idle {
1192 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1195 } else {
1196 Poll::Pending
1200 }
1201 }
1202 }
1203 Poll::Pending => Poll::Pending,
1208 };
1209 } else {
1210 ready
1211 }
1212 } else {
1213 ready
1214 };
1215
1216 Poll::Ready(Ok(Either::Right(ready)))
1217 })
1218 .await;
1219
1220 drop(reset);
1224
1225 match result? {
1226 Either::Left(value) => break Ok(value),
1229 Either::Right(ready) => {
1232 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1233 store: StoreContextMut<'a, T>,
1234 ready: I,
1235 }
1236
1237 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1238 fn drop(&mut self) {
1239 while let Some(item) = self.ready.next() {
1240 match item {
1241 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1242 WorkItem::PushFuture(future) => {
1243 tls::set(self.store.0, move || drop(future))
1244 }
1245 _ => {}
1246 }
1247 }
1248 }
1249 }
1250
1251 let mut dispose = Dispose {
1252 store: self.as_context_mut(),
1253 ready: ready.into_iter(),
1254 };
1255
1256 while let Some(item) = dispose.ready.next() {
1257 dispose
1258 .store
1259 .as_context_mut()
1260 .handle_work_item(item)
1261 .await?;
1262 }
1263 }
1264 }
1265 }
1266 }
1267
1268 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1270 where
1271 T: Send,
1272 {
1273 log::trace!("handle work item {item:?}");
1274 match item {
1275 WorkItem::PushFuture(future) => {
1276 self.0
1277 .concurrent_state_mut()
1278 .futures
1279 .get_mut()
1280 .as_mut()
1281 .unwrap()
1282 .push(future.into_inner());
1283 }
1284 WorkItem::ResumeFiber(fiber) => {
1285 self.0.resume_fiber(fiber).await?;
1286 }
1287 WorkItem::GuestCall(call) => {
1288 let state = self.0.concurrent_state_mut();
1289 if call.is_ready(state)? {
1290 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1291 } else {
1292 let task = state.get_mut(call.thread.task)?;
1293 if !task.starting_sent {
1294 task.starting_sent = true;
1295 if let GuestCallKind::StartImplicit(_) = &call.kind {
1296 Waitable::Guest(call.thread.task).set_event(
1297 state,
1298 Some(Event::Subtask {
1299 status: Status::Starting,
1300 }),
1301 )?;
1302 }
1303 }
1304
1305 let runtime_instance = state.get_mut(call.thread.task)?.instance;
1306 state
1307 .instance_state(runtime_instance)
1308 .pending
1309 .insert(call.thread, call.kind);
1310 }
1311 }
1312 WorkItem::Poll(params) => {
1313 let state = self.0.concurrent_state_mut();
1314 if state.get_mut(params.thread.task)?.event.is_some()
1315 || !state.get_mut(params.set)?.ready.is_empty()
1316 {
1317 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1320 thread: params.thread,
1321 kind: GuestCallKind::DeliverEvent {
1322 instance: params.instance,
1323 set: Some(params.set),
1324 },
1325 }));
1326 } else {
1327 state.get_mut(params.thread.task)?.event = Some(Event::None);
1330 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1331 thread: params.thread,
1332 kind: GuestCallKind::DeliverEvent {
1333 instance: params.instance,
1334 set: Some(params.set),
1335 },
1336 }));
1337 }
1338 }
1339 WorkItem::WorkerFunction(fun) => {
1340 self.run_on_worker(WorkerItem::Function(fun)).await?;
1341 }
1342 }
1343
1344 Ok(())
1345 }
1346
1347 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1349 where
1350 T: Send,
1351 {
1352 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1353 fiber
1354 } else {
1355 fiber::make_fiber(self.0, move |store| {
1356 loop {
1357 match store.concurrent_state_mut().worker_item.take().unwrap() {
1358 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1359 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1360 }
1361
1362 store.suspend(SuspendReason::NeedWork)?;
1363 }
1364 })?
1365 };
1366
1367 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1368 assert!(worker_item.is_none());
1369 *worker_item = Some(item);
1370
1371 self.0.resume_fiber(worker).await
1372 }
1373
1374 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1379 where
1380 T: 'static,
1381 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1382 + Send
1383 + Sync
1384 + 'static,
1385 R: Send + Sync + 'static,
1386 {
1387 let token = StoreToken::new(self);
1388 async move {
1389 let mut accessor = Accessor::new(token);
1390 closure(&mut accessor).await
1391 }
1392 }
1393}
1394
1395impl StoreOpaque {
1396 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1399 let old_thread = self.concurrent_state_mut().guest_thread;
1400 log::trace!("resume_fiber: save current thread {old_thread:?}");
1401
1402 let fiber = fiber::resolve_or_release(self, fiber).await?;
1403
1404 let state = self.concurrent_state_mut();
1405
1406 state.guest_thread = old_thread;
1407 if let Some(ref ot) = old_thread {
1408 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1409 }
1410 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1411
1412 if let Some(mut fiber) = fiber {
1413 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1414 match state.suspend_reason.take().unwrap() {
1416 SuspendReason::NeedWork => {
1417 if state.worker.is_none() {
1418 state.worker = Some(fiber);
1419 } else {
1420 fiber.dispose(self);
1421 }
1422 }
1423 SuspendReason::Yielding { thread, .. } => {
1424 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1425 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1426 }
1427 SuspendReason::ExplicitlySuspending { thread, .. } => {
1428 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1429 }
1430 SuspendReason::Waiting { set, thread } => {
1431 let old = state
1432 .get_mut(set)?
1433 .waiting
1434 .insert(thread, WaitMode::Fiber(fiber));
1435 assert!(old.is_none());
1436 }
1437 };
1438 } else {
1439 log::trace!("resume_fiber: fiber has exited");
1440 }
1441
1442 Ok(())
1443 }
1444
1445 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1451 log::trace!("suspend fiber: {reason:?}");
1452
1453 let task = match &reason {
1457 SuspendReason::Yielding { thread, .. }
1458 | SuspendReason::Waiting { thread, .. }
1459 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1460 SuspendReason::NeedWork => None,
1461 };
1462
1463 let old_guest_thread = if let Some(task) = task {
1464 self.maybe_pop_call_context(task)?;
1465 self.concurrent_state_mut().guest_thread
1466 } else {
1467 None
1468 };
1469
1470 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1471 assert!(suspend_reason.is_none());
1472 *suspend_reason = Some(reason);
1473
1474 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1475
1476 if let Some(task) = task {
1477 self.concurrent_state_mut().guest_thread = old_guest_thread;
1478 self.maybe_push_call_context(task)?;
1479 }
1480
1481 Ok(())
1482 }
1483
1484 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1488 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1489
1490 if !task.returned_or_cancelled() {
1491 log::trace!("push call context for {guest_task:?}");
1492 let call_context = task.call_context.take().unwrap();
1493 self.component_resource_state().0.push(call_context);
1494 }
1495 Ok(())
1496 }
1497
1498 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1502 if !self
1503 .concurrent_state_mut()
1504 .get_mut(guest_task)?
1505 .returned_or_cancelled()
1506 {
1507 log::trace!("pop call context for {guest_task:?}");
1508 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1509 self.concurrent_state_mut()
1510 .get_mut(guest_task)?
1511 .call_context = call_context;
1512 }
1513 Ok(())
1514 }
1515
1516 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1517 let state = self.concurrent_state_mut();
1518 let caller = state.guest_thread.unwrap();
1519 let old_set = waitable.common(state)?.set;
1520 let set = state.get_mut(caller.task)?.sync_call_set;
1521 waitable.join(state, Some(set))?;
1522 self.suspend(SuspendReason::Waiting {
1523 set,
1524 thread: caller,
1525 })?;
1526 let state = self.concurrent_state_mut();
1527 waitable.join(state, old_set)
1528 }
1529}
1530
1531impl Instance {
1532 fn get_event(
1535 self,
1536 store: &mut StoreOpaque,
1537 guest_task: TableId<GuestTask>,
1538 set: Option<TableId<WaitableSet>>,
1539 cancellable: bool,
1540 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1541 let state = store.concurrent_state_mut();
1542
1543 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1544 log::trace!("deliver event {event:?} to {guest_task:?}");
1545
1546 if cancellable || !matches!(event, Event::Cancelled) {
1547 return Ok(Some((event, None)));
1548 } else {
1549 state.get_mut(guest_task)?.event = Some(event);
1550 }
1551 }
1552
1553 Ok(
1554 if let Some((set, waitable)) = set
1555 .and_then(|set| {
1556 state
1557 .get_mut(set)
1558 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1559 .transpose()
1560 })
1561 .transpose()?
1562 {
1563 let common = waitable.common(state)?;
1564 let handle = common.handle.unwrap();
1565 let event = common.event.take().unwrap();
1566
1567 log::trace!(
1568 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1569 );
1570
1571 waitable.on_delivery(self.id().get_mut(store), event);
1572
1573 Some((event, Some((waitable, handle))))
1574 } else {
1575 None
1576 },
1577 )
1578 }
1579
1580 fn handle_callback_code(
1586 self,
1587 store: &mut StoreOpaque,
1588 guest_thread: QualifiedThreadId,
1589 runtime_instance: RuntimeComponentInstanceIndex,
1590 code: u32,
1591 initial_call: bool,
1592 ) -> Result<()> {
1593 let (code, set) = unpack_callback_code(code);
1594
1595 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1596
1597 let state = store.concurrent_state_mut();
1598 if !state.get_mut(guest_thread.task)?.returned_or_cancelled() {
1599 if initial_call {
1600 Waitable::Guest(guest_thread.task).set_event(
1603 state,
1604 Some(Event::Subtask {
1605 status: Status::Started,
1606 }),
1607 )?;
1608 }
1609 }
1610
1611 let get_set = |store, handle| {
1612 if handle == 0 {
1613 bail!("invalid waitable-set handle");
1614 }
1615
1616 let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1617 .waitable_set_rep(handle)?;
1618
1619 Ok(TableId::<WaitableSet>::new(set))
1620 };
1621
1622 match code {
1623 callback_code::EXIT => {
1624 log::trace!("implicit thread {guest_thread:?} completed");
1625 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1626 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1627 if task.threads.is_empty() && !task.returned_or_cancelled() {
1628 bail!(Trap::NoAsyncResult);
1629 }
1630 match &task.caller {
1631 Caller::Host { .. } => {
1632 if task.ready_to_delete() {
1633 Waitable::Guest(guest_thread.task)
1634 .delete_from(store.concurrent_state_mut())?;
1635 }
1636 }
1637 Caller::Guest { .. } => {
1638 task.exited = true;
1639 task.callback = None;
1640 }
1641 }
1642 }
1643 callback_code::YIELD => {
1644 let task = state.get_mut(guest_thread.task)?;
1647 assert!(task.event.is_none());
1648 task.event = Some(Event::None);
1649 state.push_low_priority(WorkItem::GuestCall(GuestCall {
1650 thread: guest_thread,
1651 kind: GuestCallKind::DeliverEvent {
1652 instance: self,
1653 set: None,
1654 },
1655 }));
1656 }
1657 callback_code::WAIT | callback_code::POLL => {
1658 let set = get_set(store, set)?;
1659 let state = store.concurrent_state_mut();
1660
1661 if state.get_mut(guest_thread.task)?.event.is_some()
1662 || !state.get_mut(set)?.ready.is_empty()
1663 {
1664 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1666 thread: guest_thread,
1667 kind: GuestCallKind::DeliverEvent {
1668 instance: self,
1669 set: Some(set),
1670 },
1671 }));
1672 } else {
1673 match code {
1675 callback_code::POLL => {
1676 state.push_low_priority(WorkItem::Poll(PollParams {
1679 instance: self,
1680 thread: guest_thread,
1681 set,
1682 }));
1683 }
1684 callback_code::WAIT => {
1685 let old = state
1692 .get_mut(guest_thread.thread)?
1693 .wake_on_cancel
1694 .replace(set);
1695 assert!(old.is_none());
1696 let old = state
1697 .get_mut(set)?
1698 .waiting
1699 .insert(guest_thread, WaitMode::Callback(self));
1700 assert!(old.is_none());
1701 }
1702 _ => unreachable!(),
1703 }
1704 }
1705 }
1706 _ => bail!("unsupported callback code: {code}"),
1707 }
1708
1709 Ok(())
1710 }
1711
1712 fn cleanup_thread(
1713 self,
1714 store: &mut StoreOpaque,
1715 guest_thread: QualifiedThreadId,
1716 runtime_instance: RuntimeComponentInstanceIndex,
1717 ) -> Result<()> {
1718 let guest_id = store
1719 .concurrent_state_mut()
1720 .get_mut(guest_thread.thread)?
1721 .instance_rep;
1722 self.id().get_mut(store).guest_tables().0[runtime_instance]
1723 .guest_thread_remove(guest_id.unwrap())?;
1724
1725 store.concurrent_state_mut().delete(guest_thread.thread)?;
1726 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1727 task.threads.remove(&guest_thread.thread);
1728 Ok(())
1729 }
1730
1731 unsafe fn queue_call<T: 'static>(
1738 self,
1739 mut store: StoreContextMut<T>,
1740 guest_thread: QualifiedThreadId,
1741 callee: SendSyncPtr<VMFuncRef>,
1742 param_count: usize,
1743 result_count: usize,
1744 flags: Option<InstanceFlags>,
1745 async_: bool,
1746 callback: Option<SendSyncPtr<VMFuncRef>>,
1747 post_return: Option<SendSyncPtr<VMFuncRef>>,
1748 ) -> Result<()> {
1749 unsafe fn make_call<T: 'static>(
1764 store: StoreContextMut<T>,
1765 guest_thread: QualifiedThreadId,
1766 callee: SendSyncPtr<VMFuncRef>,
1767 param_count: usize,
1768 result_count: usize,
1769 flags: Option<InstanceFlags>,
1770 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1771 + Send
1772 + Sync
1773 + 'static
1774 + use<T> {
1775 let token = StoreToken::new(store);
1776 move |store: &mut dyn VMStore| {
1777 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1778
1779 store
1780 .concurrent_state_mut()
1781 .get_mut(guest_thread.thread)?
1782 .state = GuestThreadState::Running;
1783 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1784 let may_enter_after_call = task.call_post_return_automatically();
1785 let lower = task.lower_params.take().unwrap();
1786
1787 lower(store, &mut storage[..param_count])?;
1788
1789 let mut store = token.as_context_mut(store);
1790
1791 unsafe {
1794 if let Some(mut flags) = flags {
1795 flags.set_may_enter(false);
1796 }
1797 crate::Func::call_unchecked_raw(
1798 &mut store,
1799 callee.as_non_null(),
1800 NonNull::new(
1801 &mut storage[..param_count.max(result_count)]
1802 as *mut [MaybeUninit<ValRaw>] as _,
1803 )
1804 .unwrap(),
1805 )?;
1806 if let Some(mut flags) = flags {
1807 flags.set_may_enter(may_enter_after_call);
1808 }
1809 }
1810
1811 Ok(storage)
1812 }
1813 }
1814
1815 let call = unsafe {
1819 make_call(
1820 store.as_context_mut(),
1821 guest_thread,
1822 callee,
1823 param_count,
1824 result_count,
1825 flags,
1826 )
1827 };
1828
1829 let callee_instance = store
1830 .0
1831 .concurrent_state_mut()
1832 .get_mut(guest_thread.task)?
1833 .instance;
1834 let fun = if callback.is_some() {
1835 assert!(async_);
1836
1837 Box::new(move |store: &mut dyn VMStore| {
1838 self.add_guest_thread_to_instance_table(
1839 guest_thread.thread,
1840 store,
1841 callee_instance,
1842 )?;
1843 let old_thread = store
1844 .concurrent_state_mut()
1845 .guest_thread
1846 .replace(guest_thread);
1847 log::trace!(
1848 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1849 );
1850
1851 store.maybe_push_call_context(guest_thread.task)?;
1852
1853 store.concurrent_state_mut().enter_instance(callee_instance);
1854
1855 let storage = call(store)?;
1862
1863 store
1864 .concurrent_state_mut()
1865 .exit_instance(callee_instance)?;
1866
1867 store.maybe_pop_call_context(guest_thread.task)?;
1868
1869 let state = store.concurrent_state_mut();
1870 state.guest_thread = old_thread;
1871 old_thread
1872 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1873 log::trace!("stackless call: restored {old_thread:?} as current thread");
1874
1875 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1878
1879 self.handle_callback_code(store, guest_thread, callee_instance, code, true)?;
1880
1881 Ok(())
1882 }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1883 } else {
1884 let token = StoreToken::new(store.as_context_mut());
1885 Box::new(move |store: &mut dyn VMStore| {
1886 self.add_guest_thread_to_instance_table(
1887 guest_thread.thread,
1888 store,
1889 callee_instance,
1890 )?;
1891 let old_thread = store
1892 .concurrent_state_mut()
1893 .guest_thread
1894 .replace(guest_thread);
1895 log::trace!(
1896 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1897 );
1898 let mut flags = self.id().get(store).instance_flags(callee_instance);
1899
1900 store.maybe_push_call_context(guest_thread.task)?;
1901
1902 if !async_ {
1906 store.concurrent_state_mut().enter_instance(callee_instance);
1907 }
1908
1909 let storage = call(store)?;
1916
1917 self.cleanup_thread(store, guest_thread, callee_instance)?;
1919
1920 if async_ {
1921 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1922 if task.threads.is_empty() && !task.returned_or_cancelled() {
1923 bail!(Trap::NoAsyncResult);
1924 }
1925 } else {
1926 let lift = {
1932 let state = store.concurrent_state_mut();
1933 state.exit_instance(callee_instance)?;
1934
1935 assert!(state.get_mut(guest_thread.task)?.result.is_none());
1936
1937 state
1938 .get_mut(guest_thread.task)?
1939 .lift_result
1940 .take()
1941 .unwrap()
1942 };
1943
1944 let result = (lift.lift)(store, unsafe {
1947 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1948 &storage[..result_count],
1949 )
1950 })?;
1951
1952 let post_return_arg = match result_count {
1953 0 => ValRaw::i32(0),
1954 1 => unsafe { storage[0].assume_init() },
1957 _ => unreachable!(),
1958 };
1959
1960 if store
1961 .concurrent_state_mut()
1962 .get_mut(guest_thread.task)?
1963 .call_post_return_automatically()
1964 {
1965 unsafe {
1966 flags.set_may_leave(false);
1967 flags.set_needs_post_return(false);
1968 }
1969
1970 if let Some(func) = post_return {
1971 let mut store = token.as_context_mut(store);
1972
1973 unsafe {
1979 crate::Func::call_unchecked_raw(
1980 &mut store,
1981 func.as_non_null(),
1982 slice::from_ref(&post_return_arg).into(),
1983 )?;
1984 }
1985 }
1986
1987 unsafe {
1988 flags.set_may_leave(true);
1989 flags.set_may_enter(true);
1990 }
1991 }
1992
1993 self.task_complete(
1994 store,
1995 guest_thread.task,
1996 result,
1997 Status::Returned,
1998 post_return_arg,
1999 )?;
2000 }
2001
2002 store.maybe_pop_call_context(guest_thread.task)?;
2003
2004 let state = store.concurrent_state_mut();
2005 let task = state.get_mut(guest_thread.task)?;
2006
2007 match &task.caller {
2008 Caller::Host { .. } => {
2009 if task.ready_to_delete() {
2010 Waitable::Guest(guest_thread.task).delete_from(state)?;
2011 }
2012 }
2013 Caller::Guest { .. } => {
2014 task.exited = true;
2015 }
2016 }
2017
2018 Ok(())
2019 })
2020 };
2021
2022 store
2023 .0
2024 .concurrent_state_mut()
2025 .push_high_priority(WorkItem::GuestCall(GuestCall {
2026 thread: guest_thread,
2027 kind: GuestCallKind::StartImplicit(fun),
2028 }));
2029
2030 Ok(())
2031 }
2032
2033 unsafe fn prepare_call<T: 'static>(
2046 self,
2047 mut store: StoreContextMut<T>,
2048 start: *mut VMFuncRef,
2049 return_: *mut VMFuncRef,
2050 caller_instance: RuntimeComponentInstanceIndex,
2051 callee_instance: RuntimeComponentInstanceIndex,
2052 task_return_type: TypeTupleIndex,
2053 memory: *mut VMMemoryDefinition,
2054 string_encoding: u8,
2055 caller_info: CallerInfo,
2056 ) -> Result<()> {
2057 self.id().get(store.0).check_may_leave(caller_instance)?;
2058
2059 enum ResultInfo {
2060 Heap { results: u32 },
2061 Stack { result_count: u32 },
2062 }
2063
2064 let result_info = match &caller_info {
2065 CallerInfo::Async {
2066 has_result: true,
2067 params,
2068 } => ResultInfo::Heap {
2069 results: params.last().unwrap().get_u32(),
2070 },
2071 CallerInfo::Async {
2072 has_result: false, ..
2073 } => ResultInfo::Stack { result_count: 0 },
2074 CallerInfo::Sync {
2075 result_count,
2076 params,
2077 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2078 results: params.last().unwrap().get_u32(),
2079 },
2080 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2081 result_count: *result_count,
2082 },
2083 };
2084
2085 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2086
2087 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2091 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2092 let token = StoreToken::new(store.as_context_mut());
2093 let state = store.0.concurrent_state_mut();
2094 let old_thread = state.guest_thread.take();
2095 let new_task = GuestTask::new(
2096 state,
2097 Box::new(move |store, dst| {
2098 let mut store = token.as_context_mut(store);
2099 assert!(dst.len() <= MAX_FLAT_PARAMS);
2100 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2101 let count = match caller_info {
2102 CallerInfo::Async { params, has_result } => {
2106 let params = ¶ms[..params.len() - usize::from(has_result)];
2107 for (param, src) in params.iter().zip(&mut src) {
2108 src.write(*param);
2109 }
2110 params.len()
2111 }
2112
2113 CallerInfo::Sync { params, .. } => {
2115 for (param, src) in params.iter().zip(&mut src) {
2116 src.write(*param);
2117 }
2118 params.len()
2119 }
2120 };
2121 unsafe {
2128 crate::Func::call_unchecked_raw(
2129 &mut store,
2130 start.as_non_null(),
2131 NonNull::new(
2132 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2133 )
2134 .unwrap(),
2135 )?;
2136 }
2137 dst.copy_from_slice(&src[..dst.len()]);
2138 let state = store.0.concurrent_state_mut();
2139 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2140 state,
2141 Some(Event::Subtask {
2142 status: Status::Started,
2143 }),
2144 )?;
2145 Ok(())
2146 }),
2147 LiftResult {
2148 lift: Box::new(move |store, src| {
2149 let mut store = token.as_context_mut(store);
2152 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2154 my_src.push(ValRaw::u32(*results));
2155 }
2156 unsafe {
2163 crate::Func::call_unchecked_raw(
2164 &mut store,
2165 return_.as_non_null(),
2166 my_src.as_mut_slice().into(),
2167 )?;
2168 }
2169 let state = store.0.concurrent_state_mut();
2170 let thread = state.guest_thread.unwrap();
2171 if sync_caller {
2172 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2173 if let ResultInfo::Stack { result_count } = &result_info {
2174 match result_count {
2175 0 => None,
2176 1 => Some(my_src[0]),
2177 _ => unreachable!(),
2178 }
2179 } else {
2180 None
2181 },
2182 );
2183 }
2184 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2185 }),
2186 ty: task_return_type,
2187 memory: NonNull::new(memory).map(SendSyncPtr::new),
2188 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2189 },
2190 Caller::Guest {
2191 thread: old_thread.unwrap(),
2192 instance: caller_instance,
2193 },
2194 None,
2195 callee_instance,
2196 )?;
2197
2198 let guest_task = state.push(new_task)?;
2199 let new_thread = GuestThread::new_implicit(guest_task);
2200 let guest_thread = state.push(new_thread)?;
2201 state.get_mut(guest_task)?.threads.insert(guest_thread);
2202
2203 let state = store.0.concurrent_state_mut();
2204 if let Some(old_thread) = old_thread {
2205 if !state.may_enter(guest_task) {
2206 bail!(crate::Trap::CannotEnterComponent);
2207 }
2208
2209 state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2210 };
2211
2212 state.guest_thread = Some(QualifiedThreadId {
2215 task: guest_task,
2216 thread: guest_thread,
2217 });
2218 log::trace!(
2219 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2220 );
2221
2222 Ok(())
2223 }
2224
2225 unsafe fn call_callback<T>(
2230 self,
2231 mut store: StoreContextMut<T>,
2232 callee_instance: RuntimeComponentInstanceIndex,
2233 function: SendSyncPtr<VMFuncRef>,
2234 event: Event,
2235 handle: u32,
2236 may_enter_after_call: bool,
2237 ) -> Result<u32> {
2238 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2239
2240 let (ordinal, result) = event.parts();
2241 let params = &mut [
2242 ValRaw::u32(ordinal),
2243 ValRaw::u32(handle),
2244 ValRaw::u32(result),
2245 ];
2246 unsafe {
2251 flags.set_may_enter(false);
2252 crate::Func::call_unchecked_raw(
2253 &mut store,
2254 function.as_non_null(),
2255 params.as_mut_slice().into(),
2256 )?;
2257 flags.set_may_enter(may_enter_after_call);
2258 }
2259 Ok(params[0].get_u32())
2260 }
2261
2262 unsafe fn start_call<T: 'static>(
2275 self,
2276 mut store: StoreContextMut<T>,
2277 callback: *mut VMFuncRef,
2278 post_return: *mut VMFuncRef,
2279 callee: *mut VMFuncRef,
2280 param_count: u32,
2281 result_count: u32,
2282 flags: u32,
2283 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2284 ) -> Result<u32> {
2285 let token = StoreToken::new(store.as_context_mut());
2286 let async_caller = storage.is_none();
2287 let state = store.0.concurrent_state_mut();
2288 let guest_thread = state.guest_thread.unwrap();
2289 let may_enter_after_call = state
2290 .get_mut(guest_thread.task)?
2291 .call_post_return_automatically();
2292 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2293 let param_count = usize::try_from(param_count).unwrap();
2294 assert!(param_count <= MAX_FLAT_PARAMS);
2295 let result_count = usize::try_from(result_count).unwrap();
2296 assert!(result_count <= MAX_FLAT_RESULTS);
2297
2298 let task = state.get_mut(guest_thread.task)?;
2299 if !callback.is_null() {
2300 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2304 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2305 let store = token.as_context_mut(store);
2306 unsafe {
2307 self.call_callback::<T>(
2308 store,
2309 runtime_instance,
2310 callback,
2311 event,
2312 handle,
2313 may_enter_after_call,
2314 )
2315 }
2316 }));
2317 }
2318
2319 let Caller::Guest {
2320 thread: caller,
2321 instance: runtime_instance,
2322 } = &task.caller
2323 else {
2324 unreachable!()
2327 };
2328 let caller = *caller;
2329 let caller_instance = *runtime_instance;
2330
2331 let callee_instance = task.instance;
2332
2333 let instance_flags = if callback.is_null() {
2334 None
2335 } else {
2336 Some(self.id().get(store.0).instance_flags(callee_instance))
2337 };
2338
2339 unsafe {
2341 self.queue_call(
2342 store.as_context_mut(),
2343 guest_thread,
2344 callee,
2345 param_count,
2346 result_count,
2347 instance_flags,
2348 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2349 NonNull::new(callback).map(SendSyncPtr::new),
2350 NonNull::new(post_return).map(SendSyncPtr::new),
2351 )?;
2352 }
2353
2354 let state = store.0.concurrent_state_mut();
2355
2356 let guest_waitable = Waitable::Guest(guest_thread.task);
2359 let old_set = guest_waitable.common(state)?.set;
2360 let set = state.get_mut(caller.task)?.sync_call_set;
2361 guest_waitable.join(state, Some(set))?;
2362
2363 let (status, waitable) = loop {
2379 store.0.suspend(SuspendReason::Waiting {
2380 set,
2381 thread: caller,
2382 })?;
2383
2384 let state = store.0.concurrent_state_mut();
2385
2386 log::trace!("taking event for {:?}", guest_thread.task);
2387 let event = guest_waitable.take_event(state)?;
2388 let Some(Event::Subtask { status }) = event else {
2389 unreachable!();
2390 };
2391
2392 log::trace!("status {status:?} for {:?}", guest_thread.task);
2393
2394 if status == Status::Returned {
2395 break (status, None);
2397 } else if async_caller {
2398 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2402 .subtask_insert_guest(guest_thread.task.rep())?;
2403 store
2404 .0
2405 .concurrent_state_mut()
2406 .get_mut(guest_thread.task)?
2407 .common
2408 .handle = Some(handle);
2409 break (status, Some(handle));
2410 } else {
2411 }
2415 };
2416
2417 let state = store.0.concurrent_state_mut();
2418
2419 guest_waitable.join(state, old_set)?;
2420
2421 if let Some(storage) = storage {
2422 let task = state.get_mut(guest_thread.task)?;
2426 if let Some(result) = task.sync_result.take() {
2427 if let Some(result) = result {
2428 storage[0] = MaybeUninit::new(result);
2429 }
2430
2431 if task.exited {
2432 if task.ready_to_delete() {
2433 Waitable::Guest(guest_thread.task).delete_from(state)?;
2434 }
2435 }
2436 }
2437 }
2438
2439 state.guest_thread = Some(caller);
2441 state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2442 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2443
2444 Ok(status.pack(waitable))
2445 }
2446
2447 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2459 self,
2460 mut store: StoreContextMut<T>,
2461 future: impl Future<Output = Result<R>> + Send + 'static,
2462 caller_instance: RuntimeComponentInstanceIndex,
2463 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2464 ) -> Result<Option<u32>> {
2465 let token = StoreToken::new(store.as_context_mut());
2466 let state = store.0.concurrent_state_mut();
2467 let caller = state.guest_thread.unwrap();
2468
2469 let (join_handle, future) = JoinHandle::run(async move {
2472 let mut future = pin!(future);
2473 let mut call_context = None;
2474 future::poll_fn(move |cx| {
2475 tls::get(|store| {
2478 if let Some(call_context) = call_context.take() {
2479 token
2480 .as_context_mut(store)
2481 .0
2482 .component_resource_state()
2483 .0
2484 .push(call_context);
2485 }
2486 });
2487
2488 let result = future.as_mut().poll(cx);
2489
2490 if result.is_pending() {
2491 tls::get(|store| {
2494 call_context = Some(
2495 token
2496 .as_context_mut(store)
2497 .0
2498 .component_resource_state()
2499 .0
2500 .pop()
2501 .unwrap(),
2502 );
2503 });
2504 }
2505 result
2506 })
2507 .await
2508 });
2509
2510 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2514
2515 log::trace!("new host task child of {caller:?}: {task:?}");
2516
2517 let mut future = Box::pin(future);
2518
2519 let poll = tls::set(store.0, || {
2524 future
2525 .as_mut()
2526 .poll(&mut Context::from_waker(&Waker::noop()))
2527 });
2528
2529 Ok(match poll {
2530 Poll::Ready(None) => unreachable!(),
2531 Poll::Ready(Some(result)) => {
2532 lower(store.as_context_mut(), result?)?;
2535 log::trace!("delete host task {task:?} (already ready)");
2536 store.0.concurrent_state_mut().delete(task)?;
2537 None
2538 }
2539 Poll::Pending => {
2540 let future =
2548 Box::pin(async move {
2549 let result = match future.await {
2550 Some(result) => result?,
2551 None => return Ok(()),
2553 };
2554 tls::get(move |store| {
2555 store.concurrent_state_mut().push_high_priority(
2561 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2562 lower(token.as_context_mut(store), result)?;
2563 let state = store.concurrent_state_mut();
2564 state.get_mut(task)?.join_handle.take();
2565 Waitable::Host(task).set_event(
2566 state,
2567 Some(Event::Subtask {
2568 status: Status::Returned,
2569 }),
2570 )
2571 }))),
2572 );
2573 Ok(())
2574 })
2575 });
2576
2577 store.0.concurrent_state_mut().push_future(future);
2578 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2579 .subtask_insert_host(task.rep())?;
2580 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2581 log::trace!(
2582 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2583 );
2584 Some(handle)
2585 }
2586 })
2587 }
2588
2589 pub(crate) fn task_return(
2592 self,
2593 store: &mut dyn VMStore,
2594 caller: RuntimeComponentInstanceIndex,
2595 ty: TypeTupleIndex,
2596 options: OptionsIndex,
2597 storage: &[ValRaw],
2598 ) -> Result<()> {
2599 self.id().get(store).check_may_leave(caller)?;
2600 let state = store.concurrent_state_mut();
2601 let guest_thread = state.guest_thread.unwrap();
2602 let lift = state
2603 .get_mut(guest_thread.task)?
2604 .lift_result
2605 .take()
2606 .ok_or_else(|| {
2607 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2608 })?;
2609 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2610
2611 let CanonicalOptions {
2612 string_encoding,
2613 data_model,
2614 ..
2615 } = &self.id().get(store).component().env_component().options[options];
2616
2617 let invalid = ty != lift.ty
2618 || string_encoding != &lift.string_encoding
2619 || match data_model {
2620 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2621 Some(memory) => {
2622 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2623 let actual = self.id().get(store).runtime_memory(memory);
2624 expected != actual
2625 }
2626 None => false,
2629 },
2630 CanonicalOptionsDataModel::Gc { .. } => true,
2632 };
2633
2634 if invalid {
2635 bail!("invalid `task.return` signature and/or options for current task");
2636 }
2637
2638 log::trace!("task.return for {guest_thread:?}");
2639
2640 let result = (lift.lift)(store, storage)?;
2641 self.task_complete(
2642 store,
2643 guest_thread.task,
2644 result,
2645 Status::Returned,
2646 ValRaw::i32(0),
2647 )
2648 }
2649
2650 pub(crate) fn task_cancel(
2652 self,
2653 store: &mut StoreOpaque,
2654 caller: RuntimeComponentInstanceIndex,
2655 ) -> Result<()> {
2656 self.id().get(store).check_may_leave(caller)?;
2657 let state = store.concurrent_state_mut();
2658 let guest_thread = state.guest_thread.unwrap();
2659 let task = state.get_mut(guest_thread.task)?;
2660 if !task.cancel_sent {
2661 bail!("`task.cancel` called by task which has not been cancelled")
2662 }
2663 _ = task.lift_result.take().ok_or_else(|| {
2664 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2665 })?;
2666
2667 assert!(task.result.is_none());
2668
2669 log::trace!("task.cancel for {guest_thread:?}");
2670
2671 self.task_complete(
2672 store,
2673 guest_thread.task,
2674 Box::new(DummyResult),
2675 Status::ReturnCancelled,
2676 ValRaw::i32(0),
2677 )
2678 }
2679
2680 fn task_complete(
2686 self,
2687 store: &mut StoreOpaque,
2688 guest_task: TableId<GuestTask>,
2689 result: Box<dyn Any + Send + Sync>,
2690 status: Status,
2691 post_return_arg: ValRaw,
2692 ) -> Result<()> {
2693 if store
2694 .concurrent_state_mut()
2695 .get_mut(guest_task)?
2696 .call_post_return_automatically()
2697 {
2698 let (calls, host_table, _, instance) =
2699 store.component_resource_state_with_instance(self);
2700 ResourceTables {
2701 calls,
2702 host_table: Some(host_table),
2703 guest: Some(instance.guest_tables()),
2704 }
2705 .exit_call()?;
2706 } else {
2707 let function_index = store
2712 .concurrent_state_mut()
2713 .get_mut(guest_task)?
2714 .function_index
2715 .unwrap();
2716 self.id()
2717 .get_mut(store)
2718 .post_return_arg_set(function_index, post_return_arg);
2719 }
2720
2721 let state = store.concurrent_state_mut();
2722 let task = state.get_mut(guest_task)?;
2723
2724 if let Caller::Host { tx, .. } = &mut task.caller {
2725 if let Some(tx) = tx.take() {
2726 _ = tx.send(result);
2727 }
2728 } else {
2729 task.result = Some(result);
2730 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2731 }
2732
2733 Ok(())
2734 }
2735
2736 pub(crate) fn waitable_set_new(
2738 self,
2739 store: &mut StoreOpaque,
2740 caller_instance: RuntimeComponentInstanceIndex,
2741 ) -> Result<u32> {
2742 self.id().get_mut(store).check_may_leave(caller_instance)?;
2743 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2744 let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2745 .waitable_set_insert(set.rep())?;
2746 log::trace!("new waitable set {set:?} (handle {handle})");
2747 Ok(handle)
2748 }
2749
2750 pub(crate) fn waitable_set_drop(
2752 self,
2753 store: &mut StoreOpaque,
2754 caller_instance: RuntimeComponentInstanceIndex,
2755 set: u32,
2756 ) -> Result<()> {
2757 self.id().get_mut(store).check_may_leave(caller_instance)?;
2758 let rep =
2759 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2760
2761 log::trace!("drop waitable set {rep} (handle {set})");
2762
2763 let set = store
2764 .concurrent_state_mut()
2765 .delete(TableId::<WaitableSet>::new(rep))?;
2766
2767 if !set.waiting.is_empty() {
2768 bail!("cannot drop waitable set with waiters");
2769 }
2770
2771 Ok(())
2772 }
2773
2774 pub(crate) fn waitable_join(
2776 self,
2777 store: &mut StoreOpaque,
2778 caller_instance: RuntimeComponentInstanceIndex,
2779 waitable_handle: u32,
2780 set_handle: u32,
2781 ) -> Result<()> {
2782 let mut instance = self.id().get_mut(store);
2783 instance.check_may_leave(caller_instance)?;
2784 let waitable =
2785 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2786
2787 let set = if set_handle == 0 {
2788 None
2789 } else {
2790 let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2791
2792 Some(TableId::<WaitableSet>::new(set))
2793 };
2794
2795 log::trace!(
2796 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2797 );
2798
2799 waitable.join(store.concurrent_state_mut(), set)
2800 }
2801
2802 pub(crate) fn subtask_drop(
2804 self,
2805 store: &mut StoreOpaque,
2806 caller_instance: RuntimeComponentInstanceIndex,
2807 task_id: u32,
2808 ) -> Result<()> {
2809 self.id().get_mut(store).check_may_leave(caller_instance)?;
2810 self.waitable_join(store, caller_instance, task_id, 0)?;
2811
2812 let (rep, is_host) =
2813 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2814
2815 let concurrent_state = store.concurrent_state_mut();
2816 let (waitable, expected_caller_instance, delete) = if is_host {
2817 let id = TableId::<HostTask>::new(rep);
2818 let task = concurrent_state.get_mut(id)?;
2819 if task.join_handle.is_some() {
2820 bail!("cannot drop a subtask which has not yet resolved");
2821 }
2822 (Waitable::Host(id), task.caller_instance, true)
2823 } else {
2824 let id = TableId::<GuestTask>::new(rep);
2825 let task = concurrent_state.get_mut(id)?;
2826 if task.lift_result.is_some() {
2827 bail!("cannot drop a subtask which has not yet resolved");
2828 }
2829 if let Caller::Guest { instance, .. } = &task.caller {
2830 (Waitable::Guest(id), *instance, task.exited)
2831 } else {
2832 unreachable!()
2833 }
2834 };
2835
2836 waitable.common(concurrent_state)?.handle = None;
2837
2838 if waitable.take_event(concurrent_state)?.is_some() {
2839 bail!("cannot drop a subtask with an undelivered event");
2840 }
2841
2842 if delete {
2843 waitable.delete_from(concurrent_state)?;
2844 }
2845
2846 assert_eq!(expected_caller_instance, caller_instance);
2850 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2851 Ok(())
2852 }
2853
2854 pub(crate) fn waitable_set_wait(
2856 self,
2857 store: &mut StoreOpaque,
2858 caller: RuntimeComponentInstanceIndex,
2859 options: OptionsIndex,
2860 set: u32,
2861 payload: u32,
2862 ) -> Result<u32> {
2863 self.id().get(store).check_may_leave(caller)?;
2864 let &CanonicalOptions {
2865 cancellable,
2866 instance: caller_instance,
2867 ..
2868 } = &self.id().get(store).component().env_component().options[options];
2869 let rep =
2870 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2871
2872 self.waitable_check(
2873 store,
2874 cancellable,
2875 WaitableCheck::Wait(WaitableCheckParams {
2876 set: TableId::new(rep),
2877 options,
2878 payload,
2879 }),
2880 )
2881 }
2882
2883 pub(crate) fn waitable_set_poll(
2885 self,
2886 store: &mut StoreOpaque,
2887 caller: RuntimeComponentInstanceIndex,
2888 options: OptionsIndex,
2889 set: u32,
2890 payload: u32,
2891 ) -> Result<u32> {
2892 self.id().get(store).check_may_leave(caller)?;
2893 let &CanonicalOptions {
2894 cancellable,
2895 instance: caller_instance,
2896 ..
2897 } = &self.id().get(store).component().env_component().options[options];
2898 let rep =
2899 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2900
2901 self.waitable_check(
2902 store,
2903 cancellable,
2904 WaitableCheck::Poll(WaitableCheckParams {
2905 set: TableId::new(rep),
2906 options,
2907 payload,
2908 }),
2909 )
2910 }
2911
2912 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2914 let thread_id = store
2915 .concurrent_state_mut()
2916 .guest_thread
2917 .ok_or_else(|| anyhow!("no current thread"))?
2918 .thread;
2919 Ok(store
2921 .concurrent_state_mut()
2922 .get_mut(thread_id)?
2923 .instance_rep
2924 .unwrap())
2925 }
2926
2927 pub(crate) fn thread_new_indirect<T: 'static>(
2929 self,
2930 mut store: StoreContextMut<T>,
2931 runtime_instance: RuntimeComponentInstanceIndex,
2932 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
2934 start_func_idx: u32,
2935 context: i32,
2936 ) -> Result<u32> {
2937 self.id().get(store.0).check_may_leave(runtime_instance)?;
2938
2939 log::trace!("creating new thread");
2940
2941 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
2942 let instance = self.id().get_mut(store.0);
2943 let callee = instance
2944 .index_runtime_func_table(start_func_table_idx, start_func_idx as u64)?
2945 .ok_or_else(|| {
2946 anyhow!("the start function index points to an uninitialized function")
2947 })?;
2948 if callee.type_index(store.0) != start_func_ty.type_index() {
2949 bail!(
2950 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
2951 );
2952 }
2953
2954 let token = StoreToken::new(store.as_context_mut());
2955 let start_func = Box::new(
2956 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
2957 let old_thread = store
2958 .concurrent_state_mut()
2959 .guest_thread
2960 .replace(guest_thread);
2961 log::trace!(
2962 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
2963 );
2964
2965 store.maybe_push_call_context(guest_thread.task)?;
2966
2967 let mut store = token.as_context_mut(store);
2968 let mut params = [ValRaw::i32(context)];
2969 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
2972
2973 store.0.maybe_pop_call_context(guest_thread.task)?;
2974
2975 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
2976 log::trace!("explicit thread {guest_thread:?} completed");
2977 let state = store.0.concurrent_state_mut();
2978 let task = state.get_mut(guest_thread.task)?;
2979 if task.threads.is_empty() && !task.returned_or_cancelled() {
2980 bail!(Trap::NoAsyncResult);
2981 }
2982 state.guest_thread = old_thread;
2983 old_thread
2984 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2985 if state.get_mut(guest_thread.task)?.ready_to_delete() {
2986 Waitable::Guest(guest_thread.task).delete_from(state)?;
2987 }
2988 log::trace!("thread start: restored {old_thread:?} as current thread");
2989
2990 Ok(())
2991 },
2992 );
2993
2994 let state = store.0.concurrent_state_mut();
2995 let current_thread = state.guest_thread.unwrap();
2996 let parent_task = current_thread.task;
2997
2998 let new_thread = GuestThread::new_explicit(parent_task, start_func);
2999 let thread_id = state.push(new_thread)?;
3000 state.get_mut(parent_task)?.threads.insert(thread_id);
3001
3002 log::trace!("new thread with id {thread_id:?} created");
3003
3004 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3005 }
3006
3007 pub(crate) fn resume_suspended_thread(
3008 self,
3009 store: &mut StoreOpaque,
3010 runtime_instance: RuntimeComponentInstanceIndex,
3011 thread_idx: u32,
3012 high_priority: bool,
3013 ) -> Result<()> {
3014 let thread_id =
3015 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3016 let state = store.concurrent_state_mut();
3017 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3018 let thread = state.get_mut(guest_thread.thread)?;
3019
3020 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3021 GuestThreadState::NotStartedExplicit(start_func) => {
3022 log::trace!("starting thread {guest_thread:?}");
3023 let guest_call = WorkItem::GuestCall(GuestCall {
3024 thread: guest_thread,
3025 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3026 start_func(store, guest_thread)
3027 })),
3028 });
3029 store
3030 .concurrent_state_mut()
3031 .push_work_item(guest_call, high_priority);
3032 }
3033 GuestThreadState::Suspended(fiber) => {
3034 log::trace!("resuming thread {thread_id:?} that was suspended");
3035 store
3036 .concurrent_state_mut()
3037 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3038 }
3039 _ => {
3040 bail!("cannot resume thread which is not suspended");
3041 }
3042 }
3043 Ok(())
3044 }
3045
3046 fn add_guest_thread_to_instance_table(
3047 self,
3048 thread_id: TableId<GuestThread>,
3049 store: &mut StoreOpaque,
3050 runtime_instance: RuntimeComponentInstanceIndex,
3051 ) -> Result<u32> {
3052 let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3053 .guest_thread_insert(thread_id.rep())?;
3054 store
3055 .concurrent_state_mut()
3056 .get_mut(thread_id)?
3057 .instance_rep = Some(guest_id);
3058 Ok(guest_id)
3059 }
3060
3061 pub(crate) fn suspension_intrinsic(
3064 self,
3065 store: &mut StoreOpaque,
3066 caller: RuntimeComponentInstanceIndex,
3067 cancellable: bool,
3068 yielding: bool,
3069 to_thread: Option<u32>,
3070 ) -> Result<WaitResult> {
3071 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3073 return Ok(WaitResult::Cancelled);
3074 }
3075
3076 self.id().get(store).check_may_leave(caller)?;
3077
3078 if let Some(thread) = to_thread {
3079 self.resume_suspended_thread(store, caller, thread, true)?;
3080 }
3081
3082 let state = store.concurrent_state_mut();
3083 let guest_thread = state.guest_thread.unwrap();
3084 let reason = if yielding {
3085 SuspendReason::Yielding {
3086 thread: guest_thread,
3087 }
3088 } else {
3089 SuspendReason::ExplicitlySuspending {
3090 thread: guest_thread,
3091 }
3092 };
3093
3094 store.suspend(reason)?;
3095
3096 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3097 Ok(WaitResult::Cancelled)
3098 } else {
3099 Ok(WaitResult::Completed)
3100 }
3101 }
3102
3103 fn waitable_check(
3105 self,
3106 store: &mut StoreOpaque,
3107 cancellable: bool,
3108 check: WaitableCheck,
3109 ) -> Result<u32> {
3110 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3111
3112 let (wait, set) = match &check {
3113 WaitableCheck::Wait(params) => (true, Some(params.set)),
3114 WaitableCheck::Poll(params) => (false, Some(params.set)),
3115 };
3116
3117 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3118 store.suspend(SuspendReason::Yielding {
3120 thread: guest_thread,
3121 })?;
3122
3123 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3124
3125 let state = store.concurrent_state_mut();
3126 let task = state.get_mut(guest_thread.task)?;
3127
3128 if wait {
3131 let set = set.unwrap();
3132
3133 if (task.event.is_none()
3134 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3135 && state.get_mut(set)?.ready.is_empty()
3136 {
3137 if cancellable {
3138 let old = state
3139 .get_mut(guest_thread.thread)?
3140 .wake_on_cancel
3141 .replace(set);
3142 assert!(old.is_none());
3143 }
3144
3145 store.suspend(SuspendReason::Waiting {
3146 set,
3147 thread: guest_thread,
3148 })?;
3149 }
3150 }
3151
3152 log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3153
3154 let result = match check {
3155 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3157 let event =
3158 self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3159
3160 let (ordinal, handle, result) = if wait {
3161 let (event, waitable) = event.unwrap();
3162 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3163 let (ordinal, result) = event.parts();
3164 (ordinal, handle, result)
3165 } else {
3166 if let Some((event, waitable)) = event {
3167 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3168 let (ordinal, result) = event.parts();
3169 (ordinal, handle, result)
3170 } else {
3171 log::trace!(
3172 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3173 guest_thread.task,
3174 params.set
3175 );
3176 let (ordinal, result) = Event::None.parts();
3177 (ordinal, 0, result)
3178 }
3179 };
3180 let options = Options::new_index(store, self, params.options);
3181 let ptr = func::validate_inbounds::<(u32, u32)>(
3182 options.memory_mut(store),
3183 &ValRaw::u32(params.payload),
3184 )?;
3185 options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3186 options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3187 Ok(ordinal)
3188 }
3189 };
3190
3191 result
3192 }
3193
3194 pub(crate) fn subtask_cancel(
3196 self,
3197 store: &mut StoreOpaque,
3198 caller_instance: RuntimeComponentInstanceIndex,
3199 async_: bool,
3200 task_id: u32,
3201 ) -> Result<u32> {
3202 self.id().get(store).check_may_leave(caller_instance)?;
3203 let (rep, is_host) =
3204 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3205 let (waitable, expected_caller_instance) = if is_host {
3206 let id = TableId::<HostTask>::new(rep);
3207 (
3208 Waitable::Host(id),
3209 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3210 )
3211 } else {
3212 let id = TableId::<GuestTask>::new(rep);
3213 if let Caller::Guest { instance, .. } =
3214 &store.concurrent_state_mut().get_mut(id)?.caller
3215 {
3216 (Waitable::Guest(id), *instance)
3217 } else {
3218 unreachable!()
3219 }
3220 };
3221 assert_eq!(expected_caller_instance, caller_instance);
3225
3226 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3227
3228 let concurrent_state = store.concurrent_state_mut();
3229 if let Waitable::Host(host_task) = waitable {
3230 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3231 handle.abort();
3232 return Ok(Status::ReturnCancelled as u32);
3233 }
3234 } else {
3235 let caller = concurrent_state.guest_thread.unwrap();
3236 let guest_task = TableId::<GuestTask>::new(rep);
3237 let task = concurrent_state.get_mut(guest_task)?;
3238 if !task.already_lowered_parameters() {
3239 task.lower_params = None;
3240 task.lift_result = None;
3241
3242 let callee_instance = task.instance;
3244
3245 let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3246 let pending_count = pending.len();
3247 pending.retain(|thread, _| thread.task != guest_task);
3248 if pending.len() == pending_count {
3250 bail!("`subtask.cancel` called after terminal status delivered");
3251 }
3252 return Ok(Status::StartCancelled as u32);
3253 } else if !task.returned_or_cancelled() {
3254 task.cancel_sent = true;
3257 task.event = Some(Event::Cancelled);
3262 for thread in task.threads.clone() {
3263 let thread = QualifiedThreadId {
3264 task: guest_task,
3265 thread,
3266 };
3267 if let Some(set) = concurrent_state
3268 .get_mut(thread.thread)
3269 .unwrap()
3270 .wake_on_cancel
3271 .take()
3272 {
3273 let item = match concurrent_state
3274 .get_mut(set)?
3275 .waiting
3276 .remove(&thread)
3277 .unwrap()
3278 {
3279 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3280 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3281 thread,
3282 kind: GuestCallKind::DeliverEvent {
3283 instance,
3284 set: None,
3285 },
3286 }),
3287 };
3288 concurrent_state.push_high_priority(item);
3289
3290 store.suspend(SuspendReason::Yielding { thread: caller })?;
3291 break;
3292 }
3293 }
3294
3295 let concurrent_state = store.concurrent_state_mut();
3296 let task = concurrent_state.get_mut(guest_task)?;
3297 if !task.returned_or_cancelled() {
3298 if async_ {
3299 return Ok(BLOCKED);
3300 } else {
3301 store.wait_for_event(Waitable::Guest(guest_task))?;
3302 }
3303 }
3304 }
3305 }
3306
3307 let event = waitable.take_event(store.concurrent_state_mut())?;
3308 if let Some(Event::Subtask {
3309 status: status @ (Status::Returned | Status::ReturnCancelled),
3310 }) = event
3311 {
3312 Ok(status as u32)
3313 } else {
3314 bail!("`subtask.cancel` called after terminal status delivered");
3315 }
3316 }
3317
3318 pub(crate) fn context_get(
3319 self,
3320 store: &mut StoreOpaque,
3321 caller: RuntimeComponentInstanceIndex,
3322 slot: u32,
3323 ) -> Result<u32> {
3324 self.id().get(store).check_may_leave(caller)?;
3325 store.concurrent_state_mut().context_get(slot)
3326 }
3327
3328 pub(crate) fn context_set(
3329 self,
3330 store: &mut StoreOpaque,
3331 caller: RuntimeComponentInstanceIndex,
3332 slot: u32,
3333 value: u32,
3334 ) -> Result<()> {
3335 self.id().get(store).check_may_leave(caller)?;
3336 store.concurrent_state_mut().context_set(slot, value)
3337 }
3338}
3339
3340pub trait VMComponentAsyncStore {
3348 unsafe fn prepare_call(
3354 &mut self,
3355 instance: Instance,
3356 memory: *mut VMMemoryDefinition,
3357 start: *mut VMFuncRef,
3358 return_: *mut VMFuncRef,
3359 caller_instance: RuntimeComponentInstanceIndex,
3360 callee_instance: RuntimeComponentInstanceIndex,
3361 task_return_type: TypeTupleIndex,
3362 string_encoding: u8,
3363 result_count: u32,
3364 storage: *mut ValRaw,
3365 storage_len: usize,
3366 ) -> Result<()>;
3367
3368 unsafe fn sync_start(
3371 &mut self,
3372 instance: Instance,
3373 callback: *mut VMFuncRef,
3374 callee: *mut VMFuncRef,
3375 param_count: u32,
3376 storage: *mut MaybeUninit<ValRaw>,
3377 storage_len: usize,
3378 ) -> Result<()>;
3379
3380 unsafe fn async_start(
3383 &mut self,
3384 instance: Instance,
3385 callback: *mut VMFuncRef,
3386 post_return: *mut VMFuncRef,
3387 callee: *mut VMFuncRef,
3388 param_count: u32,
3389 result_count: u32,
3390 flags: u32,
3391 ) -> Result<u32>;
3392
3393 fn future_write(
3395 &mut self,
3396 instance: Instance,
3397 caller: RuntimeComponentInstanceIndex,
3398 ty: TypeFutureTableIndex,
3399 options: OptionsIndex,
3400 future: u32,
3401 address: u32,
3402 ) -> Result<u32>;
3403
3404 fn future_read(
3406 &mut self,
3407 instance: Instance,
3408 caller: RuntimeComponentInstanceIndex,
3409 ty: TypeFutureTableIndex,
3410 options: OptionsIndex,
3411 future: u32,
3412 address: u32,
3413 ) -> Result<u32>;
3414
3415 fn future_drop_writable(
3417 &mut self,
3418 instance: Instance,
3419 caller: RuntimeComponentInstanceIndex,
3420 ty: TypeFutureTableIndex,
3421 writer: u32,
3422 ) -> Result<()>;
3423
3424 fn stream_write(
3426 &mut self,
3427 instance: Instance,
3428 caller: RuntimeComponentInstanceIndex,
3429 ty: TypeStreamTableIndex,
3430 options: OptionsIndex,
3431 stream: u32,
3432 address: u32,
3433 count: u32,
3434 ) -> Result<u32>;
3435
3436 fn stream_read(
3438 &mut self,
3439 instance: Instance,
3440 caller: RuntimeComponentInstanceIndex,
3441 ty: TypeStreamTableIndex,
3442 options: OptionsIndex,
3443 stream: u32,
3444 address: u32,
3445 count: u32,
3446 ) -> Result<u32>;
3447
3448 fn flat_stream_write(
3451 &mut self,
3452 instance: Instance,
3453 caller: RuntimeComponentInstanceIndex,
3454 ty: TypeStreamTableIndex,
3455 options: OptionsIndex,
3456 payload_size: u32,
3457 payload_align: u32,
3458 stream: u32,
3459 address: u32,
3460 count: u32,
3461 ) -> Result<u32>;
3462
3463 fn flat_stream_read(
3466 &mut self,
3467 instance: Instance,
3468 caller: RuntimeComponentInstanceIndex,
3469 ty: TypeStreamTableIndex,
3470 options: OptionsIndex,
3471 payload_size: u32,
3472 payload_align: u32,
3473 stream: u32,
3474 address: u32,
3475 count: u32,
3476 ) -> Result<u32>;
3477
3478 fn stream_drop_writable(
3480 &mut self,
3481 instance: Instance,
3482 caller: RuntimeComponentInstanceIndex,
3483 ty: TypeStreamTableIndex,
3484 writer: u32,
3485 ) -> Result<()>;
3486
3487 fn error_context_debug_message(
3489 &mut self,
3490 instance: Instance,
3491 caller: RuntimeComponentInstanceIndex,
3492 ty: TypeComponentLocalErrorContextTableIndex,
3493 options: OptionsIndex,
3494 err_ctx_handle: u32,
3495 debug_msg_address: u32,
3496 ) -> Result<()>;
3497
3498 fn thread_new_indirect(
3500 &mut self,
3501 instance: Instance,
3502 caller: RuntimeComponentInstanceIndex,
3503 func_ty_idx: TypeFuncIndex,
3504 start_func_table_idx: RuntimeTableIndex,
3505 start_func_idx: u32,
3506 context: i32,
3507 ) -> Result<u32>;
3508}
3509
3510impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3512 unsafe fn prepare_call(
3513 &mut self,
3514 instance: Instance,
3515 memory: *mut VMMemoryDefinition,
3516 start: *mut VMFuncRef,
3517 return_: *mut VMFuncRef,
3518 caller_instance: RuntimeComponentInstanceIndex,
3519 callee_instance: RuntimeComponentInstanceIndex,
3520 task_return_type: TypeTupleIndex,
3521 string_encoding: u8,
3522 result_count_or_max_if_async: u32,
3523 storage: *mut ValRaw,
3524 storage_len: usize,
3525 ) -> Result<()> {
3526 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3530
3531 unsafe {
3532 instance.prepare_call(
3533 StoreContextMut(self),
3534 start,
3535 return_,
3536 caller_instance,
3537 callee_instance,
3538 task_return_type,
3539 memory,
3540 string_encoding,
3541 match result_count_or_max_if_async {
3542 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3543 params,
3544 has_result: false,
3545 },
3546 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3547 params,
3548 has_result: true,
3549 },
3550 result_count => CallerInfo::Sync {
3551 params,
3552 result_count,
3553 },
3554 },
3555 )
3556 }
3557 }
3558
3559 unsafe fn sync_start(
3560 &mut self,
3561 instance: Instance,
3562 callback: *mut VMFuncRef,
3563 callee: *mut VMFuncRef,
3564 param_count: u32,
3565 storage: *mut MaybeUninit<ValRaw>,
3566 storage_len: usize,
3567 ) -> Result<()> {
3568 unsafe {
3569 instance
3570 .start_call(
3571 StoreContextMut(self),
3572 callback,
3573 ptr::null_mut(),
3574 callee,
3575 param_count,
3576 1,
3577 START_FLAG_ASYNC_CALLEE,
3578 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3582 )
3583 .map(drop)
3584 }
3585 }
3586
3587 unsafe fn async_start(
3588 &mut self,
3589 instance: Instance,
3590 callback: *mut VMFuncRef,
3591 post_return: *mut VMFuncRef,
3592 callee: *mut VMFuncRef,
3593 param_count: u32,
3594 result_count: u32,
3595 flags: u32,
3596 ) -> Result<u32> {
3597 unsafe {
3598 instance.start_call(
3599 StoreContextMut(self),
3600 callback,
3601 post_return,
3602 callee,
3603 param_count,
3604 result_count,
3605 flags,
3606 None,
3607 )
3608 }
3609 }
3610
3611 fn future_write(
3612 &mut self,
3613 instance: Instance,
3614 caller: RuntimeComponentInstanceIndex,
3615 ty: TypeFutureTableIndex,
3616 options: OptionsIndex,
3617 future: u32,
3618 address: u32,
3619 ) -> Result<u32> {
3620 instance.id().get(self).check_may_leave(caller)?;
3621 instance
3622 .guest_write(
3623 StoreContextMut(self),
3624 TransmitIndex::Future(ty),
3625 options,
3626 None,
3627 future,
3628 address,
3629 1,
3630 )
3631 .map(|result| result.encode())
3632 }
3633
3634 fn future_read(
3635 &mut self,
3636 instance: Instance,
3637 caller: RuntimeComponentInstanceIndex,
3638 ty: TypeFutureTableIndex,
3639 options: OptionsIndex,
3640 future: u32,
3641 address: u32,
3642 ) -> Result<u32> {
3643 instance.id().get(self).check_may_leave(caller)?;
3644 instance
3645 .guest_read(
3646 StoreContextMut(self),
3647 TransmitIndex::Future(ty),
3648 options,
3649 None,
3650 future,
3651 address,
3652 1,
3653 )
3654 .map(|result| result.encode())
3655 }
3656
3657 fn stream_write(
3658 &mut self,
3659 instance: Instance,
3660 caller: RuntimeComponentInstanceIndex,
3661 ty: TypeStreamTableIndex,
3662 options: OptionsIndex,
3663 stream: u32,
3664 address: u32,
3665 count: u32,
3666 ) -> Result<u32> {
3667 instance.id().get(self).check_may_leave(caller)?;
3668 instance
3669 .guest_write(
3670 StoreContextMut(self),
3671 TransmitIndex::Stream(ty),
3672 options,
3673 None,
3674 stream,
3675 address,
3676 count,
3677 )
3678 .map(|result| result.encode())
3679 }
3680
3681 fn stream_read(
3682 &mut self,
3683 instance: Instance,
3684 caller: RuntimeComponentInstanceIndex,
3685 ty: TypeStreamTableIndex,
3686 options: OptionsIndex,
3687 stream: u32,
3688 address: u32,
3689 count: u32,
3690 ) -> Result<u32> {
3691 instance.id().get(self).check_may_leave(caller)?;
3692 instance
3693 .guest_read(
3694 StoreContextMut(self),
3695 TransmitIndex::Stream(ty),
3696 options,
3697 None,
3698 stream,
3699 address,
3700 count,
3701 )
3702 .map(|result| result.encode())
3703 }
3704
3705 fn future_drop_writable(
3706 &mut self,
3707 instance: Instance,
3708 caller: RuntimeComponentInstanceIndex,
3709 ty: TypeFutureTableIndex,
3710 writer: u32,
3711 ) -> Result<()> {
3712 instance.id().get(self).check_may_leave(caller)?;
3713 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3714 }
3715
3716 fn flat_stream_write(
3717 &mut self,
3718 instance: Instance,
3719 caller: RuntimeComponentInstanceIndex,
3720 ty: TypeStreamTableIndex,
3721 options: OptionsIndex,
3722 payload_size: u32,
3723 payload_align: u32,
3724 stream: u32,
3725 address: u32,
3726 count: u32,
3727 ) -> Result<u32> {
3728 instance.id().get(self).check_may_leave(caller)?;
3729 instance
3730 .guest_write(
3731 StoreContextMut(self),
3732 TransmitIndex::Stream(ty),
3733 options,
3734 Some(FlatAbi {
3735 size: payload_size,
3736 align: payload_align,
3737 }),
3738 stream,
3739 address,
3740 count,
3741 )
3742 .map(|result| result.encode())
3743 }
3744
3745 fn flat_stream_read(
3746 &mut self,
3747 instance: Instance,
3748 caller: RuntimeComponentInstanceIndex,
3749 ty: TypeStreamTableIndex,
3750 options: OptionsIndex,
3751 payload_size: u32,
3752 payload_align: u32,
3753 stream: u32,
3754 address: u32,
3755 count: u32,
3756 ) -> Result<u32> {
3757 instance.id().get(self).check_may_leave(caller)?;
3758 instance
3759 .guest_read(
3760 StoreContextMut(self),
3761 TransmitIndex::Stream(ty),
3762 options,
3763 Some(FlatAbi {
3764 size: payload_size,
3765 align: payload_align,
3766 }),
3767 stream,
3768 address,
3769 count,
3770 )
3771 .map(|result| result.encode())
3772 }
3773
3774 fn stream_drop_writable(
3775 &mut self,
3776 instance: Instance,
3777 caller: RuntimeComponentInstanceIndex,
3778 ty: TypeStreamTableIndex,
3779 writer: u32,
3780 ) -> Result<()> {
3781 instance.id().get(self).check_may_leave(caller)?;
3782 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3783 }
3784
3785 fn error_context_debug_message(
3786 &mut self,
3787 instance: Instance,
3788 caller: RuntimeComponentInstanceIndex,
3789 ty: TypeComponentLocalErrorContextTableIndex,
3790 options: OptionsIndex,
3791 err_ctx_handle: u32,
3792 debug_msg_address: u32,
3793 ) -> Result<()> {
3794 instance.id().get(self).check_may_leave(caller)?;
3795 instance.error_context_debug_message(
3796 StoreContextMut(self),
3797 ty,
3798 options,
3799 err_ctx_handle,
3800 debug_msg_address,
3801 )
3802 }
3803
3804 fn thread_new_indirect(
3805 &mut self,
3806 instance: Instance,
3807 caller: RuntimeComponentInstanceIndex,
3808 func_ty_idx: TypeFuncIndex,
3809 start_func_table_idx: RuntimeTableIndex,
3810 start_func_idx: u32,
3811 context: i32,
3812 ) -> Result<u32> {
3813 instance.thread_new_indirect(
3814 StoreContextMut(self),
3815 caller,
3816 func_ty_idx,
3817 start_func_table_idx,
3818 start_func_idx,
3819 context,
3820 )
3821 }
3822}
3823
3824type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3825
3826struct HostTask {
3828 common: WaitableCommon,
3829 caller_instance: RuntimeComponentInstanceIndex,
3830 join_handle: Option<JoinHandle>,
3831}
3832
3833impl HostTask {
3834 fn new(
3835 caller_instance: RuntimeComponentInstanceIndex,
3836 join_handle: Option<JoinHandle>,
3837 ) -> Self {
3838 Self {
3839 common: WaitableCommon::default(),
3840 caller_instance,
3841 join_handle,
3842 }
3843 }
3844}
3845
3846impl TableDebug for HostTask {
3847 fn type_name() -> &'static str {
3848 "HostTask"
3849 }
3850}
3851
3852type CallbackFn = Box<
3853 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3854 + Send
3855 + Sync
3856 + 'static,
3857>;
3858
3859enum Caller {
3861 Host {
3863 tx: Option<oneshot::Sender<LiftedResult>>,
3865 exit_tx: Arc<oneshot::Sender<()>>,
3872 host_future_present: bool,
3875 call_post_return_automatically: bool,
3877 },
3878 Guest {
3880 thread: QualifiedThreadId,
3882 instance: RuntimeComponentInstanceIndex,
3888 },
3889}
3890
3891struct LiftResult {
3894 lift: RawLift,
3895 ty: TypeTupleIndex,
3896 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3897 string_encoding: StringEncoding,
3898}
3899
3900#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3905struct QualifiedThreadId {
3906 task: TableId<GuestTask>,
3907 thread: TableId<GuestThread>,
3908}
3909
3910impl QualifiedThreadId {
3911 fn qualify(
3912 state: &mut ConcurrentState,
3913 thread: TableId<GuestThread>,
3914 ) -> Result<QualifiedThreadId> {
3915 Ok(QualifiedThreadId {
3916 task: state.get_mut(thread)?.parent_task,
3917 thread,
3918 })
3919 }
3920}
3921
3922impl fmt::Debug for QualifiedThreadId {
3923 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3924 f.debug_tuple("QualifiedThreadId")
3925 .field(&self.task.rep())
3926 .field(&self.thread.rep())
3927 .finish()
3928 }
3929}
3930
3931enum GuestThreadState {
3932 NotStartedImplicit,
3933 NotStartedExplicit(
3934 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
3935 ),
3936 Running,
3937 Suspended(StoreFiber<'static>),
3938 Pending,
3939 Completed,
3940}
3941pub struct GuestThread {
3942 context: [u32; 2],
3945 parent_task: TableId<GuestTask>,
3947 wake_on_cancel: Option<TableId<WaitableSet>>,
3950 state: GuestThreadState,
3952 instance_rep: Option<u32>,
3955}
3956
3957impl GuestThread {
3958 fn from_instance(
3961 state: Pin<&mut ComponentInstance>,
3962 caller_instance: RuntimeComponentInstanceIndex,
3963 guest_thread: u32,
3964 ) -> Result<TableId<Self>> {
3965 let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
3966 Ok(TableId::new(rep))
3967 }
3968
3969 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
3970 Self {
3971 context: [0; 2],
3972 parent_task,
3973 wake_on_cancel: None,
3974 state: GuestThreadState::NotStartedImplicit,
3975 instance_rep: None,
3976 }
3977 }
3978
3979 fn new_explicit(
3980 parent_task: TableId<GuestTask>,
3981 start_func: Box<
3982 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
3983 >,
3984 ) -> Self {
3985 Self {
3986 context: [0; 2],
3987 parent_task,
3988 wake_on_cancel: None,
3989 state: GuestThreadState::NotStartedExplicit(start_func),
3990 instance_rep: None,
3991 }
3992 }
3993}
3994
3995impl TableDebug for GuestThread {
3996 fn type_name() -> &'static str {
3997 "GuestThread"
3998 }
3999}
4000
4001enum SyncResult {
4002 NotProduced,
4003 Produced(Option<ValRaw>),
4004 Taken,
4005}
4006
4007impl SyncResult {
4008 fn take(&mut self) -> Option<Option<ValRaw>> {
4009 match mem::replace(self, SyncResult::Taken) {
4010 SyncResult::NotProduced => None,
4011 SyncResult::Produced(val) => Some(val),
4012 SyncResult::Taken => {
4013 panic!("attempted to take a synchronous result that was already taken")
4014 }
4015 }
4016 }
4017}
4018
4019#[derive(Debug)]
4020enum HostFutureState {
4021 NotApplicable,
4022 Live,
4023 Dropped,
4024}
4025
4026pub(crate) struct GuestTask {
4028 common: WaitableCommon,
4030 lower_params: Option<RawLower>,
4032 lift_result: Option<LiftResult>,
4034 result: Option<LiftedResult>,
4037 callback: Option<CallbackFn>,
4040 caller: Caller,
4042 call_context: Option<CallContext>,
4045 sync_result: SyncResult,
4048 cancel_sent: bool,
4051 starting_sent: bool,
4054 subtasks: HashSet<TableId<GuestTask>>,
4059 sync_call_set: TableId<WaitableSet>,
4061 instance: RuntimeComponentInstanceIndex,
4067 event: Option<Event>,
4070 function_index: Option<ExportIndex>,
4072 exited: bool,
4074 threads: HashSet<TableId<GuestThread>>,
4076 host_future_state: HostFutureState,
4079}
4080
4081impl GuestTask {
4082 fn already_lowered_parameters(&self) -> bool {
4083 self.lower_params.is_none()
4085 }
4086 fn returned_or_cancelled(&self) -> bool {
4087 self.lift_result.is_none()
4089 }
4090 fn ready_to_delete(&self) -> bool {
4091 let threads_completed = self.threads.is_empty();
4092 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4093 let pending_completion_event = matches!(
4094 self.common.event,
4095 Some(Event::Subtask {
4096 status: Status::Returned | Status::ReturnCancelled
4097 })
4098 );
4099 let ready = threads_completed
4100 && !has_sync_result
4101 && !pending_completion_event
4102 && !matches!(self.host_future_state, HostFutureState::Live);
4103 log::trace!(
4104 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4105 threads_completed,
4106 has_sync_result,
4107 pending_completion_event,
4108 self.host_future_state
4109 );
4110 ready
4111 }
4112 fn new(
4113 state: &mut ConcurrentState,
4114 lower_params: RawLower,
4115 lift_result: LiftResult,
4116 caller: Caller,
4117 callback: Option<CallbackFn>,
4118 component_instance: RuntimeComponentInstanceIndex,
4119 ) -> Result<Self> {
4120 let sync_call_set = state.push(WaitableSet::default())?;
4121 let host_future_state = match &caller {
4122 Caller::Guest { .. } => HostFutureState::NotApplicable,
4123 Caller::Host {
4124 host_future_present,
4125 ..
4126 } => {
4127 if *host_future_present {
4128 HostFutureState::Live
4129 } else {
4130 HostFutureState::NotApplicable
4131 }
4132 }
4133 };
4134 Ok(Self {
4135 common: WaitableCommon::default(),
4136 lower_params: Some(lower_params),
4137 lift_result: Some(lift_result),
4138 result: None,
4139 callback,
4140 caller,
4141 call_context: Some(CallContext::default()),
4142 sync_result: SyncResult::NotProduced,
4143 cancel_sent: false,
4144 starting_sent: false,
4145 subtasks: HashSet::new(),
4146 sync_call_set,
4147 instance: component_instance,
4148 event: None,
4149 function_index: None,
4150 exited: false,
4151 threads: HashSet::new(),
4152 host_future_state,
4153 })
4154 }
4155
4156 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4159 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4162 if let Some(Event::Subtask {
4163 status: Status::Returned | Status::ReturnCancelled,
4164 }) = waitable.common(state)?.event
4165 {
4166 waitable.delete_from(state)?;
4167 }
4168 }
4169
4170 assert!(self.threads.is_empty());
4171
4172 state.delete(self.sync_call_set)?;
4173
4174 match &self.caller {
4176 Caller::Guest {
4177 thread,
4178 instance: runtime_instance,
4179 } => {
4180 let task_mut = state.get_mut(thread.task)?;
4181 let present = task_mut.subtasks.remove(&me);
4182 assert!(present);
4183
4184 for subtask in &self.subtasks {
4185 task_mut.subtasks.insert(*subtask);
4186 }
4187
4188 for subtask in &self.subtasks {
4189 state.get_mut(*subtask)?.caller = Caller::Guest {
4190 thread: *thread,
4191 instance: *runtime_instance,
4192 };
4193 }
4194 }
4195 Caller::Host { exit_tx, .. } => {
4196 for subtask in &self.subtasks {
4197 state.get_mut(*subtask)?.caller = Caller::Host {
4198 tx: None,
4199 exit_tx: exit_tx.clone(),
4203 host_future_present: false,
4204 call_post_return_automatically: true,
4205 };
4206 }
4207 }
4208 }
4209
4210 for subtask in self.subtasks {
4211 if state.get_mut(subtask)?.exited {
4212 Waitable::Guest(subtask).delete_from(state)?;
4213 }
4214 }
4215
4216 Ok(())
4217 }
4218
4219 fn call_post_return_automatically(&self) -> bool {
4220 matches!(
4221 self.caller,
4222 Caller::Guest { .. }
4223 | Caller::Host {
4224 call_post_return_automatically: true,
4225 ..
4226 }
4227 )
4228 }
4229}
4230
4231impl TableDebug for GuestTask {
4232 fn type_name() -> &'static str {
4233 "GuestTask"
4234 }
4235}
4236
4237#[derive(Default)]
4239struct WaitableCommon {
4240 event: Option<Event>,
4242 set: Option<TableId<WaitableSet>>,
4244 handle: Option<u32>,
4246}
4247
4248#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4250enum Waitable {
4251 Host(TableId<HostTask>),
4253 Guest(TableId<GuestTask>),
4255 Transmit(TableId<TransmitHandle>),
4257}
4258
4259impl Waitable {
4260 fn from_instance(
4263 state: Pin<&mut ComponentInstance>,
4264 caller_instance: RuntimeComponentInstanceIndex,
4265 waitable: u32,
4266 ) -> Result<Self> {
4267 use crate::runtime::vm::component::Waitable;
4268
4269 let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4270
4271 Ok(match kind {
4272 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4273 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4274 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4275 })
4276 }
4277
4278 fn rep(&self) -> u32 {
4280 match self {
4281 Self::Host(id) => id.rep(),
4282 Self::Guest(id) => id.rep(),
4283 Self::Transmit(id) => id.rep(),
4284 }
4285 }
4286
4287 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4291 log::trace!("waitable {self:?} join set {set:?}",);
4292
4293 let old = mem::replace(&mut self.common(state)?.set, set);
4294
4295 if let Some(old) = old {
4296 match *self {
4297 Waitable::Host(id) => state.remove_child(id, old),
4298 Waitable::Guest(id) => state.remove_child(id, old),
4299 Waitable::Transmit(id) => state.remove_child(id, old),
4300 }?;
4301
4302 state.get_mut(old)?.ready.remove(self);
4303 }
4304
4305 if let Some(set) = set {
4306 match *self {
4307 Waitable::Host(id) => state.add_child(id, set),
4308 Waitable::Guest(id) => state.add_child(id, set),
4309 Waitable::Transmit(id) => state.add_child(id, set),
4310 }?;
4311
4312 if self.common(state)?.event.is_some() {
4313 self.mark_ready(state)?;
4314 }
4315 }
4316
4317 Ok(())
4318 }
4319
4320 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4322 Ok(match self {
4323 Self::Host(id) => &mut state.get_mut(*id)?.common,
4324 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4325 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4326 })
4327 }
4328
4329 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4333 log::trace!("set event for {self:?}: {event:?}");
4334 self.common(state)?.event = event;
4335 self.mark_ready(state)
4336 }
4337
4338 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4340 let common = self.common(state)?;
4341 let event = common.event.take();
4342 if let Some(set) = self.common(state)?.set {
4343 state.get_mut(set)?.ready.remove(self);
4344 }
4345
4346 Ok(event)
4347 }
4348
4349 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4353 if let Some(set) = self.common(state)?.set {
4354 state.get_mut(set)?.ready.insert(*self);
4355 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4356 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4357 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4358
4359 let item = match mode {
4360 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4361 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4362 thread,
4363 kind: GuestCallKind::DeliverEvent {
4364 instance,
4365 set: Some(set),
4366 },
4367 }),
4368 };
4369 state.push_high_priority(item);
4370 }
4371 }
4372 Ok(())
4373 }
4374
4375 fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
4378 match event {
4379 Event::FutureRead {
4380 pending: Some((ty, handle)),
4381 ..
4382 }
4383 | Event::FutureWrite {
4384 pending: Some((ty, handle)),
4385 ..
4386 } => {
4387 let runtime_instance = instance.component().types()[ty].instance;
4388 let (rep, state) = instance.guest_tables().0[runtime_instance]
4389 .future_rep(ty, handle)
4390 .unwrap();
4391 assert_eq!(rep, self.rep());
4392 assert_eq!(*state, TransmitLocalState::Busy);
4393 *state = match event {
4394 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4395 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4396 _ => unreachable!(),
4397 };
4398 }
4399 Event::StreamRead {
4400 pending: Some((ty, handle)),
4401 code,
4402 }
4403 | Event::StreamWrite {
4404 pending: Some((ty, handle)),
4405 code,
4406 } => {
4407 let runtime_instance = instance.component().types()[ty].instance;
4408 let (rep, state) = instance.guest_tables().0[runtime_instance]
4409 .stream_rep(ty, handle)
4410 .unwrap();
4411 assert_eq!(rep, self.rep());
4412 assert_eq!(*state, TransmitLocalState::Busy);
4413 let done = matches!(code, ReturnCode::Dropped(_));
4414 *state = match event {
4415 Event::StreamRead { .. } => TransmitLocalState::Read { done },
4416 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4417 _ => unreachable!(),
4418 };
4419 }
4420 _ => {}
4421 }
4422 }
4423
4424 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4426 match self {
4427 Self::Host(task) => {
4428 log::trace!("delete host task {task:?}");
4429 state.delete(*task)?;
4430 }
4431 Self::Guest(task) => {
4432 log::trace!("delete guest task {task:?}");
4433 state.delete(*task)?.dispose(state, *task)?;
4434 }
4435 Self::Transmit(task) => {
4436 state.delete(*task)?;
4437 }
4438 }
4439
4440 Ok(())
4441 }
4442}
4443
4444impl fmt::Debug for Waitable {
4445 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4446 match self {
4447 Self::Host(id) => write!(f, "{id:?}"),
4448 Self::Guest(id) => write!(f, "{id:?}"),
4449 Self::Transmit(id) => write!(f, "{id:?}"),
4450 }
4451 }
4452}
4453
4454#[derive(Default)]
4456struct WaitableSet {
4457 ready: BTreeSet<Waitable>,
4459 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4461}
4462
4463impl TableDebug for WaitableSet {
4464 fn type_name() -> &'static str {
4465 "WaitableSet"
4466 }
4467}
4468
4469type RawLower =
4471 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4472
4473type RawLift = Box<
4475 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4476>;
4477
4478type LiftedResult = Box<dyn Any + Send + Sync>;
4482
4483struct DummyResult;
4486
4487#[derive(Default)]
4489struct InstanceState {
4490 backpressure: u16,
4492 do_not_enter: bool,
4494 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4497}
4498
4499pub struct ConcurrentState {
4501 guest_thread: Option<QualifiedThreadId>,
4503
4504 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4509 table: AlwaysMut<ResourceTable>,
4511 instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4517 high_priority: Vec<WorkItem>,
4519 low_priority: Vec<WorkItem>,
4521 suspend_reason: Option<SuspendReason>,
4525 worker: Option<StoreFiber<'static>>,
4529 worker_item: Option<WorkerItem>,
4531
4532 global_error_context_ref_counts:
4545 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4546}
4547
4548impl Default for ConcurrentState {
4549 fn default() -> Self {
4550 Self {
4551 guest_thread: None,
4552 table: AlwaysMut::new(ResourceTable::new()),
4553 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4554 instance_states: HashMap::new(),
4555 high_priority: Vec::new(),
4556 low_priority: Vec::new(),
4557 suspend_reason: None,
4558 worker: None,
4559 worker_item: None,
4560 global_error_context_ref_counts: BTreeMap::new(),
4561 }
4562 }
4563}
4564
4565impl ConcurrentState {
4566 pub(crate) fn take_fibers_and_futures(
4583 &mut self,
4584 fibers: &mut Vec<StoreFiber<'static>>,
4585 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4586 ) {
4587 for entry in self.table.get_mut().iter_mut() {
4588 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4589 for mode in mem::take(&mut set.waiting).into_values() {
4590 if let WaitMode::Fiber(fiber) = mode {
4591 fibers.push(fiber);
4592 }
4593 }
4594 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4595 if let GuestThreadState::Suspended(fiber) =
4596 mem::replace(&mut thread.state, GuestThreadState::Completed)
4597 {
4598 fibers.push(fiber);
4599 }
4600 }
4601 }
4602
4603 if let Some(fiber) = self.worker.take() {
4604 fibers.push(fiber);
4605 }
4606
4607 let mut take_items = |list| {
4608 for item in mem::take(list) {
4609 match item {
4610 WorkItem::ResumeFiber(fiber) => {
4611 fibers.push(fiber);
4612 }
4613 WorkItem::PushFuture(future) => {
4614 self.futures
4615 .get_mut()
4616 .as_mut()
4617 .unwrap()
4618 .push(future.into_inner());
4619 }
4620 _ => {}
4621 }
4622 }
4623 };
4624
4625 take_items(&mut self.high_priority);
4626 take_items(&mut self.low_priority);
4627
4628 if let Some(them) = self.futures.get_mut().take() {
4629 futures.push(them);
4630 }
4631 }
4632
4633 fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4634 self.instance_states.entry(instance).or_default()
4635 }
4636
4637 fn push<V: Send + Sync + 'static>(
4638 &mut self,
4639 value: V,
4640 ) -> Result<TableId<V>, ResourceTableError> {
4641 self.table.get_mut().push(value).map(TableId::from)
4642 }
4643
4644 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4645 self.table.get_mut().get_mut(&Resource::from(id))
4646 }
4647
4648 pub fn add_child<T: 'static, U: 'static>(
4649 &mut self,
4650 child: TableId<T>,
4651 parent: TableId<U>,
4652 ) -> Result<(), ResourceTableError> {
4653 self.table
4654 .get_mut()
4655 .add_child(Resource::from(child), Resource::from(parent))
4656 }
4657
4658 pub fn remove_child<T: 'static, U: 'static>(
4659 &mut self,
4660 child: TableId<T>,
4661 parent: TableId<U>,
4662 ) -> Result<(), ResourceTableError> {
4663 self.table
4664 .get_mut()
4665 .remove_child(Resource::from(child), Resource::from(parent))
4666 }
4667
4668 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4669 self.table.get_mut().delete(Resource::from(id))
4670 }
4671
4672 fn push_future(&mut self, future: HostTaskFuture) {
4673 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4680 }
4681
4682 fn push_high_priority(&mut self, item: WorkItem) {
4683 log::trace!("push high priority: {item:?}");
4684 self.high_priority.push(item);
4685 }
4686
4687 fn push_low_priority(&mut self, item: WorkItem) {
4688 log::trace!("push low priority: {item:?}");
4689 self.low_priority.push(item);
4690 }
4691
4692 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4693 if high_priority {
4694 self.push_high_priority(item);
4695 } else {
4696 self.push_low_priority(item);
4697 }
4698 }
4699
4700 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4710 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4711
4712 loop {
4720 let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4721 Caller::Host { .. } => break true,
4722 Caller::Guest { thread, instance } => {
4723 if *instance == guest_instance {
4724 break false;
4725 } else {
4726 *thread
4727 }
4728 }
4729 };
4730 guest_task = next_thread.task;
4731 }
4732 }
4733
4734 fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4738 self.instance_state(instance).do_not_enter = true;
4739 }
4740
4741 fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4745 self.instance_state(instance).do_not_enter = false;
4746 self.partition_pending(instance)
4747 }
4748
4749 fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4754 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4755 let call = GuestCall { thread, kind };
4756 if call.is_ready(self)? {
4757 self.push_high_priority(WorkItem::GuestCall(call));
4758 } else {
4759 self.instance_state(instance)
4760 .pending
4761 .insert(call.thread, call.kind);
4762 }
4763 }
4764
4765 Ok(())
4766 }
4767
4768 pub(crate) fn backpressure_modify(
4770 &mut self,
4771 caller_instance: RuntimeComponentInstanceIndex,
4772 modify: impl FnOnce(u16) -> Option<u16>,
4773 ) -> Result<()> {
4774 let state = self.instance_state(caller_instance);
4775 let old = state.backpressure;
4776 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4777 state.backpressure = new;
4778
4779 if old > 0 && new == 0 {
4780 self.partition_pending(caller_instance)?;
4783 }
4784
4785 Ok(())
4786 }
4787
4788 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4790 let thread = self.guest_thread.unwrap();
4791 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4792 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4793 Ok(val)
4794 }
4795
4796 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4798 let thread = self.guest_thread.unwrap();
4799 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4800 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4801 Ok(())
4802 }
4803
4804 fn take_pending_cancellation(&mut self) -> bool {
4807 let thread = self.guest_thread.unwrap();
4808 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4809 assert!(matches!(event, Event::Cancelled));
4810 true
4811 } else {
4812 false
4813 }
4814 }
4815}
4816
4817fn for_any_lower<
4820 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4821>(
4822 fun: F,
4823) -> F {
4824 fun
4825}
4826
4827fn for_any_lift<
4829 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4830>(
4831 fun: F,
4832) -> F {
4833 fun
4834}
4835
4836fn checked<F: Future + Send + 'static>(
4841 id: StoreId,
4842 fut: F,
4843) -> impl Future<Output = F::Output> + Send + 'static {
4844 async move {
4845 let mut fut = pin!(fut);
4846 future::poll_fn(move |cx| {
4847 let message = "\
4848 `Future`s which depend on asynchronous component tasks, streams, or \
4849 futures to complete may only be polled from the event loop of the \
4850 store to which they belong. Please use \
4851 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4852 ";
4853 tls::try_get(|store| {
4854 let matched = match store {
4855 tls::TryGet::Some(store) => store.id() == id,
4856 tls::TryGet::Taken | tls::TryGet::None => false,
4857 };
4858
4859 if !matched {
4860 panic!("{message}")
4861 }
4862 });
4863 fut.as_mut().poll(cx)
4864 })
4865 .await
4866 }
4867}
4868
4869fn check_recursive_run() {
4872 tls::try_get(|store| {
4873 if !matches!(store, tls::TryGet::None) {
4874 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4875 }
4876 });
4877}
4878
4879fn unpack_callback_code(code: u32) -> (u32, u32) {
4880 (code & 0xF, code >> 4)
4881}
4882
4883struct WaitableCheckParams {
4887 set: TableId<WaitableSet>,
4888 options: OptionsIndex,
4889 payload: u32,
4890}
4891
4892enum WaitableCheck {
4894 Wait(WaitableCheckParams),
4895 Poll(WaitableCheckParams),
4896}
4897
4898pub(crate) struct PreparedCall<R> {
4900 handle: Func,
4902 thread: QualifiedThreadId,
4904 param_count: usize,
4906 rx: oneshot::Receiver<LiftedResult>,
4909 exit_rx: oneshot::Receiver<()>,
4912 _phantom: PhantomData<R>,
4913}
4914
4915impl<R> PreparedCall<R> {
4916 pub(crate) fn task_id(&self) -> TaskId {
4918 TaskId {
4919 task: self.thread.task,
4920 }
4921 }
4922}
4923
4924pub(crate) struct TaskId {
4926 task: TableId<GuestTask>,
4927}
4928
4929impl TaskId {
4930 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4936 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
4937 if !task.already_lowered_parameters() {
4938 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4939 } else {
4940 task.host_future_state = HostFutureState::Dropped;
4941 if task.ready_to_delete() {
4942 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4943 }
4944 }
4945 Ok(())
4946 }
4947}
4948
4949pub(crate) fn prepare_call<T, R>(
4955 mut store: StoreContextMut<T>,
4956 handle: Func,
4957 param_count: usize,
4958 host_future_present: bool,
4959 call_post_return_automatically: bool,
4960 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4961 + Send
4962 + Sync
4963 + 'static,
4964 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4965 + Send
4966 + Sync
4967 + 'static,
4968) -> Result<PreparedCall<R>> {
4969 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4970
4971 let instance = handle.instance().id().get(store.0);
4972 let task_return_type = instance.component().types()[ty].results;
4973 let component_instance = raw_options.instance;
4974 let callback = options.callback();
4975 let memory = options.memory_raw().map(SendSyncPtr::new);
4976 let string_encoding = options.string_encoding();
4977 let token = StoreToken::new(store.as_context_mut());
4978 let state = store.0.concurrent_state_mut();
4979
4980 assert!(state.guest_thread.is_none());
4981
4982 let (tx, rx) = oneshot::channel();
4983 let (exit_tx, exit_rx) = oneshot::channel();
4984
4985 let mut task = GuestTask::new(
4986 state,
4987 Box::new(for_any_lower(move |store, params| {
4988 lower_params(handle, token.as_context_mut(store), params)
4989 })),
4990 LiftResult {
4991 lift: Box::new(for_any_lift(move |store, result| {
4992 lift_result(handle, store, result)
4993 })),
4994 ty: task_return_type,
4995 memory,
4996 string_encoding,
4997 },
4998 Caller::Host {
4999 tx: Some(tx),
5000 exit_tx: Arc::new(exit_tx),
5001 host_future_present,
5002 call_post_return_automatically,
5003 },
5004 callback.map(|callback| {
5005 let callback = SendSyncPtr::new(callback);
5006 let instance = handle.instance();
5007 Box::new(
5008 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5009 let store = token.as_context_mut(store);
5010 unsafe {
5013 instance.call_callback(
5014 store,
5015 runtime_instance,
5016 callback,
5017 event,
5018 handle,
5019 call_post_return_automatically,
5020 )
5021 }
5022 },
5023 ) as CallbackFn
5024 }),
5025 component_instance,
5026 )?;
5027 task.function_index = Some(handle.index());
5028
5029 let task = state.push(task)?;
5030 let thread = state.push(GuestThread::new_implicit(task))?;
5031 state.get_mut(task)?.threads.insert(thread);
5032
5033 Ok(PreparedCall {
5034 handle,
5035 thread: QualifiedThreadId { task, thread },
5036 param_count,
5037 rx,
5038 exit_rx,
5039 _phantom: PhantomData,
5040 })
5041}
5042
5043pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5050 mut store: StoreContextMut<T>,
5051 prepared: PreparedCall<R>,
5052) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5053 let PreparedCall {
5054 handle,
5055 thread,
5056 param_count,
5057 rx,
5058 exit_rx,
5059 ..
5060 } = prepared;
5061
5062 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5063
5064 Ok(checked(
5065 store.0.id(),
5066 rx.map(move |result| {
5067 result
5068 .map(|v| (*v.downcast().unwrap(), exit_rx))
5069 .map_err(anyhow::Error::from)
5070 }),
5071 ))
5072}
5073
5074fn queue_call0<T: 'static>(
5077 store: StoreContextMut<T>,
5078 handle: Func,
5079 guest_thread: QualifiedThreadId,
5080 param_count: usize,
5081) -> Result<()> {
5082 let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
5083 let is_concurrent = raw_options.async_;
5084 let instance = handle.instance();
5085 let callee = handle.lifted_core_func(store.0);
5086 let callback = options.callback();
5087 let post_return = handle.post_return_core_func(store.0);
5088
5089 log::trace!("queueing call {guest_thread:?}");
5090
5091 let instance_flags = if callback.is_none() {
5092 None
5093 } else {
5094 Some(flags)
5095 };
5096
5097 unsafe {
5101 instance.queue_call(
5102 store,
5103 guest_thread,
5104 SendSyncPtr::new(callee),
5105 param_count,
5106 1,
5107 instance_flags,
5108 is_concurrent,
5109 callback.map(SendSyncPtr::new),
5110 post_return.map(SendSyncPtr::new),
5111 )
5112 }
5113}