1use crate::component::func::{self, Func, Options};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{
58 CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
59};
60use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
61use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw};
62use anyhow::{Context as _, Result, anyhow, bail};
63use error_contexts::GlobalErrorContextRefCount;
64use futures::channel::oneshot;
65use futures::future::{self, Either, FutureExt};
66use futures::stream::{FuturesUnordered, StreamExt};
67use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
68use std::any::Any;
69use std::borrow::ToOwned;
70use std::boxed::Box;
71use std::cell::UnsafeCell;
72use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
73use std::fmt;
74use std::future::Future;
75use std::marker::PhantomData;
76use std::mem::{self, ManuallyDrop, MaybeUninit};
77use std::ops::DerefMut;
78use std::pin::{Pin, pin};
79use std::ptr::{self, NonNull};
80use std::slice;
81use std::sync::Arc;
82use std::task::{Context, Poll, Waker};
83use std::vec::Vec;
84use table::{TableDebug, TableId};
85use wasmtime_environ::component::{
86 CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
87 OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
88 RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex,
89 TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex,
90 TypeTupleIndex,
91};
92
93pub use abort::JoinHandle;
94pub use futures_and_streams::{
95 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
96 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
97 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
98};
99pub(crate) use futures_and_streams::{
100 ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
101};
102
103mod abort;
104mod error_contexts;
105mod futures_and_streams;
106mod table;
107pub(crate) mod tls;
108
109const BLOCKED: u32 = 0xffff_ffff;
112
113#[derive(Clone, Copy, Eq, PartialEq, Debug)]
115pub enum Status {
116 Starting = 0,
117 Started = 1,
118 Returned = 2,
119 StartCancelled = 3,
120 ReturnCancelled = 4,
121}
122
123impl Status {
124 pub fn pack(self, waitable: Option<u32>) -> u32 {
130 assert!(matches!(self, Status::Returned) == waitable.is_none());
131 let waitable = waitable.unwrap_or(0);
132 assert!(waitable < (1 << 28));
133 (waitable << 4) | (self as u32)
134 }
135}
136
137#[derive(Clone, Copy, Debug)]
140enum Event {
141 None,
142 Cancelled,
143 Subtask {
144 status: Status,
145 },
146 StreamRead {
147 code: ReturnCode,
148 pending: Option<(TypeStreamTableIndex, u32)>,
149 },
150 StreamWrite {
151 code: ReturnCode,
152 pending: Option<(TypeStreamTableIndex, u32)>,
153 },
154 FutureRead {
155 code: ReturnCode,
156 pending: Option<(TypeFutureTableIndex, u32)>,
157 },
158 FutureWrite {
159 code: ReturnCode,
160 pending: Option<(TypeFutureTableIndex, u32)>,
161 },
162}
163
164impl Event {
165 fn parts(self) -> (u32, u32) {
170 const EVENT_NONE: u32 = 0;
171 const EVENT_SUBTASK: u32 = 1;
172 const EVENT_STREAM_READ: u32 = 2;
173 const EVENT_STREAM_WRITE: u32 = 3;
174 const EVENT_FUTURE_READ: u32 = 4;
175 const EVENT_FUTURE_WRITE: u32 = 5;
176 const EVENT_CANCELLED: u32 = 6;
177 match self {
178 Event::None => (EVENT_NONE, 0),
179 Event::Cancelled => (EVENT_CANCELLED, 0),
180 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
181 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
182 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
183 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
184 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
185 }
186 }
187}
188
189mod callback_code {
191 pub const EXIT: u32 = 0;
192 pub const YIELD: u32 = 1;
193 pub const WAIT: u32 = 2;
194 pub const POLL: u32 = 3;
195}
196
197const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
201
202pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
208 store: StoreContextMut<'a, T>,
209 get_data: fn(&mut T) -> D::Data<'_>,
210}
211
212impl<'a, T, D> Access<'a, T, D>
213where
214 D: HasData + ?Sized,
215 T: 'static,
216{
217 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
219 Self { store, get_data }
220 }
221
222 pub fn data_mut(&mut self) -> &mut T {
224 self.store.data_mut()
225 }
226
227 pub fn get(&mut self) -> D::Data<'_> {
229 (self.get_data)(self.data_mut())
230 }
231
232 pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
236 where
237 T: 'static,
238 {
239 let accessor = Accessor {
240 get_data: self.get_data,
241 token: StoreToken::new(self.store.as_context_mut()),
242 };
243 self.store
244 .as_context_mut()
245 .spawn_with_accessor(accessor, task)
246 }
247}
248
249impl<'a, T, D> AsContext for Access<'a, T, D>
250where
251 D: HasData + ?Sized,
252 T: 'static,
253{
254 type Data = T;
255
256 fn as_context(&self) -> StoreContext<'_, T> {
257 self.store.as_context()
258 }
259}
260
261impl<'a, T, D> AsContextMut for Access<'a, T, D>
262where
263 D: HasData + ?Sized,
264 T: 'static,
265{
266 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
267 self.store.as_context_mut()
268 }
269}
270
271pub struct Accessor<T: 'static, D = HasSelf<T>>
331where
332 D: HasData + ?Sized,
333{
334 token: StoreToken<T>,
335 get_data: fn(&mut T) -> D::Data<'_>,
336}
337
338pub trait AsAccessor {
355 type Data: 'static;
357
358 type AccessorData: HasData + ?Sized;
361
362 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
364}
365
366impl<T: AsAccessor + ?Sized> AsAccessor for &T {
367 type Data = T::Data;
368 type AccessorData = T::AccessorData;
369
370 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
371 T::as_accessor(self)
372 }
373}
374
375impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
376 type Data = T;
377 type AccessorData = D;
378
379 fn as_accessor(&self) -> &Accessor<T, D> {
380 self
381 }
382}
383
384const _: () = {
407 const fn assert<T: Send + Sync>() {}
408 assert::<Accessor<UnsafeCell<u32>>>();
409};
410
411impl<T> Accessor<T> {
412 pub(crate) fn new(token: StoreToken<T>) -> Self {
421 Self {
422 token,
423 get_data: |x| x,
424 }
425 }
426}
427
428impl<T, D> Accessor<T, D>
429where
430 D: HasData + ?Sized,
431{
432 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
450 tls::get(|vmstore| {
451 fun(Access {
452 store: self.token.as_context_mut(vmstore),
453 get_data: self.get_data,
454 })
455 })
456 }
457
458 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
461 self.get_data
462 }
463
464 pub fn with_getter<D2: HasData>(
481 &self,
482 get_data: fn(&mut T) -> D2::Data<'_>,
483 ) -> Accessor<T, D2> {
484 Accessor {
485 token: self.token,
486 get_data,
487 }
488 }
489
490 pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
506 where
507 T: 'static,
508 {
509 let accessor = self.clone_for_spawn();
510 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
511 }
512
513 fn clone_for_spawn(&self) -> Self {
514 Self {
515 token: self.token,
516 get_data: self.get_data,
517 }
518 }
519}
520
521pub trait AccessorTask<T, D, R>: Send + 'static
533where
534 D: HasData + ?Sized,
535{
536 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
538}
539
540enum CallerInfo {
543 Async {
545 params: Vec<ValRaw>,
546 has_result: bool,
547 },
548 Sync {
550 params: Vec<ValRaw>,
551 result_count: u32,
552 },
553}
554
555enum WaitMode {
557 Fiber(StoreFiber<'static>),
559 Callback(Instance),
562}
563
564#[derive(Debug)]
566enum SuspendReason {
567 Waiting {
570 set: TableId<WaitableSet>,
571 task: TableId<GuestTask>,
572 },
573 NeedWork,
576 Yielding { task: TableId<GuestTask> },
579}
580
581enum GuestCallKind {
583 DeliverEvent {
586 instance: Instance,
588 set: Option<TableId<WaitableSet>>,
593 },
594 Start(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
597}
598
599impl fmt::Debug for GuestCallKind {
600 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
601 match self {
602 Self::DeliverEvent { instance, set } => f
603 .debug_struct("DeliverEvent")
604 .field("instance", instance)
605 .field("set", set)
606 .finish(),
607 Self::Start(_) => f.debug_tuple("Start").finish(),
608 }
609 }
610}
611
612#[derive(Debug)]
614struct GuestCall {
615 task: TableId<GuestTask>,
616 kind: GuestCallKind,
617}
618
619impl GuestCall {
620 fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
630 let task_instance = state.get_mut(self.task)?.instance;
631 let state = state.instance_state(task_instance);
632 let ready = match &self.kind {
633 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
634 GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure > 0),
635 };
636 log::trace!(
637 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
638 state.do_not_enter,
639 state.backpressure
640 );
641 Ok(ready)
642 }
643}
644
645enum WorkerItem {
647 GuestCall(GuestCall),
648 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
649}
650
651#[derive(Debug)]
654struct PollParams {
655 instance: Instance,
657 task: TableId<GuestTask>,
659 set: TableId<WaitableSet>,
661}
662
663enum WorkItem {
666 PushFuture(AlwaysMut<HostTaskFuture>),
668 ResumeFiber(StoreFiber<'static>),
670 GuestCall(GuestCall),
672 Poll(PollParams),
674 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
676}
677
678impl fmt::Debug for WorkItem {
679 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
680 match self {
681 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
682 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
683 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
684 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
685 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
686 }
687 }
688}
689
690pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
698 store: &mut dyn VMStore,
699 future: impl Future<Output = Result<R>> + Send + 'static,
700 caller_instance: RuntimeComponentInstanceIndex,
701) -> Result<R> {
702 let state = store.concurrent_state_mut();
703
704 let Some(caller) = state.guest_task else {
708 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
709 Poll::Ready(result) => result,
710 Poll::Pending => {
711 unreachable!()
712 }
713 };
714 };
715
716 let old_result = state
719 .get_mut(caller)
720 .with_context(|| format!("bad handle: {caller:?}"))?
721 .result
722 .take();
723
724 let task = state.push(HostTask::new(caller_instance, None))?;
728
729 log::trace!("new host task child of {caller:?}: {task:?}");
730
731 let mut future = Box::pin(async move {
735 let result = future.await?;
736 tls::get(move |store| {
737 let state = store.concurrent_state_mut();
738 state.get_mut(caller)?.result = Some(Box::new(result) as _);
739
740 Waitable::Host(task).set_event(
741 state,
742 Some(Event::Subtask {
743 status: Status::Returned,
744 }),
745 )?;
746
747 Ok(())
748 })
749 }) as HostTaskFuture;
750
751 let poll = tls::set(store, || {
755 future
756 .as_mut()
757 .poll(&mut Context::from_waker(&Waker::noop()))
758 });
759
760 match poll {
761 Poll::Ready(result) => {
762 result?;
764 log::trace!("delete host task {task:?} (already ready)");
765 store.concurrent_state_mut().delete(task)?;
766 }
767 Poll::Pending => {
768 let state = store.concurrent_state_mut();
773 state.push_future(future);
774
775 let set = state.get_mut(caller)?.sync_call_set;
776 Waitable::Host(task).join(state, Some(set))?;
777
778 store.suspend(SuspendReason::Waiting { set, task: caller })?;
779 }
780 }
781
782 Ok(*mem::replace(
784 &mut store.concurrent_state_mut().get_mut(caller)?.result,
785 old_result,
786 )
787 .unwrap()
788 .downcast()
789 .unwrap())
790}
791
792fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
794 match call.kind {
795 GuestCallKind::DeliverEvent { instance, set } => {
796 let (event, waitable) = instance.get_event(store, call.task, set, true)?.unwrap();
797 let state = store.concurrent_state_mut();
798 let task = state.get_mut(call.task)?;
799 let runtime_instance = task.instance;
800 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
801
802 log::trace!(
803 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
804 call.task,
805 );
806
807 let old_task = state.guest_task.replace(call.task);
808 log::trace!(
809 "GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task",
810 call.task
811 );
812
813 store.maybe_push_call_context(call.task)?;
814
815 let state = store.concurrent_state_mut();
816 state.enter_instance(runtime_instance);
817
818 let callback = state.get_mut(call.task)?.callback.take().unwrap();
819
820 let code = callback(store, runtime_instance, event, handle)?;
821
822 let state = store.concurrent_state_mut();
823
824 state.get_mut(call.task)?.callback = Some(callback);
825
826 state.exit_instance(runtime_instance)?;
827
828 store.maybe_pop_call_context(call.task)?;
829
830 instance.handle_callback_code(store, call.task, runtime_instance, code, false)?;
831
832 store.concurrent_state_mut().guest_task = old_task;
833 log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task");
834 }
835 GuestCallKind::Start(fun) => {
836 fun(store)?;
837 }
838 }
839
840 Ok(())
841}
842
843impl<T> Store<T> {
844 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
846 where
847 T: Send + 'static,
848 {
849 self.as_context_mut().run_concurrent(fun).await
850 }
851
852 #[doc(hidden)]
853 pub fn assert_concurrent_state_empty(&mut self) {
854 self.as_context_mut().assert_concurrent_state_empty();
855 }
856
857 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
859 where
860 T: 'static,
861 {
862 self.as_context_mut().spawn(task)
863 }
864}
865
866impl<T> StoreContextMut<'_, T> {
867 #[doc(hidden)]
876 pub fn assert_concurrent_state_empty(self) {
877 let store = self.0;
878 store
879 .store_data_mut()
880 .components
881 .assert_guest_tables_empty();
882 let state = store.concurrent_state_mut();
883 assert!(
884 state.table.get_mut().is_empty(),
885 "non-empty table: {:?}",
886 state.table.get_mut()
887 );
888 assert!(state.high_priority.is_empty());
889 assert!(state.low_priority.is_empty());
890 assert!(state.guest_task.is_none());
891 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
892 assert!(
893 state
894 .instance_states
895 .iter()
896 .all(|(_, state)| state.pending.is_empty())
897 );
898 assert!(state.global_error_context_ref_counts.is_empty());
899 }
900
901 pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
911 where
912 T: 'static,
913 {
914 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
915 self.spawn_with_accessor(accessor, task)
916 }
917
918 fn spawn_with_accessor<D>(
921 self,
922 accessor: Accessor<T, D>,
923 task: impl AccessorTask<T, D, Result<()>>,
924 ) -> JoinHandle
925 where
926 T: 'static,
927 D: HasData + ?Sized,
928 {
929 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
933 self.0
934 .concurrent_state_mut()
935 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
936 handle
937 }
938
939 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
983 where
984 T: Send + 'static,
985 {
986 self.do_run_concurrent(fun, false).await
987 }
988
989 pub(super) async fn run_concurrent_trap_on_idle<R>(
990 self,
991 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
992 ) -> Result<R>
993 where
994 T: Send + 'static,
995 {
996 self.do_run_concurrent(fun, true).await
997 }
998
999 async fn do_run_concurrent<R>(
1000 mut self,
1001 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1002 trap_on_idle: bool,
1003 ) -> Result<R>
1004 where
1005 T: Send + 'static,
1006 {
1007 check_recursive_run();
1008 let token = StoreToken::new(self.as_context_mut());
1009
1010 struct Dropper<'a, T: 'static, V> {
1011 store: StoreContextMut<'a, T>,
1012 value: ManuallyDrop<V>,
1013 }
1014
1015 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1016 fn drop(&mut self) {
1017 tls::set(self.store.0, || {
1018 unsafe { ManuallyDrop::drop(&mut self.value) }
1023 });
1024 }
1025 }
1026
1027 let accessor = &Accessor::new(token);
1028 let dropper = &mut Dropper {
1029 store: self,
1030 value: ManuallyDrop::new(fun(accessor)),
1031 };
1032 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1034
1035 dropper
1036 .store
1037 .as_context_mut()
1038 .poll_until(future, trap_on_idle)
1039 .await
1040 }
1041
1042 async fn poll_until<R>(
1048 mut self,
1049 mut future: Pin<&mut impl Future<Output = R>>,
1050 trap_on_idle: bool,
1051 ) -> Result<R>
1052 where
1053 T: Send + 'static,
1054 {
1055 struct Reset<'a, T: 'static> {
1056 store: StoreContextMut<'a, T>,
1057 futures: Option<FuturesUnordered<HostTaskFuture>>,
1058 }
1059
1060 impl<'a, T> Drop for Reset<'a, T> {
1061 fn drop(&mut self) {
1062 if let Some(futures) = self.futures.take() {
1063 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1064 }
1065 }
1066 }
1067
1068 loop {
1069 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1073 let mut reset = Reset {
1074 store: self.as_context_mut(),
1075 futures,
1076 };
1077 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1078
1079 let result = future::poll_fn(|cx| {
1080 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1083 return Poll::Ready(Ok(Either::Left(value)));
1084 }
1085
1086 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1090 Poll::Ready(Some(output)) => {
1091 match output {
1092 Err(e) => return Poll::Ready(Err(e)),
1093 Ok(()) => {}
1094 }
1095 Poll::Ready(true)
1096 }
1097 Poll::Ready(None) => Poll::Ready(false),
1098 Poll::Pending => Poll::Pending,
1099 };
1100
1101 let state = reset.store.0.concurrent_state_mut();
1104 let ready = mem::take(&mut state.high_priority);
1105 let ready = if ready.is_empty() {
1106 let ready = mem::take(&mut state.low_priority);
1109 if ready.is_empty() {
1110 return match next {
1111 Poll::Ready(true) => {
1112 Poll::Ready(Ok(Either::Right(Vec::new())))
1118 }
1119 Poll::Ready(false) => {
1120 if let Poll::Ready(value) =
1124 tls::set(reset.store.0, || future.as_mut().poll(cx))
1125 {
1126 Poll::Ready(Ok(Either::Left(value)))
1127 } else {
1128 if trap_on_idle {
1134 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1137 } else {
1138 Poll::Pending
1142 }
1143 }
1144 }
1145 Poll::Pending => Poll::Pending,
1150 };
1151 } else {
1152 ready
1153 }
1154 } else {
1155 ready
1156 };
1157
1158 Poll::Ready(Ok(Either::Right(ready)))
1159 })
1160 .await;
1161
1162 drop(reset);
1166
1167 match result? {
1168 Either::Left(value) => break Ok(value),
1171 Either::Right(ready) => {
1174 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1175 store: StoreContextMut<'a, T>,
1176 ready: I,
1177 }
1178
1179 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1180 fn drop(&mut self) {
1181 while let Some(item) = self.ready.next() {
1182 match item {
1183 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1184 WorkItem::PushFuture(future) => {
1185 tls::set(self.store.0, move || drop(future))
1186 }
1187 _ => {}
1188 }
1189 }
1190 }
1191 }
1192
1193 let mut dispose = Dispose {
1194 store: self.as_context_mut(),
1195 ready: ready.into_iter(),
1196 };
1197
1198 while let Some(item) = dispose.ready.next() {
1199 dispose
1200 .store
1201 .as_context_mut()
1202 .handle_work_item(item)
1203 .await?;
1204 }
1205 }
1206 }
1207 }
1208 }
1209
1210 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1212 where
1213 T: Send,
1214 {
1215 log::trace!("handle work item {item:?}");
1216 match item {
1217 WorkItem::PushFuture(future) => {
1218 self.0
1219 .concurrent_state_mut()
1220 .futures
1221 .get_mut()
1222 .as_mut()
1223 .unwrap()
1224 .push(future.into_inner());
1225 }
1226 WorkItem::ResumeFiber(fiber) => {
1227 self.0.resume_fiber(fiber).await?;
1228 }
1229 WorkItem::GuestCall(call) => {
1230 let state = self.0.concurrent_state_mut();
1231 if call.is_ready(state)? {
1232 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1233 } else {
1234 let task = state.get_mut(call.task)?;
1235 if !task.starting_sent {
1236 task.starting_sent = true;
1237 if let GuestCallKind::Start(_) = &call.kind {
1238 Waitable::Guest(call.task).set_event(
1239 state,
1240 Some(Event::Subtask {
1241 status: Status::Starting,
1242 }),
1243 )?;
1244 }
1245 }
1246
1247 let runtime_instance = state.get_mut(call.task)?.instance;
1248 state
1249 .instance_state(runtime_instance)
1250 .pending
1251 .insert(call.task, call.kind);
1252 }
1253 }
1254 WorkItem::Poll(params) => {
1255 let state = self.0.concurrent_state_mut();
1256 if state.get_mut(params.task)?.event.is_some()
1257 || !state.get_mut(params.set)?.ready.is_empty()
1258 {
1259 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1262 task: params.task,
1263 kind: GuestCallKind::DeliverEvent {
1264 instance: params.instance,
1265 set: Some(params.set),
1266 },
1267 }));
1268 } else {
1269 state.get_mut(params.task)?.event = Some(Event::None);
1272 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1273 task: params.task,
1274 kind: GuestCallKind::DeliverEvent {
1275 instance: params.instance,
1276 set: Some(params.set),
1277 },
1278 }));
1279 }
1280 }
1281 WorkItem::WorkerFunction(fun) => {
1282 self.run_on_worker(WorkerItem::Function(fun)).await?;
1283 }
1284 }
1285
1286 Ok(())
1287 }
1288
1289 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1291 where
1292 T: Send,
1293 {
1294 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1295 fiber
1296 } else {
1297 fiber::make_fiber(self.0, move |store| {
1298 loop {
1299 match store.concurrent_state_mut().worker_item.take().unwrap() {
1300 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1301 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1302 }
1303
1304 store.suspend(SuspendReason::NeedWork)?;
1305 }
1306 })?
1307 };
1308
1309 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1310 assert!(worker_item.is_none());
1311 *worker_item = Some(item);
1312
1313 self.0.resume_fiber(worker).await
1314 }
1315
1316 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1321 where
1322 T: 'static,
1323 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1324 + Send
1325 + Sync
1326 + 'static,
1327 R: Send + Sync + 'static,
1328 {
1329 let token = StoreToken::new(self);
1330 async move {
1331 let mut accessor = Accessor::new(token);
1332 closure(&mut accessor).await
1333 }
1334 }
1335}
1336
1337impl StoreOpaque {
1338 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1341 let old_task = self.concurrent_state_mut().guest_task;
1342 log::trace!("resume_fiber: save current task {old_task:?}");
1343
1344 let fiber = fiber::resolve_or_release(self, fiber).await?;
1345
1346 let state = self.concurrent_state_mut();
1347
1348 state.guest_task = old_task;
1349 log::trace!("resume_fiber: restore current task {old_task:?}");
1350
1351 if let Some(mut fiber) = fiber {
1352 match state.suspend_reason.take().unwrap() {
1354 SuspendReason::NeedWork => {
1355 if state.worker.is_none() {
1356 state.worker = Some(fiber);
1357 } else {
1358 fiber.dispose(self);
1359 }
1360 }
1361 SuspendReason::Yielding { .. } => {
1362 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1363 }
1364 SuspendReason::Waiting { set, task } => {
1365 let old = state
1366 .get_mut(set)?
1367 .waiting
1368 .insert(task, WaitMode::Fiber(fiber));
1369 assert!(old.is_none());
1370 }
1371 }
1372 }
1373
1374 Ok(())
1375 }
1376
1377 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1383 log::trace!("suspend fiber: {reason:?}");
1384
1385 let task = match &reason {
1389 SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task),
1390 SuspendReason::NeedWork => None,
1391 };
1392
1393 let old_guest_task = if let Some(task) = task {
1394 self.maybe_pop_call_context(task)?;
1395 self.concurrent_state_mut().guest_task
1396 } else {
1397 None
1398 };
1399
1400 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1401 assert!(suspend_reason.is_none());
1402 *suspend_reason = Some(reason);
1403
1404 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1405
1406 if let Some(task) = task {
1407 self.concurrent_state_mut().guest_task = old_guest_task;
1408 self.maybe_push_call_context(task)?;
1409 }
1410
1411 Ok(())
1412 }
1413
1414 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1418 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1419 if task.lift_result.is_some() {
1420 log::trace!("push call context for {guest_task:?}");
1421 let call_context = task.call_context.take().unwrap();
1422 self.component_resource_state().0.push(call_context);
1423 }
1424 Ok(())
1425 }
1426
1427 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1431 if self
1432 .concurrent_state_mut()
1433 .get_mut(guest_task)?
1434 .lift_result
1435 .is_some()
1436 {
1437 log::trace!("pop call context for {guest_task:?}");
1438 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1439 self.concurrent_state_mut()
1440 .get_mut(guest_task)?
1441 .call_context = call_context;
1442 }
1443 Ok(())
1444 }
1445
1446 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1447 let state = self.concurrent_state_mut();
1448 let caller = state.guest_task.unwrap();
1449 let old_set = waitable.common(state)?.set;
1450 let set = state.get_mut(caller)?.sync_call_set;
1451 waitable.join(state, Some(set))?;
1452 self.suspend(SuspendReason::Waiting { set, task: caller })?;
1453 let state = self.concurrent_state_mut();
1454 waitable.join(state, old_set)
1455 }
1456}
1457
1458impl Instance {
1459 fn get_event(
1462 self,
1463 store: &mut StoreOpaque,
1464 guest_task: TableId<GuestTask>,
1465 set: Option<TableId<WaitableSet>>,
1466 cancellable: bool,
1467 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1468 let state = store.concurrent_state_mut();
1469
1470 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1471 log::trace!("deliver event {event:?} to {guest_task:?}");
1472
1473 if cancellable || !matches!(event, Event::Cancelled) {
1474 return Ok(Some((event, None)));
1475 } else {
1476 state.get_mut(guest_task)?.event = Some(event);
1477 }
1478 }
1479
1480 Ok(
1481 if let Some((set, waitable)) = set
1482 .and_then(|set| {
1483 state
1484 .get_mut(set)
1485 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1486 .transpose()
1487 })
1488 .transpose()?
1489 {
1490 let common = waitable.common(state)?;
1491 let handle = common.handle.unwrap();
1492 let event = common.event.take().unwrap();
1493
1494 log::trace!(
1495 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1496 );
1497
1498 waitable.on_delivery(self.id().get_mut(store), event);
1499
1500 Some((event, Some((waitable, handle))))
1501 } else {
1502 None
1503 },
1504 )
1505 }
1506
1507 fn handle_callback_code(
1513 self,
1514 store: &mut StoreOpaque,
1515 guest_task: TableId<GuestTask>,
1516 runtime_instance: RuntimeComponentInstanceIndex,
1517 code: u32,
1518 initial_call: bool,
1519 ) -> Result<()> {
1520 let (code, set) = unpack_callback_code(code);
1521
1522 log::trace!("received callback code from {guest_task:?}: {code} (set: {set})");
1523
1524 let state = store.concurrent_state_mut();
1525 let task = state.get_mut(guest_task)?;
1526
1527 if task.lift_result.is_some() {
1528 if code == callback_code::EXIT {
1529 return Err(anyhow!(crate::Trap::NoAsyncResult));
1530 }
1531 if initial_call {
1532 Waitable::Guest(guest_task).set_event(
1535 state,
1536 Some(Event::Subtask {
1537 status: Status::Started,
1538 }),
1539 )?;
1540 }
1541 }
1542
1543 let get_set = |store, handle| {
1544 if handle == 0 {
1545 bail!("invalid waitable-set handle");
1546 }
1547
1548 let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1549 .waitable_set_rep(handle)?;
1550
1551 Ok(TableId::<WaitableSet>::new(set))
1552 };
1553
1554 match code {
1555 callback_code::EXIT => {
1556 let task = state.get_mut(guest_task)?;
1557 match &task.caller {
1558 Caller::Host {
1559 remove_task_automatically,
1560 ..
1561 } => {
1562 if *remove_task_automatically {
1563 log::trace!("handle_callback_code will delete task {guest_task:?}");
1564 Waitable::Guest(guest_task).delete_from(state)?;
1565 }
1566 }
1567 Caller::Guest { .. } => {
1568 task.exited = true;
1569 task.callback = None;
1570 }
1571 }
1572 }
1573 callback_code::YIELD => {
1574 let task = state.get_mut(guest_task)?;
1577 assert!(task.event.is_none());
1578 task.event = Some(Event::None);
1579 state.push_low_priority(WorkItem::GuestCall(GuestCall {
1580 task: guest_task,
1581 kind: GuestCallKind::DeliverEvent {
1582 instance: self,
1583 set: None,
1584 },
1585 }));
1586 }
1587 callback_code::WAIT | callback_code::POLL => {
1588 let set = get_set(store, set)?;
1589 let state = store.concurrent_state_mut();
1590
1591 if state.get_mut(guest_task)?.event.is_some()
1592 || !state.get_mut(set)?.ready.is_empty()
1593 {
1594 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1596 task: guest_task,
1597 kind: GuestCallKind::DeliverEvent {
1598 instance: self,
1599 set: Some(set),
1600 },
1601 }));
1602 } else {
1603 match code {
1605 callback_code::POLL => {
1606 state.push_low_priority(WorkItem::Poll(PollParams {
1609 instance: self,
1610 task: guest_task,
1611 set,
1612 }));
1613 }
1614 callback_code::WAIT => {
1615 let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
1622 assert!(old.is_none());
1623 let old = state
1624 .get_mut(set)?
1625 .waiting
1626 .insert(guest_task, WaitMode::Callback(self));
1627 assert!(old.is_none());
1628 }
1629 _ => unreachable!(),
1630 }
1631 }
1632 }
1633 _ => bail!("unsupported callback code: {code}"),
1634 }
1635
1636 Ok(())
1637 }
1638
1639 unsafe fn queue_call<T: 'static>(
1646 self,
1647 mut store: StoreContextMut<T>,
1648 guest_task: TableId<GuestTask>,
1649 callee: SendSyncPtr<VMFuncRef>,
1650 param_count: usize,
1651 result_count: usize,
1652 flags: Option<InstanceFlags>,
1653 async_: bool,
1654 callback: Option<SendSyncPtr<VMFuncRef>>,
1655 post_return: Option<SendSyncPtr<VMFuncRef>>,
1656 ) -> Result<()> {
1657 unsafe fn make_call<T: 'static>(
1672 store: StoreContextMut<T>,
1673 guest_task: TableId<GuestTask>,
1674 callee: SendSyncPtr<VMFuncRef>,
1675 param_count: usize,
1676 result_count: usize,
1677 flags: Option<InstanceFlags>,
1678 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1679 + Send
1680 + Sync
1681 + 'static
1682 + use<T> {
1683 let token = StoreToken::new(store);
1684 move |store: &mut dyn VMStore| {
1685 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1686 let task = store.concurrent_state_mut().get_mut(guest_task)?;
1687 let may_enter_after_call = task.call_post_return_automatically();
1688 let lower = task.lower_params.take().unwrap();
1689
1690 lower(store, &mut storage[..param_count])?;
1691
1692 let mut store = token.as_context_mut(store);
1693
1694 unsafe {
1697 if let Some(mut flags) = flags {
1698 flags.set_may_enter(false);
1699 }
1700 crate::Func::call_unchecked_raw(
1701 &mut store,
1702 callee.as_non_null(),
1703 NonNull::new(
1704 &mut storage[..param_count.max(result_count)]
1705 as *mut [MaybeUninit<ValRaw>] as _,
1706 )
1707 .unwrap(),
1708 )?;
1709 if let Some(mut flags) = flags {
1710 flags.set_may_enter(may_enter_after_call);
1711 }
1712 }
1713
1714 Ok(storage)
1715 }
1716 }
1717
1718 let call = unsafe {
1722 make_call(
1723 store.as_context_mut(),
1724 guest_task,
1725 callee,
1726 param_count,
1727 result_count,
1728 flags,
1729 )
1730 };
1731
1732 let callee_instance = store.0.concurrent_state_mut().get_mut(guest_task)?.instance;
1733 let fun = if callback.is_some() {
1734 assert!(async_);
1735
1736 Box::new(move |store: &mut dyn VMStore| {
1737 let old_task = store.concurrent_state_mut().guest_task.replace(guest_task);
1738 log::trace!(
1739 "stackless call: replaced {old_task:?} with {guest_task:?} as current task"
1740 );
1741
1742 store.maybe_push_call_context(guest_task)?;
1743
1744 store.concurrent_state_mut().enter_instance(callee_instance);
1745
1746 let storage = call(store)?;
1753
1754 store
1755 .concurrent_state_mut()
1756 .exit_instance(callee_instance)?;
1757
1758 store.maybe_pop_call_context(guest_task)?;
1759
1760 let state = store.concurrent_state_mut();
1761 state.guest_task = old_task;
1762 log::trace!("stackless call: restored {old_task:?} as current task");
1763
1764 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1767
1768 self.handle_callback_code(store, guest_task, callee_instance, code, true)?;
1769
1770 Ok(())
1771 }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1772 } else {
1773 let token = StoreToken::new(store.as_context_mut());
1774 Box::new(move |store: &mut dyn VMStore| {
1775 let old_task = store.concurrent_state_mut().guest_task.replace(guest_task);
1776 log::trace!(
1777 "stackful call: replaced {old_task:?} with {guest_task:?} as current task",
1778 );
1779
1780 let mut flags = self.id().get(store).instance_flags(callee_instance);
1781
1782 store.maybe_push_call_context(guest_task)?;
1783
1784 if !async_ {
1788 store.concurrent_state_mut().enter_instance(callee_instance);
1789 }
1790
1791 let storage = call(store)?;
1798
1799 if async_ {
1800 if store
1805 .concurrent_state_mut()
1806 .get_mut(guest_task)?
1807 .lift_result
1808 .is_some()
1809 {
1810 return Err(anyhow!(crate::Trap::NoAsyncResult));
1811 }
1812 } else {
1813 let lift = {
1819 let state = store.concurrent_state_mut();
1820 state.exit_instance(callee_instance)?;
1821
1822 assert!(state.get_mut(guest_task)?.result.is_none());
1823
1824 state.get_mut(guest_task)?.lift_result.take().unwrap()
1825 };
1826
1827 let result = (lift.lift)(store, unsafe {
1830 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1831 &storage[..result_count],
1832 )
1833 })?;
1834
1835 let post_return_arg = match result_count {
1836 0 => ValRaw::i32(0),
1837 1 => unsafe { storage[0].assume_init() },
1840 _ => unreachable!(),
1841 };
1842
1843 if store
1844 .concurrent_state_mut()
1845 .get_mut(guest_task)?
1846 .call_post_return_automatically()
1847 {
1848 unsafe {
1849 flags.set_may_leave(false);
1850 flags.set_needs_post_return(false);
1851 }
1852
1853 if let Some(func) = post_return {
1854 let mut store = token.as_context_mut(store);
1855
1856 unsafe {
1862 crate::Func::call_unchecked_raw(
1863 &mut store,
1864 func.as_non_null(),
1865 slice::from_ref(&post_return_arg).into(),
1866 )?;
1867 }
1868 }
1869
1870 unsafe {
1871 flags.set_may_leave(true);
1872 flags.set_may_enter(true);
1873 }
1874 }
1875
1876 self.task_complete(
1877 store,
1878 guest_task,
1879 result,
1880 Status::Returned,
1881 post_return_arg,
1882 )?;
1883 }
1884
1885 store.maybe_pop_call_context(guest_task)?;
1886
1887 let task = store.concurrent_state_mut().get_mut(guest_task)?;
1888
1889 match &task.caller {
1890 Caller::Host {
1891 remove_task_automatically,
1892 ..
1893 } => {
1894 if *remove_task_automatically {
1895 Waitable::Guest(guest_task)
1896 .delete_from(store.concurrent_state_mut())?;
1897 }
1898 }
1899 Caller::Guest { .. } => {
1900 task.exited = true;
1901 }
1902 }
1903
1904 Ok(())
1905 })
1906 };
1907
1908 store
1909 .0
1910 .concurrent_state_mut()
1911 .push_high_priority(WorkItem::GuestCall(GuestCall {
1912 task: guest_task,
1913 kind: GuestCallKind::Start(fun),
1914 }));
1915
1916 Ok(())
1917 }
1918
1919 unsafe fn prepare_call<T: 'static>(
1932 self,
1933 mut store: StoreContextMut<T>,
1934 start: *mut VMFuncRef,
1935 return_: *mut VMFuncRef,
1936 caller_instance: RuntimeComponentInstanceIndex,
1937 callee_instance: RuntimeComponentInstanceIndex,
1938 task_return_type: TypeTupleIndex,
1939 memory: *mut VMMemoryDefinition,
1940 string_encoding: u8,
1941 caller_info: CallerInfo,
1942 ) -> Result<()> {
1943 self.id().get(store.0).check_may_leave(caller_instance)?;
1944
1945 enum ResultInfo {
1946 Heap { results: u32 },
1947 Stack { result_count: u32 },
1948 }
1949
1950 let result_info = match &caller_info {
1951 CallerInfo::Async {
1952 has_result: true,
1953 params,
1954 } => ResultInfo::Heap {
1955 results: params.last().unwrap().get_u32(),
1956 },
1957 CallerInfo::Async {
1958 has_result: false, ..
1959 } => ResultInfo::Stack { result_count: 0 },
1960 CallerInfo::Sync {
1961 result_count,
1962 params,
1963 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
1964 results: params.last().unwrap().get_u32(),
1965 },
1966 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
1967 result_count: *result_count,
1968 },
1969 };
1970
1971 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
1972
1973 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
1977 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
1978 let token = StoreToken::new(store.as_context_mut());
1979 let state = store.0.concurrent_state_mut();
1980 let old_task = state.guest_task.take();
1981 let new_task = GuestTask::new(
1982 state,
1983 Box::new(move |store, dst| {
1984 let mut store = token.as_context_mut(store);
1985 assert!(dst.len() <= MAX_FLAT_PARAMS);
1986 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1987 let count = match caller_info {
1988 CallerInfo::Async { params, has_result } => {
1992 let params = ¶ms[..params.len() - usize::from(has_result)];
1993 for (param, src) in params.iter().zip(&mut src) {
1994 src.write(*param);
1995 }
1996 params.len()
1997 }
1998
1999 CallerInfo::Sync { params, .. } => {
2001 for (param, src) in params.iter().zip(&mut src) {
2002 src.write(*param);
2003 }
2004 params.len()
2005 }
2006 };
2007 unsafe {
2014 crate::Func::call_unchecked_raw(
2015 &mut store,
2016 start.as_non_null(),
2017 NonNull::new(
2018 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2019 )
2020 .unwrap(),
2021 )?;
2022 }
2023 dst.copy_from_slice(&src[..dst.len()]);
2024 let state = store.0.concurrent_state_mut();
2025 let task = state.guest_task.unwrap();
2026 Waitable::Guest(task).set_event(
2027 state,
2028 Some(Event::Subtask {
2029 status: Status::Started,
2030 }),
2031 )?;
2032 Ok(())
2033 }),
2034 LiftResult {
2035 lift: Box::new(move |store, src| {
2036 let mut store = token.as_context_mut(store);
2039 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2041 my_src.push(ValRaw::u32(*results));
2042 }
2043 unsafe {
2050 crate::Func::call_unchecked_raw(
2051 &mut store,
2052 return_.as_non_null(),
2053 my_src.as_mut_slice().into(),
2054 )?;
2055 }
2056 let state = store.0.concurrent_state_mut();
2057 let task = state.guest_task.unwrap();
2058 if sync_caller {
2059 state.get_mut(task)?.sync_result =
2060 Some(if let ResultInfo::Stack { result_count } = &result_info {
2061 match result_count {
2062 0 => None,
2063 1 => Some(my_src[0]),
2064 _ => unreachable!(),
2065 }
2066 } else {
2067 None
2068 });
2069 }
2070 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2071 }),
2072 ty: task_return_type,
2073 memory: NonNull::new(memory).map(SendSyncPtr::new),
2074 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2075 },
2076 Caller::Guest {
2077 task: old_task.unwrap(),
2078 instance: caller_instance,
2079 },
2080 None,
2081 callee_instance,
2082 )?;
2083
2084 let guest_task = state.push(new_task)?;
2085
2086 if let Some(old_task) = old_task {
2087 if !state.may_enter(guest_task) {
2088 bail!(crate::Trap::CannotEnterComponent);
2089 }
2090
2091 state.get_mut(old_task)?.subtasks.insert(guest_task);
2092 };
2093
2094 state.guest_task = Some(guest_task);
2097 log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}");
2098
2099 Ok(())
2100 }
2101
2102 unsafe fn call_callback<T>(
2107 self,
2108 mut store: StoreContextMut<T>,
2109 callee_instance: RuntimeComponentInstanceIndex,
2110 function: SendSyncPtr<VMFuncRef>,
2111 event: Event,
2112 handle: u32,
2113 may_enter_after_call: bool,
2114 ) -> Result<u32> {
2115 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2116
2117 let (ordinal, result) = event.parts();
2118 let params = &mut [
2119 ValRaw::u32(ordinal),
2120 ValRaw::u32(handle),
2121 ValRaw::u32(result),
2122 ];
2123 unsafe {
2128 flags.set_may_enter(false);
2129 crate::Func::call_unchecked_raw(
2130 &mut store,
2131 function.as_non_null(),
2132 params.as_mut_slice().into(),
2133 )?;
2134 flags.set_may_enter(may_enter_after_call);
2135 }
2136 Ok(params[0].get_u32())
2137 }
2138
2139 unsafe fn start_call<T: 'static>(
2152 self,
2153 mut store: StoreContextMut<T>,
2154 callback: *mut VMFuncRef,
2155 post_return: *mut VMFuncRef,
2156 callee: *mut VMFuncRef,
2157 param_count: u32,
2158 result_count: u32,
2159 flags: u32,
2160 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2161 ) -> Result<u32> {
2162 let token = StoreToken::new(store.as_context_mut());
2163 let async_caller = storage.is_none();
2164 let state = store.0.concurrent_state_mut();
2165 let guest_task = state.guest_task.unwrap();
2166 let may_enter_after_call = state.get_mut(guest_task)?.call_post_return_automatically();
2167 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2168 let param_count = usize::try_from(param_count).unwrap();
2169 assert!(param_count <= MAX_FLAT_PARAMS);
2170 let result_count = usize::try_from(result_count).unwrap();
2171 assert!(result_count <= MAX_FLAT_RESULTS);
2172
2173 let task = state.get_mut(guest_task)?;
2174 if !callback.is_null() {
2175 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2179 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2180 let store = token.as_context_mut(store);
2181 unsafe {
2182 self.call_callback::<T>(
2183 store,
2184 runtime_instance,
2185 callback,
2186 event,
2187 handle,
2188 may_enter_after_call,
2189 )
2190 }
2191 }));
2192 }
2193
2194 let Caller::Guest {
2195 task: caller,
2196 instance: runtime_instance,
2197 } = &task.caller
2198 else {
2199 unreachable!()
2202 };
2203 let caller = *caller;
2204 let caller_instance = *runtime_instance;
2205
2206 let callee_instance = task.instance;
2207
2208 let instance_flags = if callback.is_null() {
2209 None
2210 } else {
2211 Some(self.id().get(store.0).instance_flags(callee_instance))
2212 };
2213
2214 unsafe {
2216 self.queue_call(
2217 store.as_context_mut(),
2218 guest_task,
2219 callee,
2220 param_count,
2221 result_count,
2222 instance_flags,
2223 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2224 NonNull::new(callback).map(SendSyncPtr::new),
2225 NonNull::new(post_return).map(SendSyncPtr::new),
2226 )?;
2227 }
2228
2229 let state = store.0.concurrent_state_mut();
2230
2231 let guest_waitable = Waitable::Guest(guest_task);
2234 let old_set = guest_waitable.common(state)?.set;
2235 let set = state.get_mut(caller)?.sync_call_set;
2236 guest_waitable.join(state, Some(set))?;
2237
2238 let (status, waitable) = loop {
2254 store
2255 .0
2256 .suspend(SuspendReason::Waiting { set, task: caller })?;
2257
2258 let state = store.0.concurrent_state_mut();
2259
2260 let event = guest_waitable.take_event(state)?;
2261 let Some(Event::Subtask { status }) = event else {
2262 unreachable!();
2263 };
2264
2265 log::trace!("status {status:?} for {guest_task:?}");
2266
2267 if status == Status::Returned {
2268 break (status, None);
2270 } else if async_caller {
2271 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2275 .subtask_insert_guest(guest_task.rep())?;
2276 store
2277 .0
2278 .concurrent_state_mut()
2279 .get_mut(guest_task)?
2280 .common
2281 .handle = Some(handle);
2282 break (status, Some(handle));
2283 } else {
2284 }
2288 };
2289
2290 let state = store.0.concurrent_state_mut();
2291
2292 guest_waitable.join(state, old_set)?;
2293
2294 if let Some(storage) = storage {
2295 let task = state.get_mut(guest_task)?;
2299 if let Some(result) = task.sync_result.take() {
2300 if let Some(result) = result {
2301 storage[0] = MaybeUninit::new(result);
2302 }
2303
2304 if task.exited {
2305 Waitable::Guest(guest_task).delete_from(state)?;
2306 }
2307 } else {
2308 return Err(anyhow!(crate::Trap::NoAsyncResult));
2311 }
2312 }
2313
2314 state.guest_task = Some(caller);
2316 log::trace!("popped current task {guest_task:?}; new task is {caller:?}");
2317
2318 Ok(status.pack(waitable))
2319 }
2320
2321 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2333 self,
2334 mut store: StoreContextMut<T>,
2335 future: impl Future<Output = Result<R>> + Send + 'static,
2336 caller_instance: RuntimeComponentInstanceIndex,
2337 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2338 ) -> Result<Option<u32>> {
2339 let token = StoreToken::new(store.as_context_mut());
2340 let state = store.0.concurrent_state_mut();
2341 let caller = state.guest_task.unwrap();
2342
2343 let (join_handle, future) = JoinHandle::run(async move {
2346 let mut future = pin!(future);
2347 let mut call_context = None;
2348 future::poll_fn(move |cx| {
2349 tls::get(|store| {
2352 if let Some(call_context) = call_context.take() {
2353 token
2354 .as_context_mut(store)
2355 .0
2356 .component_resource_state()
2357 .0
2358 .push(call_context);
2359 }
2360 });
2361
2362 let result = future.as_mut().poll(cx);
2363
2364 if result.is_pending() {
2365 tls::get(|store| {
2368 call_context = Some(
2369 token
2370 .as_context_mut(store)
2371 .0
2372 .component_resource_state()
2373 .0
2374 .pop()
2375 .unwrap(),
2376 );
2377 });
2378 }
2379 result
2380 })
2381 .await
2382 });
2383
2384 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2388
2389 log::trace!("new host task child of {caller:?}: {task:?}");
2390
2391 let mut future = Box::pin(future);
2392
2393 let poll = tls::set(store.0, || {
2398 future
2399 .as_mut()
2400 .poll(&mut Context::from_waker(&Waker::noop()))
2401 });
2402
2403 Ok(match poll {
2404 Poll::Ready(None) => unreachable!(),
2405 Poll::Ready(Some(result)) => {
2406 lower(store.as_context_mut(), result?)?;
2409 log::trace!("delete host task {task:?} (already ready)");
2410 store.0.concurrent_state_mut().delete(task)?;
2411 None
2412 }
2413 Poll::Pending => {
2414 let future =
2422 Box::pin(async move {
2423 let result = match future.await {
2424 Some(result) => result?,
2425 None => return Ok(()),
2427 };
2428 tls::get(move |store| {
2429 store.concurrent_state_mut().push_high_priority(
2435 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2436 lower(token.as_context_mut(store), result)?;
2437 let state = store.concurrent_state_mut();
2438 state.get_mut(task)?.join_handle.take();
2439 Waitable::Host(task).set_event(
2440 state,
2441 Some(Event::Subtask {
2442 status: Status::Returned,
2443 }),
2444 )
2445 }))),
2446 );
2447 Ok(())
2448 })
2449 });
2450
2451 store.0.concurrent_state_mut().push_future(future);
2452 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2453 .subtask_insert_host(task.rep())?;
2454 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2455 log::trace!(
2456 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2457 );
2458 Some(handle)
2459 }
2460 })
2461 }
2462
2463 pub(crate) fn task_return(
2466 self,
2467 store: &mut dyn VMStore,
2468 caller: RuntimeComponentInstanceIndex,
2469 ty: TypeTupleIndex,
2470 options: OptionsIndex,
2471 storage: &[ValRaw],
2472 ) -> Result<()> {
2473 self.id().get(store).check_may_leave(caller)?;
2474 let state = store.concurrent_state_mut();
2475 let guest_task = state.guest_task.unwrap();
2476 let lift = state
2477 .get_mut(guest_task)?
2478 .lift_result
2479 .take()
2480 .ok_or_else(|| {
2481 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2482 })?;
2483 assert!(state.get_mut(guest_task)?.result.is_none());
2484
2485 let CanonicalOptions {
2486 string_encoding,
2487 data_model,
2488 ..
2489 } = &self.id().get(store).component().env_component().options[options];
2490
2491 let invalid = ty != lift.ty
2492 || string_encoding != &lift.string_encoding
2493 || match data_model {
2494 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2495 Some(memory) => {
2496 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2497 let actual = self.id().get(store).runtime_memory(memory);
2498 expected != actual
2499 }
2500 None => false,
2503 },
2504 CanonicalOptionsDataModel::Gc { .. } => true,
2506 };
2507
2508 if invalid {
2509 bail!("invalid `task.return` signature and/or options for current task");
2510 }
2511
2512 log::trace!("task.return for {guest_task:?}");
2513
2514 let result = (lift.lift)(store, storage)?;
2515
2516 self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0))
2517 }
2518
2519 pub(crate) fn task_cancel(
2521 self,
2522 store: &mut StoreOpaque,
2523 caller: RuntimeComponentInstanceIndex,
2524 ) -> Result<()> {
2525 self.id().get(store).check_may_leave(caller)?;
2526 let state = store.concurrent_state_mut();
2527 let guest_task = state.guest_task.unwrap();
2528 let task = state.get_mut(guest_task)?;
2529 if !task.cancel_sent {
2530 bail!("`task.cancel` called by task which has not been cancelled")
2531 }
2532 _ = task.lift_result.take().ok_or_else(|| {
2533 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2534 })?;
2535
2536 assert!(task.result.is_none());
2537
2538 log::trace!("task.cancel for {guest_task:?}");
2539
2540 self.task_complete(
2541 store,
2542 guest_task,
2543 Box::new(DummyResult),
2544 Status::ReturnCancelled,
2545 ValRaw::i32(0),
2546 )
2547 }
2548
2549 fn task_complete(
2555 self,
2556 store: &mut StoreOpaque,
2557 guest_task: TableId<GuestTask>,
2558 result: Box<dyn Any + Send + Sync>,
2559 status: Status,
2560 post_return_arg: ValRaw,
2561 ) -> Result<()> {
2562 if store
2563 .concurrent_state_mut()
2564 .get_mut(guest_task)?
2565 .call_post_return_automatically()
2566 {
2567 let (calls, host_table, _, instance) =
2568 store.component_resource_state_with_instance(self);
2569 ResourceTables {
2570 calls,
2571 host_table: Some(host_table),
2572 guest: Some(instance.guest_tables()),
2573 }
2574 .exit_call()?;
2575 } else {
2576 let function_index = store
2581 .concurrent_state_mut()
2582 .get_mut(guest_task)?
2583 .function_index
2584 .unwrap();
2585
2586 self.id()
2587 .get_mut(store)
2588 .post_return_arg_set(function_index, post_return_arg);
2589 }
2590
2591 let state = store.concurrent_state_mut();
2592 let task = state.get_mut(guest_task)?;
2593
2594 if let Caller::Host { tx, .. } = &mut task.caller {
2595 if let Some(tx) = tx.take() {
2596 _ = tx.send(result);
2597 }
2598 } else {
2599 task.result = Some(result);
2600 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2601 }
2602
2603 Ok(())
2604 }
2605
2606 pub(crate) fn waitable_set_new(
2608 self,
2609 store: &mut StoreOpaque,
2610 caller_instance: RuntimeComponentInstanceIndex,
2611 ) -> Result<u32> {
2612 self.id().get_mut(store).check_may_leave(caller_instance)?;
2613 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2614 let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2615 .waitable_set_insert(set.rep())?;
2616 log::trace!("new waitable set {set:?} (handle {handle})");
2617 Ok(handle)
2618 }
2619
2620 pub(crate) fn waitable_set_drop(
2622 self,
2623 store: &mut StoreOpaque,
2624 caller_instance: RuntimeComponentInstanceIndex,
2625 set: u32,
2626 ) -> Result<()> {
2627 self.id().get_mut(store).check_may_leave(caller_instance)?;
2628 let rep =
2629 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2630
2631 log::trace!("drop waitable set {rep} (handle {set})");
2632
2633 let set = store
2634 .concurrent_state_mut()
2635 .delete(TableId::<WaitableSet>::new(rep))?;
2636
2637 if !set.waiting.is_empty() {
2638 bail!("cannot drop waitable set with waiters");
2639 }
2640
2641 Ok(())
2642 }
2643
2644 pub(crate) fn waitable_join(
2646 self,
2647 store: &mut StoreOpaque,
2648 caller_instance: RuntimeComponentInstanceIndex,
2649 waitable_handle: u32,
2650 set_handle: u32,
2651 ) -> Result<()> {
2652 let mut instance = self.id().get_mut(store);
2653 instance.check_may_leave(caller_instance)?;
2654 let waitable =
2655 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2656
2657 let set = if set_handle == 0 {
2658 None
2659 } else {
2660 let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2661
2662 Some(TableId::<WaitableSet>::new(set))
2663 };
2664
2665 log::trace!(
2666 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2667 );
2668
2669 waitable.join(store.concurrent_state_mut(), set)
2670 }
2671
2672 pub(crate) fn subtask_drop(
2674 self,
2675 store: &mut StoreOpaque,
2676 caller_instance: RuntimeComponentInstanceIndex,
2677 task_id: u32,
2678 ) -> Result<()> {
2679 self.id().get_mut(store).check_may_leave(caller_instance)?;
2680 self.waitable_join(store, caller_instance, task_id, 0)?;
2681
2682 let (rep, is_host) =
2683 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2684
2685 let concurrent_state = store.concurrent_state_mut();
2686 let (waitable, expected_caller_instance, delete) = if is_host {
2687 let id = TableId::<HostTask>::new(rep);
2688 let task = concurrent_state.get_mut(id)?;
2689 if task.join_handle.is_some() {
2690 bail!("cannot drop a subtask which has not yet resolved");
2691 }
2692 (Waitable::Host(id), task.caller_instance, true)
2693 } else {
2694 let id = TableId::<GuestTask>::new(rep);
2695 let task = concurrent_state.get_mut(id)?;
2696 if task.lift_result.is_some() {
2697 bail!("cannot drop a subtask which has not yet resolved");
2698 }
2699 if let Caller::Guest { instance, .. } = &task.caller {
2700 (Waitable::Guest(id), *instance, task.exited)
2701 } else {
2702 unreachable!()
2703 }
2704 };
2705
2706 waitable.common(concurrent_state)?.handle = None;
2707
2708 if waitable.take_event(concurrent_state)?.is_some() {
2709 bail!("cannot drop a subtask with an undelivered event");
2710 }
2711
2712 if delete {
2713 waitable.delete_from(concurrent_state)?;
2714 }
2715
2716 assert_eq!(expected_caller_instance, caller_instance);
2720 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2721 Ok(())
2722 }
2723
2724 pub(crate) fn waitable_set_wait(
2726 self,
2727 store: &mut StoreOpaque,
2728 caller: RuntimeComponentInstanceIndex,
2729 options: OptionsIndex,
2730 set: u32,
2731 payload: u32,
2732 ) -> Result<u32> {
2733 self.id().get(store).check_may_leave(caller)?;
2734 let &CanonicalOptions {
2735 cancellable,
2736 instance: caller_instance,
2737 ..
2738 } = &self.id().get(store).component().env_component().options[options];
2739 let rep =
2740 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2741
2742 self.waitable_check(
2743 store,
2744 cancellable,
2745 WaitableCheck::Wait(WaitableCheckParams {
2746 set: TableId::new(rep),
2747 options,
2748 payload,
2749 }),
2750 )
2751 }
2752
2753 pub(crate) fn waitable_set_poll(
2755 self,
2756 store: &mut StoreOpaque,
2757 caller: RuntimeComponentInstanceIndex,
2758 options: OptionsIndex,
2759 set: u32,
2760 payload: u32,
2761 ) -> Result<u32> {
2762 self.id().get(store).check_may_leave(caller)?;
2763 let &CanonicalOptions {
2764 cancellable,
2765 instance: caller_instance,
2766 ..
2767 } = &self.id().get(store).component().env_component().options[options];
2768 let rep =
2769 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2770
2771 self.waitable_check(
2772 store,
2773 cancellable,
2774 WaitableCheck::Poll(WaitableCheckParams {
2775 set: TableId::new(rep),
2776 options,
2777 payload,
2778 }),
2779 )
2780 }
2781
2782 pub(crate) fn thread_yield(
2784 self,
2785 store: &mut StoreOpaque,
2786 caller: RuntimeComponentInstanceIndex,
2787 cancellable: bool,
2788 ) -> Result<bool> {
2789 self.id().get(store).check_may_leave(caller)?;
2790 self.waitable_check(store, cancellable, WaitableCheck::Yield)
2791 .map(|_| {
2792 if cancellable {
2793 let state = store.concurrent_state_mut();
2794 let task = state.guest_task.unwrap();
2795 if let Some(event) = state.get_mut(task).unwrap().event.take() {
2796 assert!(matches!(event, Event::Cancelled));
2797 true
2798 } else {
2799 false
2800 }
2801 } else {
2802 false
2803 }
2804 })
2805 }
2806
2807 fn waitable_check(
2810 self,
2811 store: &mut StoreOpaque,
2812 cancellable: bool,
2813 check: WaitableCheck,
2814 ) -> Result<u32> {
2815 let guest_task = store.concurrent_state_mut().guest_task.unwrap();
2816
2817 let (wait, set) = match &check {
2818 WaitableCheck::Wait(params) => (true, Some(params.set)),
2819 WaitableCheck::Poll(params) => (false, Some(params.set)),
2820 WaitableCheck::Yield => (false, None),
2821 };
2822
2823 store.suspend(SuspendReason::Yielding { task: guest_task })?;
2825
2826 log::trace!("waitable check for {guest_task:?}; set {set:?}");
2827
2828 let state = store.concurrent_state_mut();
2829 let task = state.get_mut(guest_task)?;
2830
2831 if wait {
2834 let set = set.unwrap();
2835
2836 if (task.event.is_none()
2837 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
2838 && state.get_mut(set)?.ready.is_empty()
2839 {
2840 if cancellable {
2841 let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
2842 assert!(old.is_none());
2843 }
2844
2845 store.suspend(SuspendReason::Waiting {
2846 set,
2847 task: guest_task,
2848 })?;
2849 }
2850 }
2851
2852 log::trace!("waitable check for {guest_task:?}; set {set:?}, part two");
2853
2854 let result = match check {
2855 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
2857 let event = self.get_event(store, guest_task, Some(params.set), cancellable)?;
2858
2859 let (ordinal, handle, result) = if wait {
2860 let (event, waitable) = event.unwrap();
2861 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2862 let (ordinal, result) = event.parts();
2863 (ordinal, handle, result)
2864 } else {
2865 if let Some((event, waitable)) = event {
2866 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2867 let (ordinal, result) = event.parts();
2868 (ordinal, handle, result)
2869 } else {
2870 log::trace!(
2871 "no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}",
2872 params.set
2873 );
2874 let (ordinal, result) = Event::None.parts();
2875 (ordinal, 0, result)
2876 }
2877 };
2878 let options = Options::new_index(store, self, params.options);
2879 let ptr = func::validate_inbounds::<(u32, u32)>(
2880 options.memory_mut(store),
2881 &ValRaw::u32(params.payload),
2882 )?;
2883 options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
2884 options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
2885 Ok(ordinal)
2886 }
2887 WaitableCheck::Yield => Ok(0),
2888 };
2889
2890 result
2891 }
2892
2893 pub(crate) fn subtask_cancel(
2895 self,
2896 store: &mut StoreOpaque,
2897 caller_instance: RuntimeComponentInstanceIndex,
2898 async_: bool,
2899 task_id: u32,
2900 ) -> Result<u32> {
2901 self.id().get(store).check_may_leave(caller_instance)?;
2902 let (rep, is_host) =
2903 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
2904 let (waitable, expected_caller_instance) = if is_host {
2905 let id = TableId::<HostTask>::new(rep);
2906 (
2907 Waitable::Host(id),
2908 store.concurrent_state_mut().get_mut(id)?.caller_instance,
2909 )
2910 } else {
2911 let id = TableId::<GuestTask>::new(rep);
2912 if let Caller::Guest { instance, .. } =
2913 &store.concurrent_state_mut().get_mut(id)?.caller
2914 {
2915 (Waitable::Guest(id), *instance)
2916 } else {
2917 unreachable!()
2918 }
2919 };
2920 assert_eq!(expected_caller_instance, caller_instance);
2924
2925 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
2926
2927 let concurrent_state = store.concurrent_state_mut();
2928 if let Waitable::Host(host_task) = waitable {
2929 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
2930 handle.abort();
2931 return Ok(Status::ReturnCancelled as u32);
2932 }
2933 } else {
2934 let caller = concurrent_state.guest_task.unwrap();
2935 let guest_task = TableId::<GuestTask>::new(rep);
2936 let task = concurrent_state.get_mut(guest_task)?;
2937 if task.lower_params.is_some() {
2938 task.lower_params = None;
2939 task.lift_result = None;
2940
2941 let callee_instance = task.instance;
2943
2944 let kind = concurrent_state
2945 .instance_state(callee_instance)
2946 .pending
2947 .remove(&guest_task);
2948
2949 if kind.is_none() {
2950 bail!("`subtask.cancel` called after terminal status delivered");
2951 }
2952
2953 return Ok(Status::StartCancelled as u32);
2954 } else if task.lift_result.is_some() {
2955 task.cancel_sent = true;
2958 task.event = Some(Event::Cancelled);
2963 if let Some(set) = task.wake_on_cancel.take() {
2964 let item = match concurrent_state
2965 .get_mut(set)?
2966 .waiting
2967 .remove(&guest_task)
2968 .unwrap()
2969 {
2970 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
2971 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
2972 task: guest_task,
2973 kind: GuestCallKind::DeliverEvent {
2974 instance,
2975 set: None,
2976 },
2977 }),
2978 };
2979 concurrent_state.push_high_priority(item);
2980
2981 store.suspend(SuspendReason::Yielding { task: caller })?;
2982 }
2983
2984 let concurrent_state = store.concurrent_state_mut();
2985 let task = concurrent_state.get_mut(guest_task)?;
2986 if task.lift_result.is_some() {
2987 if async_ {
2990 return Ok(BLOCKED);
2991 } else {
2992 store.wait_for_event(Waitable::Guest(guest_task))?;
2993 }
2994 }
2995 }
2996 }
2997
2998 let event = waitable.take_event(store.concurrent_state_mut())?;
2999 if let Some(Event::Subtask {
3000 status: status @ (Status::Returned | Status::ReturnCancelled),
3001 }) = event
3002 {
3003 Ok(status as u32)
3004 } else {
3005 bail!("`subtask.cancel` called after terminal status delivered");
3006 }
3007 }
3008
3009 pub(crate) fn context_get(
3010 self,
3011 store: &mut StoreOpaque,
3012 caller: RuntimeComponentInstanceIndex,
3013 slot: u32,
3014 ) -> Result<u32> {
3015 self.id().get(store).check_may_leave(caller)?;
3016 store.concurrent_state_mut().context_get(slot)
3017 }
3018
3019 pub(crate) fn context_set(
3020 self,
3021 store: &mut StoreOpaque,
3022 caller: RuntimeComponentInstanceIndex,
3023 slot: u32,
3024 value: u32,
3025 ) -> Result<()> {
3026 self.id().get(store).check_may_leave(caller)?;
3027 store.concurrent_state_mut().context_set(slot, value)
3028 }
3029}
3030
3031pub trait VMComponentAsyncStore {
3039 unsafe fn prepare_call(
3045 &mut self,
3046 instance: Instance,
3047 memory: *mut VMMemoryDefinition,
3048 start: *mut VMFuncRef,
3049 return_: *mut VMFuncRef,
3050 caller_instance: RuntimeComponentInstanceIndex,
3051 callee_instance: RuntimeComponentInstanceIndex,
3052 task_return_type: TypeTupleIndex,
3053 string_encoding: u8,
3054 result_count: u32,
3055 storage: *mut ValRaw,
3056 storage_len: usize,
3057 ) -> Result<()>;
3058
3059 unsafe fn sync_start(
3062 &mut self,
3063 instance: Instance,
3064 callback: *mut VMFuncRef,
3065 callee: *mut VMFuncRef,
3066 param_count: u32,
3067 storage: *mut MaybeUninit<ValRaw>,
3068 storage_len: usize,
3069 ) -> Result<()>;
3070
3071 unsafe fn async_start(
3074 &mut self,
3075 instance: Instance,
3076 callback: *mut VMFuncRef,
3077 post_return: *mut VMFuncRef,
3078 callee: *mut VMFuncRef,
3079 param_count: u32,
3080 result_count: u32,
3081 flags: u32,
3082 ) -> Result<u32>;
3083
3084 fn future_write(
3086 &mut self,
3087 instance: Instance,
3088 caller: RuntimeComponentInstanceIndex,
3089 ty: TypeFutureTableIndex,
3090 options: OptionsIndex,
3091 future: u32,
3092 address: u32,
3093 ) -> Result<u32>;
3094
3095 fn future_read(
3097 &mut self,
3098 instance: Instance,
3099 caller: RuntimeComponentInstanceIndex,
3100 ty: TypeFutureTableIndex,
3101 options: OptionsIndex,
3102 future: u32,
3103 address: u32,
3104 ) -> Result<u32>;
3105
3106 fn future_drop_writable(
3108 &mut self,
3109 instance: Instance,
3110 caller: RuntimeComponentInstanceIndex,
3111 ty: TypeFutureTableIndex,
3112 writer: u32,
3113 ) -> Result<()>;
3114
3115 fn stream_write(
3117 &mut self,
3118 instance: Instance,
3119 caller: RuntimeComponentInstanceIndex,
3120 ty: TypeStreamTableIndex,
3121 options: OptionsIndex,
3122 stream: u32,
3123 address: u32,
3124 count: u32,
3125 ) -> Result<u32>;
3126
3127 fn stream_read(
3129 &mut self,
3130 instance: Instance,
3131 caller: RuntimeComponentInstanceIndex,
3132 ty: TypeStreamTableIndex,
3133 options: OptionsIndex,
3134 stream: u32,
3135 address: u32,
3136 count: u32,
3137 ) -> Result<u32>;
3138
3139 fn flat_stream_write(
3142 &mut self,
3143 instance: Instance,
3144 caller: RuntimeComponentInstanceIndex,
3145 ty: TypeStreamTableIndex,
3146 options: OptionsIndex,
3147 payload_size: u32,
3148 payload_align: u32,
3149 stream: u32,
3150 address: u32,
3151 count: u32,
3152 ) -> Result<u32>;
3153
3154 fn flat_stream_read(
3157 &mut self,
3158 instance: Instance,
3159 caller: RuntimeComponentInstanceIndex,
3160 ty: TypeStreamTableIndex,
3161 options: OptionsIndex,
3162 payload_size: u32,
3163 payload_align: u32,
3164 stream: u32,
3165 address: u32,
3166 count: u32,
3167 ) -> Result<u32>;
3168
3169 fn stream_drop_writable(
3171 &mut self,
3172 instance: Instance,
3173 caller: RuntimeComponentInstanceIndex,
3174 ty: TypeStreamTableIndex,
3175 writer: u32,
3176 ) -> Result<()>;
3177
3178 fn error_context_debug_message(
3180 &mut self,
3181 instance: Instance,
3182 caller: RuntimeComponentInstanceIndex,
3183 ty: TypeComponentLocalErrorContextTableIndex,
3184 options: OptionsIndex,
3185 err_ctx_handle: u32,
3186 debug_msg_address: u32,
3187 ) -> Result<()>;
3188}
3189
3190impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3192 unsafe fn prepare_call(
3193 &mut self,
3194 instance: Instance,
3195 memory: *mut VMMemoryDefinition,
3196 start: *mut VMFuncRef,
3197 return_: *mut VMFuncRef,
3198 caller_instance: RuntimeComponentInstanceIndex,
3199 callee_instance: RuntimeComponentInstanceIndex,
3200 task_return_type: TypeTupleIndex,
3201 string_encoding: u8,
3202 result_count_or_max_if_async: u32,
3203 storage: *mut ValRaw,
3204 storage_len: usize,
3205 ) -> Result<()> {
3206 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3210
3211 unsafe {
3212 instance.prepare_call(
3213 StoreContextMut(self),
3214 start,
3215 return_,
3216 caller_instance,
3217 callee_instance,
3218 task_return_type,
3219 memory,
3220 string_encoding,
3221 match result_count_or_max_if_async {
3222 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3223 params,
3224 has_result: false,
3225 },
3226 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3227 params,
3228 has_result: true,
3229 },
3230 result_count => CallerInfo::Sync {
3231 params,
3232 result_count,
3233 },
3234 },
3235 )
3236 }
3237 }
3238
3239 unsafe fn sync_start(
3240 &mut self,
3241 instance: Instance,
3242 callback: *mut VMFuncRef,
3243 callee: *mut VMFuncRef,
3244 param_count: u32,
3245 storage: *mut MaybeUninit<ValRaw>,
3246 storage_len: usize,
3247 ) -> Result<()> {
3248 unsafe {
3249 instance
3250 .start_call(
3251 StoreContextMut(self),
3252 callback,
3253 ptr::null_mut(),
3254 callee,
3255 param_count,
3256 1,
3257 START_FLAG_ASYNC_CALLEE,
3258 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3262 )
3263 .map(drop)
3264 }
3265 }
3266
3267 unsafe fn async_start(
3268 &mut self,
3269 instance: Instance,
3270 callback: *mut VMFuncRef,
3271 post_return: *mut VMFuncRef,
3272 callee: *mut VMFuncRef,
3273 param_count: u32,
3274 result_count: u32,
3275 flags: u32,
3276 ) -> Result<u32> {
3277 unsafe {
3278 instance.start_call(
3279 StoreContextMut(self),
3280 callback,
3281 post_return,
3282 callee,
3283 param_count,
3284 result_count,
3285 flags,
3286 None,
3287 )
3288 }
3289 }
3290
3291 fn future_write(
3292 &mut self,
3293 instance: Instance,
3294 caller: RuntimeComponentInstanceIndex,
3295 ty: TypeFutureTableIndex,
3296 options: OptionsIndex,
3297 future: u32,
3298 address: u32,
3299 ) -> Result<u32> {
3300 instance.id().get(self).check_may_leave(caller)?;
3301 instance
3302 .guest_write(
3303 StoreContextMut(self),
3304 TransmitIndex::Future(ty),
3305 options,
3306 None,
3307 future,
3308 address,
3309 1,
3310 )
3311 .map(|result| result.encode())
3312 }
3313
3314 fn future_read(
3315 &mut self,
3316 instance: Instance,
3317 caller: RuntimeComponentInstanceIndex,
3318 ty: TypeFutureTableIndex,
3319 options: OptionsIndex,
3320 future: u32,
3321 address: u32,
3322 ) -> Result<u32> {
3323 instance.id().get(self).check_may_leave(caller)?;
3324 instance
3325 .guest_read(
3326 StoreContextMut(self),
3327 TransmitIndex::Future(ty),
3328 options,
3329 None,
3330 future,
3331 address,
3332 1,
3333 )
3334 .map(|result| result.encode())
3335 }
3336
3337 fn stream_write(
3338 &mut self,
3339 instance: Instance,
3340 caller: RuntimeComponentInstanceIndex,
3341 ty: TypeStreamTableIndex,
3342 options: OptionsIndex,
3343 stream: u32,
3344 address: u32,
3345 count: u32,
3346 ) -> Result<u32> {
3347 instance.id().get(self).check_may_leave(caller)?;
3348 instance
3349 .guest_write(
3350 StoreContextMut(self),
3351 TransmitIndex::Stream(ty),
3352 options,
3353 None,
3354 stream,
3355 address,
3356 count,
3357 )
3358 .map(|result| result.encode())
3359 }
3360
3361 fn stream_read(
3362 &mut self,
3363 instance: Instance,
3364 caller: RuntimeComponentInstanceIndex,
3365 ty: TypeStreamTableIndex,
3366 options: OptionsIndex,
3367 stream: u32,
3368 address: u32,
3369 count: u32,
3370 ) -> Result<u32> {
3371 instance.id().get(self).check_may_leave(caller)?;
3372 instance
3373 .guest_read(
3374 StoreContextMut(self),
3375 TransmitIndex::Stream(ty),
3376 options,
3377 None,
3378 stream,
3379 address,
3380 count,
3381 )
3382 .map(|result| result.encode())
3383 }
3384
3385 fn future_drop_writable(
3386 &mut self,
3387 instance: Instance,
3388 caller: RuntimeComponentInstanceIndex,
3389 ty: TypeFutureTableIndex,
3390 writer: u32,
3391 ) -> Result<()> {
3392 instance.id().get(self).check_may_leave(caller)?;
3393 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3394 }
3395
3396 fn flat_stream_write(
3397 &mut self,
3398 instance: Instance,
3399 caller: RuntimeComponentInstanceIndex,
3400 ty: TypeStreamTableIndex,
3401 options: OptionsIndex,
3402 payload_size: u32,
3403 payload_align: u32,
3404 stream: u32,
3405 address: u32,
3406 count: u32,
3407 ) -> Result<u32> {
3408 instance.id().get(self).check_may_leave(caller)?;
3409 instance
3410 .guest_write(
3411 StoreContextMut(self),
3412 TransmitIndex::Stream(ty),
3413 options,
3414 Some(FlatAbi {
3415 size: payload_size,
3416 align: payload_align,
3417 }),
3418 stream,
3419 address,
3420 count,
3421 )
3422 .map(|result| result.encode())
3423 }
3424
3425 fn flat_stream_read(
3426 &mut self,
3427 instance: Instance,
3428 caller: RuntimeComponentInstanceIndex,
3429 ty: TypeStreamTableIndex,
3430 options: OptionsIndex,
3431 payload_size: u32,
3432 payload_align: u32,
3433 stream: u32,
3434 address: u32,
3435 count: u32,
3436 ) -> Result<u32> {
3437 instance.id().get(self).check_may_leave(caller)?;
3438 instance
3439 .guest_read(
3440 StoreContextMut(self),
3441 TransmitIndex::Stream(ty),
3442 options,
3443 Some(FlatAbi {
3444 size: payload_size,
3445 align: payload_align,
3446 }),
3447 stream,
3448 address,
3449 count,
3450 )
3451 .map(|result| result.encode())
3452 }
3453
3454 fn stream_drop_writable(
3455 &mut self,
3456 instance: Instance,
3457 caller: RuntimeComponentInstanceIndex,
3458 ty: TypeStreamTableIndex,
3459 writer: u32,
3460 ) -> Result<()> {
3461 instance.id().get(self).check_may_leave(caller)?;
3462 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3463 }
3464
3465 fn error_context_debug_message(
3466 &mut self,
3467 instance: Instance,
3468 caller: RuntimeComponentInstanceIndex,
3469 ty: TypeComponentLocalErrorContextTableIndex,
3470 options: OptionsIndex,
3471 err_ctx_handle: u32,
3472 debug_msg_address: u32,
3473 ) -> Result<()> {
3474 instance.id().get(self).check_may_leave(caller)?;
3475 instance.error_context_debug_message(
3476 StoreContextMut(self),
3477 ty,
3478 options,
3479 err_ctx_handle,
3480 debug_msg_address,
3481 )
3482 }
3483}
3484
3485type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3486
3487struct HostTask {
3489 common: WaitableCommon,
3490 caller_instance: RuntimeComponentInstanceIndex,
3491 join_handle: Option<JoinHandle>,
3492}
3493
3494impl HostTask {
3495 fn new(
3496 caller_instance: RuntimeComponentInstanceIndex,
3497 join_handle: Option<JoinHandle>,
3498 ) -> Self {
3499 Self {
3500 common: WaitableCommon::default(),
3501 caller_instance,
3502 join_handle,
3503 }
3504 }
3505}
3506
3507impl TableDebug for HostTask {
3508 fn type_name() -> &'static str {
3509 "HostTask"
3510 }
3511}
3512
3513type CallbackFn = Box<
3514 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3515 + Send
3516 + Sync
3517 + 'static,
3518>;
3519
3520enum Caller {
3522 Host {
3524 tx: Option<oneshot::Sender<LiftedResult>>,
3526 exit_tx: Arc<oneshot::Sender<()>>,
3533 remove_task_automatically: bool,
3536 call_post_return_automatically: bool,
3538 },
3539 Guest {
3541 task: TableId<GuestTask>,
3543 instance: RuntimeComponentInstanceIndex,
3549 },
3550}
3551
3552struct LiftResult {
3555 lift: RawLift,
3556 ty: TypeTupleIndex,
3557 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3558 string_encoding: StringEncoding,
3559}
3560
3561struct GuestTask {
3563 common: WaitableCommon,
3565 lower_params: Option<RawLower>,
3567 lift_result: Option<LiftResult>,
3569 result: Option<LiftedResult>,
3572 callback: Option<CallbackFn>,
3575 caller: Caller,
3577 call_context: Option<CallContext>,
3580 sync_result: Option<Option<ValRaw>>,
3583 cancel_sent: bool,
3586 starting_sent: bool,
3589 context: [u32; 2],
3592 subtasks: HashSet<TableId<GuestTask>>,
3597 sync_call_set: TableId<WaitableSet>,
3599 instance: RuntimeComponentInstanceIndex,
3605 event: Option<Event>,
3608 wake_on_cancel: Option<TableId<WaitableSet>>,
3611 function_index: Option<ExportIndex>,
3613 exited: bool,
3615}
3616
3617impl GuestTask {
3618 fn new(
3619 state: &mut ConcurrentState,
3620 lower_params: RawLower,
3621 lift_result: LiftResult,
3622 caller: Caller,
3623 callback: Option<CallbackFn>,
3624 component_instance: RuntimeComponentInstanceIndex,
3625 ) -> Result<Self> {
3626 let sync_call_set = state.push(WaitableSet::default())?;
3627
3628 Ok(Self {
3629 common: WaitableCommon::default(),
3630 lower_params: Some(lower_params),
3631 lift_result: Some(lift_result),
3632 result: None,
3633 callback,
3634 caller,
3635 call_context: Some(CallContext::default()),
3636 sync_result: None,
3637 cancel_sent: false,
3638 starting_sent: false,
3639 context: [0u32; 2],
3640 subtasks: HashSet::new(),
3641 sync_call_set,
3642 instance: component_instance,
3643 event: None,
3644 wake_on_cancel: None,
3645 function_index: None,
3646 exited: false,
3647 })
3648 }
3649
3650 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
3653 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
3656 if let Some(Event::Subtask {
3657 status: Status::Returned | Status::ReturnCancelled,
3658 }) = waitable.common(state)?.event
3659 {
3660 waitable.delete_from(state)?;
3661 }
3662 }
3663
3664 state.delete(self.sync_call_set)?;
3665
3666 match &self.caller {
3668 Caller::Guest {
3669 task,
3670 instance: runtime_instance,
3671 } => {
3672 let task_mut = state.get_mut(*task)?;
3673 let present = task_mut.subtasks.remove(&me);
3674 assert!(present);
3675
3676 for subtask in &self.subtasks {
3677 task_mut.subtasks.insert(*subtask);
3678 }
3679
3680 for subtask in &self.subtasks {
3681 state.get_mut(*subtask)?.caller = Caller::Guest {
3682 task: *task,
3683 instance: *runtime_instance,
3684 };
3685 }
3686 }
3687 Caller::Host { exit_tx, .. } => {
3688 for subtask in &self.subtasks {
3689 state.get_mut(*subtask)?.caller = Caller::Host {
3690 tx: None,
3691 exit_tx: exit_tx.clone(),
3695 remove_task_automatically: true,
3696 call_post_return_automatically: true,
3697 };
3698 }
3699 }
3700 }
3701
3702 for subtask in self.subtasks {
3703 if state.get_mut(subtask)?.exited {
3704 Waitable::Guest(subtask).delete_from(state)?;
3705 }
3706 }
3707
3708 Ok(())
3709 }
3710
3711 fn call_post_return_automatically(&self) -> bool {
3712 matches!(
3713 self.caller,
3714 Caller::Guest { .. }
3715 | Caller::Host {
3716 call_post_return_automatically: true,
3717 ..
3718 }
3719 )
3720 }
3721}
3722
3723impl TableDebug for GuestTask {
3724 fn type_name() -> &'static str {
3725 "GuestTask"
3726 }
3727}
3728
3729#[derive(Default)]
3731struct WaitableCommon {
3732 event: Option<Event>,
3734 set: Option<TableId<WaitableSet>>,
3736 handle: Option<u32>,
3738}
3739
3740#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3742enum Waitable {
3743 Host(TableId<HostTask>),
3745 Guest(TableId<GuestTask>),
3747 Transmit(TableId<TransmitHandle>),
3749}
3750
3751impl Waitable {
3752 fn from_instance(
3755 state: Pin<&mut ComponentInstance>,
3756 caller_instance: RuntimeComponentInstanceIndex,
3757 waitable: u32,
3758 ) -> Result<Self> {
3759 use crate::runtime::vm::component::Waitable;
3760
3761 let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
3762
3763 Ok(match kind {
3764 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
3765 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
3766 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
3767 })
3768 }
3769
3770 fn rep(&self) -> u32 {
3772 match self {
3773 Self::Host(id) => id.rep(),
3774 Self::Guest(id) => id.rep(),
3775 Self::Transmit(id) => id.rep(),
3776 }
3777 }
3778
3779 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
3783 log::trace!("waitable {self:?} join set {set:?}",);
3784
3785 let old = mem::replace(&mut self.common(state)?.set, set);
3786
3787 if let Some(old) = old {
3788 match *self {
3789 Waitable::Host(id) => state.remove_child(id, old),
3790 Waitable::Guest(id) => state.remove_child(id, old),
3791 Waitable::Transmit(id) => state.remove_child(id, old),
3792 }?;
3793
3794 state.get_mut(old)?.ready.remove(self);
3795 }
3796
3797 if let Some(set) = set {
3798 match *self {
3799 Waitable::Host(id) => state.add_child(id, set),
3800 Waitable::Guest(id) => state.add_child(id, set),
3801 Waitable::Transmit(id) => state.add_child(id, set),
3802 }?;
3803
3804 if self.common(state)?.event.is_some() {
3805 self.mark_ready(state)?;
3806 }
3807 }
3808
3809 Ok(())
3810 }
3811
3812 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
3814 Ok(match self {
3815 Self::Host(id) => &mut state.get_mut(*id)?.common,
3816 Self::Guest(id) => &mut state.get_mut(*id)?.common,
3817 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
3818 })
3819 }
3820
3821 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
3825 log::trace!("set event for {self:?}: {event:?}");
3826 self.common(state)?.event = event;
3827 self.mark_ready(state)
3828 }
3829
3830 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
3832 let common = self.common(state)?;
3833 let event = common.event.take();
3834 if let Some(set) = self.common(state)?.set {
3835 state.get_mut(set)?.ready.remove(self);
3836 }
3837 Ok(event)
3838 }
3839
3840 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
3844 if let Some(set) = self.common(state)?.set {
3845 state.get_mut(set)?.ready.insert(*self);
3846 if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() {
3847 let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take();
3848 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
3849
3850 let item = match mode {
3851 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3852 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3853 task,
3854 kind: GuestCallKind::DeliverEvent {
3855 instance,
3856 set: Some(set),
3857 },
3858 }),
3859 };
3860 state.push_high_priority(item);
3861 }
3862 }
3863 Ok(())
3864 }
3865
3866 fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
3869 match event {
3870 Event::FutureRead {
3871 pending: Some((ty, handle)),
3872 ..
3873 }
3874 | Event::FutureWrite {
3875 pending: Some((ty, handle)),
3876 ..
3877 } => {
3878 let runtime_instance = instance.component().types()[ty].instance;
3879 let (rep, state) = instance.guest_tables().0[runtime_instance]
3880 .future_rep(ty, handle)
3881 .unwrap();
3882 assert_eq!(rep, self.rep());
3883 assert_eq!(*state, TransmitLocalState::Busy);
3884 *state = match event {
3885 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
3886 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
3887 _ => unreachable!(),
3888 };
3889 }
3890 Event::StreamRead {
3891 pending: Some((ty, handle)),
3892 code,
3893 }
3894 | Event::StreamWrite {
3895 pending: Some((ty, handle)),
3896 code,
3897 } => {
3898 let runtime_instance = instance.component().types()[ty].instance;
3899 let (rep, state) = instance.guest_tables().0[runtime_instance]
3900 .stream_rep(ty, handle)
3901 .unwrap();
3902 assert_eq!(rep, self.rep());
3903 assert_eq!(*state, TransmitLocalState::Busy);
3904 let done = matches!(code, ReturnCode::Dropped(_));
3905 *state = match event {
3906 Event::StreamRead { .. } => TransmitLocalState::Read { done },
3907 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
3908 _ => unreachable!(),
3909 };
3910 }
3911 _ => {}
3912 }
3913 }
3914
3915 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
3917 match self {
3918 Self::Host(task) => {
3919 log::trace!("delete host task {task:?}");
3920 state.delete(*task)?;
3921 }
3922 Self::Guest(task) => {
3923 log::trace!("delete guest task {task:?}");
3924 state.delete(*task)?.dispose(state, *task)?;
3925 }
3926 Self::Transmit(task) => {
3927 state.delete(*task)?;
3928 }
3929 }
3930
3931 Ok(())
3932 }
3933}
3934
3935impl fmt::Debug for Waitable {
3936 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3937 match self {
3938 Self::Host(id) => write!(f, "{id:?}"),
3939 Self::Guest(id) => write!(f, "{id:?}"),
3940 Self::Transmit(id) => write!(f, "{id:?}"),
3941 }
3942 }
3943}
3944
3945#[derive(Default)]
3947struct WaitableSet {
3948 ready: BTreeSet<Waitable>,
3950 waiting: BTreeMap<TableId<GuestTask>, WaitMode>,
3952}
3953
3954impl TableDebug for WaitableSet {
3955 fn type_name() -> &'static str {
3956 "WaitableSet"
3957 }
3958}
3959
3960type RawLower =
3962 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
3963
3964type RawLift = Box<
3966 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
3967>;
3968
3969type LiftedResult = Box<dyn Any + Send + Sync>;
3973
3974struct DummyResult;
3977
3978#[derive(Default)]
3980struct InstanceState {
3981 backpressure: u16,
3983 do_not_enter: bool,
3985 pending: BTreeMap<TableId<GuestTask>, GuestCallKind>,
3988}
3989
3990pub struct ConcurrentState {
3992 guest_task: Option<TableId<GuestTask>>,
3994 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
3999 table: AlwaysMut<ResourceTable>,
4001 instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4007 high_priority: Vec<WorkItem>,
4009 low_priority: Vec<WorkItem>,
4011 suspend_reason: Option<SuspendReason>,
4015 worker: Option<StoreFiber<'static>>,
4019 worker_item: Option<WorkerItem>,
4021
4022 global_error_context_ref_counts:
4035 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4036}
4037
4038impl Default for ConcurrentState {
4039 fn default() -> Self {
4040 Self {
4041 guest_task: None,
4042 table: AlwaysMut::new(ResourceTable::new()),
4043 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4044 instance_states: HashMap::new(),
4045 high_priority: Vec::new(),
4046 low_priority: Vec::new(),
4047 suspend_reason: None,
4048 worker: None,
4049 worker_item: None,
4050 global_error_context_ref_counts: BTreeMap::new(),
4051 }
4052 }
4053}
4054
4055impl ConcurrentState {
4056 pub(crate) fn take_fibers_and_futures(
4073 &mut self,
4074 fibers: &mut Vec<StoreFiber<'static>>,
4075 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4076 ) {
4077 for entry in self.table.get_mut().iter_mut() {
4078 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4079 for mode in mem::take(&mut set.waiting).into_values() {
4080 if let WaitMode::Fiber(fiber) = mode {
4081 fibers.push(fiber);
4082 }
4083 }
4084 }
4085 }
4086
4087 if let Some(fiber) = self.worker.take() {
4088 fibers.push(fiber);
4089 }
4090
4091 let mut take_items = |list| {
4092 for item in mem::take(list) {
4093 match item {
4094 WorkItem::ResumeFiber(fiber) => {
4095 fibers.push(fiber);
4096 }
4097 WorkItem::PushFuture(future) => {
4098 self.futures
4099 .get_mut()
4100 .as_mut()
4101 .unwrap()
4102 .push(future.into_inner());
4103 }
4104 _ => {}
4105 }
4106 }
4107 };
4108
4109 take_items(&mut self.high_priority);
4110 take_items(&mut self.low_priority);
4111
4112 if let Some(them) = self.futures.get_mut().take() {
4113 futures.push(them);
4114 }
4115 }
4116
4117 fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4118 self.instance_states.entry(instance).or_default()
4119 }
4120
4121 fn push<V: Send + Sync + 'static>(
4122 &mut self,
4123 value: V,
4124 ) -> Result<TableId<V>, ResourceTableError> {
4125 self.table.get_mut().push(value).map(TableId::from)
4126 }
4127
4128 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4129 self.table.get_mut().get_mut(&Resource::from(id))
4130 }
4131
4132 pub fn add_child<T: 'static, U: 'static>(
4133 &mut self,
4134 child: TableId<T>,
4135 parent: TableId<U>,
4136 ) -> Result<(), ResourceTableError> {
4137 self.table
4138 .get_mut()
4139 .add_child(Resource::from(child), Resource::from(parent))
4140 }
4141
4142 pub fn remove_child<T: 'static, U: 'static>(
4143 &mut self,
4144 child: TableId<T>,
4145 parent: TableId<U>,
4146 ) -> Result<(), ResourceTableError> {
4147 self.table
4148 .get_mut()
4149 .remove_child(Resource::from(child), Resource::from(parent))
4150 }
4151
4152 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4153 self.table.get_mut().delete(Resource::from(id))
4154 }
4155
4156 fn push_future(&mut self, future: HostTaskFuture) {
4157 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4164 }
4165
4166 fn push_high_priority(&mut self, item: WorkItem) {
4167 log::trace!("push high priority: {item:?}");
4168 self.high_priority.push(item);
4169 }
4170
4171 fn push_low_priority(&mut self, item: WorkItem) {
4172 log::trace!("push low priority: {item:?}");
4173 self.low_priority.push(item);
4174 }
4175
4176 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4186 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4187
4188 loop {
4196 match &self.get_mut(guest_task).unwrap().caller {
4197 Caller::Host { .. } => break true,
4198 Caller::Guest { task, instance } => {
4199 if *instance == guest_instance {
4200 break false;
4201 } else {
4202 guest_task = *task;
4203 }
4204 }
4205 }
4206 }
4207 }
4208
4209 fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4213 self.instance_state(instance).do_not_enter = true;
4214 }
4215
4216 fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4220 self.instance_state(instance).do_not_enter = false;
4221 self.partition_pending(instance)
4222 }
4223
4224 fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4229 for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4230 let call = GuestCall { task, kind };
4231 if call.is_ready(self)? {
4232 self.push_high_priority(WorkItem::GuestCall(call));
4233 } else {
4234 self.instance_state(instance)
4235 .pending
4236 .insert(call.task, call.kind);
4237 }
4238 }
4239
4240 Ok(())
4241 }
4242
4243 pub(crate) fn backpressure_modify(
4245 &mut self,
4246 caller_instance: RuntimeComponentInstanceIndex,
4247 modify: impl FnOnce(u16) -> Option<u16>,
4248 ) -> Result<()> {
4249 let state = self.instance_state(caller_instance);
4250 let old = state.backpressure;
4251 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4252 state.backpressure = new;
4253
4254 if old > 0 && new == 0 {
4255 self.partition_pending(caller_instance)?;
4258 }
4259
4260 Ok(())
4261 }
4262
4263 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4265 let task = self.guest_task.unwrap();
4266 let val = self.get_mut(task)?.context[usize::try_from(slot).unwrap()];
4267 log::trace!("context_get {task:?} slot {slot} val {val:#x}");
4268 Ok(val)
4269 }
4270
4271 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4273 let task = self.guest_task.unwrap();
4274 log::trace!("context_set {task:?} slot {slot} val {val:#x}");
4275 self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val;
4276 Ok(())
4277 }
4278}
4279
4280fn for_any_lower<
4283 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4284>(
4285 fun: F,
4286) -> F {
4287 fun
4288}
4289
4290fn for_any_lift<
4292 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4293>(
4294 fun: F,
4295) -> F {
4296 fun
4297}
4298
4299fn checked<F: Future + Send + 'static>(
4304 id: StoreId,
4305 fut: F,
4306) -> impl Future<Output = F::Output> + Send + 'static {
4307 async move {
4308 let mut fut = pin!(fut);
4309 future::poll_fn(move |cx| {
4310 let message = "\
4311 `Future`s which depend on asynchronous component tasks, streams, or \
4312 futures to complete may only be polled from the event loop of the \
4313 store to which they belong. Please use \
4314 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4315 ";
4316 tls::try_get(|store| {
4317 let matched = match store {
4318 tls::TryGet::Some(store) => store.id() == id,
4319 tls::TryGet::Taken | tls::TryGet::None => false,
4320 };
4321
4322 if !matched {
4323 panic!("{message}")
4324 }
4325 });
4326 fut.as_mut().poll(cx)
4327 })
4328 .await
4329 }
4330}
4331
4332fn check_recursive_run() {
4335 tls::try_get(|store| {
4336 if !matches!(store, tls::TryGet::None) {
4337 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4338 }
4339 });
4340}
4341
4342fn unpack_callback_code(code: u32) -> (u32, u32) {
4343 (code & 0xF, code >> 4)
4344}
4345
4346struct WaitableCheckParams {
4350 set: TableId<WaitableSet>,
4351 options: OptionsIndex,
4352 payload: u32,
4353}
4354
4355enum WaitableCheck {
4357 Wait(WaitableCheckParams),
4358 Poll(WaitableCheckParams),
4359 Yield,
4360}
4361
4362pub(crate) struct PreparedCall<R> {
4364 handle: Func,
4366 task: TableId<GuestTask>,
4368 param_count: usize,
4370 rx: oneshot::Receiver<LiftedResult>,
4373 exit_rx: oneshot::Receiver<()>,
4376 _phantom: PhantomData<R>,
4377}
4378
4379impl<R> PreparedCall<R> {
4380 pub(crate) fn task_id(&self) -> TaskId {
4382 TaskId { task: self.task }
4383 }
4384}
4385
4386pub(crate) struct TaskId {
4388 task: TableId<GuestTask>,
4389}
4390
4391impl TaskId {
4392 pub(crate) fn remove<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4402 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())
4403 }
4404}
4405
4406pub(crate) fn prepare_call<T, R>(
4412 mut store: StoreContextMut<T>,
4413 handle: Func,
4414 param_count: usize,
4415 remove_task_automatically: bool,
4416 call_post_return_automatically: bool,
4417 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4418 + Send
4419 + Sync
4420 + 'static,
4421 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4422 + Send
4423 + Sync
4424 + 'static,
4425) -> Result<PreparedCall<R>> {
4426 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4427
4428 let instance = handle.instance().id().get(store.0);
4429 let task_return_type = instance.component().types()[ty].results;
4430 let component_instance = raw_options.instance;
4431 let callback = options.callback();
4432 let memory = options.memory_raw().map(SendSyncPtr::new);
4433 let string_encoding = options.string_encoding();
4434 let token = StoreToken::new(store.as_context_mut());
4435 let state = store.0.concurrent_state_mut();
4436
4437 assert!(state.guest_task.is_none());
4438
4439 let (tx, rx) = oneshot::channel();
4440 let (exit_tx, exit_rx) = oneshot::channel();
4441
4442 let mut task = GuestTask::new(
4443 state,
4444 Box::new(for_any_lower(move |store, params| {
4445 lower_params(handle, token.as_context_mut(store), params)
4446 })),
4447 LiftResult {
4448 lift: Box::new(for_any_lift(move |store, result| {
4449 lift_result(handle, store, result)
4450 })),
4451 ty: task_return_type,
4452 memory,
4453 string_encoding,
4454 },
4455 Caller::Host {
4456 tx: Some(tx),
4457 exit_tx: Arc::new(exit_tx),
4458 remove_task_automatically,
4459 call_post_return_automatically,
4460 },
4461 callback.map(|callback| {
4462 let callback = SendSyncPtr::new(callback);
4463 let instance = handle.instance();
4464 Box::new(
4465 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
4466 let store = token.as_context_mut(store);
4467 unsafe {
4470 instance.call_callback(
4471 store,
4472 runtime_instance,
4473 callback,
4474 event,
4475 handle,
4476 call_post_return_automatically,
4477 )
4478 }
4479 },
4480 ) as CallbackFn
4481 }),
4482 component_instance,
4483 )?;
4484 task.function_index = Some(handle.index());
4485
4486 let task = state.push(task)?;
4487
4488 Ok(PreparedCall {
4489 handle,
4490 task,
4491 param_count,
4492 rx,
4493 exit_rx,
4494 _phantom: PhantomData,
4495 })
4496}
4497
4498pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4505 mut store: StoreContextMut<T>,
4506 prepared: PreparedCall<R>,
4507) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
4508 let PreparedCall {
4509 handle,
4510 task,
4511 param_count,
4512 rx,
4513 exit_rx,
4514 ..
4515 } = prepared;
4516
4517 queue_call0(store.as_context_mut(), handle, task, param_count)?;
4518
4519 Ok(checked(
4520 store.0.id(),
4521 rx.map(move |result| {
4522 result
4523 .map(|v| (*v.downcast().unwrap(), exit_rx))
4524 .map_err(anyhow::Error::from)
4525 }),
4526 ))
4527}
4528
4529fn queue_call0<T: 'static>(
4532 store: StoreContextMut<T>,
4533 handle: Func,
4534 guest_task: TableId<GuestTask>,
4535 param_count: usize,
4536) -> Result<()> {
4537 let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
4538 let is_concurrent = raw_options.async_;
4539 let instance = handle.instance();
4540 let callee = handle.lifted_core_func(store.0);
4541 let callback = options.callback();
4542 let post_return = handle.post_return_core_func(store.0);
4543
4544 log::trace!("queueing call {guest_task:?}");
4545
4546 let instance_flags = if callback.is_none() {
4547 None
4548 } else {
4549 Some(flags)
4550 };
4551
4552 unsafe {
4556 instance.queue_call(
4557 store,
4558 guest_task,
4559 SendSyncPtr::new(callee),
4560 param_count,
4561 1,
4562 instance_flags,
4563 is_concurrent,
4564 callback.map(SendSyncPtr::new),
4565 post_return.map(SendSyncPtr::new),
4566 )
4567 }
4568}