1use super::table::{TableDebug, TableId};
2use super::{
3 Event, GlobalErrorContextRefCount, LocalErrorContextRefCount, StateTable, Waitable,
4 WaitableCommon, WaitableState,
5};
6use crate::component::concurrent::{ConcurrentState, WorkItem};
7use crate::component::func::{self, LiftContext, LowerContext, Options};
8use crate::component::matching::InstanceType;
9use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
10use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
11use crate::store::{StoreOpaque, StoreToken};
12use crate::vm::VMStore;
13use crate::{AsContextMut, StoreContextMut, ValRaw};
14use anyhow::{Context, Result, anyhow, bail};
15use buffers::Extender;
16use buffers::UntypedWriteBuffer;
17use futures::channel::oneshot;
18use std::boxed::Box;
19use std::fmt;
20use std::future;
21use std::iter;
22use std::marker::PhantomData;
23use std::mem::{self, MaybeUninit};
24use std::string::{String, ToString};
25use std::sync::{Arc, Mutex};
26use std::task::{Poll, Waker};
27use std::vec::Vec;
28use wasmtime_environ::component::{
29 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
30 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
31 TypeFutureTableIndex, TypeStreamTableIndex,
32};
33
34pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
35
36mod buffers;
37
38#[derive(Copy, Clone, Debug)]
41enum TransmitKind {
42 Stream,
43 Future,
44}
45
46#[derive(Copy, Clone, Debug, PartialEq)]
48pub enum ReturnCode {
49 Blocked,
50 Completed(u32),
51 Dropped(u32),
52 Cancelled(u32),
53}
54
55impl ReturnCode {
56 pub fn encode(&self) -> u32 {
61 const BLOCKED: u32 = 0xffff_ffff;
62 const COMPLETED: u32 = 0x0;
63 const DROPPED: u32 = 0x1;
64 const CANCELLED: u32 = 0x2;
65 match self {
66 ReturnCode::Blocked => BLOCKED,
67 ReturnCode::Completed(n) => {
68 debug_assert!(*n < (1 << 28));
69 (n << 4) | COMPLETED
70 }
71 ReturnCode::Dropped(n) => {
72 debug_assert!(*n < (1 << 28));
73 (n << 4) | DROPPED
74 }
75 ReturnCode::Cancelled(n) => {
76 debug_assert!(*n < (1 << 28));
77 (n << 4) | CANCELLED
78 }
79 }
80 }
81
82 fn completed(kind: TransmitKind, count: u32) -> Self {
85 Self::Completed(if let TransmitKind::Future = kind {
86 0
87 } else {
88 count
89 })
90 }
91}
92
93#[derive(Copy, Clone, Debug)]
98pub(super) enum TableIndex {
99 Stream(TypeStreamTableIndex),
100 Future(TypeFutureTableIndex),
101}
102
103impl TableIndex {
104 fn kind(&self) -> TransmitKind {
105 match self {
106 TableIndex::Stream(_) => TransmitKind::Stream,
107 TableIndex::Future(_) => TransmitKind::Future,
108 }
109 }
110}
111
112enum PostWrite {
114 Continue,
116 Drop,
118}
119
120struct HostResult<B> {
122 buffer: B,
124 dropped: bool,
126}
127
128fn payload(ty: TableIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
131 match ty {
132 TableIndex::Future(ty) => types[types[ty].ty].payload,
133 TableIndex::Stream(ty) => types[types[ty].ty].payload,
134 }
135}
136
137fn get_mut_by_index_from(
140 state_table: &mut StateTable<WaitableState>,
141 ty: TableIndex,
142 index: u32,
143) -> Result<(u32, &mut StreamFutureState)> {
144 Ok(match ty {
145 TableIndex::Stream(ty) => {
146 let (rep, WaitableState::Stream(actual_ty, state)) =
147 state_table.get_mut_by_index(index)?
148 else {
149 bail!("invalid stream handle");
150 };
151 if *actual_ty != ty {
152 bail!("invalid stream handle");
153 }
154 (rep, state)
155 }
156 TableIndex::Future(ty) => {
157 let (rep, WaitableState::Future(actual_ty, state)) =
158 state_table.get_mut_by_index(index)?
159 else {
160 bail!("invalid future handle");
161 };
162 if *actual_ty != ty {
163 bail!("invalid future handle");
164 }
165 (rep, state)
166 }
167 })
168}
169
170fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState {
172 match ty {
173 TableIndex::Stream(ty) => WaitableState::Stream(ty, state),
174 TableIndex::Future(ty) => WaitableState::Future(ty, state),
175 }
176}
177
178fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
181 mut store: StoreContextMut<U>,
182 instance: Instance,
183 reader: Reader,
184 mut buffer: B,
185 kind: TransmitKind,
186) -> Result<(HostResult<B>, ReturnCode)> {
187 Ok(match reader {
188 Reader::Guest {
189 options,
190 ty,
191 address,
192 count,
193 } => {
194 let types = instance.id().get(store.0).component().types().clone();
195 let count = buffer.remaining().len().min(count);
196
197 let lower = &mut if T::MAY_REQUIRE_REALLOC {
198 LowerContext::new
199 } else {
200 LowerContext::new_without_realloc
201 }(store.as_context_mut(), options, &types, instance);
202
203 if address % usize::try_from(T::ALIGN32)? != 0 {
204 bail!("read pointer not aligned");
205 }
206 lower
207 .as_slice_mut()
208 .get_mut(address..)
209 .and_then(|b| b.get_mut(..T::SIZE32 * count))
210 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
211
212 if let Some(ty) = payload(ty, &types) {
213 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
214 }
215
216 buffer.skip(count);
217 (
218 HostResult {
219 buffer,
220 dropped: false,
221 },
222 ReturnCode::completed(kind, count.try_into().unwrap()),
223 )
224 }
225 Reader::Host { accept } => {
226 let count = buffer.remaining().len();
227 let mut untyped = UntypedWriteBuffer::new(&mut buffer);
228 let count = accept(&mut untyped, count);
229 (
230 HostResult {
231 buffer,
232 dropped: false,
233 },
234 ReturnCode::completed(kind, count.try_into().unwrap()),
235 )
236 }
237 Reader::End => (
238 HostResult {
239 buffer,
240 dropped: true,
241 },
242 ReturnCode::Dropped(0),
243 ),
244 })
245}
246
247fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
250 writer: Writer,
251 mut buffer: B,
252 kind: TransmitKind,
253) -> Result<(HostResult<B>, ReturnCode)> {
254 Ok(match writer {
255 Writer::Guest {
256 lift,
257 ty,
258 address,
259 count,
260 } => {
261 let count = count.min(buffer.remaining_capacity());
262 if T::IS_RUST_UNIT_TYPE {
263 buffer.extend(
267 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() })
268 .take(count),
269 )
270 } else {
271 let ty = ty.unwrap();
272 if address % usize::try_from(T::ALIGN32)? != 0 {
273 bail!("write pointer not aligned");
274 }
275 lift.memory()
276 .get(address..)
277 .and_then(|b| b.get(..T::SIZE32 * count))
278 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
279
280 let list = &WasmList::new(address, count, lift, ty)?;
281 T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))?
282 }
283 (
284 HostResult {
285 buffer,
286 dropped: false,
287 },
288 ReturnCode::completed(kind, count.try_into().unwrap()),
289 )
290 }
291 Writer::Host {
292 buffer: input,
293 count,
294 } => {
295 let count = count.min(buffer.remaining_capacity());
296 buffer.move_from(input.get_mut::<T>(), count);
297 (
298 HostResult {
299 buffer,
300 dropped: false,
301 },
302 ReturnCode::completed(kind, count.try_into().unwrap()),
303 )
304 }
305 Writer::End => (
306 HostResult {
307 buffer,
308 dropped: true,
309 },
310 ReturnCode::Dropped(0),
311 ),
312 })
313}
314
315async fn watch_reader(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
318 future::poll_fn(|cx| {
319 accessor
320 .as_accessor()
321 .with(|mut access| {
322 let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
323 let state_id = concurrent_state.get(id)?.state;
324 let state = concurrent_state.get_mut(state_id)?;
325 anyhow::Ok(if matches!(&state.read, ReadState::Dropped) {
326 Poll::Ready(())
327 } else {
328 state.reader_watcher = Some(cx.waker().clone());
329 Poll::Pending
330 })
331 })
332 .unwrap_or(Poll::Ready(()))
333 })
334 .await
335}
336
337async fn watch_writer(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
340 future::poll_fn(|cx| {
341 accessor
342 .as_accessor()
343 .with(|mut access| {
344 let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
345 let state_id = concurrent_state.get(id)?.state;
346 let state = concurrent_state.get_mut(state_id)?;
347 anyhow::Ok(
348 if matches!(
349 &state.write,
350 WriteState::Dropped
351 | WriteState::GuestReady {
352 post_write: PostWrite::Drop,
353 ..
354 }
355 | WriteState::HostReady {
356 post_write: PostWrite::Drop,
357 ..
358 }
359 ) {
360 Poll::Ready(())
361 } else {
362 state.writer_watcher = Some(cx.waker().clone());
363 Poll::Pending
364 },
365 )
366 })
367 .unwrap_or(Poll::Ready(()))
368 })
369 .await
370}
371
372#[derive(Debug, Eq, PartialEq)]
375pub(super) enum StreamFutureState {
376 Write {
378 done: bool,
382 },
383 Read {
385 done: bool,
389 },
390 Busy,
392}
393
394#[derive(Debug, PartialEq, Eq, PartialOrd)]
396pub(super) struct ErrorContextState {
397 pub(crate) debug_msg: String,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
404pub(super) struct FlatAbi {
405 pub(super) size: u32,
406 pub(super) align: u32,
407}
408
409pub struct FutureWriter<T> {
416 default: fn() -> T,
417 id: TableId<TransmitHandle>,
418 instance: Instance,
419}
420
421impl<T> FutureWriter<T> {
422 fn new(default: fn() -> T, id: TableId<TransmitHandle>, instance: Instance) -> Self {
423 Self {
424 default,
425 id,
426 instance,
427 }
428 }
429
430 pub async fn write(self, accessor: impl AsAccessor, value: T) -> bool
441 where
442 T: func::Lower + Send + Sync + 'static,
443 {
444 self.guard(accessor).write(value).await
445 }
446
447 async fn write_(&mut self, accessor: impl AsAccessor, value: T) -> bool
450 where
451 T: func::Lower + Send + Sync + 'static,
452 {
453 let accessor = accessor.as_accessor();
454
455 let result = self
456 .instance
457 .host_write_async(accessor, self.id, Some(value), TransmitKind::Future)
458 .await;
459
460 match result {
461 Ok(HostResult { dropped, .. }) => !dropped,
462 Err(_) => todo!("guarantee buffer recovery if `host_write` fails"),
463 }
464 }
465
466 pub async fn watch_reader(&mut self, accessor: impl AsAccessor) {
476 watch_reader(accessor, self.instance, self.id).await
477 }
478
479 pub fn close(&mut self, mut store: impl AsContextMut)
487 where
488 T: func::Lower + Send + Sync + 'static,
489 {
490 let id = mem::replace(&mut self.id, TableId::new(0));
491 let default = self.default;
492 self.instance
493 .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default())))
494 .unwrap();
495 }
496
497 pub fn close_with(&mut self, accessor: impl AsAccessor)
499 where
500 T: func::Lower + Send + Sync + 'static,
501 {
502 accessor.as_accessor().with(|access| self.close(access))
503 }
504
505 pub fn guard<A>(self, accessor: A) -> GuardedFutureWriter<T, A>
511 where
512 T: func::Lower + Send + Sync + 'static,
513 A: AsAccessor,
514 {
515 GuardedFutureWriter::new(accessor, self)
516 }
517}
518
519pub struct GuardedFutureWriter<T, A>
525where
526 T: func::Lower + Send + Sync + 'static,
527 A: AsAccessor,
528{
529 writer: Option<FutureWriter<T>>,
533 accessor: A,
534}
535
536impl<T, A> GuardedFutureWriter<T, A>
537where
538 T: func::Lower + Send + Sync + 'static,
539 A: AsAccessor,
540{
541 pub fn new(accessor: A, writer: FutureWriter<T>) -> Self {
544 Self {
545 writer: Some(writer),
546 accessor,
547 }
548 }
549
550 pub async fn write(mut self, value: T) -> bool
552 where
553 T: func::Lower + Send + Sync + 'static,
554 {
555 self.writer
556 .as_mut()
557 .unwrap()
558 .write_(&self.accessor, value)
559 .await
560 }
561
562 pub async fn watch_reader(&mut self) {
564 self.writer
565 .as_mut()
566 .unwrap()
567 .watch_reader(&self.accessor)
568 .await
569 }
570
571 pub fn into_future(self) -> FutureWriter<T> {
574 self.into()
575 }
576}
577
578impl<T, A> From<GuardedFutureWriter<T, A>> for FutureWriter<T>
579where
580 T: func::Lower + Send + Sync + 'static,
581 A: AsAccessor,
582{
583 fn from(mut guard: GuardedFutureWriter<T, A>) -> Self {
584 guard.writer.take().unwrap()
585 }
586}
587
588impl<T, A> Drop for GuardedFutureWriter<T, A>
589where
590 T: func::Lower + Send + Sync + 'static,
591 A: AsAccessor,
592{
593 fn drop(&mut self) {
594 if let Some(writer) = &mut self.writer {
595 writer.close_with(&self.accessor)
596 }
597 }
598}
599
600pub struct FutureReader<T> {
607 instance: Instance,
608 id: TableId<TransmitHandle>,
609 _phantom: PhantomData<T>,
610}
611
612impl<T> FutureReader<T> {
613 fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
614 Self {
615 instance,
616 id,
617 _phantom: PhantomData,
618 }
619 }
620
621 pub async fn read(self, accessor: impl AsAccessor) -> Option<T>
634 where
635 T: func::Lift + Send + 'static,
636 {
637 self.guard(accessor).read().await
638 }
639
640 async fn read_(&mut self, accessor: impl AsAccessor) -> Option<T>
641 where
642 T: func::Lift + Send + 'static,
643 {
644 let accessor = accessor.as_accessor();
645
646 let result = self
647 .instance
648 .host_read_async(accessor, self.id, None, TransmitKind::Future)
649 .await;
650
651 if let Ok(HostResult {
652 mut buffer,
653 dropped: false,
654 }) = result
655 {
656 buffer.take()
657 } else {
658 None
659 }
660 }
661
662 pub async fn watch_writer(&mut self, accessor: impl AsAccessor) {
672 watch_writer(accessor, self.instance, self.id).await;
673 }
674
675 pub fn into_val(self) -> Val {
678 Val::Future(FutureAny(self.id.rep()))
679 }
680
681 pub fn from_val(
683 mut store: impl AsContextMut<Data: Send>,
684 instance: Instance,
685 value: &Val,
686 ) -> Result<Self> {
687 let Val::Future(FutureAny(rep)) = value else {
688 bail!("expected `future`; got `{}`", value.desc());
689 };
690 let store = store.as_context_mut();
691 let id = TableId::<TransmitHandle>::new(*rep);
692 instance.concurrent_state_mut(store.0).get(id)?; Ok(Self::new(id, instance))
694 }
695
696 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
698 match ty {
699 InterfaceType::Future(src) => {
700 let state_table = cx
701 .instance_mut()
702 .concurrent_state_mut()
703 .state_table(TableIndex::Future(src));
704 let (rep, state) =
705 get_mut_by_index_from(state_table, TableIndex::Future(src), index)?;
706
707 match state {
708 StreamFutureState::Read { .. } => {
709 state_table.remove_by_index(index)?;
710 }
711 StreamFutureState::Write { .. } => bail!("cannot transfer write end of future"),
712 StreamFutureState::Busy => bail!("cannot transfer busy future"),
713 }
714
715 let id = TableId::<TransmitHandle>::new(rep);
716 let concurrent_state = cx.instance_mut().concurrent_state_mut();
717 let state = concurrent_state.get(id)?.state;
718
719 if concurrent_state.get(state)?.done {
720 bail!("cannot lift future after previous read succeeded");
721 }
722
723 Ok(Self::new(id, cx.instance_handle()))
724 }
725 _ => func::bad_type_info(),
726 }
727 }
728
729 pub fn close(&mut self, mut store: impl AsContextMut) {
737 let id = mem::replace(&mut self.id, TableId::new(0));
739 self.instance
740 .host_drop_reader(
741 store.as_context_mut().0.traitobj_mut(),
742 id,
743 TransmitKind::Future,
744 )
745 .unwrap();
746 }
747
748 pub fn close_with(&mut self, accessor: impl AsAccessor) {
750 accessor.as_accessor().with(|access| self.close(access))
751 }
752
753 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
759 where
760 A: AsAccessor,
761 {
762 GuardedFutureReader::new(accessor, self)
763 }
764}
765
766impl<T> fmt::Debug for FutureReader<T> {
767 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
768 f.debug_struct("FutureReader")
769 .field("id", &self.id)
770 .field("instance", &self.instance)
771 .finish()
772 }
773}
774
775pub(crate) fn lower_future_to_index<U>(
777 rep: u32,
778 cx: &mut LowerContext<'_, U>,
779 ty: InterfaceType,
780) -> Result<u32> {
781 match ty {
782 InterfaceType::Future(dst) => {
783 let concurrent_state = cx.instance_mut().concurrent_state_mut();
784 let state = concurrent_state
785 .get(TableId::<TransmitHandle>::new(rep))?
786 .state;
787 let rep = concurrent_state.get(state)?.read_handle.rep();
788
789 concurrent_state
790 .state_table(TableIndex::Future(dst))
791 .insert(
792 rep,
793 WaitableState::Future(dst, StreamFutureState::Read { done: false }),
794 )
795 }
796 _ => func::bad_type_info(),
797 }
798}
799
800unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
803 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
804
805 type Lower = <u32 as func::ComponentType>::Lower;
806
807 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
808 match ty {
809 InterfaceType::Future(_) => Ok(()),
810 other => bail!("expected `future`, found `{}`", func::desc(other)),
811 }
812 }
813}
814
815unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
817 fn linear_lower_to_flat<U>(
818 &self,
819 cx: &mut LowerContext<'_, U>,
820 ty: InterfaceType,
821 dst: &mut MaybeUninit<Self::Lower>,
822 ) -> Result<()> {
823 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
824 cx,
825 InterfaceType::U32,
826 dst,
827 )
828 }
829
830 fn linear_lower_to_memory<U>(
831 &self,
832 cx: &mut LowerContext<'_, U>,
833 ty: InterfaceType,
834 offset: usize,
835 ) -> Result<()> {
836 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
837 cx,
838 InterfaceType::U32,
839 offset,
840 )
841 }
842}
843
844unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
846 fn linear_lift_from_flat(
847 cx: &mut LiftContext<'_>,
848 ty: InterfaceType,
849 src: &Self::Lower,
850 ) -> Result<Self> {
851 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
852 Self::lift_from_index(cx, ty, index)
853 }
854
855 fn linear_lift_from_memory(
856 cx: &mut LiftContext<'_>,
857 ty: InterfaceType,
858 bytes: &[u8],
859 ) -> Result<Self> {
860 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
861 Self::lift_from_index(cx, ty, index)
862 }
863}
864
865pub struct GuardedFutureReader<T, A>
871where
872 A: AsAccessor,
873{
874 reader: Option<FutureReader<T>>,
878 accessor: A,
879}
880
881impl<T, A> GuardedFutureReader<T, A>
882where
883 A: AsAccessor,
884{
885 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
887 Self {
888 reader: Some(reader),
889 accessor,
890 }
891 }
892
893 pub async fn read(mut self) -> Option<T>
895 where
896 T: func::Lift + Send + 'static,
897 {
898 self.reader.as_mut().unwrap().read_(&self.accessor).await
899 }
900
901 pub async fn watch_writer(&mut self) {
903 self.reader
904 .as_mut()
905 .unwrap()
906 .watch_writer(&self.accessor)
907 .await
908 }
909
910 pub fn into_future(self) -> FutureReader<T> {
913 self.into()
914 }
915}
916
917impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
918where
919 A: AsAccessor,
920{
921 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
922 guard.reader.take().unwrap()
923 }
924}
925
926impl<T, A> Drop for GuardedFutureReader<T, A>
927where
928 A: AsAccessor,
929{
930 fn drop(&mut self) {
931 if let Some(reader) = &mut self.reader {
932 reader.close_with(&self.accessor)
933 }
934 }
935}
936
937pub struct StreamWriter<T> {
944 instance: Instance,
945 id: TableId<TransmitHandle>,
946 closed: bool,
947 _phantom: PhantomData<T>,
948}
949
950impl<T> StreamWriter<T> {
951 fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
952 Self {
953 instance,
954 id,
955 closed: false,
956 _phantom: PhantomData,
957 }
958 }
959
960 pub fn is_closed(&self) -> bool {
963 self.closed
964 }
965
966 pub async fn write<B>(&mut self, accessor: impl AsAccessor, buffer: B) -> B
984 where
985 T: func::Lower + 'static,
986 B: WriteBuffer<T>,
987 {
988 let result = self
989 .instance
990 .host_write_async(
991 accessor.as_accessor(),
992 self.id,
993 buffer,
994 TransmitKind::Stream,
995 )
996 .await;
997
998 match result {
999 Ok(HostResult { buffer, dropped }) => {
1000 if self.closed {
1001 debug_assert!(dropped);
1002 }
1003 self.closed = dropped;
1004 buffer
1005 }
1006 Err(_) => todo!("guarantee buffer recovery if `host_write` fails"),
1007 }
1008 }
1009
1010 pub async fn write_all<B>(&mut self, accessor: impl AsAccessor, mut buffer: B) -> B
1023 where
1024 T: func::Lower + 'static,
1025 B: WriteBuffer<T>,
1026 {
1027 let accessor = accessor.as_accessor();
1028 while !self.is_closed() && buffer.remaining().len() > 0 {
1029 buffer = self.write(accessor, buffer).await;
1030 }
1031 buffer
1032 }
1033
1034 pub async fn watch_reader(&mut self, accessor: impl AsAccessor) {
1041 watch_reader(accessor, self.instance, self.id).await
1042 }
1043
1044 pub fn close(&mut self, mut store: impl AsContextMut) {
1052 let id = mem::replace(&mut self.id, TableId::new(0));
1054 self.instance
1055 .host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>)
1056 .unwrap()
1057 }
1058
1059 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1061 accessor.as_accessor().with(|access| self.close(access))
1062 }
1063
1064 pub fn guard<A>(self, accessor: A) -> GuardedStreamWriter<T, A>
1070 where
1071 A: AsAccessor,
1072 {
1073 GuardedStreamWriter::new(accessor, self)
1074 }
1075}
1076
1077pub struct GuardedStreamWriter<T, A>
1083where
1084 A: AsAccessor,
1085{
1086 writer: Option<StreamWriter<T>>,
1090 accessor: A,
1091}
1092
1093impl<T, A> GuardedStreamWriter<T, A>
1094where
1095 A: AsAccessor,
1096{
1097 pub fn new(accessor: A, writer: StreamWriter<T>) -> Self {
1099 Self {
1100 writer: Some(writer),
1101 accessor,
1102 }
1103 }
1104
1105 pub fn is_closed(&self) -> bool {
1107 self.writer.as_ref().unwrap().is_closed()
1108 }
1109
1110 pub async fn write<B>(&mut self, buffer: B) -> B
1112 where
1113 T: func::Lower + 'static,
1114 B: WriteBuffer<T>,
1115 {
1116 self.writer
1117 .as_mut()
1118 .unwrap()
1119 .write(&self.accessor, buffer)
1120 .await
1121 }
1122
1123 pub async fn write_all<B>(&mut self, buffer: B) -> B
1125 where
1126 T: func::Lower + 'static,
1127 B: WriteBuffer<T>,
1128 {
1129 self.writer
1130 .as_mut()
1131 .unwrap()
1132 .write_all(&self.accessor, buffer)
1133 .await
1134 }
1135
1136 pub async fn watch_reader(&mut self) {
1138 self.writer
1139 .as_mut()
1140 .unwrap()
1141 .watch_reader(&self.accessor)
1142 .await
1143 }
1144
1145 pub fn into_stream(self) -> StreamWriter<T> {
1148 self.into()
1149 }
1150}
1151
1152impl<T, A> From<GuardedStreamWriter<T, A>> for StreamWriter<T>
1153where
1154 A: AsAccessor,
1155{
1156 fn from(mut guard: GuardedStreamWriter<T, A>) -> Self {
1157 guard.writer.take().unwrap()
1158 }
1159}
1160
1161impl<T, A> Drop for GuardedStreamWriter<T, A>
1162where
1163 A: AsAccessor,
1164{
1165 fn drop(&mut self) {
1166 if let Some(writer) = &mut self.writer {
1167 writer.close_with(&self.accessor)
1168 }
1169 }
1170}
1171
1172pub struct StreamReader<T> {
1179 instance: Instance,
1180 id: TableId<TransmitHandle>,
1181 closed: bool,
1182 _phantom: PhantomData<T>,
1183}
1184
1185impl<T> StreamReader<T> {
1186 fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1187 Self {
1188 instance,
1189 id,
1190 closed: false,
1191 _phantom: PhantomData,
1192 }
1193 }
1194
1195 pub fn is_closed(&self) -> bool {
1198 self.closed
1199 }
1200
1201 pub async fn read<B>(&mut self, accessor: impl AsAccessor, buffer: B) -> B
1214 where
1215 T: func::Lift + 'static,
1216 B: ReadBuffer<T> + Send + 'static,
1217 {
1218 let result = self
1219 .instance
1220 .host_read_async(
1221 accessor.as_accessor(),
1222 self.id,
1223 buffer,
1224 TransmitKind::Stream,
1225 )
1226 .await;
1227
1228 match result {
1229 Ok(HostResult { buffer, dropped }) => {
1230 if self.closed {
1231 debug_assert!(dropped);
1232 }
1233 self.closed = dropped;
1234 buffer
1235 }
1236 Err(_) => {
1237 todo!("guarantee buffer recovery if `host_read` fails")
1238 }
1239 }
1240 }
1241
1242 pub async fn watch_writer(&mut self, accessor: impl AsAccessor) {
1249 watch_writer(accessor, self.instance, self.id).await
1250 }
1251
1252 pub fn into_val(self) -> Val {
1255 Val::Stream(StreamAny(self.id.rep()))
1256 }
1257
1258 pub fn from_val(
1260 mut store: impl AsContextMut<Data: Send>,
1261 instance: Instance,
1262 value: &Val,
1263 ) -> Result<Self> {
1264 let Val::Stream(StreamAny(rep)) = value else {
1265 bail!("expected `stream`; got `{}`", value.desc());
1266 };
1267 let store = store.as_context_mut();
1268 let id = TableId::<TransmitHandle>::new(*rep);
1269 instance.concurrent_state_mut(store.0).get(id)?; Ok(Self::new(id, instance))
1271 }
1272
1273 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1275 match ty {
1276 InterfaceType::Stream(src) => {
1277 let state_table = cx
1278 .instance_mut()
1279 .concurrent_state_mut()
1280 .state_table(TableIndex::Stream(src));
1281 let (rep, state) =
1282 get_mut_by_index_from(state_table, TableIndex::Stream(src), index)?;
1283
1284 match state {
1285 StreamFutureState::Read { done: true } => bail!(
1286 "cannot lift stream after being notified that the writable end dropped"
1287 ),
1288 StreamFutureState::Read { done: false } => {
1289 state_table.remove_by_index(index)?;
1290 }
1291 StreamFutureState::Write { .. } => bail!("cannot transfer write end of stream"),
1292 StreamFutureState::Busy => bail!("cannot transfer busy stream"),
1293 }
1294
1295 let id = TableId::<TransmitHandle>::new(rep);
1296
1297 Ok(Self::new(id, cx.instance_handle()))
1298 }
1299 _ => func::bad_type_info(),
1300 }
1301 }
1302
1303 pub fn close(&mut self, mut store: impl AsContextMut) {
1311 let id = mem::replace(&mut self.id, TableId::new(0));
1313 self.instance
1314 .host_drop_reader(
1315 store.as_context_mut().0.traitobj_mut(),
1316 id,
1317 TransmitKind::Stream,
1318 )
1319 .unwrap()
1320 }
1321
1322 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1324 accessor.as_accessor().with(|access| self.close(access))
1325 }
1326
1327 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1333 where
1334 A: AsAccessor,
1335 {
1336 GuardedStreamReader::new(accessor, self)
1337 }
1338}
1339
1340impl<T> fmt::Debug for StreamReader<T> {
1341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1342 f.debug_struct("StreamReader")
1343 .field("id", &self.id)
1344 .field("instance", &self.instance)
1345 .finish()
1346 }
1347}
1348
1349pub(crate) fn lower_stream_to_index<U>(
1351 rep: u32,
1352 cx: &mut LowerContext<'_, U>,
1353 ty: InterfaceType,
1354) -> Result<u32> {
1355 match ty {
1356 InterfaceType::Stream(dst) => {
1357 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1358 let state = concurrent_state
1359 .get(TableId::<TransmitHandle>::new(rep))?
1360 .state;
1361 let rep = concurrent_state.get(state)?.read_handle.rep();
1362
1363 concurrent_state
1364 .state_table(TableIndex::Stream(dst))
1365 .insert(
1366 rep,
1367 WaitableState::Stream(dst, StreamFutureState::Read { done: false }),
1368 )
1369 }
1370 _ => func::bad_type_info(),
1371 }
1372}
1373
1374unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1377 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1378
1379 type Lower = <u32 as func::ComponentType>::Lower;
1380
1381 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1382 match ty {
1383 InterfaceType::Stream(_) => Ok(()),
1384 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1385 }
1386 }
1387}
1388
1389unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1391 fn linear_lower_to_flat<U>(
1392 &self,
1393 cx: &mut LowerContext<'_, U>,
1394 ty: InterfaceType,
1395 dst: &mut MaybeUninit<Self::Lower>,
1396 ) -> Result<()> {
1397 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1398 cx,
1399 InterfaceType::U32,
1400 dst,
1401 )
1402 }
1403
1404 fn linear_lower_to_memory<U>(
1405 &self,
1406 cx: &mut LowerContext<'_, U>,
1407 ty: InterfaceType,
1408 offset: usize,
1409 ) -> Result<()> {
1410 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1411 cx,
1412 InterfaceType::U32,
1413 offset,
1414 )
1415 }
1416}
1417
1418unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1420 fn linear_lift_from_flat(
1421 cx: &mut LiftContext<'_>,
1422 ty: InterfaceType,
1423 src: &Self::Lower,
1424 ) -> Result<Self> {
1425 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1426 Self::lift_from_index(cx, ty, index)
1427 }
1428
1429 fn linear_lift_from_memory(
1430 cx: &mut LiftContext<'_>,
1431 ty: InterfaceType,
1432 bytes: &[u8],
1433 ) -> Result<Self> {
1434 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1435 Self::lift_from_index(cx, ty, index)
1436 }
1437}
1438
1439pub struct GuardedStreamReader<T, A>
1445where
1446 A: AsAccessor,
1447{
1448 reader: Option<StreamReader<T>>,
1452 accessor: A,
1453}
1454
1455impl<T, A> GuardedStreamReader<T, A>
1456where
1457 A: AsAccessor,
1458{
1459 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1462 Self {
1463 reader: Some(reader),
1464 accessor,
1465 }
1466 }
1467
1468 pub fn is_closed(&self) -> bool {
1470 self.reader.as_ref().unwrap().is_closed()
1471 }
1472
1473 pub async fn read<B>(&mut self, buffer: B) -> B
1475 where
1476 T: func::Lift + 'static,
1477 B: ReadBuffer<T> + Send + 'static,
1478 {
1479 self.reader
1480 .as_mut()
1481 .unwrap()
1482 .read(&self.accessor, buffer)
1483 .await
1484 }
1485
1486 pub async fn watch_writer(&mut self) {
1488 self.reader
1489 .as_mut()
1490 .unwrap()
1491 .watch_writer(&self.accessor)
1492 .await
1493 }
1494
1495 pub fn into_stream(self) -> StreamReader<T> {
1498 self.into()
1499 }
1500}
1501
1502impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1503where
1504 A: AsAccessor,
1505{
1506 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1507 guard.reader.take().unwrap()
1508 }
1509}
1510
1511impl<T, A> Drop for GuardedStreamReader<T, A>
1512where
1513 A: AsAccessor,
1514{
1515 fn drop(&mut self) {
1516 if let Some(reader) = &mut self.reader {
1517 reader.close_with(&self.accessor)
1518 }
1519 }
1520}
1521
1522pub struct ErrorContext {
1524 rep: u32,
1525}
1526
1527impl ErrorContext {
1528 pub(crate) fn new(rep: u32) -> Self {
1529 Self { rep }
1530 }
1531
1532 pub fn into_val(self) -> Val {
1534 Val::ErrorContext(ErrorContextAny(self.rep))
1535 }
1536
1537 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1539 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1540 bail!("expected `error-context`; got `{}`", value.desc());
1541 };
1542 Ok(Self::new(*rep))
1543 }
1544
1545 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1546 match ty {
1547 InterfaceType::ErrorContext(src) => {
1548 let (rep, _) = cx
1549 .instance_mut()
1550 .concurrent_state_mut()
1551 .error_context_tables
1552 .get_mut(src)
1553 .expect("error context table index present in (sub)component table during lift")
1554 .get_mut_by_index(index)?;
1555
1556 Ok(Self { rep })
1557 }
1558 _ => func::bad_type_info(),
1559 }
1560 }
1561}
1562
1563pub(crate) fn lower_error_context_to_index<U>(
1564 rep: u32,
1565 cx: &mut LowerContext<'_, U>,
1566 ty: InterfaceType,
1567) -> Result<u32> {
1568 match ty {
1569 InterfaceType::ErrorContext(dst) => {
1570 let tbl = cx
1571 .instance_mut()
1572 .concurrent_state_mut()
1573 .error_context_tables
1574 .get_mut(dst)
1575 .expect("error context table index present in (sub)component table during lower");
1576
1577 if let Some((dst_idx, dst_state)) = tbl.get_mut_by_rep(rep) {
1578 dst_state.0 += 1;
1579 Ok(dst_idx)
1580 } else {
1581 tbl.insert(rep, LocalErrorContextRefCount(1))
1582 }
1583 }
1584 _ => func::bad_type_info(),
1585 }
1586}
1587
1588unsafe impl func::ComponentType for ErrorContext {
1591 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1592
1593 type Lower = <u32 as func::ComponentType>::Lower;
1594
1595 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1596 match ty {
1597 InterfaceType::ErrorContext(_) => Ok(()),
1598 other => bail!("expected `error`, found `{}`", func::desc(other)),
1599 }
1600 }
1601}
1602
1603unsafe impl func::Lower for ErrorContext {
1605 fn linear_lower_to_flat<T>(
1606 &self,
1607 cx: &mut LowerContext<'_, T>,
1608 ty: InterfaceType,
1609 dst: &mut MaybeUninit<Self::Lower>,
1610 ) -> Result<()> {
1611 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1612 cx,
1613 InterfaceType::U32,
1614 dst,
1615 )
1616 }
1617
1618 fn linear_lower_to_memory<T>(
1619 &self,
1620 cx: &mut LowerContext<'_, T>,
1621 ty: InterfaceType,
1622 offset: usize,
1623 ) -> Result<()> {
1624 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1625 cx,
1626 InterfaceType::U32,
1627 offset,
1628 )
1629 }
1630}
1631
1632unsafe impl func::Lift for ErrorContext {
1634 fn linear_lift_from_flat(
1635 cx: &mut LiftContext<'_>,
1636 ty: InterfaceType,
1637 src: &Self::Lower,
1638 ) -> Result<Self> {
1639 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1640 Self::lift_from_index(cx, ty, index)
1641 }
1642
1643 fn linear_lift_from_memory(
1644 cx: &mut LiftContext<'_>,
1645 ty: InterfaceType,
1646 bytes: &[u8],
1647 ) -> Result<Self> {
1648 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1649 Self::lift_from_index(cx, ty, index)
1650 }
1651}
1652
1653pub(super) struct TransmitHandle {
1655 pub(super) common: WaitableCommon,
1656 state: TableId<TransmitState>,
1658}
1659
1660impl TransmitHandle {
1661 fn new(state: TableId<TransmitState>) -> Self {
1662 Self {
1663 common: WaitableCommon::default(),
1664 state,
1665 }
1666 }
1667}
1668
1669impl TableDebug for TransmitHandle {
1670 fn type_name() -> &'static str {
1671 "TransmitHandle"
1672 }
1673}
1674
1675struct TransmitState {
1677 write_handle: TableId<TransmitHandle>,
1679 read_handle: TableId<TransmitHandle>,
1681 write: WriteState,
1683 read: ReadState,
1685 writer_watcher: Option<Waker>,
1691 reader_watcher: Option<Waker>,
1693 done: bool,
1695}
1696
1697impl Default for TransmitState {
1698 fn default() -> Self {
1699 Self {
1700 write_handle: TableId::new(0),
1701 read_handle: TableId::new(0),
1702 read: ReadState::Open,
1703 write: WriteState::Open,
1704 reader_watcher: None,
1705 writer_watcher: None,
1706 done: false,
1707 }
1708 }
1709}
1710
1711impl TableDebug for TransmitState {
1712 fn type_name() -> &'static str {
1713 "TransmitState"
1714 }
1715}
1716
1717enum WriteState {
1719 Open,
1721 GuestReady {
1723 ty: TableIndex,
1724 flat_abi: Option<FlatAbi>,
1725 options: Options,
1726 address: usize,
1727 count: usize,
1728 handle: u32,
1729 post_write: PostWrite,
1730 },
1731 HostReady {
1733 accept:
1734 Box<dyn FnOnce(&mut dyn VMStore, Instance, Reader) -> Result<ReturnCode> + Send + Sync>,
1735 post_write: PostWrite,
1736 },
1737 Dropped,
1739}
1740
1741impl fmt::Debug for WriteState {
1742 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1743 match self {
1744 Self::Open => f.debug_tuple("Open").finish(),
1745 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1746 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1747 Self::Dropped => f.debug_tuple("Dropped").finish(),
1748 }
1749 }
1750}
1751
1752enum ReadState {
1754 Open,
1756 GuestReady {
1758 ty: TableIndex,
1759 flat_abi: Option<FlatAbi>,
1760 options: Options,
1761 address: usize,
1762 count: usize,
1763 handle: u32,
1764 },
1765 HostReady {
1767 accept: Box<dyn FnOnce(Writer) -> Result<ReturnCode> + Send + Sync>,
1768 },
1769 Dropped,
1771}
1772
1773impl fmt::Debug for ReadState {
1774 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1775 match self {
1776 Self::Open => f.debug_tuple("Open").finish(),
1777 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1778 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1779 Self::Dropped => f.debug_tuple("Dropped").finish(),
1780 }
1781 }
1782}
1783
1784enum Writer<'a> {
1788 Guest {
1790 lift: &'a mut LiftContext<'a>,
1791 ty: Option<InterfaceType>,
1792 address: usize,
1793 count: usize,
1794 },
1795 Host {
1797 buffer: &'a mut UntypedWriteBuffer<'a>,
1798 count: usize,
1799 },
1800 End,
1802}
1803
1804enum Reader<'a> {
1808 Guest {
1810 options: &'a Options,
1811 ty: TableIndex,
1812 address: usize,
1813 count: usize,
1814 },
1815 Host {
1817 accept: Box<dyn FnOnce(&mut UntypedWriteBuffer, usize) -> usize + 'a>,
1818 },
1819 End,
1821}
1822
1823impl Instance {
1824 pub fn future<T: func::Lower + func::Lift + Send + Sync + 'static>(
1831 self,
1832 mut store: impl AsContextMut,
1833 default: fn() -> T,
1834 ) -> Result<(FutureWriter<T>, FutureReader<T>)> {
1835 let (write, read) = self
1836 .concurrent_state_mut(store.as_context_mut().0)
1837 .new_transmit()?;
1838
1839 Ok((
1840 FutureWriter::new(default, write, self),
1841 FutureReader::new(read, self),
1842 ))
1843 }
1844
1845 pub fn stream<T: func::Lower + func::Lift + Send + 'static>(
1848 self,
1849 mut store: impl AsContextMut,
1850 ) -> Result<(StreamWriter<T>, StreamReader<T>)> {
1851 let (write, read) = self
1852 .concurrent_state_mut(store.as_context_mut().0)
1853 .new_transmit()?;
1854
1855 Ok((
1856 StreamWriter::new(write, self),
1857 StreamReader::new(read, self),
1858 ))
1859 }
1860
1861 fn host_write<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U>(
1863 self,
1864 mut store: StoreContextMut<U>,
1865 id: TableId<TransmitHandle>,
1866 mut buffer: B,
1867 kind: TransmitKind,
1868 post_write: PostWrite,
1869 ) -> Result<Result<HostResult<B>, oneshot::Receiver<HostResult<B>>>> {
1870 let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
1871 let transmit = self
1872 .concurrent_state_mut(store.0)
1873 .get_mut(transmit_id)
1874 .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?;
1875 log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read);
1876
1877 let new_state = if let ReadState::Dropped = &transmit.read {
1878 ReadState::Dropped
1879 } else {
1880 ReadState::Open
1881 };
1882
1883 if matches!(post_write, PostWrite::Drop) && !matches!(transmit.read, ReadState::Open) {
1884 transmit.write = WriteState::Dropped;
1885 }
1886
1887 Ok(match mem::replace(&mut transmit.read, new_state) {
1888 ReadState::Open => {
1889 assert!(matches!(&transmit.write, WriteState::Open));
1890
1891 let token = StoreToken::new(store.as_context_mut());
1892 let (tx, rx) = oneshot::channel();
1893 let state = WriteState::HostReady {
1894 accept: Box::new(move |store, instance, reader| {
1895 let (result, code) = accept_reader::<T, B, U>(
1896 token.as_context_mut(store),
1897 instance,
1898 reader,
1899 buffer,
1900 kind,
1901 )?;
1902 _ = tx.send(result);
1903 Ok(code)
1904 }),
1905 post_write,
1906 };
1907 self.concurrent_state_mut(store.0)
1908 .get_mut(transmit_id)?
1909 .write = state;
1910
1911 Err(rx)
1912 }
1913
1914 ReadState::GuestReady {
1915 ty,
1916 flat_abi: _,
1917 options,
1918 address,
1919 count,
1920 handle,
1921 ..
1922 } => {
1923 if let TransmitKind::Future = kind {
1924 transmit.done = true;
1925 }
1926
1927 let read_handle = transmit.read_handle;
1928 let accept = move |mut store: StoreContextMut<U>| {
1929 let (result, code) = accept_reader::<T, B, U>(
1930 store.as_context_mut(),
1931 self,
1932 Reader::Guest {
1933 options: &options,
1934 ty,
1935 address,
1936 count,
1937 },
1938 buffer,
1939 kind,
1940 )?;
1941
1942 self.concurrent_state_mut(store.0).set_event(
1943 read_handle.rep(),
1944 match ty {
1945 TableIndex::Future(ty) => Event::FutureRead {
1946 code,
1947 pending: Some((ty, handle)),
1948 },
1949 TableIndex::Stream(ty) => Event::StreamRead {
1950 code,
1951 pending: Some((ty, handle)),
1952 },
1953 },
1954 )?;
1955
1956 anyhow::Ok(result)
1957 };
1958
1959 if T::MAY_REQUIRE_REALLOC {
1960 let (tx, rx) = oneshot::channel();
1965 let token = StoreToken::new(store.as_context_mut());
1966 self.concurrent_state_mut(store.0).push_high_priority(
1967 WorkItem::WorkerFunction(Mutex::new(Box::new(move |store, _| {
1968 _ = tx.send(accept(token.as_context_mut(store))?);
1969 Ok(())
1970 }))),
1971 );
1972 Err(rx)
1973 } else {
1974 Ok(accept(store)?)
1979 }
1980 }
1981
1982 ReadState::HostReady { accept } => {
1983 let count = buffer.remaining().len();
1984 let mut untyped = UntypedWriteBuffer::new(&mut buffer);
1985 let code = accept(Writer::Host {
1986 buffer: &mut untyped,
1987 count,
1988 })?;
1989 let (ReturnCode::Completed(_) | ReturnCode::Dropped(_)) = code else {
1990 unreachable!()
1991 };
1992
1993 Ok(HostResult {
1994 buffer,
1995 dropped: false,
1996 })
1997 }
1998
1999 ReadState::Dropped => Ok(HostResult {
2000 buffer,
2001 dropped: true,
2002 }),
2003 })
2004 }
2005
2006 async fn host_write_async<T: func::Lower + Send + 'static, B: WriteBuffer<T>>(
2008 self,
2009 accessor: impl AsAccessor,
2010 id: TableId<TransmitHandle>,
2011 buffer: B,
2012 kind: TransmitKind,
2013 ) -> Result<HostResult<B>> {
2014 match accessor.as_accessor().with(move |mut access| {
2015 self.host_write(
2016 access.as_context_mut(),
2017 id,
2018 buffer,
2019 kind,
2020 PostWrite::Continue,
2021 )
2022 })? {
2023 Ok(result) => Ok(result),
2024 Err(rx) => Ok(rx.await?),
2025 }
2026 }
2027
2028 fn host_read<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
2030 self,
2031 store: StoreContextMut<U>,
2032 id: TableId<TransmitHandle>,
2033 mut buffer: B,
2034 kind: TransmitKind,
2035 ) -> Result<Result<HostResult<B>, oneshot::Receiver<HostResult<B>>>> {
2036 let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
2037 let transmit = self
2038 .concurrent_state_mut(store.0)
2039 .get_mut(transmit_id)
2040 .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?;
2041 log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write);
2042
2043 let new_state = if let WriteState::Dropped = &transmit.write {
2044 WriteState::Dropped
2045 } else {
2046 WriteState::Open
2047 };
2048
2049 Ok(match mem::replace(&mut transmit.write, new_state) {
2050 WriteState::Open => {
2051 assert!(matches!(&transmit.read, ReadState::Open));
2052
2053 let (tx, rx) = oneshot::channel();
2054 transmit.read = ReadState::HostReady {
2055 accept: Box::new(move |writer| {
2056 let (result, code) = accept_writer::<T, B, U>(writer, buffer, kind)?;
2057 _ = tx.send(result);
2058 Ok(code)
2059 }),
2060 };
2061
2062 Err(rx)
2063 }
2064
2065 WriteState::GuestReady {
2066 ty,
2067 flat_abi: _,
2068 options,
2069 address,
2070 count,
2071 handle,
2072 post_write,
2073 ..
2074 } => {
2075 if let TableIndex::Future(_) = ty {
2076 transmit.done = true;
2077 }
2078
2079 let write_handle = transmit.write_handle;
2080 let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self);
2081 let (result, code) = accept_writer::<T, B, U>(
2082 Writer::Guest {
2083 ty: payload(ty, lift.types),
2084 lift,
2085 address,
2086 count,
2087 },
2088 buffer,
2089 kind,
2090 )?;
2091
2092 let state = self.concurrent_state_mut(store.0);
2093 let pending = if let PostWrite::Drop = post_write {
2094 state.get_mut(transmit_id)?.write = WriteState::Dropped;
2095 false
2096 } else {
2097 true
2098 };
2099
2100 state.set_event(
2101 write_handle.rep(),
2102 match ty {
2103 TableIndex::Future(ty) => Event::FutureWrite {
2104 code,
2105 pending: pending.then_some((ty, handle)),
2106 },
2107 TableIndex::Stream(ty) => Event::StreamWrite {
2108 code,
2109 pending: pending.then_some((ty, handle)),
2110 },
2111 },
2112 )?;
2113
2114 Ok(result)
2115 }
2116
2117 WriteState::HostReady { accept, post_write } => {
2118 accept(
2119 store.0.traitobj_mut(),
2120 self,
2121 Reader::Host {
2122 accept: Box::new(|input, count| {
2123 let count = count.min(buffer.remaining_capacity());
2124 buffer.move_from(input.get_mut::<T>(), count);
2125 count
2126 }),
2127 },
2128 )?;
2129
2130 if let PostWrite::Drop = post_write {
2131 self.concurrent_state_mut(store.0)
2132 .get_mut(transmit_id)?
2133 .write = WriteState::Dropped;
2134 }
2135
2136 Ok(HostResult {
2137 buffer,
2138 dropped: false,
2139 })
2140 }
2141
2142 WriteState::Dropped => Ok(HostResult {
2143 buffer,
2144 dropped: true,
2145 }),
2146 })
2147 }
2148
2149 async fn host_read_async<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
2151 self,
2152 accessor: impl AsAccessor,
2153 id: TableId<TransmitHandle>,
2154 buffer: B,
2155 kind: TransmitKind,
2156 ) -> Result<HostResult<B>> {
2157 match accessor
2158 .as_accessor()
2159 .with(move |mut access| self.host_read(access.as_context_mut(), id, buffer, kind))?
2160 {
2161 Ok(result) => Ok(result),
2162 Err(rx) => Ok(rx.await?),
2163 }
2164 }
2165
2166 fn host_drop_reader(
2168 self,
2169 store: &mut dyn VMStore,
2170 id: TableId<TransmitHandle>,
2171 kind: TransmitKind,
2172 ) -> Result<()> {
2173 let transmit_id = self.concurrent_state_mut(store).get(id)?.state;
2174 let state = self.concurrent_state_mut(store);
2175 let transmit = state
2176 .get_mut(transmit_id)
2177 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2178 log::trace!(
2179 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2180 transmit.read,
2181 transmit.write
2182 );
2183
2184 transmit.read = ReadState::Dropped;
2185 if let Some(waker) = transmit.reader_watcher.take() {
2186 waker.wake();
2187 }
2188
2189 let new_state = if let WriteState::Dropped = &transmit.write {
2192 WriteState::Dropped
2193 } else {
2194 WriteState::Open
2195 };
2196
2197 let write_handle = transmit.write_handle;
2198
2199 match mem::replace(&mut transmit.write, new_state) {
2200 WriteState::GuestReady {
2203 ty,
2204 handle,
2205 post_write,
2206 ..
2207 } => {
2208 if let PostWrite::Drop = post_write {
2209 state.delete_transmit(transmit_id)?;
2210 } else {
2211 state.update_event(
2212 write_handle.rep(),
2213 match ty {
2214 TableIndex::Future(ty) => Event::FutureWrite {
2215 code: ReturnCode::Dropped(0),
2216 pending: Some((ty, handle)),
2217 },
2218 TableIndex::Stream(ty) => Event::StreamWrite {
2219 code: ReturnCode::Dropped(0),
2220 pending: Some((ty, handle)),
2221 },
2222 },
2223 )?;
2224 };
2225 }
2226
2227 WriteState::HostReady { accept, .. } => {
2228 accept(store, self, Reader::End)?;
2229 }
2230
2231 WriteState::Open => {
2232 state.update_event(
2233 write_handle.rep(),
2234 match kind {
2235 TransmitKind::Future => Event::FutureWrite {
2236 code: ReturnCode::Dropped(0),
2237 pending: None,
2238 },
2239 TransmitKind::Stream => Event::StreamWrite {
2240 code: ReturnCode::Dropped(0),
2241 pending: None,
2242 },
2243 },
2244 )?;
2245 }
2246
2247 WriteState::Dropped => {
2248 log::trace!("host_drop_reader delete {transmit_id:?}");
2249 state.delete_transmit(transmit_id)?;
2250 }
2251 }
2252 Ok(())
2253 }
2254
2255 fn host_drop_writer<T: func::Lower + Send + 'static, U>(
2257 self,
2258 mut store: StoreContextMut<U>,
2259 id: TableId<TransmitHandle>,
2260 default: Option<&dyn Fn() -> Result<T>>,
2261 ) -> Result<()> {
2262 let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
2263 let transmit = self
2264 .concurrent_state_mut(store.0)
2265 .get_mut(transmit_id)
2266 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2267 log::trace!(
2268 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2269 transmit.read,
2270 transmit.write
2271 );
2272
2273 if let Some(waker) = transmit.writer_watcher.take() {
2274 waker.wake();
2275 }
2276
2277 match &mut transmit.write {
2279 WriteState::GuestReady { .. } => {
2280 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2281 }
2282 WriteState::HostReady { post_write, .. } => {
2283 *post_write = PostWrite::Drop;
2284 }
2285 v @ WriteState::Open => {
2286 if let (Some(default), false) = (
2287 default,
2288 transmit.done || matches!(transmit.read, ReadState::Dropped),
2289 ) {
2290 _ = self.host_write(
2293 store.as_context_mut(),
2294 id,
2295 Some(default()?),
2296 TransmitKind::Future,
2297 PostWrite::Drop,
2298 )?;
2299 } else {
2300 *v = WriteState::Dropped;
2301 }
2302 }
2303 WriteState::Dropped => unreachable!("write state is already dropped"),
2304 }
2305
2306 let transmit = self.concurrent_state_mut(store.0).get_mut(transmit_id)?;
2307
2308 let new_state = if let ReadState::Dropped = &transmit.read {
2314 ReadState::Dropped
2315 } else {
2316 ReadState::Open
2317 };
2318
2319 let read_handle = transmit.read_handle;
2320
2321 match mem::replace(&mut transmit.read, new_state) {
2323 ReadState::GuestReady { ty, handle, .. } => {
2327 self.concurrent_state_mut(store.0).update_event(
2329 read_handle.rep(),
2330 match ty {
2331 TableIndex::Future(ty) => Event::FutureRead {
2332 code: ReturnCode::Dropped(0),
2333 pending: Some((ty, handle)),
2334 },
2335 TableIndex::Stream(ty) => Event::StreamRead {
2336 code: ReturnCode::Dropped(0),
2337 pending: Some((ty, handle)),
2338 },
2339 },
2340 )?;
2341 }
2342
2343 ReadState::HostReady { accept } => {
2346 accept(Writer::End)?;
2347 }
2348
2349 ReadState::Open => {
2351 self.concurrent_state_mut(store.0).update_event(
2352 read_handle.rep(),
2353 match default {
2354 Some(_) => Event::FutureRead {
2355 code: ReturnCode::Dropped(0),
2356 pending: None,
2357 },
2358 None => Event::StreamRead {
2359 code: ReturnCode::Dropped(0),
2360 pending: None,
2361 },
2362 },
2363 )?;
2364 }
2365
2366 ReadState::Dropped => {
2369 log::trace!("host_drop_writer delete {transmit_id:?}");
2370 self.concurrent_state_mut(store.0)
2371 .delete_transmit(transmit_id)?;
2372 }
2373 }
2374 Ok(())
2375 }
2376
2377 pub(super) fn guest_drop_writable<T>(
2379 self,
2380 store: StoreContextMut<T>,
2381 ty: TableIndex,
2382 writer: u32,
2383 ) -> Result<()> {
2384 let (transmit_rep, state) = self
2385 .concurrent_state_mut(store.0)
2386 .state_table(ty)
2387 .remove_by_index(writer)
2388 .context("failed to find writer")?;
2389 let (state, kind) = match state {
2390 WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
2391 WaitableState::Future(_, state) => (state, TransmitKind::Future),
2392 _ => {
2393 bail!("invalid stream or future handle");
2394 }
2395 };
2396 match state {
2397 StreamFutureState::Write { .. } => {}
2398 StreamFutureState::Read { .. } => {
2399 bail!("passed read end to `{{stream|future}}.drop-writable`")
2400 }
2401 StreamFutureState::Busy => bail!("cannot drop busy stream or future"),
2402 }
2403
2404 let id = TableId::<TransmitHandle>::new(transmit_rep);
2405 log::trace!("guest_drop_writable: drop writer {id:?}");
2406 match kind {
2407 TransmitKind::Stream => {
2408 self.host_drop_writer(store, id, None::<&dyn Fn() -> Result<()>>)
2409 }
2410 TransmitKind::Future => self.host_drop_writer(
2411 store,
2412 id,
2413 Some(&|| {
2414 Err::<(), _>(anyhow!(
2415 "cannot drop future write end without first writing a value"
2416 ))
2417 }),
2418 ),
2419 }
2420 }
2421
2422 fn copy<T: 'static>(
2425 self,
2426 mut store: StoreContextMut<T>,
2427 flat_abi: Option<FlatAbi>,
2428 write_ty: TableIndex,
2429 write_options: &Options,
2430 write_address: usize,
2431 read_ty: TableIndex,
2432 read_options: &Options,
2433 read_address: usize,
2434 count: usize,
2435 rep: u32,
2436 ) -> Result<()> {
2437 let types = self.id().get(store.0).component().types().clone();
2438 match (write_ty, read_ty) {
2439 (TableIndex::Future(write_ty), TableIndex::Future(read_ty)) => {
2440 assert_eq!(count, 1);
2441
2442 let val = types[types[write_ty].ty]
2443 .payload
2444 .map(|ty| {
2445 let abi = types.canonical_abi(&ty);
2446 if write_address % usize::try_from(abi.align32)? != 0 {
2448 bail!("write pointer not aligned");
2449 }
2450
2451 let lift =
2452 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2453 let bytes = lift
2454 .memory()
2455 .get(write_address..)
2456 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2457 .ok_or_else(|| {
2458 anyhow::anyhow!("write pointer out of bounds of memory")
2459 })?;
2460
2461 Val::load(lift, ty, bytes)
2462 })
2463 .transpose()?;
2464
2465 if let Some(val) = val {
2466 let lower =
2467 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2468 let ty = types[types[read_ty].ty].payload.unwrap();
2469 let ptr = func::validate_inbounds_dynamic(
2470 types.canonical_abi(&ty),
2471 lower.as_slice_mut(),
2472 &ValRaw::u32(read_address.try_into().unwrap()),
2473 )?;
2474 val.store(lower, ty, ptr)?;
2475 }
2476 }
2477 (TableIndex::Stream(write_ty), TableIndex::Stream(read_ty)) => {
2478 if let Some(flat_abi) = flat_abi {
2479 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2481 if length_in_bytes > 0 {
2482 if write_address % usize::try_from(flat_abi.align)? != 0 {
2483 bail!("write pointer not aligned");
2484 }
2485 if read_address % usize::try_from(flat_abi.align)? != 0 {
2486 bail!("read pointer not aligned");
2487 }
2488
2489 let store_opaque = store.0.store_opaque_mut();
2490
2491 {
2492 let src = write_options
2493 .memory(store_opaque)
2494 .get(write_address..)
2495 .and_then(|b| b.get(..length_in_bytes))
2496 .ok_or_else(|| {
2497 anyhow::anyhow!("write pointer out of bounds of memory")
2498 })?
2499 .as_ptr();
2500 let dst = read_options
2501 .memory_mut(store_opaque)
2502 .get_mut(read_address..)
2503 .and_then(|b| b.get_mut(..length_in_bytes))
2504 .ok_or_else(|| {
2505 anyhow::anyhow!("read pointer out of bounds of memory")
2506 })?
2507 .as_mut_ptr();
2508 unsafe { src.copy_to(dst, length_in_bytes) };
2511 }
2512 }
2513 } else {
2514 let store_opaque = store.0.store_opaque_mut();
2515 let lift = &mut LiftContext::new(store_opaque, write_options, self);
2516 let ty = types[types[write_ty].ty].payload.unwrap();
2517 let abi = lift.types.canonical_abi(&ty);
2518 let size = usize::try_from(abi.size32).unwrap();
2519 if write_address % usize::try_from(abi.align32)? != 0 {
2520 bail!("write pointer not aligned");
2521 }
2522 let bytes = lift
2523 .memory()
2524 .get(write_address..)
2525 .and_then(|b| b.get(..size * count))
2526 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2527
2528 let values = (0..count)
2529 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2530 .collect::<Result<Vec<_>>>()?;
2531
2532 let id = TableId::<TransmitHandle>::new(rep);
2533 log::trace!("copy values {values:?} for {id:?}");
2534
2535 let lower =
2536 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2537 let ty = types[types[read_ty].ty].payload.unwrap();
2538 let abi = lower.types.canonical_abi(&ty);
2539 if read_address % usize::try_from(abi.align32)? != 0 {
2540 bail!("read pointer not aligned");
2541 }
2542 let size = usize::try_from(abi.size32).unwrap();
2543 lower
2544 .as_slice_mut()
2545 .get_mut(read_address..)
2546 .and_then(|b| b.get_mut(..size * count))
2547 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2548 let mut ptr = read_address;
2549 for value in values {
2550 value.store(lower, ty, ptr)?;
2551 ptr += size
2552 }
2553 }
2554 }
2555 _ => unreachable!(),
2556 }
2557
2558 Ok(())
2559 }
2560
2561 pub(super) fn guest_write<T: 'static>(
2563 self,
2564 mut store: StoreContextMut<T>,
2565 ty: TableIndex,
2566 options: OptionsIndex,
2567 flat_abi: Option<FlatAbi>,
2568 handle: u32,
2569 address: u32,
2570 count: u32,
2571 ) -> Result<ReturnCode> {
2572 let address = usize::try_from(address).unwrap();
2573 let count = usize::try_from(count).unwrap();
2574 let options = Options::new_index(store.0, self, options);
2575 if !options.async_() {
2576 bail!("synchronous stream and future writes not yet supported");
2577 }
2578 let concurrent_state = self.concurrent_state_mut(store.0);
2579 let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?;
2580 let StreamFutureState::Write { done } = *state else {
2581 bail!(
2582 "invalid handle {handle}; expected `Write`; got {:?}",
2583 *state
2584 );
2585 };
2586
2587 if done {
2588 bail!("cannot write to stream after being notified that the readable end dropped");
2589 }
2590
2591 *state = StreamFutureState::Busy;
2592 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2593 let transmit_id = concurrent_state.get(transmit_handle)?.state;
2594 let transmit = concurrent_state.get_mut(transmit_id)?;
2595 log::trace!(
2596 "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2597 transmit.read
2598 );
2599
2600 if transmit.done {
2601 bail!("cannot write to future after previous write succeeded or readable end dropped");
2602 }
2603
2604 let new_state = if let ReadState::Dropped = &transmit.read {
2605 ReadState::Dropped
2606 } else {
2607 ReadState::Open
2608 };
2609
2610 let set_guest_ready = |me: &mut ConcurrentState| {
2611 let transmit = me.get_mut(transmit_id)?;
2612 assert!(matches!(&transmit.write, WriteState::Open));
2613 transmit.write = WriteState::GuestReady {
2614 ty,
2615 flat_abi,
2616 options,
2617 address,
2618 count,
2619 handle,
2620 post_write: PostWrite::Continue,
2621 };
2622 Ok::<_, crate::Error>(())
2623 };
2624
2625 let result = match mem::replace(&mut transmit.read, new_state) {
2626 ReadState::GuestReady {
2627 ty: read_ty,
2628 flat_abi: read_flat_abi,
2629 options: read_options,
2630 address: read_address,
2631 count: read_count,
2632 handle: read_handle,
2633 } => {
2634 assert_eq!(flat_abi, read_flat_abi);
2635
2636 if let TableIndex::Future(_) = ty {
2637 transmit.done = true;
2638 }
2639
2640 let write_complete = count == 0 || read_count > 0;
2662 let read_complete = count > 0;
2663 let read_buffer_remaining = count < read_count;
2664
2665 let read_handle_rep = transmit.read_handle.rep();
2666
2667 let count = count.min(read_count);
2668
2669 self.copy(
2670 store.as_context_mut(),
2671 flat_abi,
2672 ty,
2673 &options,
2674 address,
2675 read_ty,
2676 &read_options,
2677 read_address,
2678 count,
2679 rep,
2680 )?;
2681
2682 let instance = self.id().get_mut(store.0);
2683 let types = instance.component().types();
2684 let item_size = payload(ty, types)
2685 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2686 .unwrap_or(0);
2687 let concurrent_state = instance.concurrent_state_mut();
2688 if read_complete {
2689 let count = u32::try_from(count).unwrap();
2690 let total = if let Some(Event::StreamRead {
2691 code: ReturnCode::Completed(old_total),
2692 ..
2693 }) = concurrent_state.take_event(read_handle_rep)?
2694 {
2695 count + old_total
2696 } else {
2697 count
2698 };
2699
2700 let code = ReturnCode::completed(ty.kind(), total);
2701
2702 concurrent_state.set_event(
2703 read_handle_rep,
2704 match read_ty {
2705 TableIndex::Future(ty) => Event::FutureRead {
2706 code,
2707 pending: Some((ty, read_handle)),
2708 },
2709 TableIndex::Stream(ty) => Event::StreamRead {
2710 code,
2711 pending: Some((ty, read_handle)),
2712 },
2713 },
2714 )?;
2715 }
2716
2717 if read_buffer_remaining {
2718 let transmit = concurrent_state.get_mut(transmit_id)?;
2719 transmit.read = ReadState::GuestReady {
2720 ty: read_ty,
2721 flat_abi: read_flat_abi,
2722 options: read_options,
2723 address: read_address + (count * item_size),
2724 count: read_count - count,
2725 handle: read_handle,
2726 };
2727 }
2728
2729 if write_complete {
2730 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2731 } else {
2732 set_guest_ready(concurrent_state)?;
2733 ReturnCode::Blocked
2734 }
2735 }
2736
2737 ReadState::HostReady { accept } => {
2738 if let TableIndex::Future(_) = ty {
2739 transmit.done = true;
2740 }
2741
2742 let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self);
2743 accept(Writer::Guest {
2744 ty: payload(ty, lift.types),
2745 lift,
2746 address,
2747 count,
2748 })?
2749 }
2750
2751 ReadState::Open => {
2752 set_guest_ready(concurrent_state)?;
2753 ReturnCode::Blocked
2754 }
2755
2756 ReadState::Dropped => {
2757 if let TableIndex::Future(_) = ty {
2758 transmit.done = true;
2759 }
2760
2761 ReturnCode::Dropped(0)
2762 }
2763 };
2764
2765 if result != ReturnCode::Blocked {
2766 let state = self.concurrent_state_mut(store.0);
2767 *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write {
2768 done: matches!(
2769 (result, ty),
2770 (ReturnCode::Dropped(_), TableIndex::Stream(_))
2771 ),
2772 };
2773 }
2774
2775 Ok(result)
2776 }
2777
2778 pub(super) fn guest_read<T: 'static>(
2780 self,
2781 mut store: StoreContextMut<T>,
2782 ty: TableIndex,
2783 options: OptionsIndex,
2784 flat_abi: Option<FlatAbi>,
2785 handle: u32,
2786 address: u32,
2787 count: u32,
2788 ) -> Result<ReturnCode> {
2789 let address = usize::try_from(address).unwrap();
2790 let options = Options::new_index(store.0, self, options);
2791 if !options.async_() {
2792 bail!("synchronous stream and future reads not yet supported");
2793 }
2794 let concurrent_state = self.concurrent_state_mut(store.0);
2795 let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?;
2796 let StreamFutureState::Read { done } = *state else {
2797 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
2798 };
2799
2800 if done {
2801 bail!("cannot read from stream after being notified that the writable end dropped");
2802 }
2803
2804 *state = StreamFutureState::Busy;
2805 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2806 let transmit_id = concurrent_state.get(transmit_handle)?.state;
2807 let transmit = concurrent_state.get_mut(transmit_id)?;
2808 log::trace!(
2809 "guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2810 transmit.write
2811 );
2812
2813 if transmit.done {
2814 bail!("cannot read from future after previous read succeeded");
2815 }
2816
2817 let new_state = if let WriteState::Dropped = &transmit.write {
2818 WriteState::Dropped
2819 } else {
2820 WriteState::Open
2821 };
2822
2823 let set_guest_ready = |me: &mut ConcurrentState| {
2824 let transmit = me.get_mut(transmit_id)?;
2825 assert!(matches!(&transmit.read, ReadState::Open));
2826 transmit.read = ReadState::GuestReady {
2827 ty,
2828 flat_abi,
2829 options,
2830 address,
2831 count: usize::try_from(count).unwrap(),
2832 handle,
2833 };
2834 Ok::<_, crate::Error>(())
2835 };
2836
2837 let result = match mem::replace(&mut transmit.write, new_state) {
2838 WriteState::GuestReady {
2839 ty: write_ty,
2840 flat_abi: write_flat_abi,
2841 options: write_options,
2842 address: write_address,
2843 count: write_count,
2844 handle: write_handle,
2845 post_write,
2846 } => {
2847 assert_eq!(flat_abi, write_flat_abi);
2848
2849 if let TableIndex::Future(_) = ty {
2850 transmit.done = true;
2851 }
2852
2853 let write_handle_rep = transmit.write_handle.rep();
2854
2855 let count = usize::try_from(count).unwrap();
2860
2861 let write_complete = write_count == 0 || count > 0;
2862 let read_complete = write_count > 0;
2863 let write_buffer_remaining = count < write_count;
2864
2865 let count = count.min(write_count);
2866
2867 self.copy(
2868 store.as_context_mut(),
2869 flat_abi,
2870 write_ty,
2871 &write_options,
2872 write_address,
2873 ty,
2874 &options,
2875 address,
2876 count,
2877 rep,
2878 )?;
2879
2880 let instance = self.id().get_mut(store.0);
2881 let types = instance.component().types();
2882 let item_size = payload(ty, types)
2883 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2884 .unwrap_or(0);
2885 let concurrent_state = instance.concurrent_state_mut();
2886 let pending = if let PostWrite::Drop = post_write {
2887 concurrent_state.get_mut(transmit_id)?.write = WriteState::Dropped;
2888 false
2889 } else {
2890 true
2891 };
2892
2893 if write_complete {
2894 let count = u32::try_from(count).unwrap();
2895 let total = if let Some(Event::StreamWrite {
2896 code: ReturnCode::Completed(old_total),
2897 ..
2898 }) = concurrent_state.take_event(write_handle_rep)?
2899 {
2900 count + old_total
2901 } else {
2902 count
2903 };
2904
2905 let code = ReturnCode::completed(ty.kind(), total);
2906
2907 concurrent_state.set_event(
2908 write_handle_rep,
2909 match write_ty {
2910 TableIndex::Future(ty) => Event::FutureWrite {
2911 code,
2912 pending: pending.then_some((ty, write_handle)),
2913 },
2914 TableIndex::Stream(ty) => Event::StreamWrite {
2915 code,
2916 pending: pending.then_some((ty, write_handle)),
2917 },
2918 },
2919 )?;
2920 }
2921
2922 if write_buffer_remaining {
2923 let transmit = concurrent_state.get_mut(transmit_id)?;
2924 transmit.write = WriteState::GuestReady {
2925 ty: write_ty,
2926 flat_abi: write_flat_abi,
2927 options: write_options,
2928 address: write_address + (count * item_size),
2929 count: write_count - count,
2930 handle: write_handle,
2931 post_write,
2932 };
2933 }
2934
2935 if read_complete {
2936 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2937 } else {
2938 set_guest_ready(concurrent_state)?;
2939 ReturnCode::Blocked
2940 }
2941 }
2942
2943 WriteState::HostReady { accept, post_write } => {
2944 if let TableIndex::Future(_) = ty {
2945 transmit.done = true;
2946 }
2947
2948 let code = accept(
2949 store.0.traitobj_mut(),
2950 self,
2951 Reader::Guest {
2952 options: &options,
2953 ty,
2954 address,
2955 count: count.try_into().unwrap(),
2956 },
2957 )?;
2958
2959 if let PostWrite::Drop = post_write {
2960 self.concurrent_state_mut(store.0)
2961 .get_mut(transmit_id)?
2962 .write = WriteState::Dropped;
2963 }
2964
2965 code
2966 }
2967
2968 WriteState::Open => {
2969 set_guest_ready(concurrent_state)?;
2970 ReturnCode::Blocked
2971 }
2972
2973 WriteState::Dropped => ReturnCode::Dropped(0),
2974 };
2975
2976 if result != ReturnCode::Blocked {
2977 let state = self.concurrent_state_mut(store.0);
2978 *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read {
2979 done: matches!(
2980 (result, ty),
2981 (ReturnCode::Dropped(_), TableIndex::Stream(_))
2982 ),
2983 };
2984 }
2985
2986 Ok(result)
2987 }
2988
2989 fn guest_drop_readable(
2991 self,
2992 store: &mut dyn VMStore,
2993 ty: TableIndex,
2994 reader: u32,
2995 ) -> Result<()> {
2996 let concurrent_state = self.concurrent_state_mut(store);
2997 let (rep, state) = concurrent_state.state_table(ty).remove_by_index(reader)?;
2998 let (state, kind) = match state {
2999 WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
3000 WaitableState::Future(_, state) => (state, TransmitKind::Future),
3001 _ => {
3002 bail!("invalid stream or future handle");
3003 }
3004 };
3005 match state {
3006 StreamFutureState::Read { .. } => {}
3007 StreamFutureState::Write { .. } => {
3008 bail!("passed write end to `{{stream|future}}.drop-readable`")
3009 }
3010 StreamFutureState::Busy => bail!("cannot drop busy stream or future"),
3011 }
3012 let id = TableId::<TransmitHandle>::new(rep);
3013 log::trace!("guest_drop_readable: drop reader {id:?}");
3014 self.host_drop_reader(store, id, kind)
3015 }
3016
3017 pub(crate) fn error_context_new(
3019 self,
3020 store: &mut StoreOpaque,
3021 ty: TypeComponentLocalErrorContextTableIndex,
3022 options: OptionsIndex,
3023 debug_msg_address: u32,
3024 debug_msg_len: u32,
3025 ) -> Result<u32> {
3026 let options = Options::new_index(store, self, options);
3027 let lift_ctx = &mut LiftContext::new(store, &options, self);
3028 let address = usize::try_from(debug_msg_address)?;
3030 let len = usize::try_from(debug_msg_len)?;
3031 lift_ctx
3032 .memory()
3033 .get(address..)
3034 .and_then(|b| b.get(..len))
3035 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3036 let message = WasmStr::new(address, len, lift_ctx)?;
3037
3038 let err_ctx = ErrorContextState {
3040 debug_msg: message
3041 .to_str_from_memory(options.memory(store))?
3042 .to_string(),
3043 };
3044 let state = self.concurrent_state_mut(store);
3045 let table_id = state.push(err_ctx)?;
3046 let global_ref_count_idx =
3047 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3048
3049 let _ = state
3051 .global_error_context_ref_counts
3052 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3053
3054 let local_tbl = &mut state.error_context_tables[ty];
3061
3062 assert!(
3063 !local_tbl.has_handle(table_id.rep()),
3064 "newly created error context state already tracked by component"
3065 );
3066 let local_idx = local_tbl.insert(table_id.rep(), LocalErrorContextRefCount(1))?;
3067
3068 Ok(local_idx)
3069 }
3070
3071 pub(super) fn error_context_debug_message<T>(
3073 self,
3074 store: StoreContextMut<T>,
3075 ty: TypeComponentLocalErrorContextTableIndex,
3076 options: OptionsIndex,
3077 err_ctx_handle: u32,
3078 debug_msg_address: u32,
3079 ) -> Result<()> {
3080 let state = self.concurrent_state_mut(store.0);
3082 let (state_table_id_rep, _) = state
3083 .error_context_tables
3084 .get_mut(ty)
3085 .context("error context table index present in (sub)component lookup during debug_msg")?
3086 .get_mut_by_index(err_ctx_handle)?;
3087
3088 let ErrorContextState { debug_msg } =
3090 state.get_mut(TableId::<ErrorContextState>::new(state_table_id_rep))?;
3091 let debug_msg = debug_msg.clone();
3092
3093 let options = Options::new_index(store.0, self, options);
3094 let types = self.id().get(store.0).component().types().clone();
3095 let lower_cx = &mut LowerContext::new(store, &options, &types, self);
3096 let debug_msg_address = usize::try_from(debug_msg_address)?;
3097 let offset = lower_cx
3099 .as_slice_mut()
3100 .get(debug_msg_address..)
3101 .and_then(|b| b.get(..debug_msg.bytes().len()))
3102 .map(|_| debug_msg_address)
3103 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3104 debug_msg
3105 .as_str()
3106 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3107
3108 Ok(())
3109 }
3110
3111 pub(crate) fn future_drop_readable(
3113 self,
3114 store: &mut dyn VMStore,
3115 ty: TypeFutureTableIndex,
3116 reader: u32,
3117 ) -> Result<()> {
3118 self.guest_drop_readable(store, TableIndex::Future(ty), reader)
3119 }
3120
3121 pub(crate) fn stream_drop_readable(
3123 self,
3124 store: &mut dyn VMStore,
3125 ty: TypeStreamTableIndex,
3126 reader: u32,
3127 ) -> Result<()> {
3128 self.guest_drop_readable(store, TableIndex::Stream(ty), reader)
3129 }
3130}
3131
3132impl ConcurrentState {
3133 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
3134 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
3135 }
3136
3137 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3138 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
3139 }
3140
3141 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3152 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
3153
3154 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
3155 let (ReturnCode::Completed(count)
3156 | ReturnCode::Dropped(count)
3157 | ReturnCode::Cancelled(count)) = old
3158 else {
3159 unreachable!()
3160 };
3161
3162 match new {
3163 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
3164 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
3165 _ => unreachable!(),
3166 }
3167 }
3168
3169 let event = match (waitable.take_event(self)?, event) {
3170 (None, _) => event,
3171 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
3172 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
3173 (
3174 Some(Event::StreamWrite {
3175 code: old_code,
3176 pending: old_pending,
3177 }),
3178 Event::StreamWrite { code, pending },
3179 ) => Event::StreamWrite {
3180 code: update_code(old_code, code),
3181 pending: old_pending.or(pending),
3182 },
3183 (
3184 Some(Event::StreamRead {
3185 code: old_code,
3186 pending: old_pending,
3187 }),
3188 Event::StreamRead { code, pending },
3189 ) => Event::StreamRead {
3190 code: update_code(old_code, code),
3191 pending: old_pending.or(pending),
3192 },
3193 _ => unreachable!(),
3194 };
3195
3196 waitable.set_event(self, Some(event))
3197 }
3198
3199 fn get_mut_by_index(
3200 &mut self,
3201 ty: TableIndex,
3202 index: u32,
3203 ) -> Result<(u32, &mut StreamFutureState)> {
3204 get_mut_by_index_from(self.state_table(ty), ty, index)
3205 }
3206
3207 fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
3210 let state_id = self.push(TransmitState::default())?;
3211
3212 let write = self.push(TransmitHandle::new(state_id))?;
3213 let read = self.push(TransmitHandle::new(state_id))?;
3214
3215 let state = self.get_mut(state_id)?;
3216 state.write_handle = write;
3217 state.read_handle = read;
3218
3219 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
3220
3221 Ok((write, read))
3222 }
3223
3224 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
3226 let state = self.delete(state_id)?;
3227 self.delete(state.write_handle)?;
3228 self.delete(state.read_handle)?;
3229
3230 log::trace!(
3231 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
3232 state.write_handle,
3233 state.read_handle,
3234 );
3235
3236 Ok(())
3237 }
3238
3239 fn state_table(&mut self, ty: TableIndex) -> &mut StateTable<WaitableState> {
3240 let runtime_instance = match ty {
3241 TableIndex::Stream(ty) => self.component.types()[ty].instance,
3242 TableIndex::Future(ty) => self.component.types()[ty].instance,
3243 };
3244 &mut self.waitable_tables[runtime_instance]
3245 }
3246
3247 fn guest_new(&mut self, ty: TableIndex) -> Result<ResourcePair> {
3251 let (write, read) = self.new_transmit()?;
3252 let read = self.state_table(ty).insert(
3253 read.rep(),
3254 waitable_state(ty, StreamFutureState::Read { done: false }),
3255 )?;
3256 let write = self.state_table(ty).insert(
3257 write.rep(),
3258 waitable_state(ty, StreamFutureState::Write { done: false }),
3259 )?;
3260 Ok(ResourcePair { write, read })
3261 }
3262
3263 fn host_cancel_write(&mut self, rep: u32) -> Result<ReturnCode> {
3269 let transmit_id = TableId::<TransmitState>::new(rep);
3270 let transmit = self.get_mut(transmit_id)?;
3271 log::trace!(
3272 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3273 transmit.read,
3274 transmit.write
3275 );
3276
3277 let code = if let Some(event) =
3278 Waitable::Transmit(transmit.write_handle).take_event(self)?
3279 {
3280 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3281 unreachable!();
3282 };
3283 match (code, event) {
3284 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3285 ReturnCode::Cancelled(count)
3286 }
3287 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3288 _ => unreachable!(),
3289 }
3290 } else {
3291 ReturnCode::Cancelled(0)
3292 };
3293
3294 let transmit = self.get_mut(transmit_id)?;
3295
3296 match &transmit.write {
3297 WriteState::GuestReady { .. } | WriteState::HostReady { .. } => {
3298 transmit.write = WriteState::Open;
3299 }
3300
3301 WriteState::Open | WriteState::Dropped => {}
3302 }
3303
3304 log::trace!("cancelled write {transmit_id:?}");
3305
3306 Ok(code)
3307 }
3308
3309 fn host_cancel_read(&mut self, rep: u32) -> Result<ReturnCode> {
3315 let transmit_id = TableId::<TransmitState>::new(rep);
3316 let transmit = self.get_mut(transmit_id)?;
3317 log::trace!(
3318 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3319 transmit.read,
3320 transmit.write
3321 );
3322
3323 let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
3324 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3325 unreachable!();
3326 };
3327 match (code, event) {
3328 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3329 ReturnCode::Cancelled(count)
3330 }
3331 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3332 _ => unreachable!(),
3333 }
3334 } else {
3335 ReturnCode::Cancelled(0)
3336 };
3337
3338 let transmit = self.get_mut(transmit_id)?;
3339
3340 match &transmit.read {
3341 ReadState::GuestReady { .. } | ReadState::HostReady { .. } => {
3342 transmit.read = ReadState::Open;
3343 }
3344
3345 ReadState::Open | ReadState::Dropped => {}
3346 }
3347
3348 log::trace!("cancelled read {transmit_id:?}");
3349
3350 Ok(code)
3351 }
3352
3353 fn guest_cancel_write(
3355 &mut self,
3356 ty: TableIndex,
3357 writer: u32,
3358 _async_: bool,
3359 ) -> Result<ReturnCode> {
3360 let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3361 self.state_table(ty).get_mut_by_index(writer)?
3362 else {
3363 bail!("invalid stream or future handle");
3364 };
3365 let id = TableId::<TransmitHandle>::new(rep);
3366 log::trace!("guest cancel write {id:?} (handle {writer})");
3367 match state {
3368 StreamFutureState::Write { .. } => {
3369 bail!("stream or future write cancelled when no write is pending")
3370 }
3371 StreamFutureState::Read { .. } => {
3372 bail!("passed read end to `{{stream|future}}.cancel-write`")
3373 }
3374 StreamFutureState::Busy => {
3375 *state = StreamFutureState::Write { done: false };
3376 }
3377 }
3378 let rep = self.get(id)?.state.rep();
3379 self.host_cancel_write(rep)
3380 }
3381
3382 fn guest_cancel_read(
3384 &mut self,
3385 ty: TableIndex,
3386 reader: u32,
3387 _async_: bool,
3388 ) -> Result<ReturnCode> {
3389 let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3390 self.state_table(ty).get_mut_by_index(reader)?
3391 else {
3392 bail!("invalid stream or future handle");
3393 };
3394 let id = TableId::<TransmitHandle>::new(rep);
3395 log::trace!("guest cancel read {id:?} (handle {reader})");
3396 match state {
3397 StreamFutureState::Read { .. } => {
3398 bail!("stream or future read cancelled when no read is pending")
3399 }
3400 StreamFutureState::Write { .. } => {
3401 bail!("passed write end to `{{stream|future}}.cancel-read`")
3402 }
3403 StreamFutureState::Busy => {
3404 *state = StreamFutureState::Read { done: false };
3405 }
3406 }
3407 let rep = self.get(id)?.state.rep();
3408 self.host_cancel_read(rep)
3409 }
3410
3411 pub(crate) fn error_context_drop(
3413 &mut self,
3414 ty: TypeComponentLocalErrorContextTableIndex,
3415 error_context: u32,
3416 ) -> Result<()> {
3417 let local_state_table = self
3418 .error_context_tables
3419 .get_mut(ty)
3420 .context("error context table index present in (sub)component table during drop")?;
3421
3422 let (rep, local_ref_removed) = {
3424 let (rep, LocalErrorContextRefCount(local_ref_count)) =
3425 local_state_table.get_mut_by_index(error_context)?;
3426 assert!(*local_ref_count > 0);
3427 *local_ref_count -= 1;
3428 let mut local_ref_removed = false;
3429 if *local_ref_count == 0 {
3430 local_ref_removed = true;
3431 local_state_table
3432 .remove_by_index(error_context)
3433 .context("removing error context from component-local tracking")?;
3434 }
3435 (rep, local_ref_removed)
3436 };
3437
3438 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3439
3440 let GlobalErrorContextRefCount(global_ref_count) = self
3441 .global_error_context_ref_counts
3442 .get_mut(&global_ref_count_idx)
3443 .expect("retrieve concurrent state for error context during drop");
3444
3445 assert!(*global_ref_count >= 1);
3447 *global_ref_count -= 1;
3448 if *global_ref_count == 0 {
3449 assert!(local_ref_removed);
3450
3451 self.global_error_context_ref_counts
3452 .remove(&global_ref_count_idx);
3453
3454 self.delete(TableId::<ErrorContextState>::new(rep))
3455 .context("deleting component-global error context data")?;
3456 }
3457
3458 Ok(())
3459 }
3460
3461 fn guest_transfer<U: PartialEq + Eq + std::fmt::Debug>(
3464 &mut self,
3465 src_idx: u32,
3466 src: U,
3467 src_instance: RuntimeComponentInstanceIndex,
3468 dst: U,
3469 dst_instance: RuntimeComponentInstanceIndex,
3470 match_state: impl Fn(&mut WaitableState) -> Result<(U, &mut StreamFutureState)>,
3471 make_state: impl Fn(U, StreamFutureState) -> WaitableState,
3472 ) -> Result<u32> {
3473 let src_table = &mut self.waitable_tables[src_instance];
3474 let (_rep, src_state) = src_table.get_mut_by_index(src_idx)?;
3475 let (src_ty, _) = match_state(src_state)?;
3476 if src_ty != src {
3477 bail!("invalid future handle");
3478 }
3479
3480 let src_table = &mut self.waitable_tables[src_instance];
3481 let (rep, src_state) = src_table.get_mut_by_index(src_idx)?;
3482 let (_, src_state) = match_state(src_state)?;
3483
3484 match src_state {
3485 StreamFutureState::Read { done: true } => {
3486 bail!("cannot lift stream after being notified that the writable end dropped")
3487 }
3488 StreamFutureState::Read { done: false } => {
3489 src_table.remove_by_index(src_idx)?;
3490
3491 let dst_table = &mut self.waitable_tables[dst_instance];
3492 dst_table.insert(
3493 rep,
3494 make_state(dst, StreamFutureState::Read { done: false }),
3495 )
3496 }
3497 StreamFutureState::Write { .. } => {
3498 bail!("cannot transfer write end of stream or future")
3499 }
3500 StreamFutureState::Busy => bail!("cannot transfer busy stream or future"),
3501 }
3502 }
3503
3504 pub(crate) fn future_new(&mut self, ty: TypeFutureTableIndex) -> Result<ResourcePair> {
3506 self.guest_new(TableIndex::Future(ty))
3507 }
3508
3509 pub(crate) fn future_cancel_write(
3511 &mut self,
3512 ty: TypeFutureTableIndex,
3513 async_: bool,
3514 writer: u32,
3515 ) -> Result<u32> {
3516 self.guest_cancel_write(TableIndex::Future(ty), writer, async_)
3517 .map(|result| result.encode())
3518 }
3519
3520 pub(crate) fn future_cancel_read(
3522 &mut self,
3523 ty: TypeFutureTableIndex,
3524 async_: bool,
3525 reader: u32,
3526 ) -> Result<u32> {
3527 self.guest_cancel_read(TableIndex::Future(ty), reader, async_)
3528 .map(|result| result.encode())
3529 }
3530
3531 pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result<ResourcePair> {
3533 self.guest_new(TableIndex::Stream(ty))
3534 }
3535
3536 pub(crate) fn stream_cancel_write(
3538 &mut self,
3539 ty: TypeStreamTableIndex,
3540 async_: bool,
3541 writer: u32,
3542 ) -> Result<u32> {
3543 self.guest_cancel_write(TableIndex::Stream(ty), writer, async_)
3544 .map(|result| result.encode())
3545 }
3546
3547 pub(crate) fn stream_cancel_read(
3549 &mut self,
3550 ty: TypeStreamTableIndex,
3551 async_: bool,
3552 reader: u32,
3553 ) -> Result<u32> {
3554 self.guest_cancel_read(TableIndex::Stream(ty), reader, async_)
3555 .map(|result| result.encode())
3556 }
3557
3558 pub(crate) fn future_transfer(
3561 &mut self,
3562 src_idx: u32,
3563 src: TypeFutureTableIndex,
3564 dst: TypeFutureTableIndex,
3565 ) -> Result<u32> {
3566 self.guest_transfer(
3567 src_idx,
3568 src,
3569 self.component.types()[src].instance,
3570 dst,
3571 self.component.types()[dst].instance,
3572 |state| {
3573 if let WaitableState::Future(ty, state) = state {
3574 Ok((*ty, state))
3575 } else {
3576 Err(anyhow!("invalid future handle"))
3577 }
3578 },
3579 WaitableState::Future,
3580 )
3581 }
3582
3583 pub(crate) fn stream_transfer(
3586 &mut self,
3587 src_idx: u32,
3588 src: TypeStreamTableIndex,
3589 dst: TypeStreamTableIndex,
3590 ) -> Result<u32> {
3591 self.guest_transfer(
3592 src_idx,
3593 src,
3594 self.component.types()[src].instance,
3595 dst,
3596 self.component.types()[dst].instance,
3597 |state| {
3598 if let WaitableState::Stream(ty, state) = state {
3599 Ok((*ty, state))
3600 } else {
3601 Err(anyhow!("invalid stream handle"))
3602 }
3603 },
3604 WaitableState::Stream,
3605 )
3606 }
3607
3608 pub(crate) fn error_context_transfer(
3610 &mut self,
3611 src_idx: u32,
3612 src: TypeComponentLocalErrorContextTableIndex,
3613 dst: TypeComponentLocalErrorContextTableIndex,
3614 ) -> Result<u32> {
3615 let (rep, _) = {
3616 let rep = self
3617 .error_context_tables
3618 .get_mut(src)
3619 .context("error context table index present in (sub)component lookup")?
3620 .get_mut_by_index(src_idx)?;
3621 rep
3622 };
3623 let dst = self
3624 .error_context_tables
3625 .get_mut(dst)
3626 .context("error context table index present in (sub)component lookup")?;
3627
3628 let updated_count = if let Some((dst_idx, count)) = dst.get_mut_by_rep(rep) {
3630 (*count).0 += 1;
3631 dst_idx
3632 } else {
3633 dst.insert(rep, LocalErrorContextRefCount(1))?
3634 };
3635
3636 let global_ref_count = self
3640 .global_error_context_ref_counts
3641 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
3642 .context("global ref count present for existing (sub)component error context")?;
3643 global_ref_count.0 += 1;
3644
3645 Ok(updated_count)
3646 }
3647}
3648
3649pub(crate) struct ResourcePair {
3650 pub(crate) write: u32,
3651 pub(crate) read: u32,
3652}