1use crate::component::func::{self, Func};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables};
58use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
59use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
60use anyhow::{Context as _, Result, anyhow, bail};
61use error_contexts::GlobalErrorContextRefCount;
62use futures::channel::oneshot;
63use futures::future::{self, Either, FutureExt};
64use futures::stream::{FuturesUnordered, StreamExt};
65use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
66use std::any::Any;
67use std::borrow::ToOwned;
68use std::boxed::Box;
69use std::cell::UnsafeCell;
70use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
71use std::fmt;
72use std::future::Future;
73use std::marker::PhantomData;
74use std::mem::{self, ManuallyDrop, MaybeUninit};
75use std::ops::DerefMut;
76use std::pin::{Pin, pin};
77use std::ptr::{self, NonNull};
78use std::slice;
79use std::sync::Arc;
80use std::task::{Context, Poll, Waker};
81use std::vec::Vec;
82use table::{TableDebug, TableId};
83use wasmtime_environ::Trap;
84use wasmtime_environ::component::{
85 CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
86 OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
87 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
88 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
89 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
90};
91
92pub use abort::JoinHandle;
93pub use futures_and_streams::{
94 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
95 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
96 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
97};
98pub(crate) use futures_and_streams::{
99 ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
100};
101
102mod abort;
103mod error_contexts;
104mod futures_and_streams;
105pub(crate) mod table;
106pub(crate) mod tls;
107
108const BLOCKED: u32 = 0xffff_ffff;
111
112#[derive(Clone, Copy, Eq, PartialEq, Debug)]
114pub enum Status {
115 Starting = 0,
116 Started = 1,
117 Returned = 2,
118 StartCancelled = 3,
119 ReturnCancelled = 4,
120}
121
122impl Status {
123 pub fn pack(self, waitable: Option<u32>) -> u32 {
129 assert!(matches!(self, Status::Returned) == waitable.is_none());
130 let waitable = waitable.unwrap_or(0);
131 assert!(waitable < (1 << 28));
132 (waitable << 4) | (self as u32)
133 }
134}
135
136#[derive(Clone, Copy, Debug)]
139enum Event {
140 None,
141 Cancelled,
142 Subtask {
143 status: Status,
144 },
145 StreamRead {
146 code: ReturnCode,
147 pending: Option<(TypeStreamTableIndex, u32)>,
148 },
149 StreamWrite {
150 code: ReturnCode,
151 pending: Option<(TypeStreamTableIndex, u32)>,
152 },
153 FutureRead {
154 code: ReturnCode,
155 pending: Option<(TypeFutureTableIndex, u32)>,
156 },
157 FutureWrite {
158 code: ReturnCode,
159 pending: Option<(TypeFutureTableIndex, u32)>,
160 },
161}
162
163impl Event {
164 fn parts(self) -> (u32, u32) {
169 const EVENT_NONE: u32 = 0;
170 const EVENT_SUBTASK: u32 = 1;
171 const EVENT_STREAM_READ: u32 = 2;
172 const EVENT_STREAM_WRITE: u32 = 3;
173 const EVENT_FUTURE_READ: u32 = 4;
174 const EVENT_FUTURE_WRITE: u32 = 5;
175 const EVENT_CANCELLED: u32 = 6;
176 match self {
177 Event::None => (EVENT_NONE, 0),
178 Event::Cancelled => (EVENT_CANCELLED, 0),
179 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
180 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
181 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
182 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
183 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
184 }
185 }
186}
187
188mod callback_code {
190 pub const EXIT: u32 = 0;
191 pub const YIELD: u32 = 1;
192 pub const WAIT: u32 = 2;
193 pub const POLL: u32 = 3;
194}
195
196const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
200
201pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
207 store: StoreContextMut<'a, T>,
208 get_data: fn(&mut T) -> D::Data<'_>,
209}
210
211impl<'a, T, D> Access<'a, T, D>
212where
213 D: HasData + ?Sized,
214 T: 'static,
215{
216 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
218 Self { store, get_data }
219 }
220
221 pub fn data_mut(&mut self) -> &mut T {
223 self.store.data_mut()
224 }
225
226 pub fn get(&mut self) -> D::Data<'_> {
228 (self.get_data)(self.data_mut())
229 }
230
231 pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
235 where
236 T: 'static,
237 {
238 let accessor = Accessor {
239 get_data: self.get_data,
240 token: StoreToken::new(self.store.as_context_mut()),
241 };
242 self.store
243 .as_context_mut()
244 .spawn_with_accessor(accessor, task)
245 }
246}
247
248impl<'a, T, D> AsContext for Access<'a, T, D>
249where
250 D: HasData + ?Sized,
251 T: 'static,
252{
253 type Data = T;
254
255 fn as_context(&self) -> StoreContext<'_, T> {
256 self.store.as_context()
257 }
258}
259
260impl<'a, T, D> AsContextMut for Access<'a, T, D>
261where
262 D: HasData + ?Sized,
263 T: 'static,
264{
265 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
266 self.store.as_context_mut()
267 }
268}
269
270pub struct Accessor<T: 'static, D = HasSelf<T>>
330where
331 D: HasData + ?Sized,
332{
333 token: StoreToken<T>,
334 get_data: fn(&mut T) -> D::Data<'_>,
335}
336
337pub trait AsAccessor {
354 type Data: 'static;
356
357 type AccessorData: HasData + ?Sized;
360
361 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
363}
364
365impl<T: AsAccessor + ?Sized> AsAccessor for &T {
366 type Data = T::Data;
367 type AccessorData = T::AccessorData;
368
369 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
370 T::as_accessor(self)
371 }
372}
373
374impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
375 type Data = T;
376 type AccessorData = D;
377
378 fn as_accessor(&self) -> &Accessor<T, D> {
379 self
380 }
381}
382
383const _: () = {
406 const fn assert<T: Send + Sync>() {}
407 assert::<Accessor<UnsafeCell<u32>>>();
408};
409
410impl<T> Accessor<T> {
411 pub(crate) fn new(token: StoreToken<T>) -> Self {
420 Self {
421 token,
422 get_data: |x| x,
423 }
424 }
425}
426
427impl<T, D> Accessor<T, D>
428where
429 D: HasData + ?Sized,
430{
431 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
449 tls::get(|vmstore| {
450 fun(Access {
451 store: self.token.as_context_mut(vmstore),
452 get_data: self.get_data,
453 })
454 })
455 }
456
457 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
460 self.get_data
461 }
462
463 pub fn with_getter<D2: HasData>(
480 &self,
481 get_data: fn(&mut T) -> D2::Data<'_>,
482 ) -> Accessor<T, D2> {
483 Accessor {
484 token: self.token,
485 get_data,
486 }
487 }
488
489 pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
505 where
506 T: 'static,
507 {
508 let accessor = self.clone_for_spawn();
509 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
510 }
511
512 fn clone_for_spawn(&self) -> Self {
513 Self {
514 token: self.token,
515 get_data: self.get_data,
516 }
517 }
518}
519
520pub trait AccessorTask<T, D, R>: Send + 'static
532where
533 D: HasData + ?Sized,
534{
535 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
537}
538
539enum CallerInfo {
542 Async {
544 params: Vec<ValRaw>,
545 has_result: bool,
546 },
547 Sync {
549 params: Vec<ValRaw>,
550 result_count: u32,
551 },
552}
553
554enum WaitMode {
556 Fiber(StoreFiber<'static>),
558 Callback(Instance),
561}
562
563#[derive(Debug)]
565enum SuspendReason {
566 Waiting {
569 set: TableId<WaitableSet>,
570 thread: QualifiedThreadId,
571 },
572 NeedWork,
575 Yielding { thread: QualifiedThreadId },
578 ExplicitlySuspending { thread: QualifiedThreadId },
580}
581
582enum GuestCallKind {
584 DeliverEvent {
587 instance: Instance,
589 set: Option<TableId<WaitableSet>>,
594 },
595 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
598 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
599}
600
601impl fmt::Debug for GuestCallKind {
602 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
603 match self {
604 Self::DeliverEvent { instance, set } => f
605 .debug_struct("DeliverEvent")
606 .field("instance", instance)
607 .field("set", set)
608 .finish(),
609 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
610 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
611 }
612 }
613}
614
615#[derive(Debug)]
617struct GuestCall {
618 thread: QualifiedThreadId,
619 kind: GuestCallKind,
620}
621
622impl GuestCall {
623 fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
633 let task_instance = state.get_mut(self.thread.task)?.instance;
634 let state = state.instance_state(task_instance);
635
636 let ready = match &self.kind {
637 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
638 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
639 GuestCallKind::StartExplicit(_) => true,
640 };
641 log::trace!(
642 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
643 state.do_not_enter,
644 state.backpressure
645 );
646 Ok(ready)
647 }
648}
649
650enum WorkerItem {
652 GuestCall(GuestCall),
653 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
654}
655
656#[derive(Debug)]
659struct PollParams {
660 instance: Instance,
662 thread: QualifiedThreadId,
664 set: TableId<WaitableSet>,
666}
667
668enum WorkItem {
671 PushFuture(AlwaysMut<HostTaskFuture>),
673 ResumeFiber(StoreFiber<'static>),
675 GuestCall(GuestCall),
677 Poll(PollParams),
679 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
681}
682
683impl fmt::Debug for WorkItem {
684 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
685 match self {
686 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
687 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
688 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
689 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
690 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
691 }
692 }
693}
694
695#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
697pub(crate) enum WaitResult {
698 Cancelled,
699 Completed,
700}
701
702pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
710 store: &mut dyn VMStore,
711 future: impl Future<Output = Result<R>> + Send + 'static,
712 caller_instance: RuntimeComponentInstanceIndex,
713) -> Result<R> {
714 let state = store.concurrent_state_mut();
715
716 let Some(caller) = state.guest_thread else {
720 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
721 Poll::Ready(result) => result,
722 Poll::Pending => {
723 unreachable!()
724 }
725 };
726 };
727
728 let old_result = state
731 .get_mut(caller.task)
732 .with_context(|| format!("bad handle: {caller:?}"))?
733 .result
734 .take();
735
736 let task = state.push(HostTask::new(caller_instance, None))?;
740
741 log::trace!("new host task child of {caller:?}: {task:?}");
742
743 let mut future = Box::pin(async move {
747 let result = future.await?;
748 tls::get(move |store| {
749 let state = store.concurrent_state_mut();
750 state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
751
752 Waitable::Host(task).set_event(
753 state,
754 Some(Event::Subtask {
755 status: Status::Returned,
756 }),
757 )?;
758
759 Ok(())
760 })
761 }) as HostTaskFuture;
762
763 let poll = tls::set(store, || {
767 future
768 .as_mut()
769 .poll(&mut Context::from_waker(&Waker::noop()))
770 });
771
772 match poll {
773 Poll::Ready(result) => {
774 result?;
776 log::trace!("delete host task {task:?} (already ready)");
777 store.concurrent_state_mut().delete(task)?;
778 }
779 Poll::Pending => {
780 let state = store.concurrent_state_mut();
785 state.push_future(future);
786
787 let set = state.get_mut(caller.task)?.sync_call_set;
788 Waitable::Host(task).join(state, Some(set))?;
789
790 store.suspend(SuspendReason::Waiting {
791 set,
792 thread: caller,
793 })?;
794 }
795 }
796
797 Ok(*mem::replace(
799 &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
800 old_result,
801 )
802 .unwrap()
803 .downcast()
804 .unwrap())
805}
806
807fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
809 match call.kind {
810 GuestCallKind::DeliverEvent { instance, set } => {
811 let (event, waitable) = instance
812 .get_event(store, call.thread.task, set, true)?
813 .unwrap();
814 let state = store.concurrent_state_mut();
815 let task = state.get_mut(call.thread.task)?;
816 let runtime_instance = task.instance;
817 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
818
819 log::trace!(
820 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
821 call.thread,
822 );
823
824 let old_thread = state.guest_thread.replace(call.thread);
825 log::trace!(
826 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
827 call.thread
828 );
829
830 store.maybe_push_call_context(call.thread.task)?;
831
832 let state = store.concurrent_state_mut();
833 state.enter_instance(runtime_instance);
834
835 let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
836
837 let code = callback(store, runtime_instance, event, handle)?;
838
839 let state = store.concurrent_state_mut();
840
841 state.get_mut(call.thread.task)?.callback = Some(callback);
842 state.exit_instance(runtime_instance)?;
843
844 store.maybe_pop_call_context(call.thread.task)?;
845
846 instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
847
848 store.concurrent_state_mut().guest_thread = old_thread;
849 log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
850 }
851 GuestCallKind::StartImplicit(fun) => {
852 fun(store)?;
853 }
854 GuestCallKind::StartExplicit(fun) => {
855 fun(store)?;
856 }
857 }
858
859 Ok(())
860}
861
862impl<T> Store<T> {
863 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
865 where
866 T: Send + 'static,
867 {
868 self.as_context_mut().run_concurrent(fun).await
869 }
870
871 #[doc(hidden)]
872 pub fn assert_concurrent_state_empty(&mut self) {
873 self.as_context_mut().assert_concurrent_state_empty();
874 }
875
876 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
878 where
879 T: 'static,
880 {
881 self.as_context_mut().spawn(task)
882 }
883}
884
885impl<T> StoreContextMut<'_, T> {
886 #[doc(hidden)]
895 pub fn assert_concurrent_state_empty(self) {
896 let store = self.0;
897 store
898 .store_data_mut()
899 .components
900 .assert_guest_tables_empty();
901 let state = store.concurrent_state_mut();
902 assert!(
903 state.table.get_mut().is_empty(),
904 "non-empty table: {:?}",
905 state.table.get_mut()
906 );
907 assert!(state.high_priority.is_empty());
908 assert!(state.low_priority.is_empty());
909 assert!(state.guest_thread.is_none());
910 assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
911 assert!(
912 state
913 .instance_states
914 .iter()
915 .all(|(_, state)| state.pending.is_empty())
916 );
917 assert!(state.global_error_context_ref_counts.is_empty());
918 }
919
920 pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
930 where
931 T: 'static,
932 {
933 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
934 self.spawn_with_accessor(accessor, task)
935 }
936
937 fn spawn_with_accessor<D>(
940 self,
941 accessor: Accessor<T, D>,
942 task: impl AccessorTask<T, D, Result<()>>,
943 ) -> JoinHandle
944 where
945 T: 'static,
946 D: HasData + ?Sized,
947 {
948 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
952 self.0
953 .concurrent_state_mut()
954 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
955 handle
956 }
957
958 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1039 where
1040 T: Send + 'static,
1041 {
1042 self.do_run_concurrent(fun, false).await
1043 }
1044
1045 pub(super) async fn run_concurrent_trap_on_idle<R>(
1046 self,
1047 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1048 ) -> Result<R>
1049 where
1050 T: Send + 'static,
1051 {
1052 self.do_run_concurrent(fun, true).await
1053 }
1054
1055 async fn do_run_concurrent<R>(
1056 mut self,
1057 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1058 trap_on_idle: bool,
1059 ) -> Result<R>
1060 where
1061 T: Send + 'static,
1062 {
1063 check_recursive_run();
1064 let token = StoreToken::new(self.as_context_mut());
1065
1066 struct Dropper<'a, T: 'static, V> {
1067 store: StoreContextMut<'a, T>,
1068 value: ManuallyDrop<V>,
1069 }
1070
1071 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1072 fn drop(&mut self) {
1073 tls::set(self.store.0, || {
1074 unsafe { ManuallyDrop::drop(&mut self.value) }
1079 });
1080 }
1081 }
1082
1083 let accessor = &Accessor::new(token);
1084 let dropper = &mut Dropper {
1085 store: self,
1086 value: ManuallyDrop::new(fun(accessor)),
1087 };
1088 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1090
1091 dropper
1092 .store
1093 .as_context_mut()
1094 .poll_until(future, trap_on_idle)
1095 .await
1096 }
1097
1098 async fn poll_until<R>(
1104 mut self,
1105 mut future: Pin<&mut impl Future<Output = R>>,
1106 trap_on_idle: bool,
1107 ) -> Result<R>
1108 where
1109 T: Send + 'static,
1110 {
1111 struct Reset<'a, T: 'static> {
1112 store: StoreContextMut<'a, T>,
1113 futures: Option<FuturesUnordered<HostTaskFuture>>,
1114 }
1115
1116 impl<'a, T> Drop for Reset<'a, T> {
1117 fn drop(&mut self) {
1118 if let Some(futures) = self.futures.take() {
1119 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1120 }
1121 }
1122 }
1123
1124 loop {
1125 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1129 let mut reset = Reset {
1130 store: self.as_context_mut(),
1131 futures,
1132 };
1133 let mut next = pin!(reset.futures.as_mut().unwrap().next());
1134
1135 let result = future::poll_fn(|cx| {
1136 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1139 return Poll::Ready(Ok(Either::Left(value)));
1140 }
1141
1142 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1146 Poll::Ready(Some(output)) => {
1147 match output {
1148 Err(e) => return Poll::Ready(Err(e)),
1149 Ok(()) => {}
1150 }
1151 Poll::Ready(true)
1152 }
1153 Poll::Ready(None) => Poll::Ready(false),
1154 Poll::Pending => Poll::Pending,
1155 };
1156
1157 let state = reset.store.0.concurrent_state_mut();
1160 let ready = mem::take(&mut state.high_priority);
1161 let ready = if ready.is_empty() {
1162 let ready = mem::take(&mut state.low_priority);
1165 if ready.is_empty() {
1166 return match next {
1167 Poll::Ready(true) => {
1168 Poll::Ready(Ok(Either::Right(Vec::new())))
1174 }
1175 Poll::Ready(false) => {
1176 if let Poll::Ready(value) =
1180 tls::set(reset.store.0, || future.as_mut().poll(cx))
1181 {
1182 Poll::Ready(Ok(Either::Left(value)))
1183 } else {
1184 if trap_on_idle {
1190 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1193 } else {
1194 Poll::Pending
1198 }
1199 }
1200 }
1201 Poll::Pending => Poll::Pending,
1206 };
1207 } else {
1208 ready
1209 }
1210 } else {
1211 ready
1212 };
1213
1214 Poll::Ready(Ok(Either::Right(ready)))
1215 })
1216 .await;
1217
1218 drop(reset);
1222
1223 match result? {
1224 Either::Left(value) => break Ok(value),
1227 Either::Right(ready) => {
1230 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1231 store: StoreContextMut<'a, T>,
1232 ready: I,
1233 }
1234
1235 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1236 fn drop(&mut self) {
1237 while let Some(item) = self.ready.next() {
1238 match item {
1239 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1240 WorkItem::PushFuture(future) => {
1241 tls::set(self.store.0, move || drop(future))
1242 }
1243 _ => {}
1244 }
1245 }
1246 }
1247 }
1248
1249 let mut dispose = Dispose {
1250 store: self.as_context_mut(),
1251 ready: ready.into_iter(),
1252 };
1253
1254 while let Some(item) = dispose.ready.next() {
1255 dispose
1256 .store
1257 .as_context_mut()
1258 .handle_work_item(item)
1259 .await?;
1260 }
1261 }
1262 }
1263 }
1264 }
1265
1266 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1268 where
1269 T: Send,
1270 {
1271 log::trace!("handle work item {item:?}");
1272 match item {
1273 WorkItem::PushFuture(future) => {
1274 self.0
1275 .concurrent_state_mut()
1276 .futures
1277 .get_mut()
1278 .as_mut()
1279 .unwrap()
1280 .push(future.into_inner());
1281 }
1282 WorkItem::ResumeFiber(fiber) => {
1283 self.0.resume_fiber(fiber).await?;
1284 }
1285 WorkItem::GuestCall(call) => {
1286 let state = self.0.concurrent_state_mut();
1287 if call.is_ready(state)? {
1288 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1289 } else {
1290 let task = state.get_mut(call.thread.task)?;
1291 if !task.starting_sent {
1292 task.starting_sent = true;
1293 if let GuestCallKind::StartImplicit(_) = &call.kind {
1294 Waitable::Guest(call.thread.task).set_event(
1295 state,
1296 Some(Event::Subtask {
1297 status: Status::Starting,
1298 }),
1299 )?;
1300 }
1301 }
1302
1303 let runtime_instance = state.get_mut(call.thread.task)?.instance;
1304 state
1305 .instance_state(runtime_instance)
1306 .pending
1307 .insert(call.thread, call.kind);
1308 }
1309 }
1310 WorkItem::Poll(params) => {
1311 let state = self.0.concurrent_state_mut();
1312 if state.get_mut(params.thread.task)?.event.is_some()
1313 || !state.get_mut(params.set)?.ready.is_empty()
1314 {
1315 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1318 thread: params.thread,
1319 kind: GuestCallKind::DeliverEvent {
1320 instance: params.instance,
1321 set: Some(params.set),
1322 },
1323 }));
1324 } else {
1325 state.get_mut(params.thread.task)?.event = Some(Event::None);
1328 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1329 thread: params.thread,
1330 kind: GuestCallKind::DeliverEvent {
1331 instance: params.instance,
1332 set: Some(params.set),
1333 },
1334 }));
1335 }
1336 }
1337 WorkItem::WorkerFunction(fun) => {
1338 self.run_on_worker(WorkerItem::Function(fun)).await?;
1339 }
1340 }
1341
1342 Ok(())
1343 }
1344
1345 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1347 where
1348 T: Send,
1349 {
1350 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1351 fiber
1352 } else {
1353 fiber::make_fiber(self.0, move |store| {
1354 loop {
1355 match store.concurrent_state_mut().worker_item.take().unwrap() {
1356 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1357 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1358 }
1359
1360 store.suspend(SuspendReason::NeedWork)?;
1361 }
1362 })?
1363 };
1364
1365 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1366 assert!(worker_item.is_none());
1367 *worker_item = Some(item);
1368
1369 self.0.resume_fiber(worker).await
1370 }
1371
1372 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1377 where
1378 T: 'static,
1379 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1380 + Send
1381 + Sync
1382 + 'static,
1383 R: Send + Sync + 'static,
1384 {
1385 let token = StoreToken::new(self);
1386 async move {
1387 let mut accessor = Accessor::new(token);
1388 closure(&mut accessor).await
1389 }
1390 }
1391}
1392
1393impl StoreOpaque {
1394 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1397 let old_thread = self.concurrent_state_mut().guest_thread;
1398 log::trace!("resume_fiber: save current thread {old_thread:?}");
1399
1400 let fiber = fiber::resolve_or_release(self, fiber).await?;
1401
1402 let state = self.concurrent_state_mut();
1403
1404 state.guest_thread = old_thread;
1405 if let Some(ref ot) = old_thread {
1406 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1407 }
1408 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1409
1410 if let Some(mut fiber) = fiber {
1411 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1412 match state.suspend_reason.take().unwrap() {
1414 SuspendReason::NeedWork => {
1415 if state.worker.is_none() {
1416 state.worker = Some(fiber);
1417 } else {
1418 fiber.dispose(self);
1419 }
1420 }
1421 SuspendReason::Yielding { thread, .. } => {
1422 state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1423 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1424 }
1425 SuspendReason::ExplicitlySuspending { thread, .. } => {
1426 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1427 }
1428 SuspendReason::Waiting { set, thread } => {
1429 let old = state
1430 .get_mut(set)?
1431 .waiting
1432 .insert(thread, WaitMode::Fiber(fiber));
1433 assert!(old.is_none());
1434 }
1435 };
1436 } else {
1437 log::trace!("resume_fiber: fiber has exited");
1438 }
1439
1440 Ok(())
1441 }
1442
1443 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1449 log::trace!("suspend fiber: {reason:?}");
1450
1451 let task = match &reason {
1455 SuspendReason::Yielding { thread, .. }
1456 | SuspendReason::Waiting { thread, .. }
1457 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1458 SuspendReason::NeedWork => None,
1459 };
1460
1461 let old_guest_thread = if let Some(task) = task {
1462 self.maybe_pop_call_context(task)?;
1463 self.concurrent_state_mut().guest_thread
1464 } else {
1465 None
1466 };
1467
1468 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1469 assert!(suspend_reason.is_none());
1470 *suspend_reason = Some(reason);
1471
1472 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1473
1474 if let Some(task) = task {
1475 self.concurrent_state_mut().guest_thread = old_guest_thread;
1476 self.maybe_push_call_context(task)?;
1477 }
1478
1479 Ok(())
1480 }
1481
1482 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1486 let task = self.concurrent_state_mut().get_mut(guest_task)?;
1487
1488 if !task.returned_or_cancelled() {
1489 log::trace!("push call context for {guest_task:?}");
1490 let call_context = task.call_context.take().unwrap();
1491 self.component_resource_state().0.push(call_context);
1492 }
1493 Ok(())
1494 }
1495
1496 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1500 if !self
1501 .concurrent_state_mut()
1502 .get_mut(guest_task)?
1503 .returned_or_cancelled()
1504 {
1505 log::trace!("pop call context for {guest_task:?}");
1506 let call_context = Some(self.component_resource_state().0.pop().unwrap());
1507 self.concurrent_state_mut()
1508 .get_mut(guest_task)?
1509 .call_context = call_context;
1510 }
1511 Ok(())
1512 }
1513
1514 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1515 let state = self.concurrent_state_mut();
1516 let caller = state.guest_thread.unwrap();
1517 let old_set = waitable.common(state)?.set;
1518 let set = state.get_mut(caller.task)?.sync_call_set;
1519 waitable.join(state, Some(set))?;
1520 self.suspend(SuspendReason::Waiting {
1521 set,
1522 thread: caller,
1523 })?;
1524 let state = self.concurrent_state_mut();
1525 waitable.join(state, old_set)
1526 }
1527}
1528
1529impl Instance {
1530 fn get_event(
1533 self,
1534 store: &mut StoreOpaque,
1535 guest_task: TableId<GuestTask>,
1536 set: Option<TableId<WaitableSet>>,
1537 cancellable: bool,
1538 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1539 let state = store.concurrent_state_mut();
1540
1541 if let Some(event) = state.get_mut(guest_task)?.event.take() {
1542 log::trace!("deliver event {event:?} to {guest_task:?}");
1543
1544 if cancellable || !matches!(event, Event::Cancelled) {
1545 return Ok(Some((event, None)));
1546 } else {
1547 state.get_mut(guest_task)?.event = Some(event);
1548 }
1549 }
1550
1551 Ok(
1552 if let Some((set, waitable)) = set
1553 .and_then(|set| {
1554 state
1555 .get_mut(set)
1556 .map(|v| v.ready.pop_first().map(|v| (set, v)))
1557 .transpose()
1558 })
1559 .transpose()?
1560 {
1561 let common = waitable.common(state)?;
1562 let handle = common.handle.unwrap();
1563 let event = common.event.take().unwrap();
1564
1565 log::trace!(
1566 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1567 );
1568
1569 waitable.on_delivery(store, self, event);
1570
1571 Some((event, Some((waitable, handle))))
1572 } else {
1573 None
1574 },
1575 )
1576 }
1577
1578 fn handle_callback_code(
1581 self,
1582 store: &mut StoreOpaque,
1583 guest_thread: QualifiedThreadId,
1584 runtime_instance: RuntimeComponentInstanceIndex,
1585 code: u32,
1586 ) -> Result<()> {
1587 let (code, set) = unpack_callback_code(code);
1588
1589 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1590
1591 let state = store.concurrent_state_mut();
1592
1593 let get_set = |store, handle| {
1594 if handle == 0 {
1595 bail!("invalid waitable-set handle");
1596 }
1597
1598 let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1599 .waitable_set_rep(handle)?;
1600
1601 Ok(TableId::<WaitableSet>::new(set))
1602 };
1603
1604 match code {
1605 callback_code::EXIT => {
1606 log::trace!("implicit thread {guest_thread:?} completed");
1607 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1608 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1609 if task.threads.is_empty() && !task.returned_or_cancelled() {
1610 bail!(Trap::NoAsyncResult);
1611 }
1612 match &task.caller {
1613 Caller::Host { .. } => {
1614 if task.ready_to_delete() {
1615 Waitable::Guest(guest_thread.task)
1616 .delete_from(store.concurrent_state_mut())?;
1617 }
1618 }
1619 Caller::Guest { .. } => {
1620 task.exited = true;
1621 task.callback = None;
1622 }
1623 }
1624 }
1625 callback_code::YIELD => {
1626 let task = state.get_mut(guest_thread.task)?;
1629 assert!(task.event.is_none());
1630 task.event = Some(Event::None);
1631 state.push_low_priority(WorkItem::GuestCall(GuestCall {
1632 thread: guest_thread,
1633 kind: GuestCallKind::DeliverEvent {
1634 instance: self,
1635 set: None,
1636 },
1637 }));
1638 }
1639 callback_code::WAIT | callback_code::POLL => {
1640 let set = get_set(store, set)?;
1641 let state = store.concurrent_state_mut();
1642
1643 if state.get_mut(guest_thread.task)?.event.is_some()
1644 || !state.get_mut(set)?.ready.is_empty()
1645 {
1646 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1648 thread: guest_thread,
1649 kind: GuestCallKind::DeliverEvent {
1650 instance: self,
1651 set: Some(set),
1652 },
1653 }));
1654 } else {
1655 match code {
1657 callback_code::POLL => {
1658 state.push_low_priority(WorkItem::Poll(PollParams {
1661 instance: self,
1662 thread: guest_thread,
1663 set,
1664 }));
1665 }
1666 callback_code::WAIT => {
1667 let old = state
1674 .get_mut(guest_thread.thread)?
1675 .wake_on_cancel
1676 .replace(set);
1677 assert!(old.is_none());
1678 let old = state
1679 .get_mut(set)?
1680 .waiting
1681 .insert(guest_thread, WaitMode::Callback(self));
1682 assert!(old.is_none());
1683 }
1684 _ => unreachable!(),
1685 }
1686 }
1687 }
1688 _ => bail!("unsupported callback code: {code}"),
1689 }
1690
1691 Ok(())
1692 }
1693
1694 fn cleanup_thread(
1695 self,
1696 store: &mut StoreOpaque,
1697 guest_thread: QualifiedThreadId,
1698 runtime_instance: RuntimeComponentInstanceIndex,
1699 ) -> Result<()> {
1700 let guest_id = store
1701 .concurrent_state_mut()
1702 .get_mut(guest_thread.thread)?
1703 .instance_rep;
1704 self.id().get_mut(store).guest_tables().0[runtime_instance]
1705 .guest_thread_remove(guest_id.unwrap())?;
1706
1707 store.concurrent_state_mut().delete(guest_thread.thread)?;
1708 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1709 task.threads.remove(&guest_thread.thread);
1710 Ok(())
1711 }
1712
1713 unsafe fn queue_call<T: 'static>(
1720 self,
1721 mut store: StoreContextMut<T>,
1722 guest_thread: QualifiedThreadId,
1723 callee: SendSyncPtr<VMFuncRef>,
1724 param_count: usize,
1725 result_count: usize,
1726 flags: Option<InstanceFlags>,
1727 async_: bool,
1728 callback: Option<SendSyncPtr<VMFuncRef>>,
1729 post_return: Option<SendSyncPtr<VMFuncRef>>,
1730 ) -> Result<()> {
1731 unsafe fn make_call<T: 'static>(
1746 store: StoreContextMut<T>,
1747 guest_thread: QualifiedThreadId,
1748 callee: SendSyncPtr<VMFuncRef>,
1749 param_count: usize,
1750 result_count: usize,
1751 flags: Option<InstanceFlags>,
1752 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1753 + Send
1754 + Sync
1755 + 'static
1756 + use<T> {
1757 let token = StoreToken::new(store);
1758 move |store: &mut dyn VMStore| {
1759 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1760
1761 store
1762 .concurrent_state_mut()
1763 .get_mut(guest_thread.thread)?
1764 .state = GuestThreadState::Running;
1765 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1766 let may_enter_after_call = task.call_post_return_automatically();
1767 let lower = task.lower_params.take().unwrap();
1768
1769 lower(store, &mut storage[..param_count])?;
1770
1771 let mut store = token.as_context_mut(store);
1772
1773 unsafe {
1776 if let Some(mut flags) = flags {
1777 flags.set_may_enter(false);
1778 }
1779 crate::Func::call_unchecked_raw(
1780 &mut store,
1781 callee.as_non_null(),
1782 NonNull::new(
1783 &mut storage[..param_count.max(result_count)]
1784 as *mut [MaybeUninit<ValRaw>] as _,
1785 )
1786 .unwrap(),
1787 )?;
1788 if let Some(mut flags) = flags {
1789 flags.set_may_enter(may_enter_after_call);
1790 }
1791 }
1792
1793 Ok(storage)
1794 }
1795 }
1796
1797 let call = unsafe {
1801 make_call(
1802 store.as_context_mut(),
1803 guest_thread,
1804 callee,
1805 param_count,
1806 result_count,
1807 flags,
1808 )
1809 };
1810
1811 let callee_instance = store
1812 .0
1813 .concurrent_state_mut()
1814 .get_mut(guest_thread.task)?
1815 .instance;
1816 let fun = if callback.is_some() {
1817 assert!(async_);
1818
1819 Box::new(move |store: &mut dyn VMStore| {
1820 self.add_guest_thread_to_instance_table(
1821 guest_thread.thread,
1822 store,
1823 callee_instance,
1824 )?;
1825 let old_thread = store
1826 .concurrent_state_mut()
1827 .guest_thread
1828 .replace(guest_thread);
1829 log::trace!(
1830 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1831 );
1832
1833 store.maybe_push_call_context(guest_thread.task)?;
1834
1835 store.concurrent_state_mut().enter_instance(callee_instance);
1836
1837 let storage = call(store)?;
1844
1845 store
1846 .concurrent_state_mut()
1847 .exit_instance(callee_instance)?;
1848
1849 store.maybe_pop_call_context(guest_thread.task)?;
1850
1851 let state = store.concurrent_state_mut();
1852 state.guest_thread = old_thread;
1853 old_thread
1854 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1855 log::trace!("stackless call: restored {old_thread:?} as current thread");
1856
1857 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1860
1861 self.handle_callback_code(store, guest_thread, callee_instance, code)?;
1862
1863 Ok(())
1864 }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1865 } else {
1866 let token = StoreToken::new(store.as_context_mut());
1867 Box::new(move |store: &mut dyn VMStore| {
1868 self.add_guest_thread_to_instance_table(
1869 guest_thread.thread,
1870 store,
1871 callee_instance,
1872 )?;
1873 let old_thread = store
1874 .concurrent_state_mut()
1875 .guest_thread
1876 .replace(guest_thread);
1877 log::trace!(
1878 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1879 );
1880 let mut flags = self.id().get(store).instance_flags(callee_instance);
1881
1882 store.maybe_push_call_context(guest_thread.task)?;
1883
1884 if !async_ {
1888 store.concurrent_state_mut().enter_instance(callee_instance);
1889 }
1890
1891 let storage = call(store)?;
1898
1899 self.cleanup_thread(store, guest_thread, callee_instance)?;
1901
1902 if async_ {
1903 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1904 if task.threads.is_empty() && !task.returned_or_cancelled() {
1905 bail!(Trap::NoAsyncResult);
1906 }
1907 } else {
1908 let lift = {
1914 let state = store.concurrent_state_mut();
1915 state.exit_instance(callee_instance)?;
1916
1917 assert!(state.get_mut(guest_thread.task)?.result.is_none());
1918
1919 state
1920 .get_mut(guest_thread.task)?
1921 .lift_result
1922 .take()
1923 .unwrap()
1924 };
1925
1926 let result = (lift.lift)(store, unsafe {
1929 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1930 &storage[..result_count],
1931 )
1932 })?;
1933
1934 let post_return_arg = match result_count {
1935 0 => ValRaw::i32(0),
1936 1 => unsafe { storage[0].assume_init() },
1939 _ => unreachable!(),
1940 };
1941
1942 if store
1943 .concurrent_state_mut()
1944 .get_mut(guest_thread.task)?
1945 .call_post_return_automatically()
1946 {
1947 unsafe {
1948 flags.set_may_leave(false);
1949 flags.set_needs_post_return(false);
1950 }
1951
1952 if let Some(func) = post_return {
1953 let mut store = token.as_context_mut(store);
1954
1955 unsafe {
1961 crate::Func::call_unchecked_raw(
1962 &mut store,
1963 func.as_non_null(),
1964 slice::from_ref(&post_return_arg).into(),
1965 )?;
1966 }
1967 }
1968
1969 unsafe {
1970 flags.set_may_leave(true);
1971 flags.set_may_enter(true);
1972 }
1973 }
1974
1975 self.task_complete(
1976 store,
1977 guest_thread.task,
1978 result,
1979 Status::Returned,
1980 post_return_arg,
1981 )?;
1982 }
1983
1984 store.maybe_pop_call_context(guest_thread.task)?;
1985
1986 let state = store.concurrent_state_mut();
1987 let task = state.get_mut(guest_thread.task)?;
1988
1989 match &task.caller {
1990 Caller::Host { .. } => {
1991 if task.ready_to_delete() {
1992 Waitable::Guest(guest_thread.task).delete_from(state)?;
1993 }
1994 }
1995 Caller::Guest { .. } => {
1996 task.exited = true;
1997 }
1998 }
1999
2000 Ok(())
2001 })
2002 };
2003
2004 store
2005 .0
2006 .concurrent_state_mut()
2007 .push_high_priority(WorkItem::GuestCall(GuestCall {
2008 thread: guest_thread,
2009 kind: GuestCallKind::StartImplicit(fun),
2010 }));
2011
2012 Ok(())
2013 }
2014
2015 unsafe fn prepare_call<T: 'static>(
2028 self,
2029 mut store: StoreContextMut<T>,
2030 start: *mut VMFuncRef,
2031 return_: *mut VMFuncRef,
2032 caller_instance: RuntimeComponentInstanceIndex,
2033 callee_instance: RuntimeComponentInstanceIndex,
2034 task_return_type: TypeTupleIndex,
2035 memory: *mut VMMemoryDefinition,
2036 string_encoding: u8,
2037 caller_info: CallerInfo,
2038 ) -> Result<()> {
2039 self.id().get(store.0).check_may_leave(caller_instance)?;
2040
2041 enum ResultInfo {
2042 Heap { results: u32 },
2043 Stack { result_count: u32 },
2044 }
2045
2046 let result_info = match &caller_info {
2047 CallerInfo::Async {
2048 has_result: true,
2049 params,
2050 } => ResultInfo::Heap {
2051 results: params.last().unwrap().get_u32(),
2052 },
2053 CallerInfo::Async {
2054 has_result: false, ..
2055 } => ResultInfo::Stack { result_count: 0 },
2056 CallerInfo::Sync {
2057 result_count,
2058 params,
2059 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2060 results: params.last().unwrap().get_u32(),
2061 },
2062 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2063 result_count: *result_count,
2064 },
2065 };
2066
2067 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2068
2069 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2073 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2074 let token = StoreToken::new(store.as_context_mut());
2075 let state = store.0.concurrent_state_mut();
2076 let old_thread = state.guest_thread.take();
2077 let new_task = GuestTask::new(
2078 state,
2079 Box::new(move |store, dst| {
2080 let mut store = token.as_context_mut(store);
2081 assert!(dst.len() <= MAX_FLAT_PARAMS);
2082 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2083 let count = match caller_info {
2084 CallerInfo::Async { params, has_result } => {
2088 let params = ¶ms[..params.len() - usize::from(has_result)];
2089 for (param, src) in params.iter().zip(&mut src) {
2090 src.write(*param);
2091 }
2092 params.len()
2093 }
2094
2095 CallerInfo::Sync { params, .. } => {
2097 for (param, src) in params.iter().zip(&mut src) {
2098 src.write(*param);
2099 }
2100 params.len()
2101 }
2102 };
2103 unsafe {
2110 crate::Func::call_unchecked_raw(
2111 &mut store,
2112 start.as_non_null(),
2113 NonNull::new(
2114 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2115 )
2116 .unwrap(),
2117 )?;
2118 }
2119 dst.copy_from_slice(&src[..dst.len()]);
2120 let state = store.0.concurrent_state_mut();
2121 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2122 state,
2123 Some(Event::Subtask {
2124 status: Status::Started,
2125 }),
2126 )?;
2127 Ok(())
2128 }),
2129 LiftResult {
2130 lift: Box::new(move |store, src| {
2131 let mut store = token.as_context_mut(store);
2134 let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
2136 my_src.push(ValRaw::u32(*results));
2137 }
2138 unsafe {
2145 crate::Func::call_unchecked_raw(
2146 &mut store,
2147 return_.as_non_null(),
2148 my_src.as_mut_slice().into(),
2149 )?;
2150 }
2151 let state = store.0.concurrent_state_mut();
2152 let thread = state.guest_thread.unwrap();
2153 if sync_caller {
2154 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2155 if let ResultInfo::Stack { result_count } = &result_info {
2156 match result_count {
2157 0 => None,
2158 1 => Some(my_src[0]),
2159 _ => unreachable!(),
2160 }
2161 } else {
2162 None
2163 },
2164 );
2165 }
2166 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2167 }),
2168 ty: task_return_type,
2169 memory: NonNull::new(memory).map(SendSyncPtr::new),
2170 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2171 },
2172 Caller::Guest {
2173 thread: old_thread.unwrap(),
2174 instance: caller_instance,
2175 },
2176 None,
2177 callee_instance,
2178 )?;
2179
2180 let guest_task = state.push(new_task)?;
2181 let new_thread = GuestThread::new_implicit(guest_task);
2182 let guest_thread = state.push(new_thread)?;
2183 state.get_mut(guest_task)?.threads.insert(guest_thread);
2184
2185 let state = store.0.concurrent_state_mut();
2186 if let Some(old_thread) = old_thread {
2187 if !state.may_enter(guest_task) {
2188 bail!(crate::Trap::CannotEnterComponent);
2189 }
2190
2191 state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2192 };
2193
2194 state.guest_thread = Some(QualifiedThreadId {
2197 task: guest_task,
2198 thread: guest_thread,
2199 });
2200 log::trace!(
2201 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2202 );
2203
2204 Ok(())
2205 }
2206
2207 unsafe fn call_callback<T>(
2212 self,
2213 mut store: StoreContextMut<T>,
2214 callee_instance: RuntimeComponentInstanceIndex,
2215 function: SendSyncPtr<VMFuncRef>,
2216 event: Event,
2217 handle: u32,
2218 may_enter_after_call: bool,
2219 ) -> Result<u32> {
2220 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2221
2222 let (ordinal, result) = event.parts();
2223 let params = &mut [
2224 ValRaw::u32(ordinal),
2225 ValRaw::u32(handle),
2226 ValRaw::u32(result),
2227 ];
2228 unsafe {
2233 flags.set_may_enter(false);
2234 crate::Func::call_unchecked_raw(
2235 &mut store,
2236 function.as_non_null(),
2237 params.as_mut_slice().into(),
2238 )?;
2239 flags.set_may_enter(may_enter_after_call);
2240 }
2241 Ok(params[0].get_u32())
2242 }
2243
2244 unsafe fn start_call<T: 'static>(
2257 self,
2258 mut store: StoreContextMut<T>,
2259 callback: *mut VMFuncRef,
2260 post_return: *mut VMFuncRef,
2261 callee: *mut VMFuncRef,
2262 param_count: u32,
2263 result_count: u32,
2264 flags: u32,
2265 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2266 ) -> Result<u32> {
2267 let token = StoreToken::new(store.as_context_mut());
2268 let async_caller = storage.is_none();
2269 let state = store.0.concurrent_state_mut();
2270 let guest_thread = state.guest_thread.unwrap();
2271 let may_enter_after_call = state
2272 .get_mut(guest_thread.task)?
2273 .call_post_return_automatically();
2274 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2275 let param_count = usize::try_from(param_count).unwrap();
2276 assert!(param_count <= MAX_FLAT_PARAMS);
2277 let result_count = usize::try_from(result_count).unwrap();
2278 assert!(result_count <= MAX_FLAT_RESULTS);
2279
2280 let task = state.get_mut(guest_thread.task)?;
2281 if !callback.is_null() {
2282 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2286 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2287 let store = token.as_context_mut(store);
2288 unsafe {
2289 self.call_callback::<T>(
2290 store,
2291 runtime_instance,
2292 callback,
2293 event,
2294 handle,
2295 may_enter_after_call,
2296 )
2297 }
2298 }));
2299 }
2300
2301 let Caller::Guest {
2302 thread: caller,
2303 instance: runtime_instance,
2304 } = &task.caller
2305 else {
2306 unreachable!()
2309 };
2310 let caller = *caller;
2311 let caller_instance = *runtime_instance;
2312
2313 let callee_instance = task.instance;
2314
2315 let instance_flags = if callback.is_null() {
2316 None
2317 } else {
2318 Some(self.id().get(store.0).instance_flags(callee_instance))
2319 };
2320
2321 unsafe {
2323 self.queue_call(
2324 store.as_context_mut(),
2325 guest_thread,
2326 callee,
2327 param_count,
2328 result_count,
2329 instance_flags,
2330 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2331 NonNull::new(callback).map(SendSyncPtr::new),
2332 NonNull::new(post_return).map(SendSyncPtr::new),
2333 )?;
2334 }
2335
2336 let state = store.0.concurrent_state_mut();
2337
2338 let guest_waitable = Waitable::Guest(guest_thread.task);
2341 let old_set = guest_waitable.common(state)?.set;
2342 let set = state.get_mut(caller.task)?.sync_call_set;
2343 guest_waitable.join(state, Some(set))?;
2344
2345 let (status, waitable) = loop {
2361 store.0.suspend(SuspendReason::Waiting {
2362 set,
2363 thread: caller,
2364 })?;
2365
2366 let state = store.0.concurrent_state_mut();
2367
2368 log::trace!("taking event for {:?}", guest_thread.task);
2369 let event = guest_waitable.take_event(state)?;
2370 let Some(Event::Subtask { status }) = event else {
2371 unreachable!();
2372 };
2373
2374 log::trace!("status {status:?} for {:?}", guest_thread.task);
2375
2376 if status == Status::Returned {
2377 break (status, None);
2379 } else if async_caller {
2380 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2384 .subtask_insert_guest(guest_thread.task.rep())?;
2385 store
2386 .0
2387 .concurrent_state_mut()
2388 .get_mut(guest_thread.task)?
2389 .common
2390 .handle = Some(handle);
2391 break (status, Some(handle));
2392 } else {
2393 }
2397 };
2398
2399 let state = store.0.concurrent_state_mut();
2400
2401 guest_waitable.join(state, old_set)?;
2402
2403 if let Some(storage) = storage {
2404 let task = state.get_mut(guest_thread.task)?;
2408 if let Some(result) = task.sync_result.take() {
2409 if let Some(result) = result {
2410 storage[0] = MaybeUninit::new(result);
2411 }
2412
2413 if task.exited {
2414 if task.ready_to_delete() {
2415 Waitable::Guest(guest_thread.task).delete_from(state)?;
2416 }
2417 }
2418 }
2419 }
2420
2421 state.guest_thread = Some(caller);
2423 state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2424 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2425
2426 Ok(status.pack(waitable))
2427 }
2428
2429 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2441 self,
2442 mut store: StoreContextMut<T>,
2443 future: impl Future<Output = Result<R>> + Send + 'static,
2444 caller_instance: RuntimeComponentInstanceIndex,
2445 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2446 ) -> Result<Option<u32>> {
2447 let token = StoreToken::new(store.as_context_mut());
2448 let state = store.0.concurrent_state_mut();
2449 let caller = state.guest_thread.unwrap();
2450
2451 let (join_handle, future) = JoinHandle::run(async move {
2454 let mut future = pin!(future);
2455 let mut call_context = None;
2456 future::poll_fn(move |cx| {
2457 tls::get(|store| {
2460 if let Some(call_context) = call_context.take() {
2461 token
2462 .as_context_mut(store)
2463 .0
2464 .component_resource_state()
2465 .0
2466 .push(call_context);
2467 }
2468 });
2469
2470 let result = future.as_mut().poll(cx);
2471
2472 if result.is_pending() {
2473 tls::get(|store| {
2476 call_context = Some(
2477 token
2478 .as_context_mut(store)
2479 .0
2480 .component_resource_state()
2481 .0
2482 .pop()
2483 .unwrap(),
2484 );
2485 });
2486 }
2487 result
2488 })
2489 .await
2490 });
2491
2492 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2496
2497 log::trace!("new host task child of {caller:?}: {task:?}");
2498
2499 let mut future = Box::pin(future);
2500
2501 let poll = tls::set(store.0, || {
2506 future
2507 .as_mut()
2508 .poll(&mut Context::from_waker(&Waker::noop()))
2509 });
2510
2511 Ok(match poll {
2512 Poll::Ready(None) => unreachable!(),
2513 Poll::Ready(Some(result)) => {
2514 lower(store.as_context_mut(), result?)?;
2517 log::trace!("delete host task {task:?} (already ready)");
2518 store.0.concurrent_state_mut().delete(task)?;
2519 None
2520 }
2521 Poll::Pending => {
2522 let future =
2530 Box::pin(async move {
2531 let result = match future.await {
2532 Some(result) => result?,
2533 None => return Ok(()),
2535 };
2536 tls::get(move |store| {
2537 store.concurrent_state_mut().push_high_priority(
2543 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2544 lower(token.as_context_mut(store), result)?;
2545 let state = store.concurrent_state_mut();
2546 state.get_mut(task)?.join_handle.take();
2547 Waitable::Host(task).set_event(
2548 state,
2549 Some(Event::Subtask {
2550 status: Status::Returned,
2551 }),
2552 )
2553 }))),
2554 );
2555 Ok(())
2556 })
2557 });
2558
2559 store.0.concurrent_state_mut().push_future(future);
2560 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2561 .subtask_insert_host(task.rep())?;
2562 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2563 log::trace!(
2564 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2565 );
2566 Some(handle)
2567 }
2568 })
2569 }
2570
2571 pub(crate) fn task_return(
2574 self,
2575 store: &mut dyn VMStore,
2576 caller: RuntimeComponentInstanceIndex,
2577 ty: TypeTupleIndex,
2578 options: OptionsIndex,
2579 storage: &[ValRaw],
2580 ) -> Result<()> {
2581 self.id().get(store).check_may_leave(caller)?;
2582 let state = store.concurrent_state_mut();
2583 let guest_thread = state.guest_thread.unwrap();
2584 let lift = state
2585 .get_mut(guest_thread.task)?
2586 .lift_result
2587 .take()
2588 .ok_or_else(|| {
2589 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2590 })?;
2591 assert!(state.get_mut(guest_thread.task)?.result.is_none());
2592
2593 let CanonicalOptions {
2594 string_encoding,
2595 data_model,
2596 ..
2597 } = &self.id().get(store).component().env_component().options[options];
2598
2599 let invalid = ty != lift.ty
2600 || string_encoding != &lift.string_encoding
2601 || match data_model {
2602 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2603 Some(memory) => {
2604 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2605 let actual = self.id().get(store).runtime_memory(memory);
2606 expected != actual.as_ptr()
2607 }
2608 None => false,
2611 },
2612 CanonicalOptionsDataModel::Gc { .. } => true,
2614 };
2615
2616 if invalid {
2617 bail!("invalid `task.return` signature and/or options for current task");
2618 }
2619
2620 log::trace!("task.return for {guest_thread:?}");
2621
2622 let result = (lift.lift)(store, storage)?;
2623 self.task_complete(
2624 store,
2625 guest_thread.task,
2626 result,
2627 Status::Returned,
2628 ValRaw::i32(0),
2629 )
2630 }
2631
2632 pub(crate) fn task_cancel(
2634 self,
2635 store: &mut StoreOpaque,
2636 caller: RuntimeComponentInstanceIndex,
2637 ) -> Result<()> {
2638 self.id().get(store).check_may_leave(caller)?;
2639 let state = store.concurrent_state_mut();
2640 let guest_thread = state.guest_thread.unwrap();
2641 let task = state.get_mut(guest_thread.task)?;
2642 if !task.cancel_sent {
2643 bail!("`task.cancel` called by task which has not been cancelled")
2644 }
2645 _ = task.lift_result.take().ok_or_else(|| {
2646 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2647 })?;
2648
2649 assert!(task.result.is_none());
2650
2651 log::trace!("task.cancel for {guest_thread:?}");
2652
2653 self.task_complete(
2654 store,
2655 guest_thread.task,
2656 Box::new(DummyResult),
2657 Status::ReturnCancelled,
2658 ValRaw::i32(0),
2659 )
2660 }
2661
2662 fn task_complete(
2668 self,
2669 store: &mut StoreOpaque,
2670 guest_task: TableId<GuestTask>,
2671 result: Box<dyn Any + Send + Sync>,
2672 status: Status,
2673 post_return_arg: ValRaw,
2674 ) -> Result<()> {
2675 if store
2676 .concurrent_state_mut()
2677 .get_mut(guest_task)?
2678 .call_post_return_automatically()
2679 {
2680 let (calls, host_table, _, instance) =
2681 store.component_resource_state_with_instance(self);
2682 ResourceTables {
2683 calls,
2684 host_table: Some(host_table),
2685 guest: Some(instance.guest_tables()),
2686 }
2687 .exit_call()?;
2688 } else {
2689 let function_index = store
2694 .concurrent_state_mut()
2695 .get_mut(guest_task)?
2696 .function_index
2697 .unwrap();
2698 self.id()
2699 .get_mut(store)
2700 .post_return_arg_set(function_index, post_return_arg);
2701 }
2702
2703 let state = store.concurrent_state_mut();
2704 let task = state.get_mut(guest_task)?;
2705
2706 if let Caller::Host { tx, .. } = &mut task.caller {
2707 if let Some(tx) = tx.take() {
2708 _ = tx.send(result);
2709 }
2710 } else {
2711 task.result = Some(result);
2712 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2713 }
2714
2715 Ok(())
2716 }
2717
2718 pub(crate) fn waitable_set_new(
2720 self,
2721 store: &mut StoreOpaque,
2722 caller_instance: RuntimeComponentInstanceIndex,
2723 ) -> Result<u32> {
2724 self.id().get_mut(store).check_may_leave(caller_instance)?;
2725 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2726 let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2727 .waitable_set_insert(set.rep())?;
2728 log::trace!("new waitable set {set:?} (handle {handle})");
2729 Ok(handle)
2730 }
2731
2732 pub(crate) fn waitable_set_drop(
2734 self,
2735 store: &mut StoreOpaque,
2736 caller_instance: RuntimeComponentInstanceIndex,
2737 set: u32,
2738 ) -> Result<()> {
2739 self.id().get_mut(store).check_may_leave(caller_instance)?;
2740 let rep =
2741 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2742
2743 log::trace!("drop waitable set {rep} (handle {set})");
2744
2745 let set = store
2746 .concurrent_state_mut()
2747 .delete(TableId::<WaitableSet>::new(rep))?;
2748
2749 if !set.waiting.is_empty() {
2750 bail!("cannot drop waitable set with waiters");
2751 }
2752
2753 Ok(())
2754 }
2755
2756 pub(crate) fn waitable_join(
2758 self,
2759 store: &mut StoreOpaque,
2760 caller_instance: RuntimeComponentInstanceIndex,
2761 waitable_handle: u32,
2762 set_handle: u32,
2763 ) -> Result<()> {
2764 let mut instance = self.id().get_mut(store);
2765 instance.check_may_leave(caller_instance)?;
2766 let waitable =
2767 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2768
2769 let set = if set_handle == 0 {
2770 None
2771 } else {
2772 let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2773
2774 Some(TableId::<WaitableSet>::new(set))
2775 };
2776
2777 log::trace!(
2778 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2779 );
2780
2781 waitable.join(store.concurrent_state_mut(), set)
2782 }
2783
2784 pub(crate) fn subtask_drop(
2786 self,
2787 store: &mut StoreOpaque,
2788 caller_instance: RuntimeComponentInstanceIndex,
2789 task_id: u32,
2790 ) -> Result<()> {
2791 self.id().get_mut(store).check_may_leave(caller_instance)?;
2792 self.waitable_join(store, caller_instance, task_id, 0)?;
2793
2794 let (rep, is_host) =
2795 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2796
2797 let concurrent_state = store.concurrent_state_mut();
2798 let (waitable, expected_caller_instance, delete) = if is_host {
2799 let id = TableId::<HostTask>::new(rep);
2800 let task = concurrent_state.get_mut(id)?;
2801 if task.join_handle.is_some() {
2802 bail!("cannot drop a subtask which has not yet resolved");
2803 }
2804 (Waitable::Host(id), task.caller_instance, true)
2805 } else {
2806 let id = TableId::<GuestTask>::new(rep);
2807 let task = concurrent_state.get_mut(id)?;
2808 if task.lift_result.is_some() {
2809 bail!("cannot drop a subtask which has not yet resolved");
2810 }
2811 if let Caller::Guest { instance, .. } = &task.caller {
2812 (Waitable::Guest(id), *instance, task.exited)
2813 } else {
2814 unreachable!()
2815 }
2816 };
2817
2818 waitable.common(concurrent_state)?.handle = None;
2819
2820 if waitable.take_event(concurrent_state)?.is_some() {
2821 bail!("cannot drop a subtask with an undelivered event");
2822 }
2823
2824 if delete {
2825 waitable.delete_from(concurrent_state)?;
2826 }
2827
2828 assert_eq!(expected_caller_instance, caller_instance);
2832 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2833 Ok(())
2834 }
2835
2836 pub(crate) fn waitable_set_wait(
2838 self,
2839 store: &mut StoreOpaque,
2840 caller: RuntimeComponentInstanceIndex,
2841 options: OptionsIndex,
2842 set: u32,
2843 payload: u32,
2844 ) -> Result<u32> {
2845 self.id().get(store).check_may_leave(caller)?;
2846 let &CanonicalOptions {
2847 cancellable,
2848 instance: caller_instance,
2849 ..
2850 } = &self.id().get(store).component().env_component().options[options];
2851 let rep =
2852 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2853
2854 self.waitable_check(
2855 store,
2856 cancellable,
2857 WaitableCheck::Wait(WaitableCheckParams {
2858 set: TableId::new(rep),
2859 options,
2860 payload,
2861 }),
2862 )
2863 }
2864
2865 pub(crate) fn waitable_set_poll(
2867 self,
2868 store: &mut StoreOpaque,
2869 caller: RuntimeComponentInstanceIndex,
2870 options: OptionsIndex,
2871 set: u32,
2872 payload: u32,
2873 ) -> Result<u32> {
2874 self.id().get(store).check_may_leave(caller)?;
2875 let &CanonicalOptions {
2876 cancellable,
2877 instance: caller_instance,
2878 ..
2879 } = &self.id().get(store).component().env_component().options[options];
2880 let rep =
2881 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2882
2883 self.waitable_check(
2884 store,
2885 cancellable,
2886 WaitableCheck::Poll(WaitableCheckParams {
2887 set: TableId::new(rep),
2888 options,
2889 payload,
2890 }),
2891 )
2892 }
2893
2894 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2896 let thread_id = store
2897 .concurrent_state_mut()
2898 .guest_thread
2899 .ok_or_else(|| anyhow!("no current thread"))?
2900 .thread;
2901 Ok(store
2903 .concurrent_state_mut()
2904 .get_mut(thread_id)?
2905 .instance_rep
2906 .unwrap())
2907 }
2908
2909 pub(crate) fn thread_new_indirect<T: 'static>(
2911 self,
2912 mut store: StoreContextMut<T>,
2913 runtime_instance: RuntimeComponentInstanceIndex,
2914 _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
2916 start_func_idx: u32,
2917 context: i32,
2918 ) -> Result<u32> {
2919 self.id().get(store.0).check_may_leave(runtime_instance)?;
2920
2921 log::trace!("creating new thread");
2922
2923 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
2924 let instance = self.id().get_mut(store.0);
2925 let callee = instance
2926 .index_runtime_func_table(start_func_table_idx, start_func_idx as u64)?
2927 .ok_or_else(|| {
2928 anyhow!("the start function index points to an uninitialized function")
2929 })?;
2930 if callee.type_index(store.0) != start_func_ty.type_index() {
2931 bail!(
2932 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
2933 );
2934 }
2935
2936 let token = StoreToken::new(store.as_context_mut());
2937 let start_func = Box::new(
2938 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
2939 let old_thread = store
2940 .concurrent_state_mut()
2941 .guest_thread
2942 .replace(guest_thread);
2943 log::trace!(
2944 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
2945 );
2946
2947 store.maybe_push_call_context(guest_thread.task)?;
2948
2949 let mut store = token.as_context_mut(store);
2950 let mut params = [ValRaw::i32(context)];
2951 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
2954
2955 store.0.maybe_pop_call_context(guest_thread.task)?;
2956
2957 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
2958 log::trace!("explicit thread {guest_thread:?} completed");
2959 let state = store.0.concurrent_state_mut();
2960 let task = state.get_mut(guest_thread.task)?;
2961 if task.threads.is_empty() && !task.returned_or_cancelled() {
2962 bail!(Trap::NoAsyncResult);
2963 }
2964 state.guest_thread = old_thread;
2965 old_thread
2966 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2967 if state.get_mut(guest_thread.task)?.ready_to_delete() {
2968 Waitable::Guest(guest_thread.task).delete_from(state)?;
2969 }
2970 log::trace!("thread start: restored {old_thread:?} as current thread");
2971
2972 Ok(())
2973 },
2974 );
2975
2976 let state = store.0.concurrent_state_mut();
2977 let current_thread = state.guest_thread.unwrap();
2978 let parent_task = current_thread.task;
2979
2980 let new_thread = GuestThread::new_explicit(parent_task, start_func);
2981 let thread_id = state.push(new_thread)?;
2982 state.get_mut(parent_task)?.threads.insert(thread_id);
2983
2984 log::trace!("new thread with id {thread_id:?} created");
2985
2986 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
2987 }
2988
2989 pub(crate) fn resume_suspended_thread(
2990 self,
2991 store: &mut StoreOpaque,
2992 runtime_instance: RuntimeComponentInstanceIndex,
2993 thread_idx: u32,
2994 high_priority: bool,
2995 ) -> Result<()> {
2996 let thread_id =
2997 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
2998 let state = store.concurrent_state_mut();
2999 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3000 let thread = state.get_mut(guest_thread.thread)?;
3001
3002 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3003 GuestThreadState::NotStartedExplicit(start_func) => {
3004 log::trace!("starting thread {guest_thread:?}");
3005 let guest_call = WorkItem::GuestCall(GuestCall {
3006 thread: guest_thread,
3007 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3008 start_func(store, guest_thread)
3009 })),
3010 });
3011 store
3012 .concurrent_state_mut()
3013 .push_work_item(guest_call, high_priority);
3014 }
3015 GuestThreadState::Suspended(fiber) => {
3016 log::trace!("resuming thread {thread_id:?} that was suspended");
3017 store
3018 .concurrent_state_mut()
3019 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3020 }
3021 _ => {
3022 bail!("cannot resume thread which is not suspended");
3023 }
3024 }
3025 Ok(())
3026 }
3027
3028 fn add_guest_thread_to_instance_table(
3029 self,
3030 thread_id: TableId<GuestThread>,
3031 store: &mut StoreOpaque,
3032 runtime_instance: RuntimeComponentInstanceIndex,
3033 ) -> Result<u32> {
3034 let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3035 .guest_thread_insert(thread_id.rep())?;
3036 store
3037 .concurrent_state_mut()
3038 .get_mut(thread_id)?
3039 .instance_rep = Some(guest_id);
3040 Ok(guest_id)
3041 }
3042
3043 pub(crate) fn suspension_intrinsic(
3046 self,
3047 store: &mut StoreOpaque,
3048 caller: RuntimeComponentInstanceIndex,
3049 cancellable: bool,
3050 yielding: bool,
3051 to_thread: Option<u32>,
3052 ) -> Result<WaitResult> {
3053 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3055 return Ok(WaitResult::Cancelled);
3056 }
3057
3058 self.id().get(store).check_may_leave(caller)?;
3059
3060 if let Some(thread) = to_thread {
3061 self.resume_suspended_thread(store, caller, thread, true)?;
3062 }
3063
3064 let state = store.concurrent_state_mut();
3065 let guest_thread = state.guest_thread.unwrap();
3066 let reason = if yielding {
3067 SuspendReason::Yielding {
3068 thread: guest_thread,
3069 }
3070 } else {
3071 SuspendReason::ExplicitlySuspending {
3072 thread: guest_thread,
3073 }
3074 };
3075
3076 store.suspend(reason)?;
3077
3078 if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3079 Ok(WaitResult::Cancelled)
3080 } else {
3081 Ok(WaitResult::Completed)
3082 }
3083 }
3084
3085 fn waitable_check(
3087 self,
3088 store: &mut StoreOpaque,
3089 cancellable: bool,
3090 check: WaitableCheck,
3091 ) -> Result<u32> {
3092 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3093
3094 let (wait, set) = match &check {
3095 WaitableCheck::Wait(params) => (true, Some(params.set)),
3096 WaitableCheck::Poll(params) => (false, Some(params.set)),
3097 };
3098
3099 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3100 store.suspend(SuspendReason::Yielding {
3102 thread: guest_thread,
3103 })?;
3104
3105 log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3106
3107 let state = store.concurrent_state_mut();
3108 let task = state.get_mut(guest_thread.task)?;
3109
3110 if wait {
3113 let set = set.unwrap();
3114
3115 if (task.event.is_none()
3116 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3117 && state.get_mut(set)?.ready.is_empty()
3118 {
3119 if cancellable {
3120 let old = state
3121 .get_mut(guest_thread.thread)?
3122 .wake_on_cancel
3123 .replace(set);
3124 assert!(old.is_none());
3125 }
3126
3127 store.suspend(SuspendReason::Waiting {
3128 set,
3129 thread: guest_thread,
3130 })?;
3131 }
3132 }
3133
3134 log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3135
3136 let result = match check {
3137 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3139 let event =
3140 self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3141
3142 let (ordinal, handle, result) = if wait {
3143 let (event, waitable) = event.unwrap();
3144 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3145 let (ordinal, result) = event.parts();
3146 (ordinal, handle, result)
3147 } else {
3148 if let Some((event, waitable)) = event {
3149 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3150 let (ordinal, result) = event.parts();
3151 (ordinal, handle, result)
3152 } else {
3153 log::trace!(
3154 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3155 guest_thread.task,
3156 params.set
3157 );
3158 let (ordinal, result) = Event::None.parts();
3159 (ordinal, 0, result)
3160 }
3161 };
3162 let memory = self.options_memory_mut(store, params.options);
3163 let ptr =
3164 func::validate_inbounds::<(u32, u32)>(memory, &ValRaw::u32(params.payload))?;
3165 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3166 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3167 Ok(ordinal)
3168 }
3169 };
3170
3171 result
3172 }
3173
3174 pub(crate) fn subtask_cancel(
3176 self,
3177 store: &mut StoreOpaque,
3178 caller_instance: RuntimeComponentInstanceIndex,
3179 async_: bool,
3180 task_id: u32,
3181 ) -> Result<u32> {
3182 self.id().get(store).check_may_leave(caller_instance)?;
3183 let (rep, is_host) =
3184 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3185 let (waitable, expected_caller_instance) = if is_host {
3186 let id = TableId::<HostTask>::new(rep);
3187 (
3188 Waitable::Host(id),
3189 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3190 )
3191 } else {
3192 let id = TableId::<GuestTask>::new(rep);
3193 if let Caller::Guest { instance, .. } =
3194 &store.concurrent_state_mut().get_mut(id)?.caller
3195 {
3196 (Waitable::Guest(id), *instance)
3197 } else {
3198 unreachable!()
3199 }
3200 };
3201 assert_eq!(expected_caller_instance, caller_instance);
3205
3206 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3207
3208 let concurrent_state = store.concurrent_state_mut();
3209 if let Waitable::Host(host_task) = waitable {
3210 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3211 handle.abort();
3212 return Ok(Status::ReturnCancelled as u32);
3213 }
3214 } else {
3215 let caller = concurrent_state.guest_thread.unwrap();
3216 let guest_task = TableId::<GuestTask>::new(rep);
3217 let task = concurrent_state.get_mut(guest_task)?;
3218 if !task.already_lowered_parameters() {
3219 task.lower_params = None;
3220 task.lift_result = None;
3221
3222 let callee_instance = task.instance;
3224
3225 let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3226 let pending_count = pending.len();
3227 pending.retain(|thread, _| thread.task != guest_task);
3228 if pending.len() == pending_count {
3230 bail!("`subtask.cancel` called after terminal status delivered");
3231 }
3232 return Ok(Status::StartCancelled as u32);
3233 } else if !task.returned_or_cancelled() {
3234 task.cancel_sent = true;
3237 task.event = Some(Event::Cancelled);
3242 for thread in task.threads.clone() {
3243 let thread = QualifiedThreadId {
3244 task: guest_task,
3245 thread,
3246 };
3247 if let Some(set) = concurrent_state
3248 .get_mut(thread.thread)
3249 .unwrap()
3250 .wake_on_cancel
3251 .take()
3252 {
3253 let item = match concurrent_state
3254 .get_mut(set)?
3255 .waiting
3256 .remove(&thread)
3257 .unwrap()
3258 {
3259 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3260 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3261 thread,
3262 kind: GuestCallKind::DeliverEvent {
3263 instance,
3264 set: None,
3265 },
3266 }),
3267 };
3268 concurrent_state.push_high_priority(item);
3269
3270 store.suspend(SuspendReason::Yielding { thread: caller })?;
3271 break;
3272 }
3273 }
3274
3275 let concurrent_state = store.concurrent_state_mut();
3276 let task = concurrent_state.get_mut(guest_task)?;
3277 if !task.returned_or_cancelled() {
3278 if async_ {
3279 return Ok(BLOCKED);
3280 } else {
3281 store.wait_for_event(Waitable::Guest(guest_task))?;
3282 }
3283 }
3284 }
3285 }
3286
3287 let event = waitable.take_event(store.concurrent_state_mut())?;
3288 if let Some(Event::Subtask {
3289 status: status @ (Status::Returned | Status::ReturnCancelled),
3290 }) = event
3291 {
3292 Ok(status as u32)
3293 } else {
3294 bail!("`subtask.cancel` called after terminal status delivered");
3295 }
3296 }
3297
3298 pub(crate) fn context_get(
3299 self,
3300 store: &mut StoreOpaque,
3301 caller: RuntimeComponentInstanceIndex,
3302 slot: u32,
3303 ) -> Result<u32> {
3304 self.id().get(store).check_may_leave(caller)?;
3305 store.concurrent_state_mut().context_get(slot)
3306 }
3307
3308 pub(crate) fn context_set(
3309 self,
3310 store: &mut StoreOpaque,
3311 caller: RuntimeComponentInstanceIndex,
3312 slot: u32,
3313 value: u32,
3314 ) -> Result<()> {
3315 self.id().get(store).check_may_leave(caller)?;
3316 store.concurrent_state_mut().context_set(slot, value)
3317 }
3318}
3319
3320pub trait VMComponentAsyncStore {
3328 unsafe fn prepare_call(
3334 &mut self,
3335 instance: Instance,
3336 memory: *mut VMMemoryDefinition,
3337 start: *mut VMFuncRef,
3338 return_: *mut VMFuncRef,
3339 caller_instance: RuntimeComponentInstanceIndex,
3340 callee_instance: RuntimeComponentInstanceIndex,
3341 task_return_type: TypeTupleIndex,
3342 string_encoding: u8,
3343 result_count: u32,
3344 storage: *mut ValRaw,
3345 storage_len: usize,
3346 ) -> Result<()>;
3347
3348 unsafe fn sync_start(
3351 &mut self,
3352 instance: Instance,
3353 callback: *mut VMFuncRef,
3354 callee: *mut VMFuncRef,
3355 param_count: u32,
3356 storage: *mut MaybeUninit<ValRaw>,
3357 storage_len: usize,
3358 ) -> Result<()>;
3359
3360 unsafe fn async_start(
3363 &mut self,
3364 instance: Instance,
3365 callback: *mut VMFuncRef,
3366 post_return: *mut VMFuncRef,
3367 callee: *mut VMFuncRef,
3368 param_count: u32,
3369 result_count: u32,
3370 flags: u32,
3371 ) -> Result<u32>;
3372
3373 fn future_write(
3375 &mut self,
3376 instance: Instance,
3377 caller: RuntimeComponentInstanceIndex,
3378 ty: TypeFutureTableIndex,
3379 options: OptionsIndex,
3380 future: u32,
3381 address: u32,
3382 ) -> Result<u32>;
3383
3384 fn future_read(
3386 &mut self,
3387 instance: Instance,
3388 caller: RuntimeComponentInstanceIndex,
3389 ty: TypeFutureTableIndex,
3390 options: OptionsIndex,
3391 future: u32,
3392 address: u32,
3393 ) -> Result<u32>;
3394
3395 fn future_drop_writable(
3397 &mut self,
3398 instance: Instance,
3399 caller: RuntimeComponentInstanceIndex,
3400 ty: TypeFutureTableIndex,
3401 writer: u32,
3402 ) -> Result<()>;
3403
3404 fn stream_write(
3406 &mut self,
3407 instance: Instance,
3408 caller: RuntimeComponentInstanceIndex,
3409 ty: TypeStreamTableIndex,
3410 options: OptionsIndex,
3411 stream: u32,
3412 address: u32,
3413 count: u32,
3414 ) -> Result<u32>;
3415
3416 fn stream_read(
3418 &mut self,
3419 instance: Instance,
3420 caller: RuntimeComponentInstanceIndex,
3421 ty: TypeStreamTableIndex,
3422 options: OptionsIndex,
3423 stream: u32,
3424 address: u32,
3425 count: u32,
3426 ) -> Result<u32>;
3427
3428 fn flat_stream_write(
3431 &mut self,
3432 instance: Instance,
3433 caller: RuntimeComponentInstanceIndex,
3434 ty: TypeStreamTableIndex,
3435 options: OptionsIndex,
3436 payload_size: u32,
3437 payload_align: u32,
3438 stream: u32,
3439 address: u32,
3440 count: u32,
3441 ) -> Result<u32>;
3442
3443 fn flat_stream_read(
3446 &mut self,
3447 instance: Instance,
3448 caller: RuntimeComponentInstanceIndex,
3449 ty: TypeStreamTableIndex,
3450 options: OptionsIndex,
3451 payload_size: u32,
3452 payload_align: u32,
3453 stream: u32,
3454 address: u32,
3455 count: u32,
3456 ) -> Result<u32>;
3457
3458 fn stream_drop_writable(
3460 &mut self,
3461 instance: Instance,
3462 caller: RuntimeComponentInstanceIndex,
3463 ty: TypeStreamTableIndex,
3464 writer: u32,
3465 ) -> Result<()>;
3466
3467 fn error_context_debug_message(
3469 &mut self,
3470 instance: Instance,
3471 caller: RuntimeComponentInstanceIndex,
3472 ty: TypeComponentLocalErrorContextTableIndex,
3473 options: OptionsIndex,
3474 err_ctx_handle: u32,
3475 debug_msg_address: u32,
3476 ) -> Result<()>;
3477
3478 fn thread_new_indirect(
3480 &mut self,
3481 instance: Instance,
3482 caller: RuntimeComponentInstanceIndex,
3483 func_ty_idx: TypeFuncIndex,
3484 start_func_table_idx: RuntimeTableIndex,
3485 start_func_idx: u32,
3486 context: i32,
3487 ) -> Result<u32>;
3488}
3489
3490impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3492 unsafe fn prepare_call(
3493 &mut self,
3494 instance: Instance,
3495 memory: *mut VMMemoryDefinition,
3496 start: *mut VMFuncRef,
3497 return_: *mut VMFuncRef,
3498 caller_instance: RuntimeComponentInstanceIndex,
3499 callee_instance: RuntimeComponentInstanceIndex,
3500 task_return_type: TypeTupleIndex,
3501 string_encoding: u8,
3502 result_count_or_max_if_async: u32,
3503 storage: *mut ValRaw,
3504 storage_len: usize,
3505 ) -> Result<()> {
3506 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3510
3511 unsafe {
3512 instance.prepare_call(
3513 StoreContextMut(self),
3514 start,
3515 return_,
3516 caller_instance,
3517 callee_instance,
3518 task_return_type,
3519 memory,
3520 string_encoding,
3521 match result_count_or_max_if_async {
3522 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3523 params,
3524 has_result: false,
3525 },
3526 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3527 params,
3528 has_result: true,
3529 },
3530 result_count => CallerInfo::Sync {
3531 params,
3532 result_count,
3533 },
3534 },
3535 )
3536 }
3537 }
3538
3539 unsafe fn sync_start(
3540 &mut self,
3541 instance: Instance,
3542 callback: *mut VMFuncRef,
3543 callee: *mut VMFuncRef,
3544 param_count: u32,
3545 storage: *mut MaybeUninit<ValRaw>,
3546 storage_len: usize,
3547 ) -> Result<()> {
3548 unsafe {
3549 instance
3550 .start_call(
3551 StoreContextMut(self),
3552 callback,
3553 ptr::null_mut(),
3554 callee,
3555 param_count,
3556 1,
3557 START_FLAG_ASYNC_CALLEE,
3558 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3562 )
3563 .map(drop)
3564 }
3565 }
3566
3567 unsafe fn async_start(
3568 &mut self,
3569 instance: Instance,
3570 callback: *mut VMFuncRef,
3571 post_return: *mut VMFuncRef,
3572 callee: *mut VMFuncRef,
3573 param_count: u32,
3574 result_count: u32,
3575 flags: u32,
3576 ) -> Result<u32> {
3577 unsafe {
3578 instance.start_call(
3579 StoreContextMut(self),
3580 callback,
3581 post_return,
3582 callee,
3583 param_count,
3584 result_count,
3585 flags,
3586 None,
3587 )
3588 }
3589 }
3590
3591 fn future_write(
3592 &mut self,
3593 instance: Instance,
3594 caller: RuntimeComponentInstanceIndex,
3595 ty: TypeFutureTableIndex,
3596 options: OptionsIndex,
3597 future: u32,
3598 address: u32,
3599 ) -> Result<u32> {
3600 instance.id().get(self).check_may_leave(caller)?;
3601 instance
3602 .guest_write(
3603 StoreContextMut(self),
3604 TransmitIndex::Future(ty),
3605 options,
3606 None,
3607 future,
3608 address,
3609 1,
3610 )
3611 .map(|result| result.encode())
3612 }
3613
3614 fn future_read(
3615 &mut self,
3616 instance: Instance,
3617 caller: RuntimeComponentInstanceIndex,
3618 ty: TypeFutureTableIndex,
3619 options: OptionsIndex,
3620 future: u32,
3621 address: u32,
3622 ) -> Result<u32> {
3623 instance.id().get(self).check_may_leave(caller)?;
3624 instance
3625 .guest_read(
3626 StoreContextMut(self),
3627 TransmitIndex::Future(ty),
3628 options,
3629 None,
3630 future,
3631 address,
3632 1,
3633 )
3634 .map(|result| result.encode())
3635 }
3636
3637 fn stream_write(
3638 &mut self,
3639 instance: Instance,
3640 caller: RuntimeComponentInstanceIndex,
3641 ty: TypeStreamTableIndex,
3642 options: OptionsIndex,
3643 stream: u32,
3644 address: u32,
3645 count: u32,
3646 ) -> Result<u32> {
3647 instance.id().get(self).check_may_leave(caller)?;
3648 instance
3649 .guest_write(
3650 StoreContextMut(self),
3651 TransmitIndex::Stream(ty),
3652 options,
3653 None,
3654 stream,
3655 address,
3656 count,
3657 )
3658 .map(|result| result.encode())
3659 }
3660
3661 fn stream_read(
3662 &mut self,
3663 instance: Instance,
3664 caller: RuntimeComponentInstanceIndex,
3665 ty: TypeStreamTableIndex,
3666 options: OptionsIndex,
3667 stream: u32,
3668 address: u32,
3669 count: u32,
3670 ) -> Result<u32> {
3671 instance.id().get(self).check_may_leave(caller)?;
3672 instance
3673 .guest_read(
3674 StoreContextMut(self),
3675 TransmitIndex::Stream(ty),
3676 options,
3677 None,
3678 stream,
3679 address,
3680 count,
3681 )
3682 .map(|result| result.encode())
3683 }
3684
3685 fn future_drop_writable(
3686 &mut self,
3687 instance: Instance,
3688 caller: RuntimeComponentInstanceIndex,
3689 ty: TypeFutureTableIndex,
3690 writer: u32,
3691 ) -> Result<()> {
3692 instance.id().get(self).check_may_leave(caller)?;
3693 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3694 }
3695
3696 fn flat_stream_write(
3697 &mut self,
3698 instance: Instance,
3699 caller: RuntimeComponentInstanceIndex,
3700 ty: TypeStreamTableIndex,
3701 options: OptionsIndex,
3702 payload_size: u32,
3703 payload_align: u32,
3704 stream: u32,
3705 address: u32,
3706 count: u32,
3707 ) -> Result<u32> {
3708 instance.id().get(self).check_may_leave(caller)?;
3709 instance
3710 .guest_write(
3711 StoreContextMut(self),
3712 TransmitIndex::Stream(ty),
3713 options,
3714 Some(FlatAbi {
3715 size: payload_size,
3716 align: payload_align,
3717 }),
3718 stream,
3719 address,
3720 count,
3721 )
3722 .map(|result| result.encode())
3723 }
3724
3725 fn flat_stream_read(
3726 &mut self,
3727 instance: Instance,
3728 caller: RuntimeComponentInstanceIndex,
3729 ty: TypeStreamTableIndex,
3730 options: OptionsIndex,
3731 payload_size: u32,
3732 payload_align: u32,
3733 stream: u32,
3734 address: u32,
3735 count: u32,
3736 ) -> Result<u32> {
3737 instance.id().get(self).check_may_leave(caller)?;
3738 instance
3739 .guest_read(
3740 StoreContextMut(self),
3741 TransmitIndex::Stream(ty),
3742 options,
3743 Some(FlatAbi {
3744 size: payload_size,
3745 align: payload_align,
3746 }),
3747 stream,
3748 address,
3749 count,
3750 )
3751 .map(|result| result.encode())
3752 }
3753
3754 fn stream_drop_writable(
3755 &mut self,
3756 instance: Instance,
3757 caller: RuntimeComponentInstanceIndex,
3758 ty: TypeStreamTableIndex,
3759 writer: u32,
3760 ) -> Result<()> {
3761 instance.id().get(self).check_may_leave(caller)?;
3762 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3763 }
3764
3765 fn error_context_debug_message(
3766 &mut self,
3767 instance: Instance,
3768 caller: RuntimeComponentInstanceIndex,
3769 ty: TypeComponentLocalErrorContextTableIndex,
3770 options: OptionsIndex,
3771 err_ctx_handle: u32,
3772 debug_msg_address: u32,
3773 ) -> Result<()> {
3774 instance.id().get(self).check_may_leave(caller)?;
3775 instance.error_context_debug_message(
3776 StoreContextMut(self),
3777 ty,
3778 options,
3779 err_ctx_handle,
3780 debug_msg_address,
3781 )
3782 }
3783
3784 fn thread_new_indirect(
3785 &mut self,
3786 instance: Instance,
3787 caller: RuntimeComponentInstanceIndex,
3788 func_ty_idx: TypeFuncIndex,
3789 start_func_table_idx: RuntimeTableIndex,
3790 start_func_idx: u32,
3791 context: i32,
3792 ) -> Result<u32> {
3793 instance.thread_new_indirect(
3794 StoreContextMut(self),
3795 caller,
3796 func_ty_idx,
3797 start_func_table_idx,
3798 start_func_idx,
3799 context,
3800 )
3801 }
3802}
3803
3804type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3805
3806struct HostTask {
3808 common: WaitableCommon,
3809 caller_instance: RuntimeComponentInstanceIndex,
3810 join_handle: Option<JoinHandle>,
3811}
3812
3813impl HostTask {
3814 fn new(
3815 caller_instance: RuntimeComponentInstanceIndex,
3816 join_handle: Option<JoinHandle>,
3817 ) -> Self {
3818 Self {
3819 common: WaitableCommon::default(),
3820 caller_instance,
3821 join_handle,
3822 }
3823 }
3824}
3825
3826impl TableDebug for HostTask {
3827 fn type_name() -> &'static str {
3828 "HostTask"
3829 }
3830}
3831
3832type CallbackFn = Box<
3833 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3834 + Send
3835 + Sync
3836 + 'static,
3837>;
3838
3839enum Caller {
3841 Host {
3843 tx: Option<oneshot::Sender<LiftedResult>>,
3845 exit_tx: Arc<oneshot::Sender<()>>,
3852 host_future_present: bool,
3855 call_post_return_automatically: bool,
3857 },
3858 Guest {
3860 thread: QualifiedThreadId,
3862 instance: RuntimeComponentInstanceIndex,
3868 },
3869}
3870
3871struct LiftResult {
3874 lift: RawLift,
3875 ty: TypeTupleIndex,
3876 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3877 string_encoding: StringEncoding,
3878}
3879
3880#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3885struct QualifiedThreadId {
3886 task: TableId<GuestTask>,
3887 thread: TableId<GuestThread>,
3888}
3889
3890impl QualifiedThreadId {
3891 fn qualify(
3892 state: &mut ConcurrentState,
3893 thread: TableId<GuestThread>,
3894 ) -> Result<QualifiedThreadId> {
3895 Ok(QualifiedThreadId {
3896 task: state.get_mut(thread)?.parent_task,
3897 thread,
3898 })
3899 }
3900}
3901
3902impl fmt::Debug for QualifiedThreadId {
3903 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3904 f.debug_tuple("QualifiedThreadId")
3905 .field(&self.task.rep())
3906 .field(&self.thread.rep())
3907 .finish()
3908 }
3909}
3910
3911enum GuestThreadState {
3912 NotStartedImplicit,
3913 NotStartedExplicit(
3914 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
3915 ),
3916 Running,
3917 Suspended(StoreFiber<'static>),
3918 Pending,
3919 Completed,
3920}
3921pub struct GuestThread {
3922 context: [u32; 2],
3925 parent_task: TableId<GuestTask>,
3927 wake_on_cancel: Option<TableId<WaitableSet>>,
3930 state: GuestThreadState,
3932 instance_rep: Option<u32>,
3935}
3936
3937impl GuestThread {
3938 fn from_instance(
3941 state: Pin<&mut ComponentInstance>,
3942 caller_instance: RuntimeComponentInstanceIndex,
3943 guest_thread: u32,
3944 ) -> Result<TableId<Self>> {
3945 let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
3946 Ok(TableId::new(rep))
3947 }
3948
3949 fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
3950 Self {
3951 context: [0; 2],
3952 parent_task,
3953 wake_on_cancel: None,
3954 state: GuestThreadState::NotStartedImplicit,
3955 instance_rep: None,
3956 }
3957 }
3958
3959 fn new_explicit(
3960 parent_task: TableId<GuestTask>,
3961 start_func: Box<
3962 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
3963 >,
3964 ) -> Self {
3965 Self {
3966 context: [0; 2],
3967 parent_task,
3968 wake_on_cancel: None,
3969 state: GuestThreadState::NotStartedExplicit(start_func),
3970 instance_rep: None,
3971 }
3972 }
3973}
3974
3975impl TableDebug for GuestThread {
3976 fn type_name() -> &'static str {
3977 "GuestThread"
3978 }
3979}
3980
3981enum SyncResult {
3982 NotProduced,
3983 Produced(Option<ValRaw>),
3984 Taken,
3985}
3986
3987impl SyncResult {
3988 fn take(&mut self) -> Option<Option<ValRaw>> {
3989 match mem::replace(self, SyncResult::Taken) {
3990 SyncResult::NotProduced => None,
3991 SyncResult::Produced(val) => Some(val),
3992 SyncResult::Taken => {
3993 panic!("attempted to take a synchronous result that was already taken")
3994 }
3995 }
3996 }
3997}
3998
3999#[derive(Debug)]
4000enum HostFutureState {
4001 NotApplicable,
4002 Live,
4003 Dropped,
4004}
4005
4006pub(crate) struct GuestTask {
4008 common: WaitableCommon,
4010 lower_params: Option<RawLower>,
4012 lift_result: Option<LiftResult>,
4014 result: Option<LiftedResult>,
4017 callback: Option<CallbackFn>,
4020 caller: Caller,
4022 call_context: Option<CallContext>,
4025 sync_result: SyncResult,
4028 cancel_sent: bool,
4031 starting_sent: bool,
4034 subtasks: HashSet<TableId<GuestTask>>,
4039 sync_call_set: TableId<WaitableSet>,
4041 instance: RuntimeComponentInstanceIndex,
4047 event: Option<Event>,
4050 function_index: Option<ExportIndex>,
4052 exited: bool,
4054 threads: HashSet<TableId<GuestThread>>,
4056 host_future_state: HostFutureState,
4059}
4060
4061impl GuestTask {
4062 fn already_lowered_parameters(&self) -> bool {
4063 self.lower_params.is_none()
4065 }
4066 fn returned_or_cancelled(&self) -> bool {
4067 self.lift_result.is_none()
4069 }
4070 fn ready_to_delete(&self) -> bool {
4071 let threads_completed = self.threads.is_empty();
4072 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4073 let pending_completion_event = matches!(
4074 self.common.event,
4075 Some(Event::Subtask {
4076 status: Status::Returned | Status::ReturnCancelled
4077 })
4078 );
4079 let ready = threads_completed
4080 && !has_sync_result
4081 && !pending_completion_event
4082 && !matches!(self.host_future_state, HostFutureState::Live);
4083 log::trace!(
4084 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4085 threads_completed,
4086 has_sync_result,
4087 pending_completion_event,
4088 self.host_future_state
4089 );
4090 ready
4091 }
4092 fn new(
4093 state: &mut ConcurrentState,
4094 lower_params: RawLower,
4095 lift_result: LiftResult,
4096 caller: Caller,
4097 callback: Option<CallbackFn>,
4098 component_instance: RuntimeComponentInstanceIndex,
4099 ) -> Result<Self> {
4100 let sync_call_set = state.push(WaitableSet::default())?;
4101 let host_future_state = match &caller {
4102 Caller::Guest { .. } => HostFutureState::NotApplicable,
4103 Caller::Host {
4104 host_future_present,
4105 ..
4106 } => {
4107 if *host_future_present {
4108 HostFutureState::Live
4109 } else {
4110 HostFutureState::NotApplicable
4111 }
4112 }
4113 };
4114 Ok(Self {
4115 common: WaitableCommon::default(),
4116 lower_params: Some(lower_params),
4117 lift_result: Some(lift_result),
4118 result: None,
4119 callback,
4120 caller,
4121 call_context: Some(CallContext::default()),
4122 sync_result: SyncResult::NotProduced,
4123 cancel_sent: false,
4124 starting_sent: false,
4125 subtasks: HashSet::new(),
4126 sync_call_set,
4127 instance: component_instance,
4128 event: None,
4129 function_index: None,
4130 exited: false,
4131 threads: HashSet::new(),
4132 host_future_state,
4133 })
4134 }
4135
4136 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4139 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4142 if let Some(Event::Subtask {
4143 status: Status::Returned | Status::ReturnCancelled,
4144 }) = waitable.common(state)?.event
4145 {
4146 waitable.delete_from(state)?;
4147 }
4148 }
4149
4150 assert!(self.threads.is_empty());
4151
4152 state.delete(self.sync_call_set)?;
4153
4154 match &self.caller {
4156 Caller::Guest {
4157 thread,
4158 instance: runtime_instance,
4159 } => {
4160 let task_mut = state.get_mut(thread.task)?;
4161 let present = task_mut.subtasks.remove(&me);
4162 assert!(present);
4163
4164 for subtask in &self.subtasks {
4165 task_mut.subtasks.insert(*subtask);
4166 }
4167
4168 for subtask in &self.subtasks {
4169 state.get_mut(*subtask)?.caller = Caller::Guest {
4170 thread: *thread,
4171 instance: *runtime_instance,
4172 };
4173 }
4174 }
4175 Caller::Host { exit_tx, .. } => {
4176 for subtask in &self.subtasks {
4177 state.get_mut(*subtask)?.caller = Caller::Host {
4178 tx: None,
4179 exit_tx: exit_tx.clone(),
4183 host_future_present: false,
4184 call_post_return_automatically: true,
4185 };
4186 }
4187 }
4188 }
4189
4190 for subtask in self.subtasks {
4191 if state.get_mut(subtask)?.exited {
4192 Waitable::Guest(subtask).delete_from(state)?;
4193 }
4194 }
4195
4196 Ok(())
4197 }
4198
4199 fn call_post_return_automatically(&self) -> bool {
4200 matches!(
4201 self.caller,
4202 Caller::Guest { .. }
4203 | Caller::Host {
4204 call_post_return_automatically: true,
4205 ..
4206 }
4207 )
4208 }
4209}
4210
4211impl TableDebug for GuestTask {
4212 fn type_name() -> &'static str {
4213 "GuestTask"
4214 }
4215}
4216
4217#[derive(Default)]
4219struct WaitableCommon {
4220 event: Option<Event>,
4222 set: Option<TableId<WaitableSet>>,
4224 handle: Option<u32>,
4226}
4227
4228#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4230enum Waitable {
4231 Host(TableId<HostTask>),
4233 Guest(TableId<GuestTask>),
4235 Transmit(TableId<TransmitHandle>),
4237}
4238
4239impl Waitable {
4240 fn from_instance(
4243 state: Pin<&mut ComponentInstance>,
4244 caller_instance: RuntimeComponentInstanceIndex,
4245 waitable: u32,
4246 ) -> Result<Self> {
4247 use crate::runtime::vm::component::Waitable;
4248
4249 let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4250
4251 Ok(match kind {
4252 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4253 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4254 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4255 })
4256 }
4257
4258 fn rep(&self) -> u32 {
4260 match self {
4261 Self::Host(id) => id.rep(),
4262 Self::Guest(id) => id.rep(),
4263 Self::Transmit(id) => id.rep(),
4264 }
4265 }
4266
4267 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4271 log::trace!("waitable {self:?} join set {set:?}",);
4272
4273 let old = mem::replace(&mut self.common(state)?.set, set);
4274
4275 if let Some(old) = old {
4276 match *self {
4277 Waitable::Host(id) => state.remove_child(id, old),
4278 Waitable::Guest(id) => state.remove_child(id, old),
4279 Waitable::Transmit(id) => state.remove_child(id, old),
4280 }?;
4281
4282 state.get_mut(old)?.ready.remove(self);
4283 }
4284
4285 if let Some(set) = set {
4286 match *self {
4287 Waitable::Host(id) => state.add_child(id, set),
4288 Waitable::Guest(id) => state.add_child(id, set),
4289 Waitable::Transmit(id) => state.add_child(id, set),
4290 }?;
4291
4292 if self.common(state)?.event.is_some() {
4293 self.mark_ready(state)?;
4294 }
4295 }
4296
4297 Ok(())
4298 }
4299
4300 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4302 Ok(match self {
4303 Self::Host(id) => &mut state.get_mut(*id)?.common,
4304 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4305 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4306 })
4307 }
4308
4309 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4313 log::trace!("set event for {self:?}: {event:?}");
4314 self.common(state)?.event = event;
4315 self.mark_ready(state)
4316 }
4317
4318 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4320 let common = self.common(state)?;
4321 let event = common.event.take();
4322 if let Some(set) = self.common(state)?.set {
4323 state.get_mut(set)?.ready.remove(self);
4324 }
4325
4326 Ok(event)
4327 }
4328
4329 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4333 if let Some(set) = self.common(state)?.set {
4334 state.get_mut(set)?.ready.insert(*self);
4335 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4336 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4337 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4338
4339 let item = match mode {
4340 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4341 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4342 thread,
4343 kind: GuestCallKind::DeliverEvent {
4344 instance,
4345 set: Some(set),
4346 },
4347 }),
4348 };
4349 state.push_high_priority(item);
4350 }
4351 }
4352 Ok(())
4353 }
4354
4355 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4357 match self {
4358 Self::Host(task) => {
4359 log::trace!("delete host task {task:?}");
4360 state.delete(*task)?;
4361 }
4362 Self::Guest(task) => {
4363 log::trace!("delete guest task {task:?}");
4364 state.delete(*task)?.dispose(state, *task)?;
4365 }
4366 Self::Transmit(task) => {
4367 state.delete(*task)?;
4368 }
4369 }
4370
4371 Ok(())
4372 }
4373}
4374
4375impl fmt::Debug for Waitable {
4376 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4377 match self {
4378 Self::Host(id) => write!(f, "{id:?}"),
4379 Self::Guest(id) => write!(f, "{id:?}"),
4380 Self::Transmit(id) => write!(f, "{id:?}"),
4381 }
4382 }
4383}
4384
4385#[derive(Default)]
4387struct WaitableSet {
4388 ready: BTreeSet<Waitable>,
4390 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4392}
4393
4394impl TableDebug for WaitableSet {
4395 fn type_name() -> &'static str {
4396 "WaitableSet"
4397 }
4398}
4399
4400type RawLower =
4402 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4403
4404type RawLift = Box<
4406 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4407>;
4408
4409type LiftedResult = Box<dyn Any + Send + Sync>;
4413
4414struct DummyResult;
4417
4418#[derive(Default)]
4420struct InstanceState {
4421 backpressure: u16,
4423 do_not_enter: bool,
4425 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4428}
4429
4430pub struct ConcurrentState {
4432 guest_thread: Option<QualifiedThreadId>,
4434
4435 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4440 table: AlwaysMut<ResourceTable>,
4442 instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4448 high_priority: Vec<WorkItem>,
4450 low_priority: Vec<WorkItem>,
4452 suspend_reason: Option<SuspendReason>,
4456 worker: Option<StoreFiber<'static>>,
4460 worker_item: Option<WorkerItem>,
4462
4463 global_error_context_ref_counts:
4476 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4477}
4478
4479impl Default for ConcurrentState {
4480 fn default() -> Self {
4481 Self {
4482 guest_thread: None,
4483 table: AlwaysMut::new(ResourceTable::new()),
4484 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4485 instance_states: HashMap::new(),
4486 high_priority: Vec::new(),
4487 low_priority: Vec::new(),
4488 suspend_reason: None,
4489 worker: None,
4490 worker_item: None,
4491 global_error_context_ref_counts: BTreeMap::new(),
4492 }
4493 }
4494}
4495
4496impl ConcurrentState {
4497 pub(crate) fn take_fibers_and_futures(
4514 &mut self,
4515 fibers: &mut Vec<StoreFiber<'static>>,
4516 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4517 ) {
4518 for entry in self.table.get_mut().iter_mut() {
4519 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4520 for mode in mem::take(&mut set.waiting).into_values() {
4521 if let WaitMode::Fiber(fiber) = mode {
4522 fibers.push(fiber);
4523 }
4524 }
4525 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4526 if let GuestThreadState::Suspended(fiber) =
4527 mem::replace(&mut thread.state, GuestThreadState::Completed)
4528 {
4529 fibers.push(fiber);
4530 }
4531 }
4532 }
4533
4534 if let Some(fiber) = self.worker.take() {
4535 fibers.push(fiber);
4536 }
4537
4538 let mut take_items = |list| {
4539 for item in mem::take(list) {
4540 match item {
4541 WorkItem::ResumeFiber(fiber) => {
4542 fibers.push(fiber);
4543 }
4544 WorkItem::PushFuture(future) => {
4545 self.futures
4546 .get_mut()
4547 .as_mut()
4548 .unwrap()
4549 .push(future.into_inner());
4550 }
4551 _ => {}
4552 }
4553 }
4554 };
4555
4556 take_items(&mut self.high_priority);
4557 take_items(&mut self.low_priority);
4558
4559 if let Some(them) = self.futures.get_mut().take() {
4560 futures.push(them);
4561 }
4562 }
4563
4564 fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4565 self.instance_states.entry(instance).or_default()
4566 }
4567
4568 fn push<V: Send + Sync + 'static>(
4569 &mut self,
4570 value: V,
4571 ) -> Result<TableId<V>, ResourceTableError> {
4572 self.table.get_mut().push(value).map(TableId::from)
4573 }
4574
4575 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4576 self.table.get_mut().get_mut(&Resource::from(id))
4577 }
4578
4579 pub fn add_child<T: 'static, U: 'static>(
4580 &mut self,
4581 child: TableId<T>,
4582 parent: TableId<U>,
4583 ) -> Result<(), ResourceTableError> {
4584 self.table
4585 .get_mut()
4586 .add_child(Resource::from(child), Resource::from(parent))
4587 }
4588
4589 pub fn remove_child<T: 'static, U: 'static>(
4590 &mut self,
4591 child: TableId<T>,
4592 parent: TableId<U>,
4593 ) -> Result<(), ResourceTableError> {
4594 self.table
4595 .get_mut()
4596 .remove_child(Resource::from(child), Resource::from(parent))
4597 }
4598
4599 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4600 self.table.get_mut().delete(Resource::from(id))
4601 }
4602
4603 fn push_future(&mut self, future: HostTaskFuture) {
4604 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4611 }
4612
4613 fn push_high_priority(&mut self, item: WorkItem) {
4614 log::trace!("push high priority: {item:?}");
4615 self.high_priority.push(item);
4616 }
4617
4618 fn push_low_priority(&mut self, item: WorkItem) {
4619 log::trace!("push low priority: {item:?}");
4620 self.low_priority.push(item);
4621 }
4622
4623 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4624 if high_priority {
4625 self.push_high_priority(item);
4626 } else {
4627 self.push_low_priority(item);
4628 }
4629 }
4630
4631 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4641 let guest_instance = self.get_mut(guest_task).unwrap().instance;
4642
4643 loop {
4651 let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4652 Caller::Host { .. } => break true,
4653 Caller::Guest { thread, instance } => {
4654 if *instance == guest_instance {
4655 break false;
4656 } else {
4657 *thread
4658 }
4659 }
4660 };
4661 guest_task = next_thread.task;
4662 }
4663 }
4664
4665 fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4669 self.instance_state(instance).do_not_enter = true;
4670 }
4671
4672 fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4676 self.instance_state(instance).do_not_enter = false;
4677 self.partition_pending(instance)
4678 }
4679
4680 fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4685 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4686 let call = GuestCall { thread, kind };
4687 if call.is_ready(self)? {
4688 self.push_high_priority(WorkItem::GuestCall(call));
4689 } else {
4690 self.instance_state(instance)
4691 .pending
4692 .insert(call.thread, call.kind);
4693 }
4694 }
4695
4696 Ok(())
4697 }
4698
4699 pub(crate) fn backpressure_modify(
4701 &mut self,
4702 caller_instance: RuntimeComponentInstanceIndex,
4703 modify: impl FnOnce(u16) -> Option<u16>,
4704 ) -> Result<()> {
4705 let state = self.instance_state(caller_instance);
4706 let old = state.backpressure;
4707 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4708 state.backpressure = new;
4709
4710 if old > 0 && new == 0 {
4711 self.partition_pending(caller_instance)?;
4714 }
4715
4716 Ok(())
4717 }
4718
4719 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4721 let thread = self.guest_thread.unwrap();
4722 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4723 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4724 Ok(val)
4725 }
4726
4727 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4729 let thread = self.guest_thread.unwrap();
4730 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4731 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4732 Ok(())
4733 }
4734
4735 fn take_pending_cancellation(&mut self) -> bool {
4738 let thread = self.guest_thread.unwrap();
4739 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4740 assert!(matches!(event, Event::Cancelled));
4741 true
4742 } else {
4743 false
4744 }
4745 }
4746}
4747
4748fn for_any_lower<
4751 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4752>(
4753 fun: F,
4754) -> F {
4755 fun
4756}
4757
4758fn for_any_lift<
4760 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4761>(
4762 fun: F,
4763) -> F {
4764 fun
4765}
4766
4767fn checked<F: Future + Send + 'static>(
4772 id: StoreId,
4773 fut: F,
4774) -> impl Future<Output = F::Output> + Send + 'static {
4775 async move {
4776 let mut fut = pin!(fut);
4777 future::poll_fn(move |cx| {
4778 let message = "\
4779 `Future`s which depend on asynchronous component tasks, streams, or \
4780 futures to complete may only be polled from the event loop of the \
4781 store to which they belong. Please use \
4782 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4783 ";
4784 tls::try_get(|store| {
4785 let matched = match store {
4786 tls::TryGet::Some(store) => store.id() == id,
4787 tls::TryGet::Taken | tls::TryGet::None => false,
4788 };
4789
4790 if !matched {
4791 panic!("{message}")
4792 }
4793 });
4794 fut.as_mut().poll(cx)
4795 })
4796 .await
4797 }
4798}
4799
4800fn check_recursive_run() {
4803 tls::try_get(|store| {
4804 if !matches!(store, tls::TryGet::None) {
4805 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4806 }
4807 });
4808}
4809
4810fn unpack_callback_code(code: u32) -> (u32, u32) {
4811 (code & 0xF, code >> 4)
4812}
4813
4814struct WaitableCheckParams {
4818 set: TableId<WaitableSet>,
4819 options: OptionsIndex,
4820 payload: u32,
4821}
4822
4823enum WaitableCheck {
4825 Wait(WaitableCheckParams),
4826 Poll(WaitableCheckParams),
4827}
4828
4829pub(crate) struct PreparedCall<R> {
4831 handle: Func,
4833 thread: QualifiedThreadId,
4835 param_count: usize,
4837 rx: oneshot::Receiver<LiftedResult>,
4840 exit_rx: oneshot::Receiver<()>,
4843 _phantom: PhantomData<R>,
4844}
4845
4846impl<R> PreparedCall<R> {
4847 pub(crate) fn task_id(&self) -> TaskId {
4849 TaskId {
4850 task: self.thread.task,
4851 }
4852 }
4853}
4854
4855pub(crate) struct TaskId {
4857 task: TableId<GuestTask>,
4858}
4859
4860impl TaskId {
4861 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4867 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
4868 if !task.already_lowered_parameters() {
4869 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4870 } else {
4871 task.host_future_state = HostFutureState::Dropped;
4872 if task.ready_to_delete() {
4873 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4874 }
4875 }
4876 Ok(())
4877 }
4878}
4879
4880pub(crate) fn prepare_call<T, R>(
4886 mut store: StoreContextMut<T>,
4887 handle: Func,
4888 param_count: usize,
4889 host_future_present: bool,
4890 call_post_return_automatically: bool,
4891 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4892 + Send
4893 + Sync
4894 + 'static,
4895 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4896 + Send
4897 + Sync
4898 + 'static,
4899) -> Result<PreparedCall<R>> {
4900 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4901
4902 let instance = handle.instance().id().get(store.0);
4903 let options = &instance.component().env_component().options[options];
4904 let task_return_type = instance.component().types()[ty].results;
4905 let component_instance = raw_options.instance;
4906 let callback = options.callback.map(|i| instance.runtime_callback(i));
4907 let memory = options
4908 .memory()
4909 .map(|i| instance.runtime_memory(i))
4910 .map(SendSyncPtr::new);
4911 let string_encoding = options.string_encoding;
4912 let token = StoreToken::new(store.as_context_mut());
4913 let state = store.0.concurrent_state_mut();
4914
4915 assert!(state.guest_thread.is_none());
4916
4917 let (tx, rx) = oneshot::channel();
4918 let (exit_tx, exit_rx) = oneshot::channel();
4919
4920 let mut task = GuestTask::new(
4921 state,
4922 Box::new(for_any_lower(move |store, params| {
4923 lower_params(handle, token.as_context_mut(store), params)
4924 })),
4925 LiftResult {
4926 lift: Box::new(for_any_lift(move |store, result| {
4927 lift_result(handle, store, result)
4928 })),
4929 ty: task_return_type,
4930 memory,
4931 string_encoding,
4932 },
4933 Caller::Host {
4934 tx: Some(tx),
4935 exit_tx: Arc::new(exit_tx),
4936 host_future_present,
4937 call_post_return_automatically,
4938 },
4939 callback.map(|callback| {
4940 let callback = SendSyncPtr::new(callback);
4941 let instance = handle.instance();
4942 Box::new(
4943 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
4944 let store = token.as_context_mut(store);
4945 unsafe {
4948 instance.call_callback(
4949 store,
4950 runtime_instance,
4951 callback,
4952 event,
4953 handle,
4954 call_post_return_automatically,
4955 )
4956 }
4957 },
4958 ) as CallbackFn
4959 }),
4960 component_instance,
4961 )?;
4962 task.function_index = Some(handle.index());
4963
4964 let task = state.push(task)?;
4965 let thread = state.push(GuestThread::new_implicit(task))?;
4966 state.get_mut(task)?.threads.insert(thread);
4967
4968 Ok(PreparedCall {
4969 handle,
4970 thread: QualifiedThreadId { task, thread },
4971 param_count,
4972 rx,
4973 exit_rx,
4974 _phantom: PhantomData,
4975 })
4976}
4977
4978pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4985 mut store: StoreContextMut<T>,
4986 prepared: PreparedCall<R>,
4987) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
4988 let PreparedCall {
4989 handle,
4990 thread,
4991 param_count,
4992 rx,
4993 exit_rx,
4994 ..
4995 } = prepared;
4996
4997 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
4998
4999 Ok(checked(
5000 store.0.id(),
5001 rx.map(move |result| {
5002 result
5003 .map(|v| (*v.downcast().unwrap(), exit_rx))
5004 .map_err(anyhow::Error::from)
5005 }),
5006 ))
5007}
5008
5009fn queue_call0<T: 'static>(
5012 store: StoreContextMut<T>,
5013 handle: Func,
5014 guest_thread: QualifiedThreadId,
5015 param_count: usize,
5016) -> Result<()> {
5017 let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5018 let is_concurrent = raw_options.async_;
5019 let callback = raw_options.callback;
5020 let instance = handle.instance();
5021 let callee = handle.lifted_core_func(store.0);
5022 let post_return = handle.post_return_core_func(store.0);
5023 let callback = callback.map(|i| {
5024 let instance = instance.id().get(store.0);
5025 SendSyncPtr::new(instance.runtime_callback(i))
5026 });
5027
5028 log::trace!("queueing call {guest_thread:?}");
5029
5030 let instance_flags = if callback.is_none() {
5031 None
5032 } else {
5033 Some(flags)
5034 };
5035
5036 unsafe {
5040 instance.queue_call(
5041 store,
5042 guest_thread,
5043 SendSyncPtr::new(callee),
5044 param_count,
5045 1,
5046 instance_flags,
5047 is_concurrent,
5048 callback,
5049 post_return.map(SendSyncPtr::new),
5050 )
5051 }
5052}