1use crate::component::func::{self, Func, Options};
51use crate::component::{
52 Component, ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable,
53 ResourceTableError,
54};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{
58 CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
59};
60use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
61use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw};
62use anyhow::{Context as _, Result, anyhow, bail};
63use error_contexts::GlobalErrorContextRefCount;
64use futures::channel::oneshot;
65use futures::future::{self, Either, FutureExt};
66use futures::stream::{FuturesUnordered, StreamExt};
67use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
68use std::any::Any;
69use std::borrow::ToOwned;
70use std::boxed::Box;
71use std::cell::UnsafeCell;
72use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
73use std::fmt;
74use std::future::Future;
75use std::marker::PhantomData;
76use std::mem::{self, ManuallyDrop, MaybeUninit};
77use std::ops::DerefMut;
78use std::pin::{Pin, pin};
79use std::ptr::{self, NonNull};
80use std::slice;
81use std::sync::Arc;
82use std::task::{Context, Poll, Waker};
83use std::vec::Vec;
84use table::{TableDebug, TableId};
85use wasmtime_environ::component::{
86 CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
87 OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
88 RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex,
89 TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex,
90 TypeTupleIndex,
91};
92
93pub use abort::JoinHandle;
94pub use futures_and_streams::{
95 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
96 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
97 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
98};
99pub(crate) use futures_and_streams::{
100 ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
101};
102
103mod abort;
104mod error_contexts;
105mod futures_and_streams;
106mod table;
107pub(crate) mod tls;
108
109const BLOCKED: u32 = 0xffff_ffff;
112
113#[derive(Clone, Copy, Eq, PartialEq, Debug)]
115pub enum Status {
116 Starting = 0,
117 Started = 1,
118 Returned = 2,
119 StartCancelled = 3,
120 ReturnCancelled = 4,
121}
122
123impl Status {
124 pub fn pack(self, waitable: Option<u32>) -> u32 {
130 assert!(matches!(self, Status::Returned) == waitable.is_none());
131 let waitable = waitable.unwrap_or(0);
132 assert!(waitable < (1 << 28));
133 (waitable << 4) | (self as u32)
134 }
135}
136
137#[derive(Clone, Copy, Debug)]
140enum Event {
141 None,
142 Cancelled,
143 Subtask {
144 status: Status,
145 },
146 StreamRead {
147 code: ReturnCode,
148 pending: Option<(TypeStreamTableIndex, u32)>,
149 },
150 StreamWrite {
151 code: ReturnCode,
152 pending: Option<(TypeStreamTableIndex, u32)>,
153 },
154 FutureRead {
155 code: ReturnCode,
156 pending: Option<(TypeFutureTableIndex, u32)>,
157 },
158 FutureWrite {
159 code: ReturnCode,
160 pending: Option<(TypeFutureTableIndex, u32)>,
161 },
162}
163
164impl Event {
165 fn parts(self) -> (u32, u32) {
170 const EVENT_NONE: u32 = 0;
171 const EVENT_SUBTASK: u32 = 1;
172 const EVENT_STREAM_READ: u32 = 2;
173 const EVENT_STREAM_WRITE: u32 = 3;
174 const EVENT_FUTURE_READ: u32 = 4;
175 const EVENT_FUTURE_WRITE: u32 = 5;
176 const EVENT_CANCELLED: u32 = 6;
177 match self {
178 Event::None => (EVENT_NONE, 0),
179 Event::Cancelled => (EVENT_CANCELLED, 0),
180 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
181 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
182 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
183 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
184 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
185 }
186 }
187}
188
189mod callback_code {
191 pub const EXIT: u32 = 0;
192 pub const YIELD: u32 = 1;
193 pub const WAIT: u32 = 2;
194 pub const POLL: u32 = 3;
195}
196
197const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
201
202pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
208 store: StoreContextMut<'a, T>,
209 get_data: fn(&mut T) -> D::Data<'_>,
210 instance: Option<Instance>,
211}
212
213impl<'a, T, D> Access<'a, T, D>
214where
215 D: HasData + ?Sized,
216 T: 'static,
217{
218 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
220 Self {
221 store,
222 get_data,
223 instance: None,
224 }
225 }
226
227 pub fn data_mut(&mut self) -> &mut T {
229 self.store.data_mut()
230 }
231
232 pub fn get(&mut self) -> D::Data<'_> {
234 (self.get_data)(self.data_mut())
235 }
236
237 pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
241 where
242 T: 'static,
243 {
244 let accessor = Accessor {
245 get_data: self.get_data,
246 instance: self.instance,
247 token: StoreToken::new(self.store.as_context_mut()),
248 };
249 self.instance
250 .unwrap()
251 .spawn_with_accessor(self.store.as_context_mut(), accessor, task)
252 }
253
254 pub fn instance(&self) -> Instance {
256 self.instance.unwrap()
257 }
258}
259
260impl<'a, T, D> AsContext for Access<'a, T, D>
261where
262 D: HasData + ?Sized,
263 T: 'static,
264{
265 type Data = T;
266
267 fn as_context(&self) -> StoreContext<'_, T> {
268 self.store.as_context()
269 }
270}
271
272impl<'a, T, D> AsContextMut for Access<'a, T, D>
273where
274 D: HasData + ?Sized,
275 T: 'static,
276{
277 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
278 self.store.as_context_mut()
279 }
280}
281
282pub struct Accessor<T: 'static, D = HasSelf<T>>
342where
343 D: HasData + ?Sized,
344{
345 token: StoreToken<T>,
346 get_data: fn(&mut T) -> D::Data<'_>,
347 instance: Option<Instance>,
348}
349
350pub trait AsAccessor {
366 type Data: 'static;
368
369 type AccessorData: HasData + ?Sized;
372
373 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
375}
376
377impl<T: AsAccessor + ?Sized> AsAccessor for &T {
378 type Data = T::Data;
379 type AccessorData = T::AccessorData;
380
381 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
382 T::as_accessor(self)
383 }
384}
385
386impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
387 type Data = T;
388 type AccessorData = D;
389
390 fn as_accessor(&self) -> &Accessor<T, D> {
391 self
392 }
393}
394
395const _: () = {
418 const fn assert<T: Send + Sync>() {}
419 assert::<Accessor<UnsafeCell<u32>>>();
420};
421
422impl<T> Accessor<T> {
423 pub(crate) fn new(token: StoreToken<T>, instance: Option<Instance>) -> Self {
435 Self {
436 token,
437 get_data: |x| x,
438 instance,
439 }
440 }
441}
442
443impl<T, D> Accessor<T, D>
444where
445 D: HasData + ?Sized,
446{
447 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
465 tls::get(|vmstore| {
466 fun(Access {
467 store: self.token.as_context_mut(vmstore),
468 get_data: self.get_data,
469 instance: self.instance,
470 })
471 })
472 }
473
474 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
477 self.get_data
478 }
479
480 pub fn with_getter<D2: HasData>(
497 &self,
498 get_data: fn(&mut T) -> D2::Data<'_>,
499 ) -> Accessor<T, D2> {
500 Accessor {
501 token: self.token,
502 get_data,
503 instance: self.instance,
504 }
505 }
506
507 pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
523 where
524 T: 'static,
525 {
526 let instance = self.instance.unwrap();
527 let accessor = self.clone_for_spawn();
528 self.with(|mut access| {
529 instance.spawn_with_accessor(access.as_context_mut(), accessor, task)
530 })
531 }
532
533 pub fn instance(&self) -> Instance {
535 self.instance.unwrap()
536 }
537
538 fn clone_for_spawn(&self) -> Self {
539 Self {
540 token: self.token,
541 get_data: self.get_data,
542 instance: self.instance,
543 }
544 }
545}
546
547pub trait AccessorTask<T, D, R>: Send + 'static
560where
561 D: HasData + ?Sized,
562{
563 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
565}
566
567enum CallerInfo {
570 Async {
572 params: Vec<ValRaw>,
573 has_result: bool,
574 },
575 Sync {
577 params: Vec<ValRaw>,
578 result_count: u32,
579 },
580}
581
582enum WaitMode {
584 Fiber(StoreFiber<'static>),
586 Callback,
589}
590
591#[derive(Debug)]
593enum SuspendReason {
594 Waiting {
597 set: TableId<WaitableSet>,
598 task: TableId<GuestTask>,
599 },
600 NeedWork,
603 Yielding { task: TableId<GuestTask> },
606}
607
608enum GuestCallKind {
610 DeliverEvent {
613 set: Option<TableId<WaitableSet>>,
618 },
619 Start(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>),
622}
623
624impl fmt::Debug for GuestCallKind {
625 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
626 match self {
627 Self::DeliverEvent { set } => f.debug_struct("DeliverEvent").field("set", set).finish(),
628 Self::Start(_) => f.debug_tuple("Start").finish(),
629 }
630 }
631}
632
633#[derive(Debug)]
635struct GuestCall {
636 task: TableId<GuestTask>,
637 kind: GuestCallKind,
638}
639
640impl GuestCall {
641 fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
651 let task_instance = state.get_mut(self.task)?.instance;
652 let state = state.instance_state(task_instance);
653 let ready = match &self.kind {
654 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
655 GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure > 0),
656 };
657 log::trace!(
658 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
659 state.do_not_enter,
660 state.backpressure
661 );
662 Ok(ready)
663 }
664}
665
666enum WorkerItem {
668 GuestCall(GuestCall),
669 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
670}
671
672#[derive(Debug)]
675struct PollParams {
676 task: TableId<GuestTask>,
678 set: TableId<WaitableSet>,
680}
681
682enum WorkItem {
685 PushFuture(AlwaysMut<HostTaskFuture>),
687 ResumeFiber(StoreFiber<'static>),
689 GuestCall(GuestCall),
691 Poll(PollParams),
693 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
695}
696
697impl fmt::Debug for WorkItem {
698 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
699 match self {
700 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
701 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
702 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
703 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
704 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
705 }
706 }
707}
708
709impl ComponentInstance {
710 fn handle_callback_code(
716 mut self: Pin<&mut Self>,
717 guest_task: TableId<GuestTask>,
718 runtime_instance: RuntimeComponentInstanceIndex,
719 code: u32,
720 initial_call: bool,
721 ) -> Result<()> {
722 let (code, set) = unpack_callback_code(code);
723
724 log::trace!("received callback code from {guest_task:?}: {code} (set: {set})");
725
726 let state = self.as_mut().concurrent_state_mut();
727 let task = state.get_mut(guest_task)?;
728
729 if task.lift_result.is_some() {
730 if code == callback_code::EXIT {
731 return Err(anyhow!(crate::Trap::NoAsyncResult));
732 }
733 if initial_call {
734 Waitable::Guest(guest_task).set_event(
737 state,
738 Some(Event::Subtask {
739 status: Status::Started,
740 }),
741 )?;
742 }
743 }
744
745 let get_set = |instance: Pin<&mut Self>, handle| {
746 if handle == 0 {
747 bail!("invalid waitable-set handle");
748 }
749
750 let set = instance.guest_tables().0[runtime_instance].waitable_set_rep(handle)?;
751
752 Ok(TableId::<WaitableSet>::new(set))
753 };
754
755 match code {
756 callback_code::EXIT => {
757 let task = state.get_mut(guest_task)?;
758 match &task.caller {
759 Caller::Host {
760 remove_task_automatically,
761 ..
762 } => {
763 if *remove_task_automatically {
764 log::trace!("handle_callback_code will delete task {guest_task:?}");
765 Waitable::Guest(guest_task).delete_from(state)?;
766 }
767 }
768 Caller::Guest { .. } => {
769 task.exited = true;
770 task.callback = None;
771 }
772 }
773 }
774 callback_code::YIELD => {
775 let task = state.get_mut(guest_task)?;
778 assert!(task.event.is_none());
779 task.event = Some(Event::None);
780 state.push_low_priority(WorkItem::GuestCall(GuestCall {
781 task: guest_task,
782 kind: GuestCallKind::DeliverEvent { set: None },
783 }));
784 }
785 callback_code::WAIT | callback_code::POLL => {
786 let set = get_set(self.as_mut(), set)?;
787 let state = self.concurrent_state_mut();
788
789 if state.get_mut(guest_task)?.event.is_some()
790 || !state.get_mut(set)?.ready.is_empty()
791 {
792 state.push_high_priority(WorkItem::GuestCall(GuestCall {
794 task: guest_task,
795 kind: GuestCallKind::DeliverEvent { set: Some(set) },
796 }));
797 } else {
798 match code {
800 callback_code::POLL => {
801 state.push_low_priority(WorkItem::Poll(PollParams {
804 task: guest_task,
805 set,
806 }));
807 }
808 callback_code::WAIT => {
809 let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
816 assert!(old.is_none());
817 let old = state
818 .get_mut(set)?
819 .waiting
820 .insert(guest_task, WaitMode::Callback);
821 assert!(old.is_none());
822 }
823 _ => unreachable!(),
824 }
825 }
826 }
827 _ => bail!("unsupported callback code: {code}"),
828 }
829
830 Ok(())
831 }
832
833 fn get_event(
836 mut self: Pin<&mut Self>,
837 guest_task: TableId<GuestTask>,
838 set: Option<TableId<WaitableSet>>,
839 cancellable: bool,
840 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
841 let state = self.as_mut().concurrent_state_mut();
842
843 if let Some(event) = state.get_mut(guest_task)?.event.take() {
844 log::trace!("deliver event {event:?} to {guest_task:?}");
845
846 if cancellable || !matches!(event, Event::Cancelled) {
847 return Ok(Some((event, None)));
848 } else {
849 state.get_mut(guest_task)?.event = Some(event);
850 }
851 }
852
853 Ok(
854 if let Some((set, waitable)) = set
855 .and_then(|set| {
856 state
857 .get_mut(set)
858 .map(|v| v.ready.pop_first().map(|v| (set, v)))
859 .transpose()
860 })
861 .transpose()?
862 {
863 let common = waitable.common(state)?;
864 let handle = common.handle.unwrap();
865 let event = common.event.take().unwrap();
866
867 log::trace!(
868 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
869 );
870
871 waitable.on_delivery(self, event);
872
873 Some((event, Some((waitable, handle))))
874 } else {
875 None
876 },
877 )
878 }
879
880 pub(crate) fn waitable_set_new(
882 mut self: Pin<&mut Self>,
883 caller_instance: RuntimeComponentInstanceIndex,
884 ) -> Result<u32> {
885 self.check_may_leave(caller_instance)?;
886 let set = self
887 .as_mut()
888 .concurrent_state_mut()
889 .push(WaitableSet::default())?;
890 let handle = self.guest_tables().0[caller_instance].waitable_set_insert(set.rep())?;
891 log::trace!("new waitable set {set:?} (handle {handle})");
892 Ok(handle)
893 }
894
895 pub(crate) fn waitable_set_drop(
897 mut self: Pin<&mut Self>,
898 caller_instance: RuntimeComponentInstanceIndex,
899 set: u32,
900 ) -> Result<()> {
901 self.check_may_leave(caller_instance)?;
902 let rep = self.as_mut().guest_tables().0[caller_instance].waitable_set_remove(set)?;
903
904 log::trace!("drop waitable set {rep} (handle {set})");
905
906 let set = self
907 .concurrent_state_mut()
908 .delete(TableId::<WaitableSet>::new(rep))?;
909
910 if !set.waiting.is_empty() {
911 bail!("cannot drop waitable set with waiters");
912 }
913
914 Ok(())
915 }
916
917 pub(crate) fn waitable_join(
919 mut self: Pin<&mut Self>,
920 caller_instance: RuntimeComponentInstanceIndex,
921 waitable_handle: u32,
922 set_handle: u32,
923 ) -> Result<()> {
924 self.check_may_leave(caller_instance)?;
925 let waitable = Waitable::from_instance(self.as_mut(), caller_instance, waitable_handle)?;
926
927 let set = if set_handle == 0 {
928 None
929 } else {
930 let set =
931 self.as_mut().guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
932
933 Some(TableId::<WaitableSet>::new(set))
934 };
935
936 log::trace!(
937 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
938 );
939
940 waitable.join(self.concurrent_state_mut(), set)
941 }
942
943 pub(crate) fn subtask_drop(
945 mut self: Pin<&mut Self>,
946 caller_instance: RuntimeComponentInstanceIndex,
947 task_id: u32,
948 ) -> Result<()> {
949 self.check_may_leave(caller_instance)?;
950 self.as_mut().waitable_join(caller_instance, task_id, 0)?;
951
952 let (rep, is_host) =
953 self.as_mut().guest_tables().0[caller_instance].subtask_remove(task_id)?;
954
955 let concurrent_state = self.concurrent_state_mut();
956 let (waitable, expected_caller_instance, delete) = if is_host {
957 let id = TableId::<HostTask>::new(rep);
958 let task = concurrent_state.get_mut(id)?;
959 if task.join_handle.is_some() {
960 bail!("cannot drop a subtask which has not yet resolved");
961 }
962 (Waitable::Host(id), task.caller_instance, true)
963 } else {
964 let id = TableId::<GuestTask>::new(rep);
965 let task = concurrent_state.get_mut(id)?;
966 if task.lift_result.is_some() {
967 bail!("cannot drop a subtask which has not yet resolved");
968 }
969 if let Caller::Guest { instance, .. } = &task.caller {
970 (Waitable::Guest(id), *instance, task.exited)
971 } else {
972 unreachable!()
973 }
974 };
975
976 waitable.common(concurrent_state)?.handle = None;
977
978 if waitable.take_event(concurrent_state)?.is_some() {
979 bail!("cannot drop a subtask with an undelivered event");
980 }
981
982 if delete {
983 waitable.delete_from(concurrent_state)?;
984 }
985
986 assert_eq!(expected_caller_instance, caller_instance);
990 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
991 Ok(())
992 }
993}
994
995impl Instance {
996 #[doc(hidden)]
1005 pub fn assert_concurrent_state_empty(&self, mut store: impl AsContextMut) {
1006 let mut instance = self.id().get_mut(store.as_context_mut().0);
1007 assert!(
1008 instance
1009 .as_mut()
1010 .guest_tables()
1011 .0
1012 .iter()
1013 .all(|(_, table)| table.is_empty())
1014 );
1015 let state = instance.concurrent_state_mut();
1016 assert!(
1017 state.table.get_mut().is_empty(),
1018 "non-empty table: {:?}",
1019 state.table.get_mut()
1020 );
1021 assert!(state.high_priority.is_empty());
1022 assert!(state.low_priority.is_empty());
1023 assert!(state.guest_task.is_none());
1024 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
1025 assert!(
1026 state
1027 .instance_states
1028 .iter()
1029 .all(|(_, state)| state.pending.is_empty())
1030 );
1031 assert!(state.global_error_context_ref_counts.is_empty());
1032 }
1033
1034 pub async fn run_concurrent<T, R>(
1079 self,
1080 mut store: impl AsContextMut<Data = T>,
1081 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1082 ) -> Result<R>
1083 where
1084 T: Send + 'static,
1085 {
1086 check_recursive_run();
1087 let mut store = store.as_context_mut();
1088 let token = StoreToken::new(store.as_context_mut());
1089
1090 struct Dropper<'a, T: 'static, V> {
1091 store: StoreContextMut<'a, T>,
1092 value: ManuallyDrop<V>,
1093 }
1094
1095 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1096 fn drop(&mut self) {
1097 tls::set(self.store.0, || {
1098 unsafe { ManuallyDrop::drop(&mut self.value) }
1103 });
1104 }
1105 }
1106
1107 let accessor = &Accessor::new(token, Some(self));
1108 let dropper = &mut Dropper {
1109 store,
1110 value: ManuallyDrop::new(fun(accessor)),
1111 };
1112 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1114
1115 self.poll_until(dropper.store.as_context_mut(), future)
1116 .await
1117 }
1118
1119 pub fn spawn<U: 'static>(
1129 self,
1130 mut store: impl AsContextMut<Data = U>,
1131 task: impl AccessorTask<U, HasSelf<U>, Result<()>>,
1132 ) -> JoinHandle {
1133 let mut store = store.as_context_mut();
1134 let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self));
1135 self.spawn_with_accessor(store, accessor, task)
1136 }
1137
1138 fn spawn_with_accessor<T, D>(
1141 self,
1142 mut store: StoreContextMut<T>,
1143 accessor: Accessor<T, D>,
1144 task: impl AccessorTask<T, D, Result<()>>,
1145 ) -> JoinHandle
1146 where
1147 T: 'static,
1148 D: HasData + ?Sized,
1149 {
1150 let store = store.as_context_mut();
1151
1152 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1156 self.concurrent_state_mut(store.0)
1157 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1158
1159 handle
1160 }
1161
1162 async fn poll_until<T, R>(
1168 self,
1169 mut store: StoreContextMut<'_, T>,
1170 mut future: Pin<&mut impl Future<Output = R>>,
1171 ) -> Result<R>
1172 where
1173 T: Send,
1174 {
1175 loop {
1176 let mut futures = self
1180 .concurrent_state_mut(store.0)
1181 .futures
1182 .get_mut()
1183 .take()
1184 .unwrap();
1185 let mut next = pin!(futures.next());
1186
1187 let result = future::poll_fn(|cx| {
1188 if let Poll::Ready(value) = self.set_tls(store.0, || future.as_mut().poll(cx)) {
1191 return Poll::Ready(Ok(Either::Left(value)));
1192 }
1193
1194 let next = match self.set_tls(store.0, || next.as_mut().poll(cx)) {
1198 Poll::Ready(Some(output)) => {
1199 match output {
1200 Err(e) => return Poll::Ready(Err(e)),
1201 Ok(()) => {}
1202 }
1203 Poll::Ready(true)
1204 }
1205 Poll::Ready(None) => Poll::Ready(false),
1206 Poll::Pending => Poll::Pending,
1207 };
1208
1209 let mut instance = self.id().get_mut(store.0);
1210
1211 let state = instance.as_mut().concurrent_state_mut();
1214 let ready = mem::take(&mut state.high_priority);
1215 let ready = if ready.is_empty() {
1216 let ready = mem::take(&mut state.low_priority);
1219 if ready.is_empty() {
1220 return match next {
1221 Poll::Ready(true) => {
1222 Poll::Ready(Ok(Either::Right(Vec::new())))
1228 }
1229 Poll::Ready(false) => {
1230 if let Poll::Ready(value) =
1234 self.set_tls(store.0, || future.as_mut().poll(cx))
1235 {
1236 Poll::Ready(Ok(Either::Left(value)))
1237 } else {
1238 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1271 }
1272 }
1273 Poll::Pending => Poll::Pending,
1278 };
1279 } else {
1280 ready
1281 }
1282 } else {
1283 ready
1284 };
1285
1286 Poll::Ready(Ok(Either::Right(ready)))
1287 })
1288 .await;
1289
1290 *self.concurrent_state_mut(store.0).futures.get_mut() = Some(futures);
1294
1295 match result? {
1296 Either::Left(value) => break Ok(value),
1299 Either::Right(ready) => {
1302 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1303 store: StoreContextMut<'a, T>,
1304 ready: I,
1305 }
1306
1307 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1308 fn drop(&mut self) {
1309 while let Some(item) = self.ready.next() {
1310 match item {
1311 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1312 WorkItem::PushFuture(future) => {
1313 tls::set(self.store.0, move || drop(future))
1314 }
1315 _ => {}
1316 }
1317 }
1318 }
1319 }
1320
1321 let mut dispose = Dispose {
1322 store: store.as_context_mut(),
1323 ready: ready.into_iter(),
1324 };
1325
1326 while let Some(item) = dispose.ready.next() {
1327 self.handle_work_item(dispose.store.as_context_mut(), item)
1328 .await?;
1329 }
1330 }
1331 }
1332 }
1333 }
1334
1335 async fn handle_work_item<T: Send>(
1337 self,
1338 store: StoreContextMut<'_, T>,
1339 item: WorkItem,
1340 ) -> Result<()> {
1341 log::trace!("handle work item {item:?}");
1342 match item {
1343 WorkItem::PushFuture(future) => {
1344 self.concurrent_state_mut(store.0)
1345 .futures
1346 .get_mut()
1347 .as_mut()
1348 .unwrap()
1349 .push(future.into_inner());
1350 }
1351 WorkItem::ResumeFiber(fiber) => {
1352 self.resume_fiber(store.0, fiber).await?;
1353 }
1354 WorkItem::GuestCall(call) => {
1355 let state = self.concurrent_state_mut(store.0);
1356 if call.is_ready(state)? {
1357 self.run_on_worker(store, WorkerItem::GuestCall(call))
1358 .await?;
1359 } else {
1360 let task = state.get_mut(call.task)?;
1361 if !task.starting_sent {
1362 task.starting_sent = true;
1363 if let GuestCallKind::Start(_) = &call.kind {
1364 Waitable::Guest(call.task).set_event(
1365 state,
1366 Some(Event::Subtask {
1367 status: Status::Starting,
1368 }),
1369 )?;
1370 }
1371 }
1372
1373 let runtime_instance = state.get_mut(call.task)?.instance;
1374 state
1375 .instance_state(runtime_instance)
1376 .pending
1377 .insert(call.task, call.kind);
1378 }
1379 }
1380 WorkItem::Poll(params) => {
1381 let state = self.concurrent_state_mut(store.0);
1382 if state.get_mut(params.task)?.event.is_some()
1383 || !state.get_mut(params.set)?.ready.is_empty()
1384 {
1385 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1388 task: params.task,
1389 kind: GuestCallKind::DeliverEvent {
1390 set: Some(params.set),
1391 },
1392 }));
1393 } else {
1394 state.get_mut(params.task)?.event = Some(Event::None);
1397 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1398 task: params.task,
1399 kind: GuestCallKind::DeliverEvent {
1400 set: Some(params.set),
1401 },
1402 }));
1403 }
1404 }
1405 WorkItem::WorkerFunction(fun) => {
1406 self.run_on_worker(store, WorkerItem::Function(fun)).await?;
1407 }
1408 }
1409
1410 Ok(())
1411 }
1412
1413 async fn resume_fiber(self, store: &mut StoreOpaque, fiber: StoreFiber<'static>) -> Result<()> {
1416 let old_task = self.concurrent_state_mut(store).guest_task;
1417 log::trace!("resume_fiber: save current task {old_task:?}");
1418
1419 let fiber = fiber::resolve_or_release(store, fiber).await?;
1420
1421 let state = self.concurrent_state_mut(store);
1422
1423 state.guest_task = old_task;
1424 log::trace!("resume_fiber: restore current task {old_task:?}");
1425
1426 if let Some(mut fiber) = fiber {
1427 match state.suspend_reason.take().unwrap() {
1429 SuspendReason::NeedWork => {
1430 if state.worker.is_none() {
1431 state.worker = Some(fiber);
1432 } else {
1433 fiber.dispose(store);
1434 }
1435 }
1436 SuspendReason::Yielding { .. } => {
1437 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1438 }
1439 SuspendReason::Waiting { set, task } => {
1440 let old = state
1441 .get_mut(set)?
1442 .waiting
1443 .insert(task, WaitMode::Fiber(fiber));
1444 assert!(old.is_none());
1445 }
1446 }
1447 }
1448
1449 Ok(())
1450 }
1451
1452 async fn run_on_worker<T: Send>(
1454 self,
1455 store: StoreContextMut<'_, T>,
1456 item: WorkerItem,
1457 ) -> Result<()> {
1458 let worker = if let Some(fiber) = self.concurrent_state_mut(store.0).worker.take() {
1459 fiber
1460 } else {
1461 fiber::make_fiber(store.0, move |store| {
1462 loop {
1463 match self.concurrent_state_mut(store).worker_item.take().unwrap() {
1464 WorkerItem::GuestCall(call) => self.handle_guest_call(store, call)?,
1465 WorkerItem::Function(fun) => fun.into_inner()(store, self)?,
1466 }
1467
1468 self.suspend(store, SuspendReason::NeedWork)?;
1469 }
1470 })?
1471 };
1472
1473 let worker_item = &mut self.concurrent_state_mut(store.0).worker_item;
1474 assert!(worker_item.is_none());
1475 *worker_item = Some(item);
1476
1477 self.resume_fiber(store.0, worker).await
1478 }
1479
1480 fn handle_guest_call(self, store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
1482 match call.kind {
1483 GuestCallKind::DeliverEvent { set } => {
1484 let (event, waitable) = self
1485 .id()
1486 .get_mut(store)
1487 .get_event(call.task, set, true)?
1488 .unwrap();
1489 let state = self.concurrent_state_mut(store);
1490 let task = state.get_mut(call.task)?;
1491 let runtime_instance = task.instance;
1492 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
1493
1494 log::trace!(
1495 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
1496 call.task,
1497 );
1498
1499 let old_task = state.guest_task.replace(call.task);
1500 log::trace!(
1501 "GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task",
1502 call.task
1503 );
1504
1505 self.maybe_push_call_context(store.store_opaque_mut(), call.task)?;
1506
1507 let state = self.concurrent_state_mut(store);
1508 state.enter_instance(runtime_instance);
1509
1510 let callback = state.get_mut(call.task)?.callback.take().unwrap();
1511
1512 let code = callback(store, self, runtime_instance, event, handle)?;
1513
1514 let state = self.concurrent_state_mut(store);
1515
1516 state.get_mut(call.task)?.callback = Some(callback);
1517
1518 state.exit_instance(runtime_instance)?;
1519
1520 self.maybe_pop_call_context(store.store_opaque_mut(), call.task)?;
1521
1522 self.id().get_mut(store).handle_callback_code(
1523 call.task,
1524 runtime_instance,
1525 code,
1526 false,
1527 )?;
1528
1529 self.concurrent_state_mut(store).guest_task = old_task;
1530 log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task");
1531 }
1532 GuestCallKind::Start(fun) => {
1533 fun(store, self)?;
1534 }
1535 }
1536
1537 Ok(())
1538 }
1539
1540 fn suspend(self, store: &mut dyn VMStore, reason: SuspendReason) -> Result<()> {
1546 log::trace!("suspend fiber: {reason:?}");
1547
1548 let task = match &reason {
1552 SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task),
1553 SuspendReason::NeedWork => None,
1554 };
1555
1556 let old_guest_task = if let Some(task) = task {
1557 self.maybe_pop_call_context(store, task)?;
1558 self.concurrent_state_mut(store).guest_task
1559 } else {
1560 None
1561 };
1562
1563 let suspend_reason = &mut self.concurrent_state_mut(store).suspend_reason;
1564 assert!(suspend_reason.is_none());
1565 *suspend_reason = Some(reason);
1566
1567 store.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1568
1569 if let Some(task) = task {
1570 self.concurrent_state_mut(store).guest_task = old_guest_task;
1571 self.maybe_push_call_context(store, task)?;
1572 }
1573
1574 Ok(())
1575 }
1576
1577 fn maybe_push_call_context(
1581 self,
1582 store: &mut StoreOpaque,
1583 guest_task: TableId<GuestTask>,
1584 ) -> Result<()> {
1585 let task = self.concurrent_state_mut(store).get_mut(guest_task)?;
1586 if task.lift_result.is_some() {
1587 log::trace!("push call context for {guest_task:?}");
1588 let call_context = task.call_context.take().unwrap();
1589 store.component_resource_state().0.push(call_context);
1590 }
1591 Ok(())
1592 }
1593
1594 fn maybe_pop_call_context(
1598 self,
1599 store: &mut StoreOpaque,
1600 guest_task: TableId<GuestTask>,
1601 ) -> Result<()> {
1602 if self
1603 .concurrent_state_mut(store)
1604 .get_mut(guest_task)?
1605 .lift_result
1606 .is_some()
1607 {
1608 log::trace!("pop call context for {guest_task:?}");
1609 let call_context = Some(store.component_resource_state().0.pop().unwrap());
1610 self.concurrent_state_mut(store)
1611 .get_mut(guest_task)?
1612 .call_context = call_context;
1613 }
1614 Ok(())
1615 }
1616
1617 unsafe fn queue_call<T: 'static>(
1624 self,
1625 mut store: StoreContextMut<T>,
1626 guest_task: TableId<GuestTask>,
1627 callee: SendSyncPtr<VMFuncRef>,
1628 param_count: usize,
1629 result_count: usize,
1630 flags: Option<InstanceFlags>,
1631 async_: bool,
1632 callback: Option<SendSyncPtr<VMFuncRef>>,
1633 post_return: Option<SendSyncPtr<VMFuncRef>>,
1634 ) -> Result<()> {
1635 unsafe fn make_call<T: 'static>(
1650 store: StoreContextMut<T>,
1651 guest_task: TableId<GuestTask>,
1652 callee: SendSyncPtr<VMFuncRef>,
1653 param_count: usize,
1654 result_count: usize,
1655 flags: Option<InstanceFlags>,
1656 ) -> impl FnOnce(
1657 &mut dyn VMStore,
1658 Instance,
1659 ) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1660 + Send
1661 + Sync
1662 + 'static
1663 + use<T> {
1664 let token = StoreToken::new(store);
1665 move |store: &mut dyn VMStore, instance: Instance| {
1666 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1667 let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
1668 let may_enter_after_call = task.call_post_return_automatically();
1669 let lower = task.lower_params.take().unwrap();
1670
1671 lower(store, instance, &mut storage[..param_count])?;
1672
1673 let mut store = token.as_context_mut(store);
1674
1675 unsafe {
1678 if let Some(mut flags) = flags {
1679 flags.set_may_enter(false);
1680 }
1681 crate::Func::call_unchecked_raw(
1682 &mut store,
1683 callee.as_non_null(),
1684 NonNull::new(
1685 &mut storage[..param_count.max(result_count)]
1686 as *mut [MaybeUninit<ValRaw>] as _,
1687 )
1688 .unwrap(),
1689 )?;
1690 if let Some(mut flags) = flags {
1691 flags.set_may_enter(may_enter_after_call);
1692 }
1693 }
1694
1695 Ok(storage)
1696 }
1697 }
1698
1699 let call = unsafe {
1703 make_call(
1704 store.as_context_mut(),
1705 guest_task,
1706 callee,
1707 param_count,
1708 result_count,
1709 flags,
1710 )
1711 };
1712
1713 let callee_instance = self
1714 .concurrent_state_mut(store.0)
1715 .get_mut(guest_task)?
1716 .instance;
1717 let fun = if callback.is_some() {
1718 assert!(async_);
1719
1720 Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1721 let old_task = instance
1722 .concurrent_state_mut(store)
1723 .guest_task
1724 .replace(guest_task);
1725 log::trace!(
1726 "stackless call: replaced {old_task:?} with {guest_task:?} as current task"
1727 );
1728
1729 instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1730
1731 instance
1732 .concurrent_state_mut(store)
1733 .enter_instance(callee_instance);
1734
1735 let storage = call(store, instance)?;
1742
1743 instance
1744 .concurrent_state_mut(store)
1745 .exit_instance(callee_instance)?;
1746
1747 instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
1748
1749 let state = instance.concurrent_state_mut(store);
1750 state.guest_task = old_task;
1751 log::trace!("stackless call: restored {old_task:?} as current task");
1752
1753 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1756
1757 instance.id().get_mut(store).handle_callback_code(
1758 guest_task,
1759 callee_instance,
1760 code,
1761 true,
1762 )?;
1763
1764 Ok(())
1765 })
1766 as Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>
1767 } else {
1768 let token = StoreToken::new(store.as_context_mut());
1769 Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1770 let old_task = instance
1771 .concurrent_state_mut(store)
1772 .guest_task
1773 .replace(guest_task);
1774 log::trace!(
1775 "stackful call: replaced {old_task:?} with {guest_task:?} as current task",
1776 );
1777
1778 let mut flags = instance.id().get(store).instance_flags(callee_instance);
1779
1780 instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1781
1782 if !async_ {
1786 instance
1787 .concurrent_state_mut(store)
1788 .enter_instance(callee_instance);
1789 }
1790
1791 let storage = call(store, instance)?;
1798
1799 if async_ {
1800 if instance
1805 .concurrent_state_mut(store)
1806 .get_mut(guest_task)?
1807 .lift_result
1808 .is_some()
1809 {
1810 return Err(anyhow!(crate::Trap::NoAsyncResult));
1811 }
1812 } else {
1813 let lift = {
1819 let state = instance.concurrent_state_mut(store);
1820 state.exit_instance(callee_instance)?;
1821
1822 assert!(state.get_mut(guest_task)?.result.is_none());
1823
1824 state.get_mut(guest_task)?.lift_result.take().unwrap()
1825 };
1826
1827 let result = (lift.lift)(store, instance, unsafe {
1830 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1831 &storage[..result_count],
1832 )
1833 })?;
1834
1835 let post_return_arg = match result_count {
1836 0 => ValRaw::i32(0),
1837 1 => unsafe { storage[0].assume_init() },
1840 _ => unreachable!(),
1841 };
1842
1843 if instance
1844 .concurrent_state_mut(store)
1845 .get_mut(guest_task)?
1846 .call_post_return_automatically()
1847 {
1848 unsafe {
1849 flags.set_may_leave(false);
1850 flags.set_needs_post_return(false);
1851 }
1852
1853 if let Some(func) = post_return {
1854 let mut store = token.as_context_mut(store);
1855
1856 unsafe {
1862 crate::Func::call_unchecked_raw(
1863 &mut store,
1864 func.as_non_null(),
1865 slice::from_ref(&post_return_arg).into(),
1866 )?;
1867 }
1868 }
1869
1870 unsafe {
1871 flags.set_may_leave(true);
1872 flags.set_may_enter(true);
1873 }
1874 }
1875
1876 instance.task_complete(
1877 store,
1878 guest_task,
1879 result,
1880 Status::Returned,
1881 post_return_arg,
1882 )?;
1883 }
1884
1885 instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
1886
1887 let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
1888
1889 match &task.caller {
1890 Caller::Host {
1891 remove_task_automatically,
1892 ..
1893 } => {
1894 if *remove_task_automatically {
1895 Waitable::Guest(guest_task)
1896 .delete_from(instance.concurrent_state_mut(store))?;
1897 }
1898 }
1899 Caller::Guest { .. } => {
1900 task.exited = true;
1901 }
1902 }
1903
1904 Ok(())
1905 })
1906 };
1907
1908 self.concurrent_state_mut(store.0)
1909 .push_high_priority(WorkItem::GuestCall(GuestCall {
1910 task: guest_task,
1911 kind: GuestCallKind::Start(fun),
1912 }));
1913
1914 Ok(())
1915 }
1916
1917 unsafe fn prepare_call<T: 'static>(
1930 self,
1931 mut store: StoreContextMut<T>,
1932 start: *mut VMFuncRef,
1933 return_: *mut VMFuncRef,
1934 caller_instance: RuntimeComponentInstanceIndex,
1935 callee_instance: RuntimeComponentInstanceIndex,
1936 task_return_type: TypeTupleIndex,
1937 memory: *mut VMMemoryDefinition,
1938 string_encoding: u8,
1939 caller_info: CallerInfo,
1940 ) -> Result<()> {
1941 self.id().get(store.0).check_may_leave(caller_instance)?;
1942
1943 enum ResultInfo {
1944 Heap { results: u32 },
1945 Stack { result_count: u32 },
1946 }
1947
1948 let result_info = match &caller_info {
1949 CallerInfo::Async {
1950 has_result: true,
1951 params,
1952 } => ResultInfo::Heap {
1953 results: params.last().unwrap().get_u32(),
1954 },
1955 CallerInfo::Async {
1956 has_result: false, ..
1957 } => ResultInfo::Stack { result_count: 0 },
1958 CallerInfo::Sync {
1959 result_count,
1960 params,
1961 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
1962 results: params.last().unwrap().get_u32(),
1963 },
1964 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
1965 result_count: *result_count,
1966 },
1967 };
1968
1969 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
1970
1971 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
1975 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
1976 let token = StoreToken::new(store.as_context_mut());
1977 let state = self.concurrent_state_mut(store.0);
1978 let old_task = state.guest_task.take();
1979 let new_task = GuestTask::new(
1980 state,
1981 Box::new(move |store, instance, dst| {
1982 let mut store = token.as_context_mut(store);
1983 assert!(dst.len() <= MAX_FLAT_PARAMS);
1984 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1985 let count = match caller_info {
1986 CallerInfo::Async { params, has_result } => {
1990 let params = ¶ms[..params.len() - usize::from(has_result)];
1991 for (param, src) in params.iter().zip(&mut src) {
1992 src.write(*param);
1993 }
1994 params.len()
1995 }
1996
1997 CallerInfo::Sync { params, .. } => {
1999 for (param, src) in params.iter().zip(&mut src) {
2000 src.write(*param);
2001 }
2002 params.len()
2003 }
2004 };
2005 unsafe {
2012 crate::Func::call_unchecked_raw(
2013 &mut store,
2014 start.as_non_null(),
2015 NonNull::new(
2016 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2017 )
2018 .unwrap(),
2019 )?;
2020 }
2021 dst.copy_from_slice(&src[..dst.len()]);
2022 let state = instance.concurrent_state_mut(store.0);
2023 let task = state.guest_task.unwrap();
2024 Waitable::Guest(task).set_event(
2025 state,
2026 Some(Event::Subtask {
2027 status: Status::Started,
2028 }),
2029 )?;
2030 Ok(())
2031 }),
2032 LiftResult {
2033 lift: Box::new(move |store, instance, src| {
2034 let mut store = token.as_context_mut(store);
2037 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2039 my_src.push(ValRaw::u32(*results));
2040 }
2041 unsafe {
2048 crate::Func::call_unchecked_raw(
2049 &mut store,
2050 return_.as_non_null(),
2051 my_src.as_mut_slice().into(),
2052 )?;
2053 }
2054 let state = instance.concurrent_state_mut(store.0);
2055 let task = state.guest_task.unwrap();
2056 if sync_caller {
2057 state.get_mut(task)?.sync_result =
2058 Some(if let ResultInfo::Stack { result_count } = &result_info {
2059 match result_count {
2060 0 => None,
2061 1 => Some(my_src[0]),
2062 _ => unreachable!(),
2063 }
2064 } else {
2065 None
2066 });
2067 }
2068 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2069 }),
2070 ty: task_return_type,
2071 memory: NonNull::new(memory).map(SendSyncPtr::new),
2072 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2073 },
2074 Caller::Guest {
2075 task: old_task.unwrap(),
2076 instance: caller_instance,
2077 },
2078 None,
2079 callee_instance,
2080 )?;
2081
2082 let guest_task = state.push(new_task)?;
2083
2084 if let Some(old_task) = old_task {
2085 if !state.may_enter(guest_task) {
2086 bail!(crate::Trap::CannotEnterComponent);
2087 }
2088
2089 state.get_mut(old_task)?.subtasks.insert(guest_task);
2090 };
2091
2092 state.guest_task = Some(guest_task);
2095 log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}");
2096
2097 Ok(())
2098 }
2099
2100 unsafe fn call_callback<T>(
2105 self,
2106 mut store: StoreContextMut<T>,
2107 callee_instance: RuntimeComponentInstanceIndex,
2108 function: SendSyncPtr<VMFuncRef>,
2109 event: Event,
2110 handle: u32,
2111 may_enter_after_call: bool,
2112 ) -> Result<u32> {
2113 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2114
2115 let (ordinal, result) = event.parts();
2116 let params = &mut [
2117 ValRaw::u32(ordinal),
2118 ValRaw::u32(handle),
2119 ValRaw::u32(result),
2120 ];
2121 unsafe {
2126 flags.set_may_enter(false);
2127 crate::Func::call_unchecked_raw(
2128 &mut store,
2129 function.as_non_null(),
2130 params.as_mut_slice().into(),
2131 )?;
2132 flags.set_may_enter(may_enter_after_call);
2133 }
2134 Ok(params[0].get_u32())
2135 }
2136
2137 unsafe fn start_call<T: 'static>(
2150 self,
2151 mut store: StoreContextMut<T>,
2152 callback: *mut VMFuncRef,
2153 post_return: *mut VMFuncRef,
2154 callee: *mut VMFuncRef,
2155 param_count: u32,
2156 result_count: u32,
2157 flags: u32,
2158 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2159 ) -> Result<u32> {
2160 let token = StoreToken::new(store.as_context_mut());
2161 let async_caller = storage.is_none();
2162 let state = self.concurrent_state_mut(store.0);
2163 let guest_task = state.guest_task.unwrap();
2164 let may_enter_after_call = state.get_mut(guest_task)?.call_post_return_automatically();
2165 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2166 let param_count = usize::try_from(param_count).unwrap();
2167 assert!(param_count <= MAX_FLAT_PARAMS);
2168 let result_count = usize::try_from(result_count).unwrap();
2169 assert!(result_count <= MAX_FLAT_RESULTS);
2170
2171 let task = state.get_mut(guest_task)?;
2172 if !callback.is_null() {
2173 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2177 task.callback = Some(Box::new(
2178 move |store, instance, runtime_instance, event, handle| {
2179 let store = token.as_context_mut(store);
2180 unsafe {
2181 instance.call_callback::<T>(
2182 store,
2183 runtime_instance,
2184 callback,
2185 event,
2186 handle,
2187 may_enter_after_call,
2188 )
2189 }
2190 },
2191 ));
2192 }
2193
2194 let Caller::Guest {
2195 task: caller,
2196 instance: runtime_instance,
2197 } = &task.caller
2198 else {
2199 unreachable!()
2202 };
2203 let caller = *caller;
2204 let caller_instance = *runtime_instance;
2205
2206 let callee_instance = task.instance;
2207
2208 let instance_flags = if callback.is_null() {
2209 None
2210 } else {
2211 Some(self.id().get(store.0).instance_flags(callee_instance))
2212 };
2213
2214 unsafe {
2216 self.queue_call(
2217 store.as_context_mut(),
2218 guest_task,
2219 callee,
2220 param_count,
2221 result_count,
2222 instance_flags,
2223 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2224 NonNull::new(callback).map(SendSyncPtr::new),
2225 NonNull::new(post_return).map(SendSyncPtr::new),
2226 )?;
2227 }
2228
2229 let state = self.concurrent_state_mut(store.0);
2230
2231 let guest_waitable = Waitable::Guest(guest_task);
2234 let old_set = guest_waitable.common(state)?.set;
2235 let set = state.get_mut(caller)?.sync_call_set;
2236 guest_waitable.join(state, Some(set))?;
2237
2238 let (status, waitable) = loop {
2254 self.suspend(store.0, SuspendReason::Waiting { set, task: caller })?;
2255
2256 let state = self.concurrent_state_mut(store.0);
2257
2258 let event = guest_waitable.take_event(state)?;
2259 let Some(Event::Subtask { status }) = event else {
2260 unreachable!();
2261 };
2262
2263 log::trace!("status {status:?} for {guest_task:?}");
2264
2265 if status == Status::Returned {
2266 break (status, None);
2268 } else if async_caller {
2269 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2273 .subtask_insert_guest(guest_task.rep())?;
2274 self.concurrent_state_mut(store.0)
2275 .get_mut(guest_task)?
2276 .common
2277 .handle = Some(handle);
2278 break (status, Some(handle));
2279 } else {
2280 }
2284 };
2285
2286 let state = self.concurrent_state_mut(store.0);
2287
2288 guest_waitable.join(state, old_set)?;
2289
2290 if let Some(storage) = storage {
2291 let task = state.get_mut(guest_task)?;
2295 if let Some(result) = task.sync_result.take() {
2296 if let Some(result) = result {
2297 storage[0] = MaybeUninit::new(result);
2298 }
2299
2300 if task.exited {
2301 Waitable::Guest(guest_task).delete_from(state)?;
2302 }
2303 } else {
2304 return Err(anyhow!(crate::Trap::NoAsyncResult));
2307 }
2308 }
2309
2310 state.guest_task = Some(caller);
2312 log::trace!("popped current task {guest_task:?}; new task is {caller:?}");
2313
2314 Ok(status.pack(waitable))
2315 }
2316
2317 pub(crate) fn wrap_call<T, F, R>(
2322 self,
2323 store: StoreContextMut<T>,
2324 closure: F,
2325 ) -> impl Future<Output = Result<R>> + 'static
2326 where
2327 T: 'static,
2328 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
2329 + Send
2330 + Sync
2331 + 'static,
2332 R: Send + Sync + 'static,
2333 {
2334 let token = StoreToken::new(store);
2335 async move {
2336 let mut accessor = Accessor::new(token, Some(self));
2337 closure(&mut accessor).await
2338 }
2339 }
2340
2341 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2353 self,
2354 mut store: StoreContextMut<T>,
2355 future: impl Future<Output = Result<R>> + Send + 'static,
2356 caller_instance: RuntimeComponentInstanceIndex,
2357 lower: impl FnOnce(StoreContextMut<T>, Instance, R) -> Result<()> + Send + 'static,
2358 ) -> Result<Option<u32>> {
2359 let token = StoreToken::new(store.as_context_mut());
2360 let state = self.concurrent_state_mut(store.0);
2361 let caller = state.guest_task.unwrap();
2362
2363 let (join_handle, future) = JoinHandle::run(async move {
2366 let mut future = pin!(future);
2367 let mut call_context = None;
2368 future::poll_fn(move |cx| {
2369 tls::get(|store| {
2372 if let Some(call_context) = call_context.take() {
2373 token
2374 .as_context_mut(store)
2375 .0
2376 .component_resource_state()
2377 .0
2378 .push(call_context);
2379 }
2380 });
2381
2382 let result = future.as_mut().poll(cx);
2383
2384 if result.is_pending() {
2385 tls::get(|store| {
2388 call_context = Some(
2389 token
2390 .as_context_mut(store)
2391 .0
2392 .component_resource_state()
2393 .0
2394 .pop()
2395 .unwrap(),
2396 );
2397 });
2398 }
2399 result
2400 })
2401 .await
2402 });
2403
2404 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2408
2409 log::trace!("new host task child of {caller:?}: {task:?}");
2410
2411 let mut future = Box::pin(future);
2412
2413 let poll = self.set_tls(store.0, || {
2418 future
2419 .as_mut()
2420 .poll(&mut Context::from_waker(&Waker::noop()))
2421 });
2422
2423 Ok(match poll {
2424 Poll::Ready(None) => unreachable!(),
2425 Poll::Ready(Some(result)) => {
2426 lower(store.as_context_mut(), self, result?)?;
2429 log::trace!("delete host task {task:?} (already ready)");
2430 self.concurrent_state_mut(store.0).delete(task)?;
2431 None
2432 }
2433 Poll::Pending => {
2434 let future = Box::pin(async move {
2442 let result = match future.await {
2443 Some(result) => result?,
2444 None => return Ok(()),
2446 };
2447 tls::get(move |store| {
2448 self.concurrent_state_mut(store).push_high_priority(
2454 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store, _| {
2455 lower(token.as_context_mut(store), self, result)?;
2456 let state = self.concurrent_state_mut(store);
2457 state.get_mut(task)?.join_handle.take();
2458 Waitable::Host(task).set_event(
2459 state,
2460 Some(Event::Subtask {
2461 status: Status::Returned,
2462 }),
2463 )
2464 }))),
2465 );
2466 Ok(())
2467 })
2468 });
2469
2470 self.concurrent_state_mut(store.0).push_future(future);
2471 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2472 .subtask_insert_host(task.rep())?;
2473 self.concurrent_state_mut(store.0)
2474 .get_mut(task)?
2475 .common
2476 .handle = Some(handle);
2477 log::trace!(
2478 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2479 );
2480 Some(handle)
2481 }
2482 })
2483 }
2484
2485 pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
2493 self,
2494 store: &mut dyn VMStore,
2495 future: impl Future<Output = Result<R>> + Send + 'static,
2496 caller_instance: RuntimeComponentInstanceIndex,
2497 ) -> Result<R> {
2498 let state = self.concurrent_state_mut(store);
2499
2500 let Some(caller) = state.guest_task else {
2504 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
2505 Poll::Ready(result) => result,
2506 Poll::Pending => {
2507 unreachable!()
2508 }
2509 };
2510 };
2511
2512 let old_result = state
2515 .get_mut(caller)
2516 .with_context(|| format!("bad handle: {caller:?}"))?
2517 .result
2518 .take();
2519
2520 let task = state.push(HostTask::new(caller_instance, None))?;
2524
2525 log::trace!("new host task child of {caller:?}: {task:?}");
2526
2527 let mut future = Box::pin(async move {
2531 let result = future.await?;
2532 tls::get(move |store| {
2533 let state = self.concurrent_state_mut(store);
2534 state.get_mut(caller)?.result = Some(Box::new(result) as _);
2535
2536 Waitable::Host(task).set_event(
2537 state,
2538 Some(Event::Subtask {
2539 status: Status::Returned,
2540 }),
2541 )?;
2542
2543 Ok(())
2544 })
2545 }) as HostTaskFuture;
2546
2547 let poll = self.set_tls(store, || {
2552 future
2553 .as_mut()
2554 .poll(&mut Context::from_waker(&Waker::noop()))
2555 });
2556
2557 match poll {
2558 Poll::Ready(result) => {
2559 result?;
2561 log::trace!("delete host task {task:?} (already ready)");
2562 self.concurrent_state_mut(store).delete(task)?;
2563 }
2564 Poll::Pending => {
2565 let state = self.concurrent_state_mut(store);
2571 state.push_future(future);
2572
2573 let set = state.get_mut(caller)?.sync_call_set;
2574 Waitable::Host(task).join(state, Some(set))?;
2575
2576 self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
2577 }
2578 }
2579
2580 Ok(*mem::replace(
2582 &mut self.concurrent_state_mut(store).get_mut(caller)?.result,
2583 old_result,
2584 )
2585 .unwrap()
2586 .downcast()
2587 .unwrap())
2588 }
2589
2590 pub(crate) fn task_return(
2593 self,
2594 store: &mut dyn VMStore,
2595 caller: RuntimeComponentInstanceIndex,
2596 ty: TypeTupleIndex,
2597 options: OptionsIndex,
2598 storage: &[ValRaw],
2599 ) -> Result<()> {
2600 self.id().get(store).check_may_leave(caller)?;
2601 let state = self.concurrent_state_mut(store);
2602 let CanonicalOptions {
2603 string_encoding,
2604 data_model,
2605 ..
2606 } = *state.options(options);
2607 let guest_task = state.guest_task.unwrap();
2608 let lift = state
2609 .get_mut(guest_task)?
2610 .lift_result
2611 .take()
2612 .ok_or_else(|| {
2613 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2614 })?;
2615 assert!(state.get_mut(guest_task)?.result.is_none());
2616
2617 let invalid = ty != lift.ty
2618 || string_encoding != lift.string_encoding
2619 || match data_model {
2620 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2621 Some(memory) => {
2622 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2623 let actual = self.id().get(store).runtime_memory(memory);
2624 expected != actual
2625 }
2626 None => false,
2629 },
2630 CanonicalOptionsDataModel::Gc { .. } => true,
2632 };
2633
2634 if invalid {
2635 bail!("invalid `task.return` signature and/or options for current task");
2636 }
2637
2638 log::trace!("task.return for {guest_task:?}");
2639
2640 let result = (lift.lift)(store, self, storage)?;
2641
2642 self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0))
2643 }
2644
2645 pub(crate) fn task_cancel(
2647 self,
2648 store: &mut dyn VMStore,
2649 caller: RuntimeComponentInstanceIndex,
2650 ) -> Result<()> {
2651 self.id().get(store).check_may_leave(caller)?;
2652 let state = self.concurrent_state_mut(store);
2653 let guest_task = state.guest_task.unwrap();
2654 let task = state.get_mut(guest_task)?;
2655 if !task.cancel_sent {
2656 bail!("`task.cancel` called by task which has not been cancelled")
2657 }
2658 _ = task.lift_result.take().ok_or_else(|| {
2659 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2660 })?;
2661
2662 assert!(task.result.is_none());
2663
2664 log::trace!("task.cancel for {guest_task:?}");
2665
2666 self.task_complete(
2667 store,
2668 guest_task,
2669 Box::new(DummyResult),
2670 Status::ReturnCancelled,
2671 ValRaw::i32(0),
2672 )
2673 }
2674
2675 fn task_complete(
2681 self,
2682 store: &mut dyn VMStore,
2683 guest_task: TableId<GuestTask>,
2684 result: Box<dyn Any + Send + Sync>,
2685 status: Status,
2686 post_return_arg: ValRaw,
2687 ) -> Result<()> {
2688 if self
2689 .concurrent_state_mut(store)
2690 .get_mut(guest_task)?
2691 .call_post_return_automatically()
2692 {
2693 let (calls, host_table, _, instance) = store
2694 .store_opaque_mut()
2695 .component_resource_state_with_instance(self);
2696 ResourceTables {
2697 calls,
2698 host_table: Some(host_table),
2699 guest: Some(instance.guest_tables()),
2700 }
2701 .exit_call()?;
2702 } else {
2703 let function_index = self
2708 .concurrent_state_mut(store)
2709 .get_mut(guest_task)?
2710 .function_index
2711 .unwrap();
2712
2713 self.id()
2714 .get_mut(store)
2715 .post_return_arg_set(function_index, post_return_arg);
2716 }
2717
2718 let state = self.concurrent_state_mut(store);
2719 let task = state.get_mut(guest_task)?;
2720
2721 if let Caller::Host { tx, .. } = &mut task.caller {
2722 if let Some(tx) = tx.take() {
2723 _ = tx.send(result);
2724 }
2725 } else {
2726 task.result = Some(result);
2727 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2728 }
2729
2730 Ok(())
2731 }
2732
2733 pub(crate) fn waitable_set_wait(
2735 self,
2736 store: &mut dyn VMStore,
2737 caller: RuntimeComponentInstanceIndex,
2738 options: OptionsIndex,
2739 set: u32,
2740 payload: u32,
2741 ) -> Result<u32> {
2742 self.id().get(store).check_may_leave(caller)?;
2743 let opts = self.concurrent_state_mut(store).options(options);
2744 let cancellable = opts.cancellable;
2745 let caller_instance = opts.instance;
2746 let rep =
2747 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2748
2749 self.waitable_check(
2750 store,
2751 cancellable,
2752 WaitableCheck::Wait(WaitableCheckParams {
2753 set: TableId::new(rep),
2754 options,
2755 payload,
2756 }),
2757 )
2758 }
2759
2760 pub(crate) fn waitable_set_poll(
2762 self,
2763 store: &mut dyn VMStore,
2764 caller: RuntimeComponentInstanceIndex,
2765 options: OptionsIndex,
2766 set: u32,
2767 payload: u32,
2768 ) -> Result<u32> {
2769 self.id().get(store).check_may_leave(caller)?;
2770 let opts = self.concurrent_state_mut(store).options(options);
2771 let cancellable = opts.cancellable;
2772 let caller_instance = opts.instance;
2773 let rep =
2774 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2775
2776 self.waitable_check(
2777 store,
2778 cancellable,
2779 WaitableCheck::Poll(WaitableCheckParams {
2780 set: TableId::new(rep),
2781 options,
2782 payload,
2783 }),
2784 )
2785 }
2786
2787 pub(crate) fn thread_yield(
2789 self,
2790 store: &mut dyn VMStore,
2791 caller: RuntimeComponentInstanceIndex,
2792 cancellable: bool,
2793 ) -> Result<bool> {
2794 self.id().get(store).check_may_leave(caller)?;
2795 self.waitable_check(store, cancellable, WaitableCheck::Yield)
2796 .map(|_| {
2797 if cancellable {
2798 let state = self.concurrent_state_mut(store);
2799 let task = state.guest_task.unwrap();
2800 if let Some(event) = state.get_mut(task).unwrap().event.take() {
2801 assert!(matches!(event, Event::Cancelled));
2802 true
2803 } else {
2804 false
2805 }
2806 } else {
2807 false
2808 }
2809 })
2810 }
2811
2812 fn waitable_check(
2815 self,
2816 store: &mut dyn VMStore,
2817 cancellable: bool,
2818 check: WaitableCheck,
2819 ) -> Result<u32> {
2820 let guest_task = self.concurrent_state_mut(store).guest_task.unwrap();
2821
2822 let (wait, set) = match &check {
2823 WaitableCheck::Wait(params) => (true, Some(params.set)),
2824 WaitableCheck::Poll(params) => (false, Some(params.set)),
2825 WaitableCheck::Yield => (false, None),
2826 };
2827
2828 self.suspend(store, SuspendReason::Yielding { task: guest_task })?;
2830
2831 log::trace!("waitable check for {guest_task:?}; set {set:?}");
2832
2833 let state = self.concurrent_state_mut(store);
2834 let task = state.get_mut(guest_task)?;
2835
2836 if wait && task.callback.is_some() {
2837 bail!("cannot call `task.wait` from async-lifted export with callback");
2838 }
2839
2840 if wait {
2843 let set = set.unwrap();
2844
2845 if (task.event.is_none()
2846 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
2847 && state.get_mut(set)?.ready.is_empty()
2848 {
2849 if cancellable {
2850 let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
2851 assert!(old.is_none());
2852 }
2853
2854 self.suspend(
2855 store,
2856 SuspendReason::Waiting {
2857 set,
2858 task: guest_task,
2859 },
2860 )?;
2861 }
2862 }
2863
2864 log::trace!("waitable check for {guest_task:?}; set {set:?}, part two");
2865
2866 let result = match check {
2867 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
2869 let event = self.id().get_mut(store).get_event(
2870 guest_task,
2871 Some(params.set),
2872 cancellable,
2873 )?;
2874
2875 let (ordinal, handle, result) = if wait {
2876 let (event, waitable) = event.unwrap();
2877 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2878 let (ordinal, result) = event.parts();
2879 (ordinal, handle, result)
2880 } else {
2881 if let Some((event, waitable)) = event {
2882 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2883 let (ordinal, result) = event.parts();
2884 (ordinal, handle, result)
2885 } else {
2886 log::trace!(
2887 "no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}",
2888 params.set
2889 );
2890 let (ordinal, result) = Event::None.parts();
2891 (ordinal, 0, result)
2892 }
2893 };
2894 let store = store.store_opaque_mut();
2895 let options = Options::new_index(store, self, params.options);
2896 let ptr = func::validate_inbounds::<(u32, u32)>(
2897 options.memory_mut(store),
2898 &ValRaw::u32(params.payload),
2899 )?;
2900 options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
2901 options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
2902 Ok(ordinal)
2903 }
2904 WaitableCheck::Yield => Ok(0),
2905 };
2906
2907 result
2908 }
2909
2910 pub(crate) fn subtask_cancel(
2912 self,
2913 store: &mut dyn VMStore,
2914 caller_instance: RuntimeComponentInstanceIndex,
2915 async_: bool,
2916 task_id: u32,
2917 ) -> Result<u32> {
2918 self.id().get(store).check_may_leave(caller_instance)?;
2919 let (rep, is_host) =
2920 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
2921 let (waitable, expected_caller_instance) = if is_host {
2922 let id = TableId::<HostTask>::new(rep);
2923 (
2924 Waitable::Host(id),
2925 self.concurrent_state_mut(store)
2926 .get_mut(id)?
2927 .caller_instance,
2928 )
2929 } else {
2930 let id = TableId::<GuestTask>::new(rep);
2931 if let Caller::Guest { instance, .. } =
2932 &self.concurrent_state_mut(store).get_mut(id)?.caller
2933 {
2934 (Waitable::Guest(id), *instance)
2935 } else {
2936 unreachable!()
2937 }
2938 };
2939 assert_eq!(expected_caller_instance, caller_instance);
2943
2944 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
2945
2946 let concurrent_state = self.concurrent_state_mut(store);
2947 if let Waitable::Host(host_task) = waitable {
2948 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
2949 handle.abort();
2950 return Ok(Status::ReturnCancelled as u32);
2951 }
2952 } else {
2953 let caller = concurrent_state.guest_task.unwrap();
2954 let guest_task = TableId::<GuestTask>::new(rep);
2955 let task = concurrent_state.get_mut(guest_task)?;
2956 if task.lower_params.is_some() {
2957 task.lower_params = None;
2958 task.lift_result = None;
2959
2960 let callee_instance = task.instance;
2962
2963 let kind = concurrent_state
2964 .instance_state(callee_instance)
2965 .pending
2966 .remove(&guest_task);
2967
2968 if kind.is_none() {
2969 bail!("`subtask.cancel` called after terminal status delivered");
2970 }
2971
2972 return Ok(Status::StartCancelled as u32);
2973 } else if task.lift_result.is_some() {
2974 task.cancel_sent = true;
2977 task.event = Some(Event::Cancelled);
2982 if let Some(set) = task.wake_on_cancel.take() {
2983 let item = match concurrent_state
2984 .get_mut(set)?
2985 .waiting
2986 .remove(&guest_task)
2987 .unwrap()
2988 {
2989 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
2990 WaitMode::Callback => WorkItem::GuestCall(GuestCall {
2991 task: guest_task,
2992 kind: GuestCallKind::DeliverEvent { set: None },
2993 }),
2994 };
2995 concurrent_state.push_high_priority(item);
2996
2997 self.suspend(store, SuspendReason::Yielding { task: caller })?;
2998 }
2999
3000 let concurrent_state = self.concurrent_state_mut(store);
3001 let task = concurrent_state.get_mut(guest_task)?;
3002 if task.lift_result.is_some() {
3003 if async_ {
3006 return Ok(BLOCKED);
3007 } else {
3008 self.wait_for_event(store, Waitable::Guest(guest_task))?;
3009 }
3010 }
3011 }
3012 }
3013
3014 let event = waitable.take_event(self.concurrent_state_mut(store))?;
3015 if let Some(Event::Subtask {
3016 status: status @ (Status::Returned | Status::ReturnCancelled),
3017 }) = event
3018 {
3019 Ok(status as u32)
3020 } else {
3021 bail!("`subtask.cancel` called after terminal status delivered");
3022 }
3023 }
3024
3025 fn wait_for_event(self, store: &mut dyn VMStore, waitable: Waitable) -> Result<()> {
3026 let state = self.concurrent_state_mut(store);
3027 let caller = state.guest_task.unwrap();
3028 let old_set = waitable.common(state)?.set;
3029 let set = state.get_mut(caller)?.sync_call_set;
3030 waitable.join(state, Some(set))?;
3031 self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
3032 let state = self.concurrent_state_mut(store);
3033 waitable.join(state, old_set)
3034 }
3035
3036 fn set_tls<R>(self, store: &mut dyn VMStore, f: impl FnOnce() -> R) -> R {
3043 struct Reset<'a>(&'a mut dyn VMStore, Option<ComponentInstanceId>);
3044
3045 impl Drop for Reset<'_> {
3046 fn drop(&mut self) {
3047 self.0.concurrent_async_state_mut().current_instance = self.1;
3048 }
3049 }
3050 let prev = mem::replace(
3051 &mut store.concurrent_async_state_mut().current_instance,
3052 Some(self.id().instance()),
3053 );
3054 let reset = Reset(store, prev);
3055
3056 tls::set(reset.0, f)
3057 }
3058
3059 pub(crate) fn concurrent_state_mut<'a>(
3061 &self,
3062 store: &'a mut StoreOpaque,
3063 ) -> &'a mut ConcurrentState {
3064 self.id().get_mut(store).concurrent_state_mut()
3065 }
3066
3067 pub(crate) fn context_get(
3068 self,
3069 store: &mut dyn VMStore,
3070 caller: RuntimeComponentInstanceIndex,
3071 slot: u32,
3072 ) -> Result<u32> {
3073 self.id().get(store).check_may_leave(caller)?;
3074 self.concurrent_state_mut(store).context_get(slot)
3075 }
3076
3077 pub(crate) fn context_set(
3078 self,
3079 store: &mut dyn VMStore,
3080 caller: RuntimeComponentInstanceIndex,
3081 slot: u32,
3082 value: u32,
3083 ) -> Result<()> {
3084 self.id().get(store).check_may_leave(caller)?;
3085 self.concurrent_state_mut(store).context_set(slot, value)
3086 }
3087}
3088
3089pub trait VMComponentAsyncStore {
3097 unsafe fn prepare_call(
3103 &mut self,
3104 instance: Instance,
3105 memory: *mut VMMemoryDefinition,
3106 start: *mut VMFuncRef,
3107 return_: *mut VMFuncRef,
3108 caller_instance: RuntimeComponentInstanceIndex,
3109 callee_instance: RuntimeComponentInstanceIndex,
3110 task_return_type: TypeTupleIndex,
3111 string_encoding: u8,
3112 result_count: u32,
3113 storage: *mut ValRaw,
3114 storage_len: usize,
3115 ) -> Result<()>;
3116
3117 unsafe fn sync_start(
3120 &mut self,
3121 instance: Instance,
3122 callback: *mut VMFuncRef,
3123 callee: *mut VMFuncRef,
3124 param_count: u32,
3125 storage: *mut MaybeUninit<ValRaw>,
3126 storage_len: usize,
3127 ) -> Result<()>;
3128
3129 unsafe fn async_start(
3132 &mut self,
3133 instance: Instance,
3134 callback: *mut VMFuncRef,
3135 post_return: *mut VMFuncRef,
3136 callee: *mut VMFuncRef,
3137 param_count: u32,
3138 result_count: u32,
3139 flags: u32,
3140 ) -> Result<u32>;
3141
3142 fn future_write(
3144 &mut self,
3145 instance: Instance,
3146 caller: RuntimeComponentInstanceIndex,
3147 ty: TypeFutureTableIndex,
3148 options: OptionsIndex,
3149 future: u32,
3150 address: u32,
3151 ) -> Result<u32>;
3152
3153 fn future_read(
3155 &mut self,
3156 instance: Instance,
3157 caller: RuntimeComponentInstanceIndex,
3158 ty: TypeFutureTableIndex,
3159 options: OptionsIndex,
3160 future: u32,
3161 address: u32,
3162 ) -> Result<u32>;
3163
3164 fn future_drop_writable(
3166 &mut self,
3167 instance: Instance,
3168 caller: RuntimeComponentInstanceIndex,
3169 ty: TypeFutureTableIndex,
3170 writer: u32,
3171 ) -> Result<()>;
3172
3173 fn stream_write(
3175 &mut self,
3176 instance: Instance,
3177 caller: RuntimeComponentInstanceIndex,
3178 ty: TypeStreamTableIndex,
3179 options: OptionsIndex,
3180 stream: u32,
3181 address: u32,
3182 count: u32,
3183 ) -> Result<u32>;
3184
3185 fn stream_read(
3187 &mut self,
3188 instance: Instance,
3189 caller: RuntimeComponentInstanceIndex,
3190 ty: TypeStreamTableIndex,
3191 options: OptionsIndex,
3192 stream: u32,
3193 address: u32,
3194 count: u32,
3195 ) -> Result<u32>;
3196
3197 fn flat_stream_write(
3200 &mut self,
3201 instance: Instance,
3202 caller: RuntimeComponentInstanceIndex,
3203 ty: TypeStreamTableIndex,
3204 options: OptionsIndex,
3205 payload_size: u32,
3206 payload_align: u32,
3207 stream: u32,
3208 address: u32,
3209 count: u32,
3210 ) -> Result<u32>;
3211
3212 fn flat_stream_read(
3215 &mut self,
3216 instance: Instance,
3217 caller: RuntimeComponentInstanceIndex,
3218 ty: TypeStreamTableIndex,
3219 options: OptionsIndex,
3220 payload_size: u32,
3221 payload_align: u32,
3222 stream: u32,
3223 address: u32,
3224 count: u32,
3225 ) -> Result<u32>;
3226
3227 fn stream_drop_writable(
3229 &mut self,
3230 instance: Instance,
3231 caller: RuntimeComponentInstanceIndex,
3232 ty: TypeStreamTableIndex,
3233 writer: u32,
3234 ) -> Result<()>;
3235
3236 fn error_context_debug_message(
3238 &mut self,
3239 instance: Instance,
3240 caller: RuntimeComponentInstanceIndex,
3241 ty: TypeComponentLocalErrorContextTableIndex,
3242 options: OptionsIndex,
3243 err_ctx_handle: u32,
3244 debug_msg_address: u32,
3245 ) -> Result<()>;
3246}
3247
3248impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3250 unsafe fn prepare_call(
3251 &mut self,
3252 instance: Instance,
3253 memory: *mut VMMemoryDefinition,
3254 start: *mut VMFuncRef,
3255 return_: *mut VMFuncRef,
3256 caller_instance: RuntimeComponentInstanceIndex,
3257 callee_instance: RuntimeComponentInstanceIndex,
3258 task_return_type: TypeTupleIndex,
3259 string_encoding: u8,
3260 result_count_or_max_if_async: u32,
3261 storage: *mut ValRaw,
3262 storage_len: usize,
3263 ) -> Result<()> {
3264 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3268
3269 unsafe {
3270 instance.prepare_call(
3271 StoreContextMut(self),
3272 start,
3273 return_,
3274 caller_instance,
3275 callee_instance,
3276 task_return_type,
3277 memory,
3278 string_encoding,
3279 match result_count_or_max_if_async {
3280 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3281 params,
3282 has_result: false,
3283 },
3284 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3285 params,
3286 has_result: true,
3287 },
3288 result_count => CallerInfo::Sync {
3289 params,
3290 result_count,
3291 },
3292 },
3293 )
3294 }
3295 }
3296
3297 unsafe fn sync_start(
3298 &mut self,
3299 instance: Instance,
3300 callback: *mut VMFuncRef,
3301 callee: *mut VMFuncRef,
3302 param_count: u32,
3303 storage: *mut MaybeUninit<ValRaw>,
3304 storage_len: usize,
3305 ) -> Result<()> {
3306 unsafe {
3307 instance
3308 .start_call(
3309 StoreContextMut(self),
3310 callback,
3311 ptr::null_mut(),
3312 callee,
3313 param_count,
3314 1,
3315 START_FLAG_ASYNC_CALLEE,
3316 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3320 )
3321 .map(drop)
3322 }
3323 }
3324
3325 unsafe fn async_start(
3326 &mut self,
3327 instance: Instance,
3328 callback: *mut VMFuncRef,
3329 post_return: *mut VMFuncRef,
3330 callee: *mut VMFuncRef,
3331 param_count: u32,
3332 result_count: u32,
3333 flags: u32,
3334 ) -> Result<u32> {
3335 unsafe {
3336 instance.start_call(
3337 StoreContextMut(self),
3338 callback,
3339 post_return,
3340 callee,
3341 param_count,
3342 result_count,
3343 flags,
3344 None,
3345 )
3346 }
3347 }
3348
3349 fn future_write(
3350 &mut self,
3351 instance: Instance,
3352 caller: RuntimeComponentInstanceIndex,
3353 ty: TypeFutureTableIndex,
3354 options: OptionsIndex,
3355 future: u32,
3356 address: u32,
3357 ) -> Result<u32> {
3358 instance.id().get(self).check_may_leave(caller)?;
3359 instance
3360 .guest_write(
3361 StoreContextMut(self),
3362 TransmitIndex::Future(ty),
3363 options,
3364 None,
3365 future,
3366 address,
3367 1,
3368 )
3369 .map(|result| result.encode())
3370 }
3371
3372 fn future_read(
3373 &mut self,
3374 instance: Instance,
3375 caller: RuntimeComponentInstanceIndex,
3376 ty: TypeFutureTableIndex,
3377 options: OptionsIndex,
3378 future: u32,
3379 address: u32,
3380 ) -> Result<u32> {
3381 instance.id().get(self).check_may_leave(caller)?;
3382 instance
3383 .guest_read(
3384 StoreContextMut(self),
3385 TransmitIndex::Future(ty),
3386 options,
3387 None,
3388 future,
3389 address,
3390 1,
3391 )
3392 .map(|result| result.encode())
3393 }
3394
3395 fn stream_write(
3396 &mut self,
3397 instance: Instance,
3398 caller: RuntimeComponentInstanceIndex,
3399 ty: TypeStreamTableIndex,
3400 options: OptionsIndex,
3401 stream: u32,
3402 address: u32,
3403 count: u32,
3404 ) -> Result<u32> {
3405 instance.id().get(self).check_may_leave(caller)?;
3406 instance
3407 .guest_write(
3408 StoreContextMut(self),
3409 TransmitIndex::Stream(ty),
3410 options,
3411 None,
3412 stream,
3413 address,
3414 count,
3415 )
3416 .map(|result| result.encode())
3417 }
3418
3419 fn stream_read(
3420 &mut self,
3421 instance: Instance,
3422 caller: RuntimeComponentInstanceIndex,
3423 ty: TypeStreamTableIndex,
3424 options: OptionsIndex,
3425 stream: u32,
3426 address: u32,
3427 count: u32,
3428 ) -> Result<u32> {
3429 instance.id().get(self).check_may_leave(caller)?;
3430 instance
3431 .guest_read(
3432 StoreContextMut(self),
3433 TransmitIndex::Stream(ty),
3434 options,
3435 None,
3436 stream,
3437 address,
3438 count,
3439 )
3440 .map(|result| result.encode())
3441 }
3442
3443 fn future_drop_writable(
3444 &mut self,
3445 instance: Instance,
3446 caller: RuntimeComponentInstanceIndex,
3447 ty: TypeFutureTableIndex,
3448 writer: u32,
3449 ) -> Result<()> {
3450 instance.id().get(self).check_may_leave(caller)?;
3451 instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Future(ty), writer)
3452 }
3453
3454 fn flat_stream_write(
3455 &mut self,
3456 instance: Instance,
3457 caller: RuntimeComponentInstanceIndex,
3458 ty: TypeStreamTableIndex,
3459 options: OptionsIndex,
3460 payload_size: u32,
3461 payload_align: u32,
3462 stream: u32,
3463 address: u32,
3464 count: u32,
3465 ) -> Result<u32> {
3466 instance.id().get(self).check_may_leave(caller)?;
3467 instance
3468 .guest_write(
3469 StoreContextMut(self),
3470 TransmitIndex::Stream(ty),
3471 options,
3472 Some(FlatAbi {
3473 size: payload_size,
3474 align: payload_align,
3475 }),
3476 stream,
3477 address,
3478 count,
3479 )
3480 .map(|result| result.encode())
3481 }
3482
3483 fn flat_stream_read(
3484 &mut self,
3485 instance: Instance,
3486 caller: RuntimeComponentInstanceIndex,
3487 ty: TypeStreamTableIndex,
3488 options: OptionsIndex,
3489 payload_size: u32,
3490 payload_align: u32,
3491 stream: u32,
3492 address: u32,
3493 count: u32,
3494 ) -> Result<u32> {
3495 instance.id().get(self).check_may_leave(caller)?;
3496 instance
3497 .guest_read(
3498 StoreContextMut(self),
3499 TransmitIndex::Stream(ty),
3500 options,
3501 Some(FlatAbi {
3502 size: payload_size,
3503 align: payload_align,
3504 }),
3505 stream,
3506 address,
3507 count,
3508 )
3509 .map(|result| result.encode())
3510 }
3511
3512 fn stream_drop_writable(
3513 &mut self,
3514 instance: Instance,
3515 caller: RuntimeComponentInstanceIndex,
3516 ty: TypeStreamTableIndex,
3517 writer: u32,
3518 ) -> Result<()> {
3519 instance.id().get(self).check_may_leave(caller)?;
3520 instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Stream(ty), writer)
3521 }
3522
3523 fn error_context_debug_message(
3524 &mut self,
3525 instance: Instance,
3526 caller: RuntimeComponentInstanceIndex,
3527 ty: TypeComponentLocalErrorContextTableIndex,
3528 options: OptionsIndex,
3529 err_ctx_handle: u32,
3530 debug_msg_address: u32,
3531 ) -> Result<()> {
3532 instance.id().get(self).check_may_leave(caller)?;
3533 instance.error_context_debug_message(
3534 StoreContextMut(self),
3535 ty,
3536 options,
3537 err_ctx_handle,
3538 debug_msg_address,
3539 )
3540 }
3541}
3542
3543type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3544
3545struct HostTask {
3547 common: WaitableCommon,
3548 caller_instance: RuntimeComponentInstanceIndex,
3549 join_handle: Option<JoinHandle>,
3550}
3551
3552impl HostTask {
3553 fn new(
3554 caller_instance: RuntimeComponentInstanceIndex,
3555 join_handle: Option<JoinHandle>,
3556 ) -> Self {
3557 Self {
3558 common: WaitableCommon::default(),
3559 caller_instance,
3560 join_handle,
3561 }
3562 }
3563}
3564
3565impl TableDebug for HostTask {
3566 fn type_name() -> &'static str {
3567 "HostTask"
3568 }
3569}
3570
3571type CallbackFn = Box<
3572 dyn Fn(&mut dyn VMStore, Instance, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3573 + Send
3574 + Sync
3575 + 'static,
3576>;
3577
3578enum Caller {
3580 Host {
3582 tx: Option<oneshot::Sender<LiftedResult>>,
3584 exit_tx: Arc<oneshot::Sender<()>>,
3591 remove_task_automatically: bool,
3594 call_post_return_automatically: bool,
3596 },
3597 Guest {
3599 task: TableId<GuestTask>,
3601 instance: RuntimeComponentInstanceIndex,
3607 },
3608}
3609
3610struct LiftResult {
3613 lift: RawLift,
3614 ty: TypeTupleIndex,
3615 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3616 string_encoding: StringEncoding,
3617}
3618
3619struct GuestTask {
3621 common: WaitableCommon,
3623 lower_params: Option<RawLower>,
3625 lift_result: Option<LiftResult>,
3627 result: Option<LiftedResult>,
3630 callback: Option<CallbackFn>,
3633 caller: Caller,
3635 call_context: Option<CallContext>,
3638 sync_result: Option<Option<ValRaw>>,
3641 cancel_sent: bool,
3644 starting_sent: bool,
3647 context: [u32; 2],
3650 subtasks: HashSet<TableId<GuestTask>>,
3655 sync_call_set: TableId<WaitableSet>,
3657 instance: RuntimeComponentInstanceIndex,
3663 event: Option<Event>,
3666 wake_on_cancel: Option<TableId<WaitableSet>>,
3669 function_index: Option<ExportIndex>,
3671 exited: bool,
3673}
3674
3675impl GuestTask {
3676 fn new(
3677 state: &mut ConcurrentState,
3678 lower_params: RawLower,
3679 lift_result: LiftResult,
3680 caller: Caller,
3681 callback: Option<CallbackFn>,
3682 component_instance: RuntimeComponentInstanceIndex,
3683 ) -> Result<Self> {
3684 let sync_call_set = state.push(WaitableSet::default())?;
3685
3686 Ok(Self {
3687 common: WaitableCommon::default(),
3688 lower_params: Some(lower_params),
3689 lift_result: Some(lift_result),
3690 result: None,
3691 callback,
3692 caller,
3693 call_context: Some(CallContext::default()),
3694 sync_result: None,
3695 cancel_sent: false,
3696 starting_sent: false,
3697 context: [0u32; 2],
3698 subtasks: HashSet::new(),
3699 sync_call_set,
3700 instance: component_instance,
3701 event: None,
3702 wake_on_cancel: None,
3703 function_index: None,
3704 exited: false,
3705 })
3706 }
3707
3708 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
3711 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
3714 if let Some(Event::Subtask {
3715 status: Status::Returned | Status::ReturnCancelled,
3716 }) = waitable.common(state)?.event
3717 {
3718 waitable.delete_from(state)?;
3719 }
3720 }
3721
3722 state.delete(self.sync_call_set)?;
3723
3724 match &self.caller {
3726 Caller::Guest {
3727 task,
3728 instance: runtime_instance,
3729 } => {
3730 let task_mut = state.get_mut(*task)?;
3731 let present = task_mut.subtasks.remove(&me);
3732 assert!(present);
3733
3734 for subtask in &self.subtasks {
3735 task_mut.subtasks.insert(*subtask);
3736 }
3737
3738 for subtask in &self.subtasks {
3739 state.get_mut(*subtask)?.caller = Caller::Guest {
3740 task: *task,
3741 instance: *runtime_instance,
3742 };
3743 }
3744 }
3745 Caller::Host { exit_tx, .. } => {
3746 for subtask in &self.subtasks {
3747 state.get_mut(*subtask)?.caller = Caller::Host {
3748 tx: None,
3749 exit_tx: exit_tx.clone(),
3753 remove_task_automatically: true,
3754 call_post_return_automatically: true,
3755 };
3756 }
3757 }
3758 }
3759
3760 Ok(())
3761 }
3762
3763 fn call_post_return_automatically(&self) -> bool {
3764 matches!(
3765 self.caller,
3766 Caller::Guest { .. }
3767 | Caller::Host {
3768 call_post_return_automatically: true,
3769 ..
3770 }
3771 )
3772 }
3773}
3774
3775impl TableDebug for GuestTask {
3776 fn type_name() -> &'static str {
3777 "GuestTask"
3778 }
3779}
3780
3781#[derive(Default)]
3783struct WaitableCommon {
3784 event: Option<Event>,
3786 set: Option<TableId<WaitableSet>>,
3788 handle: Option<u32>,
3790}
3791
3792#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3794enum Waitable {
3795 Host(TableId<HostTask>),
3797 Guest(TableId<GuestTask>),
3799 Transmit(TableId<TransmitHandle>),
3801}
3802
3803impl Waitable {
3804 fn from_instance(
3807 state: Pin<&mut ComponentInstance>,
3808 caller_instance: RuntimeComponentInstanceIndex,
3809 waitable: u32,
3810 ) -> Result<Self> {
3811 use crate::runtime::vm::component::Waitable;
3812
3813 let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
3814
3815 Ok(match kind {
3816 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
3817 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
3818 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
3819 })
3820 }
3821
3822 fn rep(&self) -> u32 {
3824 match self {
3825 Self::Host(id) => id.rep(),
3826 Self::Guest(id) => id.rep(),
3827 Self::Transmit(id) => id.rep(),
3828 }
3829 }
3830
3831 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
3835 log::trace!("waitable {self:?} join set {set:?}",);
3836
3837 let old = mem::replace(&mut self.common(state)?.set, set);
3838
3839 if let Some(old) = old {
3840 match *self {
3841 Waitable::Host(id) => state.remove_child(id, old),
3842 Waitable::Guest(id) => state.remove_child(id, old),
3843 Waitable::Transmit(id) => state.remove_child(id, old),
3844 }?;
3845
3846 state.get_mut(old)?.ready.remove(self);
3847 }
3848
3849 if let Some(set) = set {
3850 match *self {
3851 Waitable::Host(id) => state.add_child(id, set),
3852 Waitable::Guest(id) => state.add_child(id, set),
3853 Waitable::Transmit(id) => state.add_child(id, set),
3854 }?;
3855
3856 if self.common(state)?.event.is_some() {
3857 self.mark_ready(state)?;
3858 }
3859 }
3860
3861 Ok(())
3862 }
3863
3864 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
3866 Ok(match self {
3867 Self::Host(id) => &mut state.get_mut(*id)?.common,
3868 Self::Guest(id) => &mut state.get_mut(*id)?.common,
3869 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
3870 })
3871 }
3872
3873 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
3877 log::trace!("set event for {self:?}: {event:?}");
3878 self.common(state)?.event = event;
3879 self.mark_ready(state)
3880 }
3881
3882 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
3884 let common = self.common(state)?;
3885 let event = common.event.take();
3886 if let Some(set) = self.common(state)?.set {
3887 state.get_mut(set)?.ready.remove(self);
3888 }
3889 Ok(event)
3890 }
3891
3892 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
3896 if let Some(set) = self.common(state)?.set {
3897 state.get_mut(set)?.ready.insert(*self);
3898 if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() {
3899 let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take();
3900 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
3901
3902 let item = match mode {
3903 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3904 WaitMode::Callback => WorkItem::GuestCall(GuestCall {
3905 task,
3906 kind: GuestCallKind::DeliverEvent { set: Some(set) },
3907 }),
3908 };
3909 state.push_high_priority(item);
3910 }
3911 }
3912 Ok(())
3913 }
3914
3915 fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
3918 match event {
3919 Event::FutureRead {
3920 pending: Some((ty, handle)),
3921 ..
3922 }
3923 | Event::FutureWrite {
3924 pending: Some((ty, handle)),
3925 ..
3926 } => {
3927 let runtime_instance = instance.component().types()[ty].instance;
3928 let (rep, state) = instance.guest_tables().0[runtime_instance]
3929 .future_rep(ty, handle)
3930 .unwrap();
3931 assert_eq!(rep, self.rep());
3932 assert_eq!(*state, TransmitLocalState::Busy);
3933 *state = match event {
3934 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
3935 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
3936 _ => unreachable!(),
3937 };
3938 }
3939 Event::StreamRead {
3940 pending: Some((ty, handle)),
3941 code,
3942 }
3943 | Event::StreamWrite {
3944 pending: Some((ty, handle)),
3945 code,
3946 } => {
3947 let runtime_instance = instance.component().types()[ty].instance;
3948 let (rep, state) = instance.guest_tables().0[runtime_instance]
3949 .stream_rep(ty, handle)
3950 .unwrap();
3951 assert_eq!(rep, self.rep());
3952 assert_eq!(*state, TransmitLocalState::Busy);
3953 let done = matches!(code, ReturnCode::Dropped(_));
3954 *state = match event {
3955 Event::StreamRead { .. } => TransmitLocalState::Read { done },
3956 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
3957 _ => unreachable!(),
3958 };
3959 }
3960 _ => {}
3961 }
3962 }
3963
3964 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
3966 match self {
3967 Self::Host(task) => {
3968 log::trace!("delete host task {task:?}");
3969 state.delete(*task)?;
3970 }
3971 Self::Guest(task) => {
3972 log::trace!("delete guest task {task:?}");
3973 state.delete(*task)?.dispose(state, *task)?;
3974 }
3975 Self::Transmit(task) => {
3976 state.delete(*task)?;
3977 }
3978 }
3979
3980 Ok(())
3981 }
3982}
3983
3984impl fmt::Debug for Waitable {
3985 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3986 match self {
3987 Self::Host(id) => write!(f, "{id:?}"),
3988 Self::Guest(id) => write!(f, "{id:?}"),
3989 Self::Transmit(id) => write!(f, "{id:?}"),
3990 }
3991 }
3992}
3993
3994#[derive(Default)]
3996struct WaitableSet {
3997 ready: BTreeSet<Waitable>,
3999 waiting: BTreeMap<TableId<GuestTask>, WaitMode>,
4001}
4002
4003impl TableDebug for WaitableSet {
4004 fn type_name() -> &'static str {
4005 "WaitableSet"
4006 }
4007}
4008
4009type RawLower = Box<
4011 dyn FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4012>;
4013
4014type RawLift = Box<
4016 dyn FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4017 + Send
4018 + Sync,
4019>;
4020
4021type LiftedResult = Box<dyn Any + Send + Sync>;
4025
4026struct DummyResult;
4029
4030pub(crate) struct AsyncState {
4033 current_instance: Option<ComponentInstanceId>,
4037}
4038
4039impl Default for AsyncState {
4040 fn default() -> Self {
4041 Self {
4042 current_instance: None,
4043 }
4044 }
4045}
4046
4047#[derive(Default)]
4049struct InstanceState {
4050 backpressure: u16,
4052 do_not_enter: bool,
4054 pending: BTreeMap<TableId<GuestTask>, GuestCallKind>,
4057}
4058
4059pub struct ConcurrentState {
4062 guest_task: Option<TableId<GuestTask>>,
4064 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4069 table: AlwaysMut<ResourceTable>,
4071 instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4077 high_priority: Vec<WorkItem>,
4079 low_priority: Vec<WorkItem>,
4081 suspend_reason: Option<SuspendReason>,
4085 worker: Option<StoreFiber<'static>>,
4089 worker_item: Option<WorkerItem>,
4091
4092 global_error_context_ref_counts:
4105 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4106
4107 component: Component,
4110}
4111
4112impl ConcurrentState {
4113 pub(crate) fn new(component: &Component) -> Self {
4114 Self {
4115 guest_task: None,
4116 table: AlwaysMut::new(ResourceTable::new()),
4117 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4118 instance_states: HashMap::new(),
4119 high_priority: Vec::new(),
4120 low_priority: Vec::new(),
4121 suspend_reason: None,
4122 worker: None,
4123 worker_item: None,
4124 global_error_context_ref_counts: BTreeMap::new(),
4125 component: component.clone(),
4126 }
4127 }
4128
4129 pub(crate) fn take_fibers_and_futures(
4146 &mut self,
4147 fibers: &mut Vec<StoreFiber<'static>>,
4148 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4149 ) {
4150 for entry in self.table.get_mut().iter_mut() {
4151 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4152 for mode in mem::take(&mut set.waiting).into_values() {
4153 if let WaitMode::Fiber(fiber) = mode {
4154 fibers.push(fiber);
4155 }
4156 }
4157 }
4158 }
4159
4160 if let Some(fiber) = self.worker.take() {
4161 fibers.push(fiber);
4162 }
4163
4164 let mut take_items = |list| {
4165 for item in mem::take(list) {
4166 match item {
4167 WorkItem::ResumeFiber(fiber) => {
4168 fibers.push(fiber);
4169 }
4170 WorkItem::PushFuture(future) => {
4171 self.futures
4172 .get_mut()
4173 .as_mut()
4174 .unwrap()
4175 .push(future.into_inner());
4176 }
4177 _ => {}
4178 }
4179 }
4180 };
4181
4182 take_items(&mut self.high_priority);
4183 take_items(&mut self.low_priority);
4184
4185 if let Some(them) = self.futures.get_mut().take() {
4186 futures.push(them);
4187 }
4188 }
4189
4190 fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4191 self.instance_states.entry(instance).or_default()
4192 }
4193
4194 fn push<V: Send + Sync + 'static>(
4195 &mut self,
4196 value: V,
4197 ) -> Result<TableId<V>, ResourceTableError> {
4198 self.table.get_mut().push(value).map(TableId::from)
4199 }
4200
4201 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4202 self.table.get_mut().get_mut(&Resource::from(id))
4203 }
4204
4205 pub fn add_child<T: 'static, U: 'static>(
4206 &mut self,
4207 child: TableId<T>,
4208 parent: TableId<U>,
4209 ) -> Result<(), ResourceTableError> {
4210 self.table
4211 .get_mut()
4212 .add_child(Resource::from(child), Resource::from(parent))
4213 }
4214
4215 pub fn remove_child<T: 'static, U: 'static>(
4216 &mut self,
4217 child: TableId<T>,
4218 parent: TableId<U>,
4219 ) -> Result<(), ResourceTableError> {
4220 self.table
4221 .get_mut()
4222 .remove_child(Resource::from(child), Resource::from(parent))
4223 }
4224
4225 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4226 self.table.get_mut().delete(Resource::from(id))
4227 }
4228
4229 fn push_future(&mut self, future: HostTaskFuture) {
4230 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4237 }
4238
4239 fn push_high_priority(&mut self, item: WorkItem) {
4240 log::trace!("push high priority: {item:?}");
4241 self.high_priority.push(item);
4242 }
4243
4244 fn push_low_priority(&mut self, item: WorkItem) {
4245 log::trace!("push low priority: {item:?}");
4246 self.low_priority.push(item);
4247 }
4248
4249 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4259 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4260
4261 loop {
4269 match &self.get_mut(guest_task).unwrap().caller {
4270 Caller::Host { .. } => break true,
4271 Caller::Guest { task, instance } => {
4272 if *instance == guest_instance {
4273 break false;
4274 } else {
4275 guest_task = *task;
4276 }
4277 }
4278 }
4279 }
4280 }
4281
4282 fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4286 self.instance_state(instance).do_not_enter = true;
4287 }
4288
4289 fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4293 self.instance_state(instance).do_not_enter = false;
4294 self.partition_pending(instance)
4295 }
4296
4297 fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4302 for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4303 let call = GuestCall { task, kind };
4304 if call.is_ready(self)? {
4305 self.push_high_priority(WorkItem::GuestCall(call));
4306 } else {
4307 self.instance_state(instance)
4308 .pending
4309 .insert(call.task, call.kind);
4310 }
4311 }
4312
4313 Ok(())
4314 }
4315
4316 pub(crate) fn backpressure_modify(
4318 &mut self,
4319 caller_instance: RuntimeComponentInstanceIndex,
4320 modify: impl FnOnce(u16) -> Option<u16>,
4321 ) -> Result<()> {
4322 let state = self.instance_state(caller_instance);
4323 let old = state.backpressure;
4324 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4325 state.backpressure = new;
4326
4327 if old > 0 && new == 0 {
4328 self.partition_pending(caller_instance)?;
4331 }
4332
4333 Ok(())
4334 }
4335
4336 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4338 let task = self.guest_task.unwrap();
4339 let val = self.get_mut(task)?.context[usize::try_from(slot).unwrap()];
4340 log::trace!("context_get {task:?} slot {slot} val {val:#x}");
4341 Ok(val)
4342 }
4343
4344 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4346 let task = self.guest_task.unwrap();
4347 log::trace!("context_set {task:?} slot {slot} val {val:#x}");
4348 self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val;
4349 Ok(())
4350 }
4351
4352 fn options(&self, options: OptionsIndex) -> &CanonicalOptions {
4353 &self.component.env_component().options[options]
4354 }
4355}
4356
4357fn for_any_lower<
4360 F: FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4361>(
4362 fun: F,
4363) -> F {
4364 fun
4365}
4366
4367fn for_any_lift<
4369 F: FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4370 + Send
4371 + Sync,
4372>(
4373 fun: F,
4374) -> F {
4375 fun
4376}
4377
4378fn checked<F: Future + Send + 'static>(
4383 instance: Instance,
4384 fut: F,
4385) -> impl Future<Output = F::Output> + Send + 'static {
4386 async move {
4387 let mut fut = pin!(fut);
4388 future::poll_fn(move |cx| {
4389 let message = "\
4390 `Future`s which depend on asynchronous component tasks, streams, or \
4391 futures to complete may only be polled from the event loop of the \
4392 instance from which they originated. Please use \
4393 `Instance::{run_concurrent,spawn}` to poll or await them.\
4394 ";
4395 tls::try_get(|store| {
4396 let matched = match store {
4397 tls::TryGet::Some(store) => {
4398 let a = store.concurrent_async_state_mut().current_instance;
4399 a == Some(instance.id().instance())
4400 }
4401 tls::TryGet::Taken | tls::TryGet::None => false,
4402 };
4403
4404 if !matched {
4405 panic!("{message}")
4406 }
4407 });
4408 fut.as_mut().poll(cx)
4409 })
4410 .await
4411 }
4412}
4413
4414fn check_recursive_run() {
4417 tls::try_get(|store| {
4418 if !matches!(store, tls::TryGet::None) {
4419 panic!("Recursive `Instance::run_concurrent` calls not supported")
4420 }
4421 });
4422}
4423
4424fn unpack_callback_code(code: u32) -> (u32, u32) {
4425 (code & 0xF, code >> 4)
4426}
4427
4428struct WaitableCheckParams {
4432 set: TableId<WaitableSet>,
4433 options: OptionsIndex,
4434 payload: u32,
4435}
4436
4437enum WaitableCheck {
4439 Wait(WaitableCheckParams),
4440 Poll(WaitableCheckParams),
4441 Yield,
4442}
4443
4444pub(crate) struct PreparedCall<R> {
4446 handle: Func,
4448 task: TableId<GuestTask>,
4450 param_count: usize,
4452 rx: oneshot::Receiver<LiftedResult>,
4455 exit_rx: oneshot::Receiver<()>,
4458 _phantom: PhantomData<R>,
4459}
4460
4461impl<R> PreparedCall<R> {
4462 pub(crate) fn task_id(&self) -> TaskId {
4464 TaskId {
4465 handle: self.handle,
4466 task: self.task,
4467 }
4468 }
4469}
4470
4471pub(crate) struct TaskId {
4473 handle: Func,
4474 task: TableId<GuestTask>,
4475}
4476
4477impl TaskId {
4478 pub(crate) fn remove<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4488 Waitable::Guest(self.task).delete_from(self.handle.instance().concurrent_state_mut(store.0))
4489 }
4490}
4491
4492pub(crate) fn prepare_call<T, R>(
4498 mut store: StoreContextMut<T>,
4499 handle: Func,
4500 param_count: usize,
4501 remove_task_automatically: bool,
4502 call_post_return_automatically: bool,
4503 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4504 + Send
4505 + Sync
4506 + 'static,
4507 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4508 + Send
4509 + Sync
4510 + 'static,
4511) -> Result<PreparedCall<R>> {
4512 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4513
4514 let instance = handle.instance().id().get(store.0);
4515 let task_return_type = instance.component().types()[ty].results;
4516 let component_instance = raw_options.instance;
4517 let callback = options.callback();
4518 let memory = options.memory_raw().map(SendSyncPtr::new);
4519 let string_encoding = options.string_encoding();
4520 let token = StoreToken::new(store.as_context_mut());
4521 let state = handle.instance().concurrent_state_mut(store.0);
4522
4523 assert!(state.guest_task.is_none());
4524
4525 let (tx, rx) = oneshot::channel();
4526 let (exit_tx, exit_rx) = oneshot::channel();
4527
4528 let mut task = GuestTask::new(
4529 state,
4530 Box::new(for_any_lower(move |store, instance, params| {
4531 debug_assert!(instance.id() == handle.instance().id());
4532 lower_params(handle, token.as_context_mut(store), params)
4533 })),
4534 LiftResult {
4535 lift: Box::new(for_any_lift(move |store, instance, result| {
4536 debug_assert!(instance.id() == handle.instance().id());
4537 lift_result(handle, store, result)
4538 })),
4539 ty: task_return_type,
4540 memory,
4541 string_encoding,
4542 },
4543 Caller::Host {
4544 tx: Some(tx),
4545 exit_tx: Arc::new(exit_tx),
4546 remove_task_automatically,
4547 call_post_return_automatically,
4548 },
4549 callback.map(|callback| {
4550 let callback = SendSyncPtr::new(callback);
4551 Box::new(
4552 move |store: &mut dyn VMStore,
4553 instance: Instance,
4554 runtime_instance,
4555 event,
4556 handle| {
4557 let store = token.as_context_mut(store);
4558 unsafe {
4561 instance.call_callback(
4562 store,
4563 runtime_instance,
4564 callback,
4565 event,
4566 handle,
4567 call_post_return_automatically,
4568 )
4569 }
4570 },
4571 ) as CallbackFn
4572 }),
4573 component_instance,
4574 )?;
4575 task.function_index = Some(handle.index());
4576
4577 let task = state.push(task)?;
4578
4579 Ok(PreparedCall {
4580 handle,
4581 task,
4582 param_count,
4583 rx,
4584 exit_rx,
4585 _phantom: PhantomData,
4586 })
4587}
4588
4589pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4596 mut store: StoreContextMut<T>,
4597 prepared: PreparedCall<R>,
4598) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
4599 let PreparedCall {
4600 handle,
4601 task,
4602 param_count,
4603 rx,
4604 exit_rx,
4605 ..
4606 } = prepared;
4607
4608 queue_call0(store.as_context_mut(), handle, task, param_count)?;
4609
4610 Ok(checked(
4611 handle.instance(),
4612 rx.map(move |result| {
4613 result
4614 .map(|v| (*v.downcast().unwrap(), exit_rx))
4615 .map_err(anyhow::Error::from)
4616 }),
4617 ))
4618}
4619
4620fn queue_call0<T: 'static>(
4623 store: StoreContextMut<T>,
4624 handle: Func,
4625 guest_task: TableId<GuestTask>,
4626 param_count: usize,
4627) -> Result<()> {
4628 let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
4629 let is_concurrent = raw_options.async_;
4630 let instance = handle.instance();
4631 let callee = handle.lifted_core_func(store.0);
4632 let callback = options.callback();
4633 let post_return = handle.post_return_core_func(store.0);
4634
4635 log::trace!("queueing call {guest_task:?}");
4636
4637 let instance_flags = if callback.is_none() {
4638 None
4639 } else {
4640 Some(flags)
4641 };
4642
4643 unsafe {
4647 instance.queue_call(
4648 store,
4649 guest_task,
4650 SendSyncPtr::new(callee),
4651 param_count,
4652 1,
4653 instance_flags,
4654 is_concurrent,
4655 callback.map(SendSyncPtr::new),
4656 post_return.map(SendSyncPtr::new),
4657 )
4658 }
4659}