1use super::table::{TableDebug, TableId};
2use super::{
3 Event, GlobalErrorContextRefCount, HostTaskOutput, LocalErrorContextRefCount, StateTable,
4 Waitable, WaitableCommon, WaitableState,
5};
6use crate::component::concurrent::{ConcurrentState, tls};
7use crate::component::func::{self, LiftContext, LowerContext, Options};
8use crate::component::matching::InstanceType;
9use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
10use crate::component::{Instance, Lower, Val, WasmList, WasmStr};
11use crate::store::{StoreOpaque, StoreToken};
12use crate::vm::{VMFuncRef, VMMemoryDefinition, VMStore};
13use crate::{AsContextMut, StoreContextMut, ValRaw};
14use anyhow::{Context, Result, anyhow, bail};
15use buffers::Extender;
16use buffers::UntypedWriteBuffer;
17use futures::channel::{mpsc, oneshot};
18use futures::future::{self, FutureExt};
19use futures::stream::StreamExt;
20use std::boxed::Box;
21use std::fmt;
22use std::future::Future;
23use std::iter;
24use std::marker::PhantomData;
25use std::mem::{self, MaybeUninit};
26use std::ptr::NonNull;
27use std::string::{String, ToString};
28use std::sync::{Arc, Mutex};
29use std::task::{Poll, Waker};
30use std::vec::Vec;
31use wasmtime_environ::component::{
32 CanonicalAbiInfo, ComponentTypes, InterfaceType, RuntimeComponentInstanceIndex, StringEncoding,
33 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
34 TypeFutureTableIndex, TypeStreamTableIndex,
35};
36
37pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
38
39mod buffers;
40
41#[derive(Copy, Clone, Debug)]
44enum TransmitKind {
45 Stream,
46 Future,
47}
48
49#[derive(Copy, Clone, Debug, PartialEq)]
51pub enum ReturnCode {
52 Blocked,
53 Completed(u32),
54 Dropped(u32),
55 Cancelled(u32),
56}
57
58impl ReturnCode {
59 pub fn encode(&self) -> u32 {
64 const BLOCKED: u32 = 0xffff_ffff;
65 const COMPLETED: u32 = 0x0;
66 const DROPPED: u32 = 0x1;
67 const CANCELLED: u32 = 0x2;
68 match self {
69 ReturnCode::Blocked => BLOCKED,
70 ReturnCode::Completed(n) => {
71 debug_assert!(*n < (1 << 28));
72 (n << 4) | COMPLETED
73 }
74 ReturnCode::Dropped(n) => {
75 debug_assert!(*n < (1 << 28));
76 (n << 4) | DROPPED
77 }
78 ReturnCode::Cancelled(n) => {
79 debug_assert!(*n < (1 << 28));
80 (n << 4) | CANCELLED
81 }
82 }
83 }
84
85 fn completed(kind: TransmitKind, count: u32) -> Self {
88 Self::Completed(if let TransmitKind::Future = kind {
89 0
90 } else {
91 count
92 })
93 }
94}
95
96#[derive(Copy, Clone, Debug)]
101pub(super) enum TableIndex {
102 Stream(TypeStreamTableIndex),
103 Future(TypeFutureTableIndex),
104}
105
106impl TableIndex {
107 fn kind(&self) -> TransmitKind {
108 match self {
109 TableIndex::Stream(_) => TransmitKind::Stream,
110 TableIndex::Future(_) => TransmitKind::Future,
111 }
112 }
113}
114
115enum PostWrite {
117 Continue,
119 Drop,
121}
122
123struct HostResult<B> {
125 buffer: B,
127 dropped: bool,
129}
130
131fn payload(ty: TableIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
134 match ty {
135 TableIndex::Future(ty) => types[types[ty].ty].payload,
136 TableIndex::Stream(ty) => types[types[ty].ty].payload,
137 }
138}
139
140fn get_mut_by_index_from(
143 state_table: &mut StateTable<WaitableState>,
144 ty: TableIndex,
145 index: u32,
146) -> Result<(u32, &mut StreamFutureState)> {
147 Ok(match ty {
148 TableIndex::Stream(ty) => {
149 let (rep, WaitableState::Stream(actual_ty, state)) =
150 state_table.get_mut_by_index(index)?
151 else {
152 bail!("invalid stream handle");
153 };
154 if *actual_ty != ty {
155 bail!("invalid stream handle");
156 }
157 (rep, state)
158 }
159 TableIndex::Future(ty) => {
160 let (rep, WaitableState::Future(actual_ty, state)) =
161 state_table.get_mut_by_index(index)?
162 else {
163 bail!("invalid future handle");
164 };
165 if *actual_ty != ty {
166 bail!("invalid future handle");
167 }
168 (rep, state)
169 }
170 })
171}
172
173fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState {
175 match ty {
176 TableIndex::Stream(ty) => WaitableState::Stream(ty, state),
177 TableIndex::Future(ty) => WaitableState::Future(ty, state),
178 }
179}
180
181fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
189 store: StoreContextMut<U>,
190 mut buffer: B,
191 tx: oneshot::Sender<HostResult<B>>,
192 kind: TransmitKind,
193) -> impl FnOnce(&mut dyn VMStore, Instance, Reader) -> Result<ReturnCode>
194+ Send
195+ Sync
196+ 'static
197+ use<T, B, U> {
198 let token = StoreToken::new(store);
199 move |store, instance, reader| {
200 let code = match reader {
201 Reader::Guest {
202 options,
203 ty,
204 address,
205 count,
206 } => {
207 let mut store = token.as_context_mut(store);
208 let types = instance.id().get(store.0).component().types().clone();
209 let count = buffer.remaining().len().min(count);
210
211 let lower =
212 &mut LowerContext::new(store.as_context_mut(), options, &types, instance);
213 if address % usize::try_from(T::ALIGN32)? != 0 {
214 bail!("read pointer not aligned");
215 }
216 lower
217 .as_slice_mut()
218 .get_mut(address..)
219 .and_then(|b| b.get_mut(..T::SIZE32 * count))
220 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
221
222 if let Some(ty) = payload(ty, &types) {
223 T::linear_store_list_to_memory(
224 lower,
225 ty,
226 address,
227 &buffer.remaining()[..count],
228 )?;
229 }
230
231 buffer.skip(count);
232 _ = tx.send(HostResult {
233 buffer,
234 dropped: false,
235 });
236 ReturnCode::completed(kind, count.try_into().unwrap())
237 }
238 Reader::Host { accept } => {
239 let count = buffer.remaining().len();
240 let mut untyped = UntypedWriteBuffer::new(&mut buffer);
241 let count = accept(&mut untyped, count);
242 _ = tx.send(HostResult {
243 buffer,
244 dropped: false,
245 });
246 ReturnCode::completed(kind, count.try_into().unwrap())
247 }
248 Reader::End => {
249 _ = tx.send(HostResult {
250 buffer,
251 dropped: true,
252 });
253 ReturnCode::Dropped(0)
254 }
255 };
256
257 Ok(code)
258 }
259}
260
261fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
269 mut buffer: B,
270 tx: oneshot::Sender<HostResult<B>>,
271 kind: TransmitKind,
272) -> impl FnOnce(Writer) -> Result<ReturnCode> + Send + Sync + 'static {
273 move |writer| {
274 let count = match writer {
275 Writer::Guest {
276 lift,
277 ty,
278 address,
279 count,
280 } => {
281 let count = count.min(buffer.remaining_capacity());
282 if T::IS_RUST_UNIT_TYPE {
283 buffer.extend(
287 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() })
288 .take(count),
289 )
290 } else {
291 let ty = ty.unwrap();
292 if address % usize::try_from(T::ALIGN32)? != 0 {
293 bail!("write pointer not aligned");
294 }
295 lift.memory()
296 .get(address..)
297 .and_then(|b| b.get(..T::SIZE32 * count))
298 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
299
300 let list = &WasmList::new(address, count, lift, ty)?;
301 T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))?
302 }
303 _ = tx.send(HostResult {
304 buffer,
305 dropped: false,
306 });
307 ReturnCode::completed(kind, count.try_into().unwrap())
308 }
309 Writer::Host {
310 buffer: input,
311 count,
312 } => {
313 let count = count.min(buffer.remaining_capacity());
314 buffer.move_from(input.get_mut::<T>(), count);
315 _ = tx.send(HostResult {
316 buffer,
317 dropped: false,
318 });
319 ReturnCode::completed(kind, count.try_into().unwrap())
320 }
321 Writer::End => {
322 _ = tx.send(HostResult {
323 buffer,
324 dropped: true,
325 });
326 ReturnCode::Dropped(0)
327 }
328 };
329
330 Ok(count)
331 }
332}
333
334#[derive(Debug, Eq, PartialEq)]
337pub(super) enum StreamFutureState {
338 Write {
340 done: bool,
344 },
345 Read {
347 done: bool,
351 },
352 Busy,
354}
355
356#[derive(Debug, PartialEq, Eq, PartialOrd)]
358pub(super) struct ErrorContextState {
359 pub(crate) debug_msg: String,
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366pub(super) struct FlatAbi {
367 pub(super) size: u32,
368 pub(super) align: u32,
369}
370
371enum WriteEvent<B> {
375 Write {
378 buffer: B,
379 tx: oneshot::Sender<HostResult<B>>,
380 },
381 Drop(Option<Box<dyn FnOnce() -> B + Send + Sync>>),
383 Watch { tx: oneshot::Sender<()> },
386}
387
388enum ReadEvent<B> {
392 Read {
395 buffer: B,
396 tx: oneshot::Sender<HostResult<B>>,
397 },
398 Drop,
400 Watch { tx: oneshot::Sender<()> },
403}
404
405fn send<T>(tx: &mut mpsc::Sender<T>, value: T) {
412 if let Err(e) = tx.try_send(value) {
413 if e.is_full() {
414 unreachable!();
415 }
416 }
417}
418
419pub struct Watch<T> {
426 inner: T,
427 waker: Arc<Mutex<WatchState>>,
428}
429
430enum WatchState {
431 Idle,
432 Waiting(Waker),
433 Done,
434}
435
436impl<T> Watch<T> {
437 pub fn into_inner(self) -> T {
442 let state = mem::replace(&mut *self.waker.lock().unwrap(), WatchState::Done);
443 if let WatchState::Waiting(waker) = state {
444 waker.wake();
445 }
446 self.inner
447 }
448}
449
450fn watch<T: Send + 'static>(
454 instance: Instance,
455 mut rx: oneshot::Receiver<()>,
456 inner: T,
457) -> (impl Future<Output = ()> + Send + 'static, Watch<T>) {
458 let waker = Arc::new(Mutex::new(WatchState::Idle));
459 (
460 super::checked(
461 instance,
462 future::poll_fn({
463 let waker = waker.clone();
464
465 move |cx| {
466 if rx.poll_unpin(cx).is_ready() {
467 return Poll::Ready(());
468 }
469 let mut state = waker.lock().unwrap();
470 match *state {
471 WatchState::Done => Poll::Ready(()),
472 _ => {
473 *state = WatchState::Waiting(cx.waker().clone());
474 Poll::Pending
475 }
476 }
477 }
478 }),
479 ),
480 Watch { waker, inner },
481 )
482}
483
484pub struct FutureWriter<T: 'static> {
486 default: Option<fn() -> T>,
487 instance: Instance,
488 tx: Option<mpsc::Sender<WriteEvent<Option<T>>>>,
489}
490
491impl<T> FutureWriter<T> {
492 fn new(
493 default: fn() -> T,
494 tx: Option<mpsc::Sender<WriteEvent<Option<T>>>>,
495 instance: Instance,
496 ) -> Self {
497 Self {
498 default: Some(default),
499 instance,
500 tx,
501 }
502 }
503
504 pub fn write(mut self, value: T) -> impl Future<Output = bool> + Send + 'static
514 where
515 T: Send + 'static,
516 {
517 let (tx, rx) = oneshot::channel();
518 send(
519 &mut self.tx.as_mut().unwrap(),
520 WriteEvent::Write {
521 buffer: Some(value),
522 tx,
523 },
524 );
525 self.default = None;
526 let instance = self.instance;
527 super::checked(
528 instance,
529 rx.map(move |v| {
530 drop(self);
531 match v {
532 Ok(HostResult { dropped, .. }) => !dropped,
533 Err(_) => todo!("guarantee buffer recovery if event loop errors or panics"),
534 }
535 }),
536 )
537 }
538
539 pub fn watch_reader(mut self) -> (impl Future<Output = ()> + Send + 'static, Watch<Self>)
551 where
552 T: Send + 'static,
553 {
554 let (tx, rx) = oneshot::channel();
555 send(&mut self.tx.as_mut().unwrap(), WriteEvent::Watch { tx });
556 let instance = self.instance;
557 watch(instance, rx, self)
558 }
559}
560
561impl<T> Drop for FutureWriter<T> {
562 fn drop(&mut self) {
563 if let Some(mut tx) = self.tx.take() {
564 send(
565 &mut tx,
566 WriteEvent::Drop(self.default.take().map(|v| {
567 Box::new(move || Some(v()))
568 as Box<dyn FnOnce() -> Option<T> + Send + Sync + 'static>
569 })),
570 );
571 }
572 }
573}
574
575pub struct HostFuture<T> {
584 instance: Instance,
585 rep: u32,
586 _phantom: PhantomData<T>,
587}
588
589impl<T> HostFuture<T> {
590 fn new(rep: u32, instance: Instance) -> Self {
592 Self {
593 instance,
594 rep,
595 _phantom: PhantomData,
596 }
597 }
598
599 pub fn into_reader(self, mut store: impl AsContextMut) -> FutureReader<T>
601 where
602 T: func::Lower + func::Lift + Send + Sync + 'static,
603 {
604 FutureReader {
605 instance: self.instance,
606 rep: self.rep,
607 tx: Some(self.instance.start_read_event_loop(
608 store.as_context_mut(),
609 self.rep,
610 TransmitKind::Future,
611 )),
612 }
613 }
614
615 pub fn into_val(self) -> Val {
618 Val::Future(FutureAny(self.rep))
619 }
620
621 pub fn from_val(
623 mut store: impl AsContextMut<Data: Send>,
624 instance: Instance,
625 value: &Val,
626 ) -> Result<Self> {
627 let Val::Future(FutureAny(rep)) = value else {
628 bail!("expected `future`; got `{}`", value.desc());
629 };
630 let store = store.as_context_mut();
631 instance
632 .concurrent_state_mut(store.0)
633 .get(TableId::<TransmitHandle>::new(*rep))?; Ok(Self::new(*rep, instance))
635 }
636
637 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
639 match ty {
640 InterfaceType::Future(src) => {
641 let state_table = cx
642 .instance_mut()
643 .concurrent_state_mut()
644 .state_table(TableIndex::Future(src));
645 let (rep, state) =
646 get_mut_by_index_from(state_table, TableIndex::Future(src), index)?;
647
648 match state {
649 StreamFutureState::Read { .. } => {
650 state_table.remove_by_index(index)?;
651 }
652 StreamFutureState::Write { .. } => bail!("cannot transfer write end of future"),
653 StreamFutureState::Busy => bail!("cannot transfer busy future"),
654 }
655
656 let concurrent_state = cx.instance_mut().concurrent_state_mut();
657 let state = concurrent_state
658 .get(TableId::<TransmitHandle>::new(rep))?
659 .state;
660
661 if concurrent_state.get(state)?.done {
662 bail!("cannot lift future after previous read succeeded");
663 }
664
665 Ok(Self::new(rep, cx.instance_handle()))
666 }
667 _ => func::bad_type_info(),
668 }
669 }
670}
671
672pub(crate) fn lower_future_to_index<U>(
674 rep: u32,
675 cx: &mut LowerContext<'_, U>,
676 ty: InterfaceType,
677) -> Result<u32> {
678 match ty {
679 InterfaceType::Future(dst) => {
680 let concurrent_state = cx.instance_mut().concurrent_state_mut();
681 let state = concurrent_state
682 .get(TableId::<TransmitHandle>::new(rep))?
683 .state;
684 let rep = concurrent_state.get(state)?.read_handle.rep();
685
686 concurrent_state
687 .state_table(TableIndex::Future(dst))
688 .insert(
689 rep,
690 WaitableState::Future(dst, StreamFutureState::Read { done: false }),
691 )
692 }
693 _ => func::bad_type_info(),
694 }
695}
696
697unsafe impl<T: Send + Sync> func::ComponentType for HostFuture<T> {
700 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
701
702 type Lower = <u32 as func::ComponentType>::Lower;
703
704 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
705 match ty {
706 InterfaceType::Future(_) => Ok(()),
707 other => bail!("expected `future`, found `{}`", func::desc(other)),
708 }
709 }
710}
711
712unsafe impl<T: Send + Sync> func::Lower for HostFuture<T> {
714 fn linear_lower_to_flat<U>(
715 &self,
716 cx: &mut LowerContext<'_, U>,
717 ty: InterfaceType,
718 dst: &mut MaybeUninit<Self::Lower>,
719 ) -> Result<()> {
720 lower_future_to_index(self.rep, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
721 }
722
723 fn linear_lower_to_memory<U>(
724 &self,
725 cx: &mut LowerContext<'_, U>,
726 ty: InterfaceType,
727 offset: usize,
728 ) -> Result<()> {
729 lower_future_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
730 cx,
731 InterfaceType::U32,
732 offset,
733 )
734 }
735}
736
737unsafe impl<T: Send + Sync> func::Lift for HostFuture<T> {
739 fn linear_lift_from_flat(
740 cx: &mut LiftContext<'_>,
741 ty: InterfaceType,
742 src: &Self::Lower,
743 ) -> Result<Self> {
744 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
745 Self::lift_from_index(cx, ty, index)
746 }
747
748 fn linear_lift_from_memory(
749 cx: &mut LiftContext<'_>,
750 ty: InterfaceType,
751 bytes: &[u8],
752 ) -> Result<Self> {
753 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
754 Self::lift_from_index(cx, ty, index)
755 }
756}
757
758impl<T> From<FutureReader<T>> for HostFuture<T> {
759 fn from(mut value: FutureReader<T>) -> Self {
760 value.tx.take();
761
762 Self {
763 instance: value.instance,
764 rep: value.rep,
765 _phantom: PhantomData,
766 }
767 }
768}
769
770pub struct FutureReader<T> {
775 instance: Instance,
776 rep: u32,
777 tx: Option<mpsc::Sender<ReadEvent<Option<T>>>>,
778}
779
780impl<T> FutureReader<T> {
781 fn new(rep: u32, tx: Option<mpsc::Sender<ReadEvent<Option<T>>>>, instance: Instance) -> Self {
782 Self { instance, rep, tx }
783 }
784
785 pub fn read(mut self) -> impl Future<Output = Option<T>> + Send + 'static
794 where
795 T: Send + 'static,
796 {
797 let (tx, rx) = oneshot::channel();
798 send(
799 &mut self.tx.as_mut().unwrap(),
800 ReadEvent::Read { buffer: None, tx },
801 );
802 let instance = self.instance;
803 super::checked(
804 instance,
805 rx.map(move |v| {
806 drop(self);
807
808 if let Ok(HostResult {
809 mut buffer,
810 dropped: false,
811 }) = v
812 {
813 buffer.take()
814 } else {
815 None
816 }
817 }),
818 )
819 }
820
821 pub fn watch_writer(mut self) -> (impl Future<Output = ()> + Send + 'static, Watch<Self>)
833 where
834 T: Send + 'static,
835 {
836 let (tx, rx) = oneshot::channel();
837 send(&mut self.tx.as_mut().unwrap(), ReadEvent::Watch { tx });
838 let instance = self.instance;
839 watch(instance, rx, self)
840 }
841}
842
843impl<T> Drop for FutureReader<T> {
844 fn drop(&mut self) {
845 if let Some(mut tx) = self.tx.take() {
846 send(&mut tx, ReadEvent::Drop);
847 }
848 }
849}
850
851pub struct StreamWriter<B> {
853 instance: Instance,
854 tx: Option<mpsc::Sender<WriteEvent<B>>>,
855}
856
857impl<B> StreamWriter<B> {
858 fn new(tx: Option<mpsc::Sender<WriteEvent<B>>>, instance: Instance) -> Self {
859 Self { instance, tx }
860 }
861
862 pub fn write(
879 mut self,
880 buffer: B,
881 ) -> impl Future<Output = (Option<StreamWriter<B>>, B)> + Send + 'static
882 where
883 B: Send + 'static,
884 {
885 let (tx, rx) = oneshot::channel();
886 send(self.tx.as_mut().unwrap(), WriteEvent::Write { buffer, tx });
887 let instance = self.instance;
888 super::checked(
889 instance,
890 rx.map(move |v| match v {
891 Ok(HostResult { buffer, dropped }) => ((!dropped).then_some(self), buffer),
892 Err(_) => todo!("guarantee buffer recovery if event loop errors or panics"),
893 }),
894 )
895 }
896
897 pub fn write_all<T>(
910 self,
911 buffer: B,
912 ) -> impl Future<Output = (Option<StreamWriter<B>>, B)> + Send + 'static
913 where
914 B: WriteBuffer<T>,
915 {
916 let instance = self.instance;
917 super::checked(
918 instance,
919 self.write(buffer).then(|(me, buffer)| async move {
920 if let Some(me) = me {
921 if buffer.remaining().len() > 0 {
922 Box::pin(me.write_all(buffer)).await
925 } else {
926 (Some(me), buffer)
927 }
928 } else {
929 (None, buffer)
930 }
931 }),
932 )
933 }
934
935 pub fn watch_reader(mut self) -> (impl Future<Output = ()> + Send + 'static, Watch<Self>)
947 where
948 B: Send + 'static,
949 {
950 let (tx, rx) = oneshot::channel();
951 send(&mut self.tx.as_mut().unwrap(), WriteEvent::Watch { tx });
952 let instance = self.instance;
953 watch(instance, rx, self)
954 }
955}
956
957impl<T> Drop for StreamWriter<T> {
958 fn drop(&mut self) {
959 if let Some(mut tx) = self.tx.take() {
960 send(&mut tx, WriteEvent::Drop(None));
961 }
962 }
963}
964
965pub struct HostStream<T> {
974 instance: Instance,
975 rep: u32,
976 _phantom: PhantomData<T>,
977}
978
979impl<T> HostStream<T> {
980 fn new(rep: u32, instance: Instance) -> Self {
982 Self {
983 instance,
984 rep,
985 _phantom: PhantomData,
986 }
987 }
988
989 pub fn into_reader<B>(self, mut store: impl AsContextMut) -> StreamReader<B>
991 where
992 T: func::Lower + func::Lift + Send + 'static,
993 B: ReadBuffer<T>,
994 {
995 StreamReader {
996 instance: self.instance,
997 rep: self.rep,
998 tx: Some(self.instance.start_read_event_loop(
999 store.as_context_mut(),
1000 self.rep,
1001 TransmitKind::Stream,
1002 )),
1003 }
1004 }
1005
1006 pub fn into_val(self) -> Val {
1009 Val::Stream(StreamAny(self.rep))
1010 }
1011
1012 pub fn from_val(
1014 mut store: impl AsContextMut<Data: Send>,
1015 instance: Instance,
1016 value: &Val,
1017 ) -> Result<Self> {
1018 let Val::Stream(StreamAny(rep)) = value else {
1019 bail!("expected `stream`; got `{}`", value.desc());
1020 };
1021 let store = store.as_context_mut();
1022 instance
1023 .concurrent_state_mut(store.0)
1024 .get(TableId::<TransmitHandle>::new(*rep))?; Ok(Self::new(*rep, instance))
1026 }
1027
1028 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1030 match ty {
1031 InterfaceType::Stream(src) => {
1032 let state_table = cx
1033 .instance_mut()
1034 .concurrent_state_mut()
1035 .state_table(TableIndex::Stream(src));
1036 let (rep, state) =
1037 get_mut_by_index_from(state_table, TableIndex::Stream(src), index)?;
1038
1039 match state {
1040 StreamFutureState::Read { done: true } => bail!(
1041 "cannot lift stream after being notified that the writable end dropped"
1042 ),
1043 StreamFutureState::Read { done: false } => {
1044 state_table.remove_by_index(index)?;
1045 }
1046 StreamFutureState::Write { .. } => bail!("cannot transfer write end of stream"),
1047 StreamFutureState::Busy => bail!("cannot transfer busy stream"),
1048 }
1049
1050 Ok(Self::new(rep, cx.instance_handle()))
1051 }
1052 _ => func::bad_type_info(),
1053 }
1054 }
1055}
1056
1057pub(crate) fn lower_stream_to_index<U>(
1059 rep: u32,
1060 cx: &mut LowerContext<'_, U>,
1061 ty: InterfaceType,
1062) -> Result<u32> {
1063 match ty {
1064 InterfaceType::Stream(dst) => {
1065 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1066 let state = concurrent_state
1067 .get(TableId::<TransmitHandle>::new(rep))?
1068 .state;
1069 let rep = concurrent_state.get(state)?.read_handle.rep();
1070
1071 concurrent_state
1072 .state_table(TableIndex::Stream(dst))
1073 .insert(
1074 rep,
1075 WaitableState::Stream(dst, StreamFutureState::Read { done: false }),
1076 )
1077 }
1078 _ => func::bad_type_info(),
1079 }
1080}
1081
1082unsafe impl<T: Send + Sync> func::ComponentType for HostStream<T> {
1085 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1086
1087 type Lower = <u32 as func::ComponentType>::Lower;
1088
1089 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1090 match ty {
1091 InterfaceType::Stream(_) => Ok(()),
1092 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1093 }
1094 }
1095}
1096
1097unsafe impl<T: Send + Sync> func::Lower for HostStream<T> {
1099 fn linear_lower_to_flat<U>(
1100 &self,
1101 cx: &mut LowerContext<'_, U>,
1102 ty: InterfaceType,
1103 dst: &mut MaybeUninit<Self::Lower>,
1104 ) -> Result<()> {
1105 lower_stream_to_index(self.rep, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1106 }
1107
1108 fn linear_lower_to_memory<U>(
1109 &self,
1110 cx: &mut LowerContext<'_, U>,
1111 ty: InterfaceType,
1112 offset: usize,
1113 ) -> Result<()> {
1114 lower_stream_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1115 cx,
1116 InterfaceType::U32,
1117 offset,
1118 )
1119 }
1120}
1121
1122unsafe impl<T: Send + Sync> func::Lift for HostStream<T> {
1124 fn linear_lift_from_flat(
1125 cx: &mut LiftContext<'_>,
1126 ty: InterfaceType,
1127 src: &Self::Lower,
1128 ) -> Result<Self> {
1129 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1130 Self::lift_from_index(cx, ty, index)
1131 }
1132
1133 fn linear_lift_from_memory(
1134 cx: &mut LiftContext<'_>,
1135 ty: InterfaceType,
1136 bytes: &[u8],
1137 ) -> Result<Self> {
1138 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1139 Self::lift_from_index(cx, ty, index)
1140 }
1141}
1142
1143impl<T, B> From<StreamReader<B>> for HostStream<T> {
1144 fn from(mut value: StreamReader<B>) -> Self {
1145 value.tx.take();
1146
1147 Self {
1148 instance: value.instance,
1149 rep: value.rep,
1150 _phantom: PhantomData,
1151 }
1152 }
1153}
1154
1155pub struct StreamReader<B> {
1160 instance: Instance,
1161 rep: u32,
1162 tx: Option<mpsc::Sender<ReadEvent<B>>>,
1163}
1164
1165impl<B> StreamReader<B> {
1166 fn new(rep: u32, tx: Option<mpsc::Sender<ReadEvent<B>>>, instance: Instance) -> Self {
1167 Self { instance, rep, tx }
1168 }
1169
1170 pub fn read(
1182 mut self,
1183 buffer: B,
1184 ) -> impl Future<Output = (Option<StreamReader<B>>, B)> + Send + 'static
1185 where
1186 B: Send + 'static,
1187 {
1188 let (tx, rx) = oneshot::channel();
1189 send(self.tx.as_mut().unwrap(), ReadEvent::Read { buffer, tx });
1190 let instance = self.instance;
1191 super::checked(
1192 instance,
1193 rx.map(move |v| match v {
1194 Ok(HostResult { buffer, dropped }) => ((!dropped).then_some(self), buffer),
1195 Err(_) => {
1196 todo!("guarantee buffer recovery if event loop errors or panics")
1197 }
1198 }),
1199 )
1200 }
1201
1202 pub fn watch_writer(mut self) -> (impl Future<Output = ()> + Send + 'static, Watch<Self>)
1214 where
1215 B: Send + 'static,
1216 {
1217 let (tx, rx) = oneshot::channel();
1218 send(&mut self.tx.as_mut().unwrap(), ReadEvent::Watch { tx });
1219 let instance = self.instance;
1220 watch(instance, rx, self)
1221 }
1222}
1223
1224impl<B> Drop for StreamReader<B> {
1225 fn drop(&mut self) {
1226 if let Some(mut tx) = self.tx.take() {
1227 send(&mut tx, ReadEvent::Drop);
1228 }
1229 }
1230}
1231
1232pub struct ErrorContext {
1234 rep: u32,
1235}
1236
1237impl ErrorContext {
1238 pub(crate) fn new(rep: u32) -> Self {
1239 Self { rep }
1240 }
1241
1242 pub fn into_val(self) -> Val {
1244 Val::ErrorContext(ErrorContextAny(self.rep))
1245 }
1246
1247 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1249 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1250 bail!("expected `error-context`; got `{}`", value.desc());
1251 };
1252 Ok(Self::new(*rep))
1253 }
1254
1255 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1256 match ty {
1257 InterfaceType::ErrorContext(src) => {
1258 let (rep, _) = cx
1259 .instance_mut()
1260 .concurrent_state_mut()
1261 .error_context_tables
1262 .get_mut(src)
1263 .expect("error context table index present in (sub)component table during lift")
1264 .get_mut_by_index(index)?;
1265
1266 Ok(Self { rep })
1267 }
1268 _ => func::bad_type_info(),
1269 }
1270 }
1271}
1272
1273pub(crate) fn lower_error_context_to_index<U>(
1274 rep: u32,
1275 cx: &mut LowerContext<'_, U>,
1276 ty: InterfaceType,
1277) -> Result<u32> {
1278 match ty {
1279 InterfaceType::ErrorContext(dst) => {
1280 let tbl = cx
1281 .instance_mut()
1282 .concurrent_state_mut()
1283 .error_context_tables
1284 .get_mut(dst)
1285 .expect("error context table index present in (sub)component table during lower");
1286
1287 if let Some((dst_idx, dst_state)) = tbl.get_mut_by_rep(rep) {
1288 dst_state.0 += 1;
1289 Ok(dst_idx)
1290 } else {
1291 tbl.insert(rep, LocalErrorContextRefCount(1))
1292 }
1293 }
1294 _ => func::bad_type_info(),
1295 }
1296}
1297
1298unsafe impl func::ComponentType for ErrorContext {
1301 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1302
1303 type Lower = <u32 as func::ComponentType>::Lower;
1304
1305 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1306 match ty {
1307 InterfaceType::ErrorContext(_) => Ok(()),
1308 other => bail!("expected `error`, found `{}`", func::desc(other)),
1309 }
1310 }
1311}
1312
1313unsafe impl func::Lower for ErrorContext {
1315 fn linear_lower_to_flat<T>(
1316 &self,
1317 cx: &mut LowerContext<'_, T>,
1318 ty: InterfaceType,
1319 dst: &mut MaybeUninit<Self::Lower>,
1320 ) -> Result<()> {
1321 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1322 cx,
1323 InterfaceType::U32,
1324 dst,
1325 )
1326 }
1327
1328 fn linear_lower_to_memory<T>(
1329 &self,
1330 cx: &mut LowerContext<'_, T>,
1331 ty: InterfaceType,
1332 offset: usize,
1333 ) -> Result<()> {
1334 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1335 cx,
1336 InterfaceType::U32,
1337 offset,
1338 )
1339 }
1340}
1341
1342unsafe impl func::Lift for ErrorContext {
1344 fn linear_lift_from_flat(
1345 cx: &mut LiftContext<'_>,
1346 ty: InterfaceType,
1347 src: &Self::Lower,
1348 ) -> Result<Self> {
1349 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1350 Self::lift_from_index(cx, ty, index)
1351 }
1352
1353 fn linear_lift_from_memory(
1354 cx: &mut LiftContext<'_>,
1355 ty: InterfaceType,
1356 bytes: &[u8],
1357 ) -> Result<Self> {
1358 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1359 Self::lift_from_index(cx, ty, index)
1360 }
1361}
1362
1363pub(super) struct TransmitHandle {
1365 pub(super) common: WaitableCommon,
1366 state: TableId<TransmitState>,
1368}
1369
1370impl TransmitHandle {
1371 fn new(state: TableId<TransmitState>) -> Self {
1372 Self {
1373 common: WaitableCommon::default(),
1374 state,
1375 }
1376 }
1377}
1378
1379impl TableDebug for TransmitHandle {
1380 fn type_name() -> &'static str {
1381 "TransmitHandle"
1382 }
1383}
1384
1385struct TransmitState {
1387 write_handle: TableId<TransmitHandle>,
1389 read_handle: TableId<TransmitHandle>,
1391 write: WriteState,
1393 read: ReadState,
1395 writer_watcher: Option<oneshot::Sender<()>>,
1401 reader_watcher: Option<oneshot::Sender<()>>,
1403 done: bool,
1405}
1406
1407impl Default for TransmitState {
1408 fn default() -> Self {
1409 Self {
1410 write_handle: TableId::new(0),
1411 read_handle: TableId::new(0),
1412 read: ReadState::Open,
1413 write: WriteState::Open,
1414 reader_watcher: None,
1415 writer_watcher: None,
1416 done: false,
1417 }
1418 }
1419}
1420
1421impl TableDebug for TransmitState {
1422 fn type_name() -> &'static str {
1423 "TransmitState"
1424 }
1425}
1426
1427enum WriteState {
1429 Open,
1431 GuestReady {
1433 ty: TableIndex,
1434 flat_abi: Option<FlatAbi>,
1435 options: Options,
1436 address: usize,
1437 count: usize,
1438 handle: u32,
1439 post_write: PostWrite,
1440 },
1441 HostReady {
1443 accept:
1444 Box<dyn FnOnce(&mut dyn VMStore, Instance, Reader) -> Result<ReturnCode> + Send + Sync>,
1445 post_write: PostWrite,
1446 },
1447 Dropped,
1449}
1450
1451impl fmt::Debug for WriteState {
1452 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1453 match self {
1454 Self::Open => f.debug_tuple("Open").finish(),
1455 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1456 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1457 Self::Dropped => f.debug_tuple("Dropped").finish(),
1458 }
1459 }
1460}
1461
1462enum ReadState {
1464 Open,
1466 GuestReady {
1468 ty: TableIndex,
1469 flat_abi: Option<FlatAbi>,
1470 options: Options,
1471 address: usize,
1472 count: usize,
1473 handle: u32,
1474 },
1475 HostReady {
1477 accept: Box<dyn FnOnce(Writer) -> Result<ReturnCode> + Send + Sync>,
1478 },
1479 Dropped,
1481}
1482
1483impl fmt::Debug for ReadState {
1484 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1485 match self {
1486 Self::Open => f.debug_tuple("Open").finish(),
1487 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1488 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1489 Self::Dropped => f.debug_tuple("Dropped").finish(),
1490 }
1491 }
1492}
1493
1494enum Writer<'a> {
1498 Guest {
1500 lift: &'a mut LiftContext<'a>,
1501 ty: Option<InterfaceType>,
1502 address: usize,
1503 count: usize,
1504 },
1505 Host {
1507 buffer: &'a mut UntypedWriteBuffer<'a>,
1508 count: usize,
1509 },
1510 End,
1512}
1513
1514enum Reader<'a> {
1518 Guest {
1520 options: &'a Options,
1521 ty: TableIndex,
1522 address: usize,
1523 count: usize,
1524 },
1525 Host {
1527 accept: Box<dyn FnOnce(&mut UntypedWriteBuffer, usize) -> usize>,
1528 },
1529 End,
1531}
1532
1533impl Instance {
1534 pub fn future<T: func::Lower + func::Lift + Send + Sync + 'static>(
1548 self,
1549 default: fn() -> T,
1550 mut store: impl AsContextMut,
1551 ) -> Result<(FutureWriter<T>, FutureReader<T>)> {
1552 let mut store = store.as_context_mut();
1553 let (write, read) = self.concurrent_state_mut(store.0).new_transmit()?;
1554
1555 Ok((
1556 FutureWriter::new(
1557 default,
1558 Some(self.start_write_event_loop(
1559 store.as_context_mut(),
1560 write.rep(),
1561 TransmitKind::Future,
1562 )),
1563 self,
1564 ),
1565 FutureReader::new(
1566 read.rep(),
1567 Some(self.start_read_event_loop(
1568 store.as_context_mut(),
1569 read.rep(),
1570 TransmitKind::Future,
1571 )),
1572 self,
1573 ),
1574 ))
1575 }
1576
1577 pub fn stream<
1580 T: func::Lower + func::Lift + Send + 'static,
1581 W: WriteBuffer<T>,
1582 R: ReadBuffer<T>,
1583 >(
1584 self,
1585 mut store: impl AsContextMut,
1586 ) -> Result<(StreamWriter<W>, StreamReader<R>)> {
1587 let mut store = store.as_context_mut();
1588 let (write, read) = self.concurrent_state_mut(store.0).new_transmit()?;
1589
1590 Ok((
1591 StreamWriter::new(
1592 Some(self.start_write_event_loop(
1593 store.as_context_mut(),
1594 write.rep(),
1595 TransmitKind::Stream,
1596 )),
1597 self,
1598 ),
1599 StreamReader::new(
1600 read.rep(),
1601 Some(self.start_read_event_loop(
1602 store.as_context_mut(),
1603 read.rep(),
1604 TransmitKind::Stream,
1605 )),
1606 self,
1607 ),
1608 ))
1609 }
1610
1611 fn start_write_event_loop<
1621 T: func::Lower + func::Lift + Send + 'static,
1622 B: WriteBuffer<T>,
1623 U,
1624 >(
1625 self,
1626 mut store: StoreContextMut<U>,
1627 rep: u32,
1628 kind: TransmitKind,
1629 ) -> mpsc::Sender<WriteEvent<B>> {
1630 let (tx, mut rx) = mpsc::channel(1);
1631 let id = TableId::<TransmitHandle>::new(rep);
1632 let run_on_drop =
1633 RunOnDrop::new(move || log::trace!("write event loop for {id:?} dropped"));
1634 let token = StoreToken::new(store.as_context_mut());
1635 let task = Box::pin(
1636 async move {
1637 log::trace!("write event loop for {id:?} started");
1638 let mut my_rep = None;
1639 while let Some(event) = rx.next().await {
1640 if my_rep.is_none() {
1641 my_rep = Some(self.get_state_rep(rep)?);
1642 }
1643 let rep = my_rep.unwrap();
1644 match event {
1645 WriteEvent::Write { buffer, tx } => tls::get(|store| {
1646 self.host_write::<_, _, U>(
1647 token.as_context_mut(store),
1648 rep,
1649 buffer,
1650 PostWrite::Continue,
1651 tx,
1652 kind,
1653 )
1654 })?,
1655 WriteEvent::Drop(default) => tls::get(|store| {
1656 if let Some(default) = default {
1657 self.host_write::<_, _, U>(
1658 token.as_context_mut(store),
1659 rep,
1660 default(),
1661 PostWrite::Continue,
1662 oneshot::channel().0,
1663 kind,
1664 )?;
1665 }
1666 self.concurrent_state_mut(store).host_drop_writer(rep, kind)
1667 })?,
1668 WriteEvent::Watch { tx } => tls::get(|store| {
1669 let state =
1670 self.concurrent_state_mut(store)
1671 .get_mut(TableId::<TransmitState>::new(rep))?;
1672 if !matches!(&state.read, ReadState::Dropped) {
1673 state.reader_watcher = Some(tx);
1674 }
1675 Ok::<_, anyhow::Error>(())
1676 })?,
1677 }
1678 }
1679 Ok(())
1680 }
1681 .map(move |v| {
1682 run_on_drop.cancel();
1683 log::trace!("write event loop for {id:?} finished: {v:?}");
1684 HostTaskOutput::Result(v)
1685 }),
1686 );
1687 self.concurrent_state_mut(store.0).push_future(task);
1688 tx
1689 }
1690
1691 fn start_read_event_loop<T: func::Lower + func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
1694 self,
1695 mut store: StoreContextMut<U>,
1696 rep: u32,
1697 kind: TransmitKind,
1698 ) -> mpsc::Sender<ReadEvent<B>> {
1699 let (tx, mut rx) = mpsc::channel(1);
1700 let id = TableId::<TransmitHandle>::new(rep);
1701 let run_on_drop = RunOnDrop::new(move || log::trace!("read event loop for {id:?} dropped"));
1702 let token = StoreToken::new(store.as_context_mut());
1703 let task = Box::pin(
1704 async move {
1705 log::trace!("read event loop for {id:?} started");
1706 let mut my_rep = None;
1707 while let Some(event) = rx.next().await {
1708 if my_rep.is_none() {
1709 my_rep = Some(self.get_state_rep(rep)?);
1710 }
1711 let rep = my_rep.unwrap();
1712 match event {
1713 ReadEvent::Read { buffer, tx } => tls::get(|store| {
1714 self.host_read::<_, _, U>(
1715 token.as_context_mut(store),
1716 rep,
1717 buffer,
1718 tx,
1719 kind,
1720 )
1721 })?,
1722 ReadEvent::Drop => {
1723 tls::get(|store| self.host_drop_reader(store, rep, kind))?
1724 }
1725 ReadEvent::Watch { tx } => tls::get(|store| {
1726 let state =
1727 self.concurrent_state_mut(store)
1728 .get_mut(TableId::<TransmitState>::new(rep))?;
1729 if !matches!(
1730 &state.write,
1731 WriteState::Dropped
1732 | WriteState::GuestReady {
1733 post_write: PostWrite::Drop,
1734 ..
1735 }
1736 | WriteState::HostReady {
1737 post_write: PostWrite::Drop,
1738 ..
1739 }
1740 ) {
1741 state.writer_watcher = Some(tx);
1742 }
1743 Ok::<_, anyhow::Error>(())
1744 })?,
1745 }
1746 }
1747 Ok(())
1748 }
1749 .map(move |v| {
1750 run_on_drop.cancel();
1751 log::trace!("read event loop for {id:?} finished: {v:?}");
1752 HostTaskOutput::Result(v)
1753 }),
1754 );
1755 self.concurrent_state_mut(store.0).push_future(task);
1756 tx
1757 }
1758
1759 fn host_write<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U>(
1770 self,
1771 mut store: StoreContextMut<U>,
1772 transmit_rep: u32,
1773 mut buffer: B,
1774 mut post_write: PostWrite,
1775 tx: oneshot::Sender<HostResult<B>>,
1776 kind: TransmitKind,
1777 ) -> Result<()> {
1778 let mut store = store.as_context_mut();
1779 let transmit_id = TableId::<TransmitState>::new(transmit_rep);
1780 let transmit = self
1781 .concurrent_state_mut(store.0)
1782 .get_mut(transmit_id)
1783 .with_context(|| format!("retrieving state for transmit [{transmit_rep}]"))?;
1784 log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read);
1785
1786 let new_state = if let ReadState::Dropped = &transmit.read {
1787 ReadState::Dropped
1788 } else {
1789 ReadState::Open
1790 };
1791
1792 match mem::replace(&mut transmit.read, new_state) {
1793 ReadState::Open => {
1794 assert!(matches!(&transmit.write, WriteState::Open));
1795
1796 let state = WriteState::HostReady {
1797 accept: Box::new(accept_reader::<T, B, U>(
1798 store.as_context_mut(),
1799 buffer,
1800 tx,
1801 kind,
1802 )),
1803 post_write,
1804 };
1805 self.concurrent_state_mut(store.0)
1806 .get_mut(transmit_id)?
1807 .write = state;
1808 post_write = PostWrite::Continue;
1809 }
1810
1811 ReadState::GuestReady {
1812 ty,
1813 flat_abi: _,
1814 options,
1815 address,
1816 count,
1817 handle,
1818 ..
1819 } => {
1820 if let TransmitKind::Future = kind {
1821 transmit.done = true;
1822 }
1823
1824 let read_handle = transmit.read_handle;
1825 let code = accept_reader::<T, B, U>(store.as_context_mut(), buffer, tx, kind)(
1826 store.0.traitobj_mut(),
1827 self,
1828 Reader::Guest {
1829 options: &options,
1830 ty,
1831 address,
1832 count,
1833 },
1834 )?;
1835
1836 self.concurrent_state_mut(store.0).set_event(
1837 read_handle.rep(),
1838 match ty {
1839 TableIndex::Future(ty) => Event::FutureRead {
1840 code,
1841 pending: Some((ty, handle)),
1842 },
1843 TableIndex::Stream(ty) => Event::StreamRead {
1844 code,
1845 pending: Some((ty, handle)),
1846 },
1847 },
1848 )?;
1849 }
1850
1851 ReadState::HostReady { accept } => {
1852 let count = buffer.remaining().len();
1853 let mut untyped = UntypedWriteBuffer::new(&mut buffer);
1854 let code = accept(Writer::Host {
1855 buffer: &mut untyped,
1856 count,
1857 })?;
1858 let (ReturnCode::Completed(_) | ReturnCode::Dropped(_)) = code else {
1859 unreachable!()
1860 };
1861
1862 _ = tx.send(HostResult {
1863 buffer,
1864 dropped: false,
1865 });
1866 }
1867
1868 ReadState::Dropped => {
1869 _ = tx.send(HostResult {
1870 buffer,
1871 dropped: true,
1872 });
1873 }
1874 }
1875
1876 if let PostWrite::Drop = post_write {
1877 self.concurrent_state_mut(store.0)
1878 .host_drop_writer(transmit_rep, kind)?;
1879 }
1880
1881 Ok(())
1882 }
1883
1884 fn host_read<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
1894 self,
1895 mut store: StoreContextMut<U>,
1896 rep: u32,
1897 mut buffer: B,
1898 tx: oneshot::Sender<HostResult<B>>,
1899 kind: TransmitKind,
1900 ) -> Result<()> {
1901 let store = store.as_context_mut();
1902 let transmit_id = TableId::<TransmitState>::new(rep);
1903 let transmit = self
1904 .concurrent_state_mut(store.0)
1905 .get_mut(transmit_id)
1906 .with_context(|| rep.to_string())?;
1907 log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write);
1908
1909 let new_state = if let WriteState::Dropped = &transmit.write {
1910 WriteState::Dropped
1911 } else {
1912 WriteState::Open
1913 };
1914
1915 match mem::replace(&mut transmit.write, new_state) {
1916 WriteState::Open => {
1917 assert!(matches!(&transmit.read, ReadState::Open));
1918
1919 transmit.read = ReadState::HostReady {
1920 accept: Box::new(accept_writer::<T, B, U>(buffer, tx, kind)),
1921 };
1922 }
1923
1924 WriteState::GuestReady {
1925 ty,
1926 flat_abi: _,
1927 options,
1928 address,
1929 count,
1930 handle,
1931 post_write,
1932 ..
1933 } => {
1934 if let TableIndex::Future(_) = ty {
1935 transmit.done = true;
1936 }
1937
1938 let write_handle = transmit.write_handle;
1939 let types = self.id().get(store.0).component().types().clone();
1940 let lift =
1941 &mut LiftContext::new(store.0.store_opaque_mut(), &options, &types, self);
1942 let code = accept_writer::<T, B, U>(buffer, tx, kind)(Writer::Guest {
1943 lift,
1944 ty: payload(ty, &types),
1945 address,
1946 count,
1947 })?;
1948
1949 let state = self.concurrent_state_mut(store.0);
1950 let pending = if let PostWrite::Drop = post_write {
1951 state.get_mut(transmit_id)?.write = WriteState::Dropped;
1952 false
1953 } else {
1954 true
1955 };
1956
1957 state.set_event(
1958 write_handle.rep(),
1959 match ty {
1960 TableIndex::Future(ty) => Event::FutureWrite {
1961 code,
1962 pending: pending.then_some((ty, handle)),
1963 },
1964 TableIndex::Stream(ty) => Event::StreamWrite {
1965 code,
1966 pending: pending.then_some((ty, handle)),
1967 },
1968 },
1969 )?;
1970 }
1971
1972 WriteState::HostReady { accept, post_write } => {
1973 accept(
1974 store.0.traitobj_mut(),
1975 self,
1976 Reader::Host {
1977 accept: Box::new(move |input, count| {
1978 let count = count.min(buffer.remaining_capacity());
1979 buffer.move_from(input.get_mut::<T>(), count);
1980 _ = tx.send(HostResult {
1981 buffer,
1982 dropped: false,
1983 });
1984 count
1985 }),
1986 },
1987 )?;
1988
1989 if let PostWrite::Drop = post_write {
1990 self.concurrent_state_mut(store.0)
1991 .get_mut(transmit_id)?
1992 .write = WriteState::Dropped;
1993 }
1994 }
1995
1996 WriteState::Dropped => {
1997 _ = tx.send(HostResult {
1998 buffer,
1999 dropped: true,
2000 });
2001 }
2002 }
2003
2004 Ok(())
2005 }
2006
2007 fn host_drop_reader(
2014 self,
2015 store: &mut dyn VMStore,
2016 transmit_rep: u32,
2017 kind: TransmitKind,
2018 ) -> Result<()> {
2019 let transmit_id = TableId::<TransmitState>::new(transmit_rep);
2020 let state = self.concurrent_state_mut(store);
2021 let transmit = state
2022 .get_mut(transmit_id)
2023 .with_context(|| format!("error closing reader {transmit_rep}"))?;
2024 log::trace!(
2025 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2026 transmit.read,
2027 transmit.write
2028 );
2029
2030 transmit.read = ReadState::Dropped;
2031 transmit.reader_watcher = None;
2032
2033 let new_state = if let WriteState::Dropped = &transmit.write {
2036 WriteState::Dropped
2037 } else {
2038 WriteState::Open
2039 };
2040
2041 let write_handle = transmit.write_handle;
2042
2043 match mem::replace(&mut transmit.write, new_state) {
2044 WriteState::GuestReady {
2047 ty,
2048 handle,
2049 post_write,
2050 ..
2051 } => {
2052 if let PostWrite::Drop = post_write {
2053 state.delete_transmit(transmit_id)?;
2054 } else {
2055 state.update_event(
2056 write_handle.rep(),
2057 match ty {
2058 TableIndex::Future(ty) => Event::FutureWrite {
2059 code: ReturnCode::Dropped(0),
2060 pending: Some((ty, handle)),
2061 },
2062 TableIndex::Stream(ty) => Event::StreamWrite {
2063 code: ReturnCode::Dropped(0),
2064 pending: Some((ty, handle)),
2065 },
2066 },
2067 )?;
2068 };
2069 }
2070
2071 WriteState::HostReady { accept, .. } => {
2072 accept(store, self, Reader::End)?;
2073 }
2074
2075 WriteState::Open => {
2076 state.update_event(
2077 write_handle.rep(),
2078 match kind {
2079 TransmitKind::Future => Event::FutureWrite {
2080 code: ReturnCode::Dropped(0),
2081 pending: None,
2082 },
2083 TransmitKind::Stream => Event::StreamWrite {
2084 code: ReturnCode::Dropped(0),
2085 pending: None,
2086 },
2087 },
2088 )?;
2089 }
2090
2091 WriteState::Dropped => {
2092 log::trace!("host_drop_reader delete {transmit_rep}");
2093 state.delete_transmit(transmit_id)?;
2094 }
2095 }
2096 Ok(())
2097 }
2098
2099 fn copy<T: 'static>(
2102 self,
2103 mut store: StoreContextMut<T>,
2104 flat_abi: Option<FlatAbi>,
2105 write_ty: TableIndex,
2106 write_options: &Options,
2107 write_address: usize,
2108 read_ty: TableIndex,
2109 read_options: &Options,
2110 read_address: usize,
2111 count: usize,
2112 rep: u32,
2113 ) -> Result<()> {
2114 let types = self.id().get(store.0).component().types().clone();
2115 match (write_ty, read_ty) {
2116 (TableIndex::Future(write_ty), TableIndex::Future(read_ty)) => {
2117 assert_eq!(count, 1);
2118
2119 let val = types[types[write_ty].ty]
2120 .payload
2121 .map(|ty| {
2122 let abi = types.canonical_abi(&ty);
2123 if write_address % usize::try_from(abi.align32)? != 0 {
2125 bail!("write pointer not aligned");
2126 }
2127
2128 let lift = &mut LiftContext::new(
2129 store.0.store_opaque_mut(),
2130 write_options,
2131 &types,
2132 self,
2133 );
2134 let bytes = lift
2135 .memory()
2136 .get(write_address..)
2137 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2138 .ok_or_else(|| {
2139 anyhow::anyhow!("write pointer out of bounds of memory")
2140 })?;
2141
2142 Val::load(lift, ty, bytes)
2143 })
2144 .transpose()?;
2145
2146 if let Some(val) = val {
2147 let lower =
2148 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2149 let ty = types[types[read_ty].ty].payload.unwrap();
2150 let ptr = func::validate_inbounds_dynamic(
2151 types.canonical_abi(&ty),
2152 lower.as_slice_mut(),
2153 &ValRaw::u32(read_address.try_into().unwrap()),
2154 )?;
2155 val.store(lower, ty, ptr)?;
2156 }
2157 }
2158 (TableIndex::Stream(write_ty), TableIndex::Stream(read_ty)) => {
2159 if let Some(flat_abi) = flat_abi {
2160 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2162 if length_in_bytes > 0 {
2163 if write_address % usize::try_from(flat_abi.align)? != 0 {
2164 bail!("write pointer not aligned");
2165 }
2166 if read_address % usize::try_from(flat_abi.align)? != 0 {
2167 bail!("read pointer not aligned");
2168 }
2169
2170 let store_opaque = store.0.store_opaque_mut();
2171
2172 {
2173 let src = write_options
2174 .memory(store_opaque)
2175 .get(write_address..)
2176 .and_then(|b| b.get(..length_in_bytes))
2177 .ok_or_else(|| {
2178 anyhow::anyhow!("write pointer out of bounds of memory")
2179 })?
2180 .as_ptr();
2181 let dst = read_options
2182 .memory_mut(store_opaque)
2183 .get_mut(read_address..)
2184 .and_then(|b| b.get_mut(..length_in_bytes))
2185 .ok_or_else(|| {
2186 anyhow::anyhow!("read pointer out of bounds of memory")
2187 })?
2188 .as_mut_ptr();
2189 unsafe { src.copy_to(dst, length_in_bytes) };
2192 }
2193 }
2194 } else {
2195 let store_opaque = store.0.store_opaque_mut();
2196 let lift = &mut LiftContext::new(store_opaque, write_options, &types, self);
2197 let ty = types[types[write_ty].ty].payload.unwrap();
2198 let abi = lift.types.canonical_abi(&ty);
2199 let size = usize::try_from(abi.size32).unwrap();
2200 if write_address % usize::try_from(abi.align32)? != 0 {
2201 bail!("write pointer not aligned");
2202 }
2203 let bytes = lift
2204 .memory()
2205 .get(write_address..)
2206 .and_then(|b| b.get(..size * count))
2207 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2208
2209 let values = (0..count)
2210 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2211 .collect::<Result<Vec<_>>>()?;
2212
2213 let id = TableId::<TransmitHandle>::new(rep);
2214 log::trace!("copy values {values:?} for {id:?}");
2215
2216 let lower =
2217 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2218 let ty = types[types[read_ty].ty].payload.unwrap();
2219 let abi = lower.types.canonical_abi(&ty);
2220 if read_address % usize::try_from(abi.align32)? != 0 {
2221 bail!("read pointer not aligned");
2222 }
2223 let size = usize::try_from(abi.size32).unwrap();
2224 lower
2225 .as_slice_mut()
2226 .get_mut(read_address..)
2227 .and_then(|b| b.get_mut(..size * count))
2228 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2229 let mut ptr = read_address;
2230 for value in values {
2231 value.store(lower, ty, ptr)?;
2232 ptr += size
2233 }
2234 }
2235 }
2236 _ => unreachable!(),
2237 }
2238
2239 Ok(())
2240 }
2241
2242 pub(super) unsafe fn guest_write<T: 'static>(
2247 self,
2248 mut store: StoreContextMut<T>,
2249 memory: *mut VMMemoryDefinition,
2250 realloc: *mut VMFuncRef,
2251 string_encoding: u8,
2252 async_: bool,
2253 ty: TableIndex,
2254 flat_abi: Option<FlatAbi>,
2255 handle: u32,
2256 address: u32,
2257 count: u32,
2258 ) -> Result<ReturnCode> {
2259 if !async_ {
2260 bail!("synchronous stream and future writes not yet supported");
2261 }
2262
2263 let address = usize::try_from(address).unwrap();
2264 let count = usize::try_from(count).unwrap();
2265 let options = unsafe {
2268 Options::new(
2269 store.0.store_opaque().id(),
2270 NonNull::new(memory),
2271 NonNull::new(realloc),
2272 StringEncoding::from_u8(string_encoding).unwrap(),
2273 true,
2274 None,
2275 )
2276 };
2277 let concurrent_state = self.concurrent_state_mut(store.0);
2278 let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?;
2279 let StreamFutureState::Write { done } = *state else {
2280 bail!(
2281 "invalid handle {handle}; expected `Write`; got {:?}",
2282 *state
2283 );
2284 };
2285
2286 if done {
2287 bail!("cannot write to stream after being notified that the readable end dropped");
2288 }
2289
2290 *state = StreamFutureState::Busy;
2291 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2292 let transmit_id = concurrent_state.get(transmit_handle)?.state;
2293 let transmit = concurrent_state.get_mut(transmit_id)?;
2294 log::trace!(
2295 "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2296 transmit.read
2297 );
2298
2299 if transmit.done {
2300 bail!("cannot write to future after previous write succeeded or readable end dropped");
2301 }
2302
2303 let new_state = if let ReadState::Dropped = &transmit.read {
2304 ReadState::Dropped
2305 } else {
2306 ReadState::Open
2307 };
2308
2309 let set_guest_ready = |me: &mut ConcurrentState| {
2310 let transmit = me.get_mut(transmit_id)?;
2311 assert!(matches!(&transmit.write, WriteState::Open));
2312 transmit.write = WriteState::GuestReady {
2313 ty,
2314 flat_abi,
2315 options,
2316 address,
2317 count,
2318 handle,
2319 post_write: PostWrite::Continue,
2320 };
2321 Ok::<_, crate::Error>(())
2322 };
2323
2324 let result = match mem::replace(&mut transmit.read, new_state) {
2325 ReadState::GuestReady {
2326 ty: read_ty,
2327 flat_abi: read_flat_abi,
2328 options: read_options,
2329 address: read_address,
2330 count: read_count,
2331 handle: read_handle,
2332 } => {
2333 assert_eq!(flat_abi, read_flat_abi);
2334
2335 if let TableIndex::Future(_) = ty {
2336 transmit.done = true;
2337 }
2338
2339 let write_complete = count == 0 || read_count > 0;
2361 let read_complete = count > 0;
2362 let read_buffer_remaining = count < read_count;
2363
2364 let read_handle_rep = transmit.read_handle.rep();
2365
2366 let count = count.min(read_count);
2367
2368 self.copy(
2369 store.as_context_mut(),
2370 flat_abi,
2371 ty,
2372 &options,
2373 address,
2374 read_ty,
2375 &read_options,
2376 read_address,
2377 count,
2378 rep,
2379 )?;
2380
2381 let instance = self.id().get_mut(store.0);
2382 let types = instance.component().types();
2383 let item_size = payload(ty, types)
2384 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2385 .unwrap_or(0);
2386 let concurrent_state = instance.concurrent_state_mut();
2387 if read_complete {
2388 let count = u32::try_from(count).unwrap();
2389 let total = if let Some(Event::StreamRead {
2390 code: ReturnCode::Completed(old_total),
2391 ..
2392 }) = concurrent_state.take_event(read_handle_rep)?
2393 {
2394 count + old_total
2395 } else {
2396 count
2397 };
2398
2399 let code = ReturnCode::completed(ty.kind(), total);
2400
2401 concurrent_state.set_event(
2402 read_handle_rep,
2403 match read_ty {
2404 TableIndex::Future(ty) => Event::FutureRead {
2405 code,
2406 pending: Some((ty, read_handle)),
2407 },
2408 TableIndex::Stream(ty) => Event::StreamRead {
2409 code,
2410 pending: Some((ty, read_handle)),
2411 },
2412 },
2413 )?;
2414 }
2415
2416 if read_buffer_remaining {
2417 let transmit = concurrent_state.get_mut(transmit_id)?;
2418 transmit.read = ReadState::GuestReady {
2419 ty: read_ty,
2420 flat_abi: read_flat_abi,
2421 options: read_options,
2422 address: read_address + (count * item_size),
2423 count: read_count - count,
2424 handle: read_handle,
2425 };
2426 }
2427
2428 if write_complete {
2429 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2430 } else {
2431 set_guest_ready(concurrent_state)?;
2432 ReturnCode::Blocked
2433 }
2434 }
2435
2436 ReadState::HostReady { accept } => {
2437 if let TableIndex::Future(_) = ty {
2438 transmit.done = true;
2439 }
2440
2441 let types = self.id().get(store.0).component().types().clone();
2442 let lift =
2443 &mut LiftContext::new(store.0.store_opaque_mut(), &options, &types, self);
2444 accept(Writer::Guest {
2445 lift,
2446 ty: payload(ty, &types),
2447 address,
2448 count,
2449 })?
2450 }
2451
2452 ReadState::Open => {
2453 set_guest_ready(concurrent_state)?;
2454 ReturnCode::Blocked
2455 }
2456
2457 ReadState::Dropped => {
2458 if let TableIndex::Future(_) = ty {
2459 transmit.done = true;
2460 }
2461
2462 ReturnCode::Dropped(0)
2463 }
2464 };
2465
2466 if result != ReturnCode::Blocked {
2467 let state = self.concurrent_state_mut(store.0);
2468 *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write {
2469 done: matches!(
2470 (result, ty),
2471 (ReturnCode::Dropped(_), TableIndex::Stream(_))
2472 ),
2473 };
2474 }
2475
2476 Ok(result)
2477 }
2478
2479 pub(super) unsafe fn guest_read<T: 'static>(
2484 self,
2485 mut store: StoreContextMut<T>,
2486 memory: *mut VMMemoryDefinition,
2487 realloc: *mut VMFuncRef,
2488 string_encoding: u8,
2489 async_: bool,
2490 ty: TableIndex,
2491 flat_abi: Option<FlatAbi>,
2492 handle: u32,
2493 address: u32,
2494 count: u32,
2495 ) -> Result<ReturnCode> {
2496 if !async_ {
2497 bail!("synchronous stream and future reads not yet supported");
2498 }
2499
2500 let address = usize::try_from(address).unwrap();
2501 let options = unsafe {
2504 Options::new(
2505 store.0.store_opaque().id(),
2506 NonNull::new(memory),
2507 NonNull::new(realloc),
2508 StringEncoding::from_u8(string_encoding).unwrap(),
2509 true,
2510 None,
2511 )
2512 };
2513 let concurrent_state = self.concurrent_state_mut(store.0);
2514 let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?;
2515 let StreamFutureState::Read { done } = *state else {
2516 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
2517 };
2518
2519 if done {
2520 bail!("cannot read from stream after being notified that the writable end dropped");
2521 }
2522
2523 *state = StreamFutureState::Busy;
2524 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2525 let transmit_id = concurrent_state.get(transmit_handle)?.state;
2526 let transmit = concurrent_state.get_mut(transmit_id)?;
2527 log::trace!(
2528 "guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2529 transmit.write
2530 );
2531
2532 if transmit.done {
2533 bail!("cannot read from future after previous read succeeded");
2534 }
2535
2536 let new_state = if let WriteState::Dropped = &transmit.write {
2537 WriteState::Dropped
2538 } else {
2539 WriteState::Open
2540 };
2541
2542 let set_guest_ready = |me: &mut ConcurrentState| {
2543 let transmit = me.get_mut(transmit_id)?;
2544 assert!(matches!(&transmit.read, ReadState::Open));
2545 transmit.read = ReadState::GuestReady {
2546 ty,
2547 flat_abi,
2548 options,
2549 address,
2550 count: usize::try_from(count).unwrap(),
2551 handle,
2552 };
2553 Ok::<_, crate::Error>(())
2554 };
2555
2556 let result = match mem::replace(&mut transmit.write, new_state) {
2557 WriteState::GuestReady {
2558 ty: write_ty,
2559 flat_abi: write_flat_abi,
2560 options: write_options,
2561 address: write_address,
2562 count: write_count,
2563 handle: write_handle,
2564 post_write,
2565 } => {
2566 assert_eq!(flat_abi, write_flat_abi);
2567
2568 if let TableIndex::Future(_) = ty {
2569 transmit.done = true;
2570 }
2571
2572 let write_handle_rep = transmit.write_handle.rep();
2573
2574 let count = usize::try_from(count).unwrap();
2579
2580 let write_complete = write_count == 0 || count > 0;
2581 let read_complete = write_count > 0;
2582 let write_buffer_remaining = count < write_count;
2583
2584 let count = count.min(write_count);
2585
2586 self.copy(
2587 store.as_context_mut(),
2588 flat_abi,
2589 write_ty,
2590 &write_options,
2591 write_address,
2592 ty,
2593 &options,
2594 address,
2595 count,
2596 rep,
2597 )?;
2598
2599 let instance = self.id().get_mut(store.0);
2600 let types = instance.component().types();
2601 let item_size = payload(ty, types)
2602 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2603 .unwrap_or(0);
2604 let concurrent_state = instance.concurrent_state_mut();
2605 let pending = if let PostWrite::Drop = post_write {
2606 concurrent_state.get_mut(transmit_id)?.write = WriteState::Dropped;
2607 false
2608 } else {
2609 true
2610 };
2611
2612 if write_complete {
2613 let count = u32::try_from(count).unwrap();
2614 let total = if let Some(Event::StreamWrite {
2615 code: ReturnCode::Completed(old_total),
2616 ..
2617 }) = concurrent_state.take_event(write_handle_rep)?
2618 {
2619 count + old_total
2620 } else {
2621 count
2622 };
2623
2624 let code = ReturnCode::completed(ty.kind(), total);
2625
2626 concurrent_state.set_event(
2627 write_handle_rep,
2628 match write_ty {
2629 TableIndex::Future(ty) => Event::FutureWrite {
2630 code,
2631 pending: pending.then_some((ty, write_handle)),
2632 },
2633 TableIndex::Stream(ty) => Event::StreamWrite {
2634 code,
2635 pending: pending.then_some((ty, write_handle)),
2636 },
2637 },
2638 )?;
2639 }
2640
2641 if write_buffer_remaining {
2642 let transmit = concurrent_state.get_mut(transmit_id)?;
2643 transmit.write = WriteState::GuestReady {
2644 ty: write_ty,
2645 flat_abi: write_flat_abi,
2646 options: write_options,
2647 address: write_address + (count * item_size),
2648 count: write_count - count,
2649 handle: write_handle,
2650 post_write,
2651 };
2652 }
2653
2654 if read_complete {
2655 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2656 } else {
2657 set_guest_ready(concurrent_state)?;
2658 ReturnCode::Blocked
2659 }
2660 }
2661
2662 WriteState::HostReady { accept, post_write } => {
2663 if let TableIndex::Future(_) = ty {
2664 transmit.done = true;
2665 }
2666
2667 let code = accept(
2668 store.0.traitobj_mut(),
2669 self,
2670 Reader::Guest {
2671 options: &options,
2672 ty,
2673 address,
2674 count: count.try_into().unwrap(),
2675 },
2676 )?;
2677
2678 if let PostWrite::Drop = post_write {
2679 self.concurrent_state_mut(store.0)
2680 .get_mut(transmit_id)?
2681 .write = WriteState::Dropped;
2682 }
2683
2684 code
2685 }
2686
2687 WriteState::Open => {
2688 set_guest_ready(concurrent_state)?;
2689 ReturnCode::Blocked
2690 }
2691
2692 WriteState::Dropped => ReturnCode::Dropped(0),
2693 };
2694
2695 if result != ReturnCode::Blocked {
2696 let state = self.concurrent_state_mut(store.0);
2697 *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read {
2698 done: matches!(
2699 (result, ty),
2700 (ReturnCode::Dropped(_), TableIndex::Stream(_))
2701 ),
2702 };
2703 }
2704
2705 Ok(result)
2706 }
2707
2708 fn guest_drop_readable(
2710 self,
2711 store: &mut dyn VMStore,
2712 ty: TableIndex,
2713 reader: u32,
2714 ) -> Result<()> {
2715 let concurrent_state = self.concurrent_state_mut(store);
2716 let (rep, state) = concurrent_state.state_table(ty).remove_by_index(reader)?;
2717 let (state, kind) = match state {
2718 WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
2719 WaitableState::Future(_, state) => (state, TransmitKind::Future),
2720 _ => {
2721 bail!("invalid stream or future handle");
2722 }
2723 };
2724 match state {
2725 StreamFutureState::Read { .. } => {}
2726 StreamFutureState::Write { .. } => {
2727 bail!("passed write end to `{{stream|future}}.drop-readable`")
2728 }
2729 StreamFutureState::Busy => bail!("cannot drop busy stream or future"),
2730 }
2731 let id = TableId::<TransmitHandle>::new(rep);
2732 let rep = concurrent_state.get(id)?.state.rep();
2733 log::trace!("guest_drop_readable: drop reader {id:?}");
2734 self.host_drop_reader(store, rep, kind)
2735 }
2736
2737 pub(crate) unsafe fn error_context_new(
2742 self,
2743 store: &mut StoreOpaque,
2744 memory: *mut VMMemoryDefinition,
2745 realloc: *mut VMFuncRef,
2746 string_encoding: u8,
2747 ty: TypeComponentLocalErrorContextTableIndex,
2748 debug_msg_address: u32,
2749 debug_msg_len: u32,
2750 ) -> Result<u32> {
2751 let options = unsafe {
2754 Options::new(
2755 store.id(),
2756 NonNull::new(memory),
2757 NonNull::new(realloc),
2758 StringEncoding::from_u8(string_encoding).ok_or_else(|| {
2759 anyhow::anyhow!("failed to convert u8 string encoding [{string_encoding}]")
2760 })?,
2761 false,
2762 None,
2763 )
2764 };
2765 let types = self.id().get(store).component().types().clone();
2766 let lift_ctx = &mut LiftContext::new(store, &options, &types, self);
2767 let address = usize::try_from(debug_msg_address)?;
2769 let len = usize::try_from(debug_msg_len)?;
2770 lift_ctx
2771 .memory()
2772 .get(address..)
2773 .and_then(|b| b.get(..len))
2774 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
2775 let message = WasmStr::new(address, len, lift_ctx)?;
2776
2777 let err_ctx = ErrorContextState {
2779 debug_msg: message
2780 .to_str_from_memory(options.memory(store))?
2781 .to_string(),
2782 };
2783 let state = self.concurrent_state_mut(store);
2784 let table_id = state.push(err_ctx)?;
2785 let global_ref_count_idx =
2786 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
2787
2788 let _ = state
2790 .global_error_context_ref_counts
2791 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
2792
2793 let local_tbl = &mut state.error_context_tables[ty];
2800
2801 assert!(
2802 !local_tbl.has_handle(table_id.rep()),
2803 "newly created error context state already tracked by component"
2804 );
2805 let local_idx = local_tbl.insert(table_id.rep(), LocalErrorContextRefCount(1))?;
2806
2807 Ok(local_idx)
2808 }
2809
2810 pub(super) unsafe fn error_context_debug_message<T>(
2815 self,
2816 store: StoreContextMut<T>,
2817 memory: *mut VMMemoryDefinition,
2818 realloc: *mut VMFuncRef,
2819 string_encoding: u8,
2820 ty: TypeComponentLocalErrorContextTableIndex,
2821 err_ctx_handle: u32,
2822 debug_msg_address: u32,
2823 ) -> Result<()> {
2824 let id = store.0.store_opaque().id();
2826 let state = self.concurrent_state_mut(store.0);
2827 let (state_table_id_rep, _) = state
2828 .error_context_tables
2829 .get_mut(ty)
2830 .context("error context table index present in (sub)component lookup during debug_msg")?
2831 .get_mut_by_index(err_ctx_handle)?;
2832
2833 let ErrorContextState { debug_msg } =
2835 state.get_mut(TableId::<ErrorContextState>::new(state_table_id_rep))?;
2836 let debug_msg = debug_msg.clone();
2837
2838 let options = unsafe {
2841 Options::new(
2842 id,
2843 NonNull::new(memory),
2844 NonNull::new(realloc),
2845 StringEncoding::from_u8(string_encoding).ok_or_else(|| {
2846 anyhow::anyhow!("failed to convert u8 string encoding [{string_encoding}]")
2847 })?,
2848 false,
2849 None,
2850 )
2851 };
2852 let types = self.id().get(store.0).component().types().clone();
2853 let lower_cx = &mut LowerContext::new(store, &options, &types, self);
2854 let debug_msg_address = usize::try_from(debug_msg_address)?;
2855 let offset = lower_cx
2857 .as_slice_mut()
2858 .get(debug_msg_address..)
2859 .and_then(|b| b.get(..debug_msg.bytes().len()))
2860 .map(|_| debug_msg_address)
2861 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
2862 debug_msg
2863 .as_str()
2864 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
2865
2866 Ok(())
2867 }
2868
2869 pub(crate) fn future_drop_readable(
2871 self,
2872 store: &mut dyn VMStore,
2873 ty: TypeFutureTableIndex,
2874 reader: u32,
2875 ) -> Result<()> {
2876 self.guest_drop_readable(store, TableIndex::Future(ty), reader)
2877 }
2878
2879 pub(crate) fn stream_drop_readable(
2881 self,
2882 store: &mut dyn VMStore,
2883 ty: TypeStreamTableIndex,
2884 reader: u32,
2885 ) -> Result<()> {
2886 self.guest_drop_readable(store, TableIndex::Stream(ty), reader)
2887 }
2888
2889 fn get_state_rep(&self, rep: u32) -> Result<u32> {
2891 tls::get(|store| {
2892 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2893 Ok(self
2894 .concurrent_state_mut(store)
2895 .get(transmit_handle)
2896 .with_context(|| format!("stream or future {transmit_handle:?} not found"))?
2897 .state
2898 .rep())
2899 })
2900 }
2901}
2902
2903struct RunOnDrop<F: FnOnce()>(Option<F>);
2905
2906impl<F: FnOnce()> RunOnDrop<F> {
2907 fn new(fun: F) -> Self {
2908 Self(Some(fun))
2909 }
2910
2911 fn cancel(mut self) {
2912 self.0 = None;
2913 }
2914}
2915
2916impl<F: FnOnce()> Drop for RunOnDrop<F> {
2917 fn drop(&mut self) {
2918 if let Some(fun) = self.0.take() {
2919 fun();
2920 }
2921 }
2922}
2923
2924impl ConcurrentState {
2925 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
2926 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
2927 }
2928
2929 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
2930 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
2931 }
2932
2933 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
2944 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
2945
2946 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
2947 let (ReturnCode::Completed(count)
2948 | ReturnCode::Dropped(count)
2949 | ReturnCode::Cancelled(count)) = old
2950 else {
2951 unreachable!()
2952 };
2953
2954 match new {
2955 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
2956 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
2957 _ => unreachable!(),
2958 }
2959 }
2960
2961 let event = match (waitable.take_event(self)?, event) {
2962 (None, _) => event,
2963 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
2964 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
2965 (
2966 Some(Event::StreamWrite {
2967 code: old_code,
2968 pending: old_pending,
2969 }),
2970 Event::StreamWrite { code, pending },
2971 ) => Event::StreamWrite {
2972 code: update_code(old_code, code),
2973 pending: old_pending.or(pending),
2974 },
2975 (
2976 Some(Event::StreamRead {
2977 code: old_code,
2978 pending: old_pending,
2979 }),
2980 Event::StreamRead { code, pending },
2981 ) => Event::StreamRead {
2982 code: update_code(old_code, code),
2983 pending: old_pending.or(pending),
2984 },
2985 _ => unreachable!(),
2986 };
2987
2988 waitable.set_event(self, Some(event))
2989 }
2990
2991 fn get_mut_by_index(
2992 &mut self,
2993 ty: TableIndex,
2994 index: u32,
2995 ) -> Result<(u32, &mut StreamFutureState)> {
2996 get_mut_by_index_from(self.state_table(ty), ty, index)
2997 }
2998
2999 fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
3002 let state_id = self.push(TransmitState::default())?;
3003
3004 let write = self.push(TransmitHandle::new(state_id))?;
3005 let read = self.push(TransmitHandle::new(state_id))?;
3006
3007 let state = self.get_mut(state_id)?;
3008 state.write_handle = write;
3009 state.read_handle = read;
3010
3011 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
3012
3013 Ok((write, read))
3014 }
3015
3016 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
3018 let state = self.delete(state_id)?;
3019 self.delete(state.write_handle)?;
3020 self.delete(state.read_handle)?;
3021
3022 log::trace!(
3023 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
3024 state.write_handle,
3025 state.read_handle,
3026 );
3027
3028 Ok(())
3029 }
3030
3031 fn state_table(&mut self, ty: TableIndex) -> &mut StateTable<WaitableState> {
3032 let runtime_instance = match ty {
3033 TableIndex::Stream(ty) => self.component.types()[ty].instance,
3034 TableIndex::Future(ty) => self.component.types()[ty].instance,
3035 };
3036 &mut self.waitable_tables[runtime_instance]
3037 }
3038
3039 fn guest_new(&mut self, ty: TableIndex) -> Result<ResourcePair> {
3043 let (write, read) = self.new_transmit()?;
3044 let read = self.state_table(ty).insert(
3045 read.rep(),
3046 waitable_state(ty, StreamFutureState::Read { done: false }),
3047 )?;
3048 let write = self.state_table(ty).insert(
3049 write.rep(),
3050 waitable_state(ty, StreamFutureState::Write { done: false }),
3051 )?;
3052 Ok(ResourcePair { write, read })
3053 }
3054
3055 fn host_cancel_write(&mut self, rep: u32) -> Result<ReturnCode> {
3061 let transmit_id = TableId::<TransmitState>::new(rep);
3062 let transmit = self.get_mut(transmit_id)?;
3063 log::trace!(
3064 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3065 transmit.read,
3066 transmit.write
3067 );
3068
3069 let code = if let Some(event) =
3070 Waitable::Transmit(transmit.write_handle).take_event(self)?
3071 {
3072 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3073 unreachable!();
3074 };
3075 match (code, event) {
3076 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3077 ReturnCode::Cancelled(count)
3078 }
3079 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3080 _ => unreachable!(),
3081 }
3082 } else {
3083 ReturnCode::Cancelled(0)
3084 };
3085
3086 let transmit = self.get_mut(transmit_id)?;
3087
3088 match &transmit.write {
3089 WriteState::GuestReady { .. } | WriteState::HostReady { .. } => {
3090 transmit.write = WriteState::Open;
3091 }
3092
3093 WriteState::Open | WriteState::Dropped => {}
3094 }
3095
3096 log::trace!("cancelled write {transmit_id:?}");
3097
3098 Ok(code)
3099 }
3100
3101 fn host_cancel_read(&mut self, rep: u32) -> Result<ReturnCode> {
3107 let transmit_id = TableId::<TransmitState>::new(rep);
3108 let transmit = self.get_mut(transmit_id)?;
3109 log::trace!(
3110 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3111 transmit.read,
3112 transmit.write
3113 );
3114
3115 let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
3116 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3117 unreachable!();
3118 };
3119 match (code, event) {
3120 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3121 ReturnCode::Cancelled(count)
3122 }
3123 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3124 _ => unreachable!(),
3125 }
3126 } else {
3127 ReturnCode::Cancelled(0)
3128 };
3129
3130 let transmit = self.get_mut(transmit_id)?;
3131
3132 match &transmit.read {
3133 ReadState::GuestReady { .. } | ReadState::HostReady { .. } => {
3134 transmit.read = ReadState::Open;
3135 }
3136
3137 ReadState::Open | ReadState::Dropped => {}
3138 }
3139
3140 log::trace!("cancelled read {transmit_id:?}");
3141
3142 Ok(code)
3143 }
3144
3145 fn host_drop_writer(&mut self, transmit_rep: u32, kind: TransmitKind) -> Result<()> {
3151 let transmit_id = TableId::<TransmitState>::new(transmit_rep);
3152 let transmit = self
3153 .get_mut(transmit_id)
3154 .with_context(|| format!("error closing writer {transmit_rep}"))?;
3155 log::trace!(
3156 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
3157 transmit.read,
3158 transmit.write
3159 );
3160
3161 transmit.writer_watcher = None;
3162
3163 match &mut transmit.write {
3165 WriteState::GuestReady { post_write, .. } => {
3166 *post_write = PostWrite::Drop;
3167 }
3168 WriteState::HostReady { post_write, .. } => {
3169 *post_write = PostWrite::Drop;
3170 }
3171 v @ WriteState::Open => {
3172 if let (TransmitKind::Future, false) = (
3173 kind,
3174 transmit.done || matches!(transmit.read, ReadState::Dropped),
3175 ) {
3176 bail!("cannot drop future write end without first writing a value")
3177 }
3178
3179 *v = WriteState::Dropped;
3180 }
3181 WriteState::Dropped => unreachable!("write state is already dropped"),
3182 }
3183
3184 let new_state = if let ReadState::Dropped = &transmit.read {
3190 ReadState::Dropped
3191 } else {
3192 ReadState::Open
3193 };
3194
3195 let read_handle = transmit.read_handle;
3196
3197 match mem::replace(&mut transmit.read, new_state) {
3199 ReadState::GuestReady { ty, handle, .. } => {
3203 self.update_event(
3205 read_handle.rep(),
3206 match ty {
3207 TableIndex::Future(ty) => Event::FutureRead {
3208 code: ReturnCode::Dropped(0),
3209 pending: Some((ty, handle)),
3210 },
3211 TableIndex::Stream(ty) => Event::StreamRead {
3212 code: ReturnCode::Dropped(0),
3213 pending: Some((ty, handle)),
3214 },
3215 },
3216 )?;
3217 }
3218
3219 ReadState::HostReady { accept } => {
3222 accept(Writer::End)?;
3223 }
3224
3225 ReadState::Open => {
3227 self.update_event(
3228 read_handle.rep(),
3229 match kind {
3230 TransmitKind::Future => Event::FutureRead {
3231 code: ReturnCode::Dropped(0),
3232 pending: None,
3233 },
3234 TransmitKind::Stream => Event::StreamRead {
3235 code: ReturnCode::Dropped(0),
3236 pending: None,
3237 },
3238 },
3239 )?;
3240 }
3241
3242 ReadState::Dropped => {
3245 log::trace!("host_drop_writer delete {transmit_rep}");
3246 self.delete_transmit(transmit_id)?;
3247 }
3248 }
3249 Ok(())
3250 }
3251
3252 fn guest_cancel_write(
3254 &mut self,
3255 ty: TableIndex,
3256 writer: u32,
3257 _async_: bool,
3258 ) -> Result<ReturnCode> {
3259 let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3260 self.state_table(ty).get_mut_by_index(writer)?
3261 else {
3262 bail!("invalid stream or future handle");
3263 };
3264 let id = TableId::<TransmitHandle>::new(rep);
3265 log::trace!("guest cancel write {id:?} (handle {writer})");
3266 match state {
3267 StreamFutureState::Write { .. } => {
3268 bail!("stream or future write cancelled when no write is pending")
3269 }
3270 StreamFutureState::Read { .. } => {
3271 bail!("passed read end to `{{stream|future}}.cancel-write`")
3272 }
3273 StreamFutureState::Busy => {
3274 *state = StreamFutureState::Write { done: false };
3275 }
3276 }
3277 let rep = self.get(id)?.state.rep();
3278 self.host_cancel_write(rep)
3279 }
3280
3281 fn guest_cancel_read(
3283 &mut self,
3284 ty: TableIndex,
3285 reader: u32,
3286 _async_: bool,
3287 ) -> Result<ReturnCode> {
3288 let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3289 self.state_table(ty).get_mut_by_index(reader)?
3290 else {
3291 bail!("invalid stream or future handle");
3292 };
3293 let id = TableId::<TransmitHandle>::new(rep);
3294 log::trace!("guest cancel read {id:?} (handle {reader})");
3295 match state {
3296 StreamFutureState::Read { .. } => {
3297 bail!("stream or future read cancelled when no read is pending")
3298 }
3299 StreamFutureState::Write { .. } => {
3300 bail!("passed write end to `{{stream|future}}.cancel-read`")
3301 }
3302 StreamFutureState::Busy => {
3303 *state = StreamFutureState::Read { done: false };
3304 }
3305 }
3306 let rep = self.get(id)?.state.rep();
3307 self.host_cancel_read(rep)
3308 }
3309
3310 fn guest_drop_writable(&mut self, ty: TableIndex, writer: u32) -> Result<()> {
3312 let (transmit_rep, state) = self
3313 .state_table(ty)
3314 .remove_by_index(writer)
3315 .context("failed to find writer")?;
3316 let (state, kind) = match state {
3317 WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
3318 WaitableState::Future(_, state) => (state, TransmitKind::Future),
3319 _ => {
3320 bail!("invalid stream or future handle");
3321 }
3322 };
3323 match state {
3324 StreamFutureState::Write { .. } => {}
3325 StreamFutureState::Read { .. } => {
3326 bail!("passed read end to `{{stream|future}}.drop-writable`")
3327 }
3328 StreamFutureState::Busy => bail!("cannot drop busy stream or future"),
3329 }
3330
3331 let id = TableId::<TransmitHandle>::new(transmit_rep);
3332 let transmit_rep = self.get(id)?.state.rep();
3333 log::trace!("guest_drop_writable: drop writer {id:?}");
3334 self.host_drop_writer(transmit_rep, kind)
3335 }
3336
3337 pub(crate) fn error_context_drop(
3339 &mut self,
3340 ty: TypeComponentLocalErrorContextTableIndex,
3341 error_context: u32,
3342 ) -> Result<()> {
3343 let local_state_table = self
3344 .error_context_tables
3345 .get_mut(ty)
3346 .context("error context table index present in (sub)component table during drop")?;
3347
3348 let (rep, local_ref_removed) = {
3350 let (rep, LocalErrorContextRefCount(local_ref_count)) =
3351 local_state_table.get_mut_by_index(error_context)?;
3352 assert!(*local_ref_count > 0);
3353 *local_ref_count -= 1;
3354 let mut local_ref_removed = false;
3355 if *local_ref_count == 0 {
3356 local_ref_removed = true;
3357 local_state_table
3358 .remove_by_index(error_context)
3359 .context("removing error context from component-local tracking")?;
3360 }
3361 (rep, local_ref_removed)
3362 };
3363
3364 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3365
3366 let GlobalErrorContextRefCount(global_ref_count) = self
3367 .global_error_context_ref_counts
3368 .get_mut(&global_ref_count_idx)
3369 .expect("retrieve concurrent state for error context during drop");
3370
3371 assert!(*global_ref_count >= 1);
3373 *global_ref_count -= 1;
3374 if *global_ref_count == 0 {
3375 assert!(local_ref_removed);
3376
3377 self.global_error_context_ref_counts
3378 .remove(&global_ref_count_idx);
3379
3380 self.delete(TableId::<ErrorContextState>::new(rep))
3381 .context("deleting component-global error context data")?;
3382 }
3383
3384 Ok(())
3385 }
3386
3387 fn guest_transfer<U: PartialEq + Eq + std::fmt::Debug>(
3390 &mut self,
3391 src_idx: u32,
3392 src: U,
3393 src_instance: RuntimeComponentInstanceIndex,
3394 dst: U,
3395 dst_instance: RuntimeComponentInstanceIndex,
3396 match_state: impl Fn(&mut WaitableState) -> Result<(U, &mut StreamFutureState)>,
3397 make_state: impl Fn(U, StreamFutureState) -> WaitableState,
3398 ) -> Result<u32> {
3399 let src_table = &mut self.waitable_tables[src_instance];
3400 let (_rep, src_state) = src_table.get_mut_by_index(src_idx)?;
3401 let (src_ty, _) = match_state(src_state)?;
3402 if src_ty != src {
3403 bail!("invalid future handle");
3404 }
3405
3406 let src_table = &mut self.waitable_tables[src_instance];
3407 let (rep, src_state) = src_table.get_mut_by_index(src_idx)?;
3408 let (_, src_state) = match_state(src_state)?;
3409
3410 match src_state {
3411 StreamFutureState::Read { done: true } => {
3412 bail!("cannot lift stream after being notified that the writable end dropped")
3413 }
3414 StreamFutureState::Read { done: false } => {
3415 src_table.remove_by_index(src_idx)?;
3416
3417 let dst_table = &mut self.waitable_tables[dst_instance];
3418 dst_table.insert(
3419 rep,
3420 make_state(dst, StreamFutureState::Read { done: false }),
3421 )
3422 }
3423 StreamFutureState::Write { .. } => {
3424 bail!("cannot transfer write end of stream or future")
3425 }
3426 StreamFutureState::Busy => bail!("cannot transfer busy stream or future"),
3427 }
3428 }
3429
3430 pub(crate) fn future_new(&mut self, ty: TypeFutureTableIndex) -> Result<ResourcePair> {
3432 self.guest_new(TableIndex::Future(ty))
3433 }
3434
3435 pub(crate) fn future_cancel_write(
3437 &mut self,
3438 ty: TypeFutureTableIndex,
3439 async_: bool,
3440 writer: u32,
3441 ) -> Result<u32> {
3442 self.guest_cancel_write(TableIndex::Future(ty), writer, async_)
3443 .map(|result| result.encode())
3444 }
3445
3446 pub(crate) fn future_cancel_read(
3448 &mut self,
3449 ty: TypeFutureTableIndex,
3450 async_: bool,
3451 reader: u32,
3452 ) -> Result<u32> {
3453 self.guest_cancel_read(TableIndex::Future(ty), reader, async_)
3454 .map(|result| result.encode())
3455 }
3456
3457 pub(crate) fn future_drop_writable(
3459 &mut self,
3460 ty: TypeFutureTableIndex,
3461 writer: u32,
3462 ) -> Result<()> {
3463 self.guest_drop_writable(TableIndex::Future(ty), writer)
3464 }
3465
3466 pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result<ResourcePair> {
3468 self.guest_new(TableIndex::Stream(ty))
3469 }
3470
3471 pub(crate) fn stream_cancel_write(
3473 &mut self,
3474 ty: TypeStreamTableIndex,
3475 async_: bool,
3476 writer: u32,
3477 ) -> Result<u32> {
3478 self.guest_cancel_write(TableIndex::Stream(ty), writer, async_)
3479 .map(|result| result.encode())
3480 }
3481
3482 pub(crate) fn stream_cancel_read(
3484 &mut self,
3485 ty: TypeStreamTableIndex,
3486 async_: bool,
3487 reader: u32,
3488 ) -> Result<u32> {
3489 self.guest_cancel_read(TableIndex::Stream(ty), reader, async_)
3490 .map(|result| result.encode())
3491 }
3492
3493 pub(crate) fn stream_drop_writable(
3495 &mut self,
3496 ty: TypeStreamTableIndex,
3497 writer: u32,
3498 ) -> Result<()> {
3499 self.guest_drop_writable(TableIndex::Stream(ty), writer)
3500 }
3501
3502 pub(crate) fn future_transfer(
3505 &mut self,
3506 src_idx: u32,
3507 src: TypeFutureTableIndex,
3508 dst: TypeFutureTableIndex,
3509 ) -> Result<u32> {
3510 self.guest_transfer(
3511 src_idx,
3512 src,
3513 self.component.types()[src].instance,
3514 dst,
3515 self.component.types()[dst].instance,
3516 |state| {
3517 if let WaitableState::Future(ty, state) = state {
3518 Ok((*ty, state))
3519 } else {
3520 Err(anyhow!("invalid future handle"))
3521 }
3522 },
3523 WaitableState::Future,
3524 )
3525 }
3526
3527 pub(crate) fn stream_transfer(
3530 &mut self,
3531 src_idx: u32,
3532 src: TypeStreamTableIndex,
3533 dst: TypeStreamTableIndex,
3534 ) -> Result<u32> {
3535 self.guest_transfer(
3536 src_idx,
3537 src,
3538 self.component.types()[src].instance,
3539 dst,
3540 self.component.types()[dst].instance,
3541 |state| {
3542 if let WaitableState::Stream(ty, state) = state {
3543 Ok((*ty, state))
3544 } else {
3545 Err(anyhow!("invalid stream handle"))
3546 }
3547 },
3548 WaitableState::Stream,
3549 )
3550 }
3551
3552 pub(crate) fn error_context_transfer(
3554 &mut self,
3555 src_idx: u32,
3556 src: TypeComponentLocalErrorContextTableIndex,
3557 dst: TypeComponentLocalErrorContextTableIndex,
3558 ) -> Result<u32> {
3559 let (rep, _) = {
3560 let rep = self
3561 .error_context_tables
3562 .get_mut(src)
3563 .context("error context table index present in (sub)component lookup")?
3564 .get_mut_by_index(src_idx)?;
3565 rep
3566 };
3567 let dst = self
3568 .error_context_tables
3569 .get_mut(dst)
3570 .context("error context table index present in (sub)component lookup")?;
3571
3572 let updated_count = if let Some((dst_idx, count)) = dst.get_mut_by_rep(rep) {
3574 (*count).0 += 1;
3575 dst_idx
3576 } else {
3577 dst.insert(rep, LocalErrorContextRefCount(1))?
3578 };
3579
3580 let global_ref_count = self
3584 .global_error_context_ref_counts
3585 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
3586 .context("global ref count present for existing (sub)component error context")?;
3587 global_ref_count.0 += 1;
3588
3589 Ok(updated_count)
3590 }
3591}
3592
3593pub(crate) struct ResourcePair {
3594 pub(crate) write: u32,
3595 pub(crate) read: u32,
3596}