1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lift, Lower, Val, WasmList};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Error, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use core::fmt;
15use core::future;
16use core::iter;
17use core::marker::PhantomData;
18use core::mem::{self, MaybeUninit};
19use core::pin::Pin;
20use core::task::{Context, Poll, Waker, ready};
21use futures::channel::oneshot;
22use futures::{FutureExt as _, stream};
23use std::any::{Any, TypeId};
24use std::boxed::Box;
25use std::io::Cursor;
26use std::string::String;
27use std::sync::{Arc, Mutex};
28use std::vec::Vec;
29use wasmtime_environ::component::{
30 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
31 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
32 TypeFutureTableIndex, TypeStreamTableIndex,
33};
34
35pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
36
37mod buffers;
38
39#[derive(Copy, Clone, Debug)]
42pub enum TransmitKind {
43 Stream,
44 Future,
45}
46
47#[derive(Copy, Clone, Debug, PartialEq)]
49pub enum ReturnCode {
50 Blocked,
51 Completed(u32),
52 Dropped(u32),
53 Cancelled(u32),
54}
55
56impl ReturnCode {
57 pub fn encode(&self) -> u32 {
62 const BLOCKED: u32 = 0xffff_ffff;
63 const COMPLETED: u32 = 0x0;
64 const DROPPED: u32 = 0x1;
65 const CANCELLED: u32 = 0x2;
66 match self {
67 ReturnCode::Blocked => BLOCKED,
68 ReturnCode::Completed(n) => {
69 debug_assert!(*n < (1 << 28));
70 (n << 4) | COMPLETED
71 }
72 ReturnCode::Dropped(n) => {
73 debug_assert!(*n < (1 << 28));
74 (n << 4) | DROPPED
75 }
76 ReturnCode::Cancelled(n) => {
77 debug_assert!(*n < (1 << 28));
78 (n << 4) | CANCELLED
79 }
80 }
81 }
82
83 fn completed(kind: TransmitKind, count: u32) -> Self {
86 Self::Completed(if let TransmitKind::Future = kind {
87 0
88 } else {
89 count
90 })
91 }
92}
93
94#[derive(Copy, Clone, Debug)]
99pub enum TransmitIndex {
100 Stream(TypeStreamTableIndex),
101 Future(TypeFutureTableIndex),
102}
103
104impl TransmitIndex {
105 pub fn kind(&self) -> TransmitKind {
106 match self {
107 TransmitIndex::Stream(_) => TransmitKind::Stream,
108 TransmitIndex::Future(_) => TransmitKind::Future,
109 }
110 }
111}
112
113fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
116 match ty {
117 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
118 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
119 }
120}
121
122fn get_mut_by_index_from(
125 handle_table: &mut HandleTable,
126 ty: TransmitIndex,
127 index: u32,
128) -> Result<(u32, &mut TransmitLocalState)> {
129 match ty {
130 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
131 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
132 }
133}
134
135fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
136 mut store: StoreContextMut<U>,
137 instance: Instance,
138 options: OptionsIndex,
139 ty: TransmitIndex,
140 address: usize,
141 count: usize,
142 buffer: &mut B,
143) -> Result<()> {
144 let count = buffer.remaining().len().min(count);
145
146 let lower = &mut if T::MAY_REQUIRE_REALLOC {
147 LowerContext::new
148 } else {
149 LowerContext::new_without_realloc
150 }(store.as_context_mut(), options, instance);
151
152 if address % usize::try_from(T::ALIGN32)? != 0 {
153 bail!("read pointer not aligned");
154 }
155 lower
156 .as_slice_mut()
157 .get_mut(address..)
158 .and_then(|b| b.get_mut(..T::SIZE32 * count))
159 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
160
161 if let Some(ty) = payload(ty, lower.types) {
162 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
163 }
164
165 buffer.skip(count);
166
167 Ok(())
168}
169
170fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
171 lift: &mut LiftContext<'_>,
172 ty: Option<InterfaceType>,
173 buffer: &mut B,
174 address: usize,
175 count: usize,
176) -> Result<()> {
177 let count = count.min(buffer.remaining_capacity());
178 if T::IS_RUST_UNIT_TYPE {
179 buffer.extend(
183 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
184 )
185 } else {
186 let ty = ty.unwrap();
187 if address % usize::try_from(T::ALIGN32)? != 0 {
188 bail!("write pointer not aligned");
189 }
190 lift.memory()
191 .get(address..)
192 .and_then(|b| b.get(..T::SIZE32 * count))
193 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
194
195 let list = &WasmList::new(address, count, lift, ty)?;
196 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
197 }
198 Ok(())
199}
200
201#[derive(Debug, PartialEq, Eq, PartialOrd)]
203pub(super) struct ErrorContextState {
204 pub(crate) debug_msg: String,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub(super) struct FlatAbi {
212 pub(super) size: u32,
213 pub(super) align: u32,
214}
215
216pub struct Destination<'a, T, B> {
218 id: TableId<TransmitState>,
219 buffer: &'a mut B,
220 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
221 _phantom: PhantomData<fn() -> T>,
222}
223
224impl<'a, T, B> Destination<'a, T, B> {
225 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
227 Destination {
228 id: self.id,
229 buffer: &mut *self.buffer,
230 host_buffer: self.host_buffer.as_deref_mut(),
231 _phantom: PhantomData,
232 }
233 }
234
235 pub fn take_buffer(&mut self) -> B
241 where
242 B: Default,
243 {
244 mem::take(self.buffer)
245 }
246
247 pub fn set_buffer(&mut self, buffer: B) {
257 *self.buffer = buffer;
258 }
259
260 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
277 let transmit = store
278 .as_context_mut()
279 .0
280 .concurrent_state_mut()
281 .get_mut(self.id)
282 .unwrap();
283
284 if let &ReadState::GuestReady { count, .. } = &transmit.read {
285 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
286 unreachable!()
287 };
288
289 Some(count - guest_offset)
290 } else {
291 None
292 }
293 }
294}
295
296impl<'a, B> Destination<'a, u8, B> {
297 pub fn as_direct<D>(
308 mut self,
309 store: StoreContextMut<'a, D>,
310 capacity: usize,
311 ) -> DirectDestination<'a, D> {
312 if let Some(buffer) = self.host_buffer.as_deref_mut() {
313 buffer.set_position(0);
314 if buffer.get_mut().is_empty() {
315 buffer.get_mut().resize(capacity, 0);
316 }
317 }
318
319 DirectDestination {
320 id: self.id,
321 host_buffer: self.host_buffer,
322 store,
323 }
324 }
325}
326
327pub struct DirectDestination<'a, D: 'static> {
330 id: TableId<TransmitState>,
331 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
332 store: StoreContextMut<'a, D>,
333}
334
335impl<D: 'static> DirectDestination<'_, D> {
336 pub fn remaining(&mut self) -> &mut [u8] {
338 if let Some(buffer) = self.host_buffer.as_deref_mut() {
339 buffer.get_mut()
340 } else {
341 let transmit = self
342 .store
343 .as_context_mut()
344 .0
345 .concurrent_state_mut()
346 .get_mut(self.id)
347 .unwrap();
348
349 let &ReadState::GuestReady {
350 address,
351 count,
352 options,
353 instance,
354 ..
355 } = &transmit.read
356 else {
357 unreachable!();
358 };
359
360 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
361 unreachable!()
362 };
363
364 instance
365 .options_memory_mut(self.store.0, options)
366 .get_mut((address + guest_offset)..)
367 .and_then(|b| b.get_mut(..(count - guest_offset)))
368 .unwrap()
369 }
370 }
371
372 pub fn mark_written(&mut self, count: usize) {
377 if let Some(buffer) = self.host_buffer.as_deref_mut() {
378 buffer.set_position(
379 buffer
380 .position()
381 .checked_add(u64::try_from(count).unwrap())
382 .unwrap(),
383 );
384 } else {
385 let transmit = self
386 .store
387 .as_context_mut()
388 .0
389 .concurrent_state_mut()
390 .get_mut(self.id)
391 .unwrap();
392
393 let ReadState::GuestReady {
394 count: read_count, ..
395 } = &transmit.read
396 else {
397 unreachable!();
398 };
399
400 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
401 unreachable!()
402 };
403
404 if *guest_offset + count > *read_count {
405 panic!(
406 "write count ({count}) must be less than or equal to read count ({read_count})"
407 )
408 } else {
409 *guest_offset += count;
410 }
411 }
412 }
413}
414
415#[derive(Copy, Clone, Debug)]
417pub enum StreamResult {
418 Completed,
421 Cancelled,
426 Dropped,
429}
430
431pub trait StreamProducer<D>: Send + 'static {
433 type Item;
435
436 type Buffer: WriteBuffer<Self::Item> + Default;
438
439 fn poll_produce<'a>(
575 self: Pin<&mut Self>,
576 cx: &mut Context<'_>,
577 store: StoreContextMut<'a, D>,
578 destination: Destination<'a, Self::Item, Self::Buffer>,
579 finish: bool,
580 ) -> Poll<Result<StreamResult>>;
581
582 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
588 Err(me)
589 }
590}
591
592impl<T, D> StreamProducer<D> for iter::Empty<T>
593where
594 T: Send + Sync + 'static,
595{
596 type Item = T;
597 type Buffer = Option<Self::Item>;
598
599 fn poll_produce<'a>(
600 self: Pin<&mut Self>,
601 _: &mut Context<'_>,
602 _: StoreContextMut<'a, D>,
603 _: Destination<'a, Self::Item, Self::Buffer>,
604 _: bool,
605 ) -> Poll<Result<StreamResult>> {
606 Poll::Ready(Ok(StreamResult::Dropped))
607 }
608}
609
610impl<T, D> StreamProducer<D> for stream::Empty<T>
611where
612 T: Send + Sync + 'static,
613{
614 type Item = T;
615 type Buffer = Option<Self::Item>;
616
617 fn poll_produce<'a>(
618 self: Pin<&mut Self>,
619 _: &mut Context<'_>,
620 _: StoreContextMut<'a, D>,
621 _: Destination<'a, Self::Item, Self::Buffer>,
622 _: bool,
623 ) -> Poll<Result<StreamResult>> {
624 Poll::Ready(Ok(StreamResult::Dropped))
625 }
626}
627
628impl<T, D> StreamProducer<D> for Vec<T>
629where
630 T: Unpin + Send + Sync + 'static,
631{
632 type Item = T;
633 type Buffer = VecBuffer<T>;
634
635 fn poll_produce<'a>(
636 self: Pin<&mut Self>,
637 _: &mut Context<'_>,
638 _: StoreContextMut<'a, D>,
639 mut dst: Destination<'a, Self::Item, Self::Buffer>,
640 _: bool,
641 ) -> Poll<Result<StreamResult>> {
642 dst.set_buffer(mem::take(self.get_mut()).into());
643 Poll::Ready(Ok(StreamResult::Dropped))
644 }
645}
646
647impl<T, D> StreamProducer<D> for Box<[T]>
648where
649 T: Unpin + Send + Sync + 'static,
650{
651 type Item = T;
652 type Buffer = VecBuffer<T>;
653
654 fn poll_produce<'a>(
655 self: Pin<&mut Self>,
656 _: &mut Context<'_>,
657 _: StoreContextMut<'a, D>,
658 mut dst: Destination<'a, Self::Item, Self::Buffer>,
659 _: bool,
660 ) -> Poll<Result<StreamResult>> {
661 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
662 Poll::Ready(Ok(StreamResult::Dropped))
663 }
664}
665
666#[cfg(feature = "component-model-async-bytes")]
667impl<D> StreamProducer<D> for bytes::Bytes {
668 type Item = u8;
669 type Buffer = Cursor<Self>;
670
671 fn poll_produce<'a>(
672 mut self: Pin<&mut Self>,
673 _: &mut Context<'_>,
674 mut store: StoreContextMut<'a, D>,
675 mut dst: Destination<'a, Self::Item, Self::Buffer>,
676 _: bool,
677 ) -> Poll<Result<StreamResult>> {
678 let cap = dst.remaining(&mut store);
679 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
680 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
682 return Poll::Ready(Ok(StreamResult::Dropped));
683 };
684 let cap = cap.into();
685 dst.set_buffer(Cursor::new(self.split_off(cap)));
687 let mut dst = dst.as_direct(store, cap);
688 dst.remaining().copy_from_slice(&self);
689 dst.mark_written(cap);
690 Poll::Ready(Ok(StreamResult::Dropped))
691 }
692}
693
694#[cfg(feature = "component-model-async-bytes")]
695impl<D> StreamProducer<D> for bytes::BytesMut {
696 type Item = u8;
697 type Buffer = Cursor<Self>;
698
699 fn poll_produce<'a>(
700 mut self: Pin<&mut Self>,
701 _: &mut Context<'_>,
702 mut store: StoreContextMut<'a, D>,
703 mut dst: Destination<'a, Self::Item, Self::Buffer>,
704 _: bool,
705 ) -> Poll<Result<StreamResult>> {
706 let cap = dst.remaining(&mut store);
707 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
708 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
710 return Poll::Ready(Ok(StreamResult::Dropped));
711 };
712 let cap = cap.into();
713 dst.set_buffer(Cursor::new(self.split_off(cap)));
715 let mut dst = dst.as_direct(store, cap);
716 dst.remaining().copy_from_slice(&self);
717 dst.mark_written(cap);
718 Poll::Ready(Ok(StreamResult::Dropped))
719 }
720}
721
722pub struct Source<'a, T> {
724 id: TableId<TransmitState>,
725 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
726}
727
728impl<'a, T> Source<'a, T> {
729 pub fn reborrow(&mut self) -> Source<'_, T> {
731 Source {
732 id: self.id,
733 host_buffer: self.host_buffer.as_deref_mut(),
734 }
735 }
736
737 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
739 where
740 T: func::Lift + 'static,
741 B: ReadBuffer<T>,
742 {
743 if let Some(input) = &mut self.host_buffer {
744 let count = input.remaining().len().min(buffer.remaining_capacity());
745 buffer.move_from(*input, count);
746 } else {
747 let store = store.as_context_mut();
748 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
749
750 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
751 unreachable!();
752 };
753
754 let &WriteState::GuestReady {
755 ty,
756 address,
757 count,
758 options,
759 instance,
760 ..
761 } = &transmit.write
762 else {
763 unreachable!()
764 };
765
766 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
767 let ty = payload(ty, cx.types);
768 let old_remaining = buffer.remaining_capacity();
769 lift::<T, B>(
770 cx,
771 ty,
772 buffer,
773 address + (T::SIZE32 * guest_offset),
774 count - guest_offset,
775 )?;
776
777 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
778
779 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
780 unreachable!();
781 };
782
783 *guest_offset += old_remaining - buffer.remaining_capacity();
784 }
785
786 Ok(())
787 }
788
789 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
792 where
793 T: 'static,
794 {
795 let transmit = store
796 .as_context_mut()
797 .0
798 .concurrent_state_mut()
799 .get_mut(self.id)
800 .unwrap();
801
802 if let &WriteState::GuestReady { count, .. } = &transmit.write {
803 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
804 unreachable!()
805 };
806
807 count - guest_offset
808 } else if let Some(host_buffer) = &self.host_buffer {
809 host_buffer.remaining().len()
810 } else {
811 unreachable!()
812 }
813 }
814}
815
816impl<'a> Source<'a, u8> {
817 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
819 DirectSource {
820 id: self.id,
821 host_buffer: self.host_buffer,
822 store,
823 }
824 }
825}
826
827pub struct DirectSource<'a, D: 'static> {
830 id: TableId<TransmitState>,
831 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
832 store: StoreContextMut<'a, D>,
833}
834
835impl<D: 'static> DirectSource<'_, D> {
836 pub fn remaining(&mut self) -> &[u8] {
838 if let Some(buffer) = self.host_buffer.as_deref_mut() {
839 buffer.remaining()
840 } else {
841 let transmit = self
842 .store
843 .as_context_mut()
844 .0
845 .concurrent_state_mut()
846 .get_mut(self.id)
847 .unwrap();
848
849 let &WriteState::GuestReady {
850 address,
851 count,
852 options,
853 instance,
854 ..
855 } = &transmit.write
856 else {
857 unreachable!()
858 };
859
860 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
861 unreachable!()
862 };
863
864 instance
865 .options_memory(self.store.0, options)
866 .get((address + guest_offset)..)
867 .and_then(|b| b.get(..(count - guest_offset)))
868 .unwrap()
869 }
870 }
871
872 pub fn mark_read(&mut self, count: usize) {
877 if let Some(buffer) = self.host_buffer.as_deref_mut() {
878 buffer.skip(count);
879 } else {
880 let transmit = self
881 .store
882 .as_context_mut()
883 .0
884 .concurrent_state_mut()
885 .get_mut(self.id)
886 .unwrap();
887
888 let WriteState::GuestReady {
889 count: write_count, ..
890 } = &transmit.write
891 else {
892 unreachable!()
893 };
894
895 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
896 unreachable!()
897 };
898
899 if *guest_offset + count > *write_count {
900 panic!(
901 "read count ({count}) must be less than or equal to write count ({write_count})"
902 )
903 } else {
904 *guest_offset += count;
905 }
906 }
907 }
908}
909
910pub trait StreamConsumer<D>: Send + 'static {
912 type Item;
914
915 fn poll_consume(
998 self: Pin<&mut Self>,
999 cx: &mut Context<'_>,
1000 store: StoreContextMut<D>,
1001 source: Source<'_, Self::Item>,
1002 finish: bool,
1003 ) -> Poll<Result<StreamResult>>;
1004}
1005
1006pub trait FutureProducer<D>: Send + 'static {
1008 type Item;
1010
1011 fn poll_produce(
1021 self: Pin<&mut Self>,
1022 cx: &mut Context<'_>,
1023 store: StoreContextMut<D>,
1024 finish: bool,
1025 ) -> Poll<Result<Option<Self::Item>>>;
1026}
1027
1028impl<T, E, D, Fut> FutureProducer<D> for Fut
1029where
1030 E: Into<Error>,
1031 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1032{
1033 type Item = T;
1034
1035 fn poll_produce<'a>(
1036 self: Pin<&mut Self>,
1037 cx: &mut Context<'_>,
1038 _: StoreContextMut<'a, D>,
1039 finish: bool,
1040 ) -> Poll<Result<Option<T>>> {
1041 match self.poll(cx) {
1042 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1043 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1044 Poll::Pending if finish => Poll::Ready(Ok(None)),
1045 Poll::Pending => Poll::Pending,
1046 }
1047 }
1048}
1049
1050pub trait FutureConsumer<D>: Send + 'static {
1052 type Item;
1054
1055 fn poll_consume(
1067 self: Pin<&mut Self>,
1068 cx: &mut Context<'_>,
1069 store: StoreContextMut<D>,
1070 source: Source<'_, Self::Item>,
1071 finish: bool,
1072 ) -> Poll<Result<()>>;
1073}
1074
1075pub struct FutureReader<T> {
1082 id: TableId<TransmitHandle>,
1083 _phantom: PhantomData<T>,
1084}
1085
1086impl<T> FutureReader<T> {
1087 pub fn new<S: AsContextMut>(
1089 mut store: S,
1090 producer: impl FutureProducer<S::Data, Item = T>,
1091 ) -> Self
1092 where
1093 T: func::Lower + func::Lift + Send + Sync + 'static,
1094 {
1095 struct Producer<P>(P);
1096
1097 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1098 for Producer<P>
1099 {
1100 type Item = P::Item;
1101 type Buffer = Option<P::Item>;
1102
1103 fn poll_produce<'a>(
1104 self: Pin<&mut Self>,
1105 cx: &mut Context<'_>,
1106 store: StoreContextMut<D>,
1107 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1108 finish: bool,
1109 ) -> Poll<Result<StreamResult>> {
1110 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1113
1114 Poll::Ready(Ok(
1115 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1116 destination.set_buffer(Some(value));
1117
1118 StreamResult::Completed
1125 } else {
1126 StreamResult::Cancelled
1127 },
1128 ))
1129 }
1130 }
1131
1132 Self::new_(
1133 store
1134 .as_context_mut()
1135 .new_transmit(TransmitKind::Future, Producer(producer)),
1136 )
1137 }
1138
1139 fn new_(id: TableId<TransmitHandle>) -> Self {
1140 Self {
1141 id,
1142 _phantom: PhantomData,
1143 }
1144 }
1145
1146 pub fn pipe<S: AsContextMut>(
1148 self,
1149 mut store: S,
1150 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1151 ) where
1152 T: func::Lift + 'static,
1153 {
1154 struct Consumer<C>(C);
1155
1156 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1157 for Consumer<C>
1158 {
1159 type Item = T;
1160
1161 fn poll_consume(
1162 self: Pin<&mut Self>,
1163 cx: &mut Context<'_>,
1164 mut store: StoreContextMut<D>,
1165 mut source: Source<Self::Item>,
1166 finish: bool,
1167 ) -> Poll<Result<StreamResult>> {
1168 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1171
1172 ready!(consumer.poll_consume(
1173 cx,
1174 store.as_context_mut(),
1175 source.reborrow(),
1176 finish
1177 ))?;
1178
1179 Poll::Ready(Ok(if source.remaining(store) == 0 {
1180 StreamResult::Completed
1186 } else {
1187 StreamResult::Cancelled
1188 }))
1189 }
1190 }
1191
1192 store
1193 .as_context_mut()
1194 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1195 }
1196
1197 pub fn into_val(self) -> Val {
1200 Val::Future(FutureAny(self.id.rep()))
1201 }
1202
1203 pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1205 let Val::Future(FutureAny(rep)) = value else {
1206 bail!("expected `future`; got `{}`", value.desc());
1207 };
1208 let store = store.as_context_mut();
1209 let id = TableId::<TransmitHandle>::new(*rep);
1210 store.0.concurrent_state_mut().get_mut(id)?; Ok(Self::new_(id))
1212 }
1213
1214 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1216 match ty {
1217 InterfaceType::Future(src) => {
1218 let handle_table = cx
1219 .instance_mut()
1220 .table_for_transmit(TransmitIndex::Future(src));
1221 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1222 if is_done {
1223 bail!("cannot lift future after being notified that the writable end dropped");
1224 }
1225 let id = TableId::<TransmitHandle>::new(rep);
1226 let concurrent_state = cx.concurrent_state_mut();
1227 let future = concurrent_state.get_mut(id)?;
1228 future.common.handle = None;
1229 let state = future.state;
1230
1231 if concurrent_state.get_mut(state)?.done {
1232 bail!("cannot lift future after previous read succeeded");
1233 }
1234
1235 Ok(Self::new_(id))
1236 }
1237 _ => func::bad_type_info(),
1238 }
1239 }
1240
1241 pub fn close(&mut self, mut store: impl AsContextMut) {
1249 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1251 store
1252 .as_context_mut()
1253 .0
1254 .host_drop_reader(id, TransmitKind::Future)
1255 .unwrap();
1256 }
1257
1258 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1260 accessor.as_accessor().with(|access| self.close(access))
1261 }
1262
1263 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1269 where
1270 A: AsAccessor,
1271 {
1272 GuardedFutureReader::new(accessor, self)
1273 }
1274}
1275
1276impl<T> fmt::Debug for FutureReader<T> {
1277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1278 f.debug_struct("FutureReader")
1279 .field("id", &self.id)
1280 .finish()
1281 }
1282}
1283
1284pub(crate) fn lower_future_to_index<U>(
1286 rep: u32,
1287 cx: &mut LowerContext<'_, U>,
1288 ty: InterfaceType,
1289) -> Result<u32> {
1290 match ty {
1291 InterfaceType::Future(dst) => {
1292 let concurrent_state = cx.store.0.concurrent_state_mut();
1293 let id = TableId::<TransmitHandle>::new(rep);
1294 let state = concurrent_state.get_mut(id)?.state;
1295 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1296
1297 let handle = cx
1298 .instance_mut()
1299 .table_for_transmit(TransmitIndex::Future(dst))
1300 .future_insert_read(dst, rep)?;
1301
1302 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1303
1304 Ok(handle)
1305 }
1306 _ => func::bad_type_info(),
1307 }
1308}
1309
1310unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1313 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1314
1315 type Lower = <u32 as func::ComponentType>::Lower;
1316
1317 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1318 match ty {
1319 InterfaceType::Future(_) => Ok(()),
1320 other => bail!("expected `future`, found `{}`", func::desc(other)),
1321 }
1322 }
1323}
1324
1325unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1327 fn linear_lower_to_flat<U>(
1328 &self,
1329 cx: &mut LowerContext<'_, U>,
1330 ty: InterfaceType,
1331 dst: &mut MaybeUninit<Self::Lower>,
1332 ) -> Result<()> {
1333 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1334 cx,
1335 InterfaceType::U32,
1336 dst,
1337 )
1338 }
1339
1340 fn linear_lower_to_memory<U>(
1341 &self,
1342 cx: &mut LowerContext<'_, U>,
1343 ty: InterfaceType,
1344 offset: usize,
1345 ) -> Result<()> {
1346 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1347 cx,
1348 InterfaceType::U32,
1349 offset,
1350 )
1351 }
1352}
1353
1354unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1356 fn linear_lift_from_flat(
1357 cx: &mut LiftContext<'_>,
1358 ty: InterfaceType,
1359 src: &Self::Lower,
1360 ) -> Result<Self> {
1361 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1362 Self::lift_from_index(cx, ty, index)
1363 }
1364
1365 fn linear_lift_from_memory(
1366 cx: &mut LiftContext<'_>,
1367 ty: InterfaceType,
1368 bytes: &[u8],
1369 ) -> Result<Self> {
1370 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1371 Self::lift_from_index(cx, ty, index)
1372 }
1373}
1374
1375pub struct GuardedFutureReader<T, A>
1381where
1382 A: AsAccessor,
1383{
1384 reader: Option<FutureReader<T>>,
1388 accessor: A,
1389}
1390
1391impl<T, A> GuardedFutureReader<T, A>
1392where
1393 A: AsAccessor,
1394{
1395 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1397 Self {
1398 reader: Some(reader),
1399 accessor,
1400 }
1401 }
1402
1403 pub fn into_future(self) -> FutureReader<T> {
1406 self.into()
1407 }
1408}
1409
1410impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1411where
1412 A: AsAccessor,
1413{
1414 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1415 guard.reader.take().unwrap()
1416 }
1417}
1418
1419impl<T, A> Drop for GuardedFutureReader<T, A>
1420where
1421 A: AsAccessor,
1422{
1423 fn drop(&mut self) {
1424 if let Some(reader) = &mut self.reader {
1425 reader.close_with(&self.accessor)
1426 }
1427 }
1428}
1429
1430pub struct StreamReader<T> {
1437 id: TableId<TransmitHandle>,
1438 _phantom: PhantomData<T>,
1439}
1440
1441impl<T> StreamReader<T> {
1442 pub fn new<S: AsContextMut>(
1444 mut store: S,
1445 producer: impl StreamProducer<S::Data, Item = T>,
1446 ) -> Self
1447 where
1448 T: func::Lower + func::Lift + Send + Sync + 'static,
1449 {
1450 Self::new_(
1451 store
1452 .as_context_mut()
1453 .new_transmit(TransmitKind::Stream, producer),
1454 )
1455 }
1456
1457 fn new_(id: TableId<TransmitHandle>) -> Self {
1458 Self {
1459 id,
1460 _phantom: PhantomData,
1461 }
1462 }
1463
1464 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1481 let store = store.as_context_mut();
1482 let state = store.0.concurrent_state_mut();
1483 let id = state.get_mut(self.id).unwrap().state;
1484 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1485 match try_into(TypeId::of::<V>()) {
1486 Some(result) => {
1487 self.close(store);
1488 Ok(*result.downcast::<V>().unwrap())
1489 }
1490 None => Err(self),
1491 }
1492 } else {
1493 Err(self)
1494 }
1495 }
1496
1497 pub fn pipe<S: AsContextMut>(
1499 self,
1500 mut store: S,
1501 consumer: impl StreamConsumer<S::Data, Item = T>,
1502 ) where
1503 T: 'static,
1504 {
1505 store
1506 .as_context_mut()
1507 .set_consumer(self.id, TransmitKind::Stream, consumer);
1508 }
1509
1510 pub fn into_val(self) -> Val {
1513 Val::Stream(StreamAny(self.id.rep()))
1514 }
1515
1516 pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1518 let Val::Stream(StreamAny(rep)) = value else {
1519 bail!("expected `stream`; got `{}`", value.desc());
1520 };
1521 let store = store.as_context_mut();
1522 let id = TableId::<TransmitHandle>::new(*rep);
1523 store.0.concurrent_state_mut().get_mut(id)?; Ok(Self::new_(id))
1525 }
1526
1527 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1529 match ty {
1530 InterfaceType::Stream(src) => {
1531 let handle_table = cx
1532 .instance_mut()
1533 .table_for_transmit(TransmitIndex::Stream(src));
1534 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1535 if is_done {
1536 bail!("cannot lift stream after being notified that the writable end dropped");
1537 }
1538 let id = TableId::<TransmitHandle>::new(rep);
1539 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1540 Ok(Self::new_(id))
1541 }
1542 _ => func::bad_type_info(),
1543 }
1544 }
1545
1546 pub fn close(&mut self, mut store: impl AsContextMut) {
1554 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1556 store
1557 .as_context_mut()
1558 .0
1559 .host_drop_reader(id, TransmitKind::Stream)
1560 .unwrap()
1561 }
1562
1563 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1565 accessor.as_accessor().with(|access| self.close(access))
1566 }
1567
1568 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1574 where
1575 A: AsAccessor,
1576 {
1577 GuardedStreamReader::new(accessor, self)
1578 }
1579}
1580
1581impl<T> fmt::Debug for StreamReader<T> {
1582 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1583 f.debug_struct("StreamReader")
1584 .field("id", &self.id)
1585 .finish()
1586 }
1587}
1588
1589pub(crate) fn lower_stream_to_index<U>(
1591 rep: u32,
1592 cx: &mut LowerContext<'_, U>,
1593 ty: InterfaceType,
1594) -> Result<u32> {
1595 match ty {
1596 InterfaceType::Stream(dst) => {
1597 let concurrent_state = cx.store.0.concurrent_state_mut();
1598 let id = TableId::<TransmitHandle>::new(rep);
1599 let state = concurrent_state.get_mut(id)?.state;
1600 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1601
1602 let handle = cx
1603 .instance_mut()
1604 .table_for_transmit(TransmitIndex::Stream(dst))
1605 .stream_insert_read(dst, rep)?;
1606
1607 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1608
1609 Ok(handle)
1610 }
1611 _ => func::bad_type_info(),
1612 }
1613}
1614
1615unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1618 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1619
1620 type Lower = <u32 as func::ComponentType>::Lower;
1621
1622 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1623 match ty {
1624 InterfaceType::Stream(_) => Ok(()),
1625 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1626 }
1627 }
1628}
1629
1630unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1632 fn linear_lower_to_flat<U>(
1633 &self,
1634 cx: &mut LowerContext<'_, U>,
1635 ty: InterfaceType,
1636 dst: &mut MaybeUninit<Self::Lower>,
1637 ) -> Result<()> {
1638 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1639 cx,
1640 InterfaceType::U32,
1641 dst,
1642 )
1643 }
1644
1645 fn linear_lower_to_memory<U>(
1646 &self,
1647 cx: &mut LowerContext<'_, U>,
1648 ty: InterfaceType,
1649 offset: usize,
1650 ) -> Result<()> {
1651 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1652 cx,
1653 InterfaceType::U32,
1654 offset,
1655 )
1656 }
1657}
1658
1659unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1661 fn linear_lift_from_flat(
1662 cx: &mut LiftContext<'_>,
1663 ty: InterfaceType,
1664 src: &Self::Lower,
1665 ) -> Result<Self> {
1666 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1667 Self::lift_from_index(cx, ty, index)
1668 }
1669
1670 fn linear_lift_from_memory(
1671 cx: &mut LiftContext<'_>,
1672 ty: InterfaceType,
1673 bytes: &[u8],
1674 ) -> Result<Self> {
1675 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1676 Self::lift_from_index(cx, ty, index)
1677 }
1678}
1679
1680pub struct GuardedStreamReader<T, A>
1686where
1687 A: AsAccessor,
1688{
1689 reader: Option<StreamReader<T>>,
1693 accessor: A,
1694}
1695
1696impl<T, A> GuardedStreamReader<T, A>
1697where
1698 A: AsAccessor,
1699{
1700 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1703 Self {
1704 reader: Some(reader),
1705 accessor,
1706 }
1707 }
1708
1709 pub fn into_stream(self) -> StreamReader<T> {
1712 self.into()
1713 }
1714}
1715
1716impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1717where
1718 A: AsAccessor,
1719{
1720 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1721 guard.reader.take().unwrap()
1722 }
1723}
1724
1725impl<T, A> Drop for GuardedStreamReader<T, A>
1726where
1727 A: AsAccessor,
1728{
1729 fn drop(&mut self) {
1730 if let Some(reader) = &mut self.reader {
1731 reader.close_with(&self.accessor)
1732 }
1733 }
1734}
1735
1736pub struct ErrorContext {
1738 rep: u32,
1739}
1740
1741impl ErrorContext {
1742 pub(crate) fn new(rep: u32) -> Self {
1743 Self { rep }
1744 }
1745
1746 pub fn into_val(self) -> Val {
1748 Val::ErrorContext(ErrorContextAny(self.rep))
1749 }
1750
1751 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1753 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1754 bail!("expected `error-context`; got `{}`", value.desc());
1755 };
1756 Ok(Self::new(*rep))
1757 }
1758
1759 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1760 match ty {
1761 InterfaceType::ErrorContext(src) => {
1762 let rep = cx
1763 .instance_mut()
1764 .table_for_error_context(src)
1765 .error_context_rep(index)?;
1766
1767 Ok(Self { rep })
1768 }
1769 _ => func::bad_type_info(),
1770 }
1771 }
1772}
1773
1774pub(crate) fn lower_error_context_to_index<U>(
1775 rep: u32,
1776 cx: &mut LowerContext<'_, U>,
1777 ty: InterfaceType,
1778) -> Result<u32> {
1779 match ty {
1780 InterfaceType::ErrorContext(dst) => {
1781 let tbl = cx.instance_mut().table_for_error_context(dst);
1782 tbl.error_context_insert(rep)
1783 }
1784 _ => func::bad_type_info(),
1785 }
1786}
1787unsafe impl func::ComponentType for ErrorContext {
1790 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1791
1792 type Lower = <u32 as func::ComponentType>::Lower;
1793
1794 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1795 match ty {
1796 InterfaceType::ErrorContext(_) => Ok(()),
1797 other => bail!("expected `error`, found `{}`", func::desc(other)),
1798 }
1799 }
1800}
1801
1802unsafe impl func::Lower for ErrorContext {
1804 fn linear_lower_to_flat<T>(
1805 &self,
1806 cx: &mut LowerContext<'_, T>,
1807 ty: InterfaceType,
1808 dst: &mut MaybeUninit<Self::Lower>,
1809 ) -> Result<()> {
1810 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1811 cx,
1812 InterfaceType::U32,
1813 dst,
1814 )
1815 }
1816
1817 fn linear_lower_to_memory<T>(
1818 &self,
1819 cx: &mut LowerContext<'_, T>,
1820 ty: InterfaceType,
1821 offset: usize,
1822 ) -> Result<()> {
1823 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1824 cx,
1825 InterfaceType::U32,
1826 offset,
1827 )
1828 }
1829}
1830
1831unsafe impl func::Lift for ErrorContext {
1833 fn linear_lift_from_flat(
1834 cx: &mut LiftContext<'_>,
1835 ty: InterfaceType,
1836 src: &Self::Lower,
1837 ) -> Result<Self> {
1838 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1839 Self::lift_from_index(cx, ty, index)
1840 }
1841
1842 fn linear_lift_from_memory(
1843 cx: &mut LiftContext<'_>,
1844 ty: InterfaceType,
1845 bytes: &[u8],
1846 ) -> Result<Self> {
1847 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1848 Self::lift_from_index(cx, ty, index)
1849 }
1850}
1851
1852pub(super) struct TransmitHandle {
1854 pub(super) common: WaitableCommon,
1855 state: TableId<TransmitState>,
1857}
1858
1859impl TransmitHandle {
1860 fn new(state: TableId<TransmitState>) -> Self {
1861 Self {
1862 common: WaitableCommon::default(),
1863 state,
1864 }
1865 }
1866}
1867
1868impl TableDebug for TransmitHandle {
1869 fn type_name() -> &'static str {
1870 "TransmitHandle"
1871 }
1872}
1873
1874struct TransmitState {
1876 write_handle: TableId<TransmitHandle>,
1878 read_handle: TableId<TransmitHandle>,
1880 write: WriteState,
1882 read: ReadState,
1884 done: bool,
1886}
1887
1888impl Default for TransmitState {
1889 fn default() -> Self {
1890 Self {
1891 write_handle: TableId::new(u32::MAX),
1892 read_handle: TableId::new(u32::MAX),
1893 read: ReadState::Open,
1894 write: WriteState::Open,
1895 done: false,
1896 }
1897 }
1898}
1899
1900impl TableDebug for TransmitState {
1901 fn type_name() -> &'static str {
1902 "TransmitState"
1903 }
1904}
1905
1906type PollStream = Box<
1907 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
1908>;
1909
1910type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
1911
1912enum WriteState {
1914 Open,
1916 GuestReady {
1918 instance: Instance,
1919 ty: TransmitIndex,
1920 flat_abi: Option<FlatAbi>,
1921 options: OptionsIndex,
1922 address: usize,
1923 count: usize,
1924 handle: u32,
1925 },
1926 HostReady {
1928 produce: PollStream,
1929 try_into: TryInto,
1930 guest_offset: usize,
1931 cancel: bool,
1932 cancel_waker: Option<Waker>,
1933 },
1934 Dropped,
1936}
1937
1938impl fmt::Debug for WriteState {
1939 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1940 match self {
1941 Self::Open => f.debug_tuple("Open").finish(),
1942 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1943 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1944 Self::Dropped => f.debug_tuple("Dropped").finish(),
1945 }
1946 }
1947}
1948
1949enum ReadState {
1951 Open,
1953 GuestReady {
1955 ty: TransmitIndex,
1956 flat_abi: Option<FlatAbi>,
1957 instance: Instance,
1958 options: OptionsIndex,
1959 address: usize,
1960 count: usize,
1961 handle: u32,
1962 },
1963 HostReady {
1965 consume: PollStream,
1966 guest_offset: usize,
1967 cancel: bool,
1968 cancel_waker: Option<Waker>,
1969 },
1970 HostToHost {
1972 accept: Box<
1973 dyn for<'a> Fn(
1974 &'a mut UntypedWriteBuffer<'a>,
1975 )
1976 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1977 + Send
1978 + Sync,
1979 >,
1980 buffer: Vec<u8>,
1981 limit: usize,
1982 },
1983 Dropped,
1985}
1986
1987impl fmt::Debug for ReadState {
1988 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1989 match self {
1990 Self::Open => f.debug_tuple("Open").finish(),
1991 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1992 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1993 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1994 Self::Dropped => f.debug_tuple("Dropped").finish(),
1995 }
1996 }
1997}
1998
1999fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2000 let count = guest_offset.try_into().unwrap();
2001 match state {
2002 StreamResult::Dropped => ReturnCode::Dropped(count),
2003 StreamResult::Completed => ReturnCode::completed(kind, count),
2004 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2005 }
2006}
2007
2008impl StoreOpaque {
2009 fn pipe_from_guest(
2010 &mut self,
2011 kind: TransmitKind,
2012 id: TableId<TransmitState>,
2013 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2014 ) {
2015 let future = async move {
2016 let stream_state = future.await?;
2017 tls::get(|store| {
2018 let state = store.concurrent_state_mut();
2019 let transmit = state.get_mut(id)?;
2020 let ReadState::HostReady {
2021 consume,
2022 guest_offset,
2023 ..
2024 } = mem::replace(&mut transmit.read, ReadState::Open)
2025 else {
2026 unreachable!();
2027 };
2028 let code = return_code(kind, stream_state, guest_offset);
2029 transmit.read = match stream_state {
2030 StreamResult::Dropped => ReadState::Dropped,
2031 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2032 consume,
2033 guest_offset: 0,
2034 cancel: false,
2035 cancel_waker: None,
2036 },
2037 };
2038 let WriteState::GuestReady { ty, handle, .. } =
2039 mem::replace(&mut transmit.write, WriteState::Open)
2040 else {
2041 unreachable!();
2042 };
2043 state.send_write_result(ty, id, handle, code)?;
2044 Ok(())
2045 })
2046 };
2047
2048 self.concurrent_state_mut().push_future(future.boxed());
2049 }
2050
2051 fn pipe_to_guest(
2052 &mut self,
2053 kind: TransmitKind,
2054 id: TableId<TransmitState>,
2055 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2056 ) {
2057 let future = async move {
2058 let stream_state = future.await?;
2059 tls::get(|store| {
2060 let state = store.concurrent_state_mut();
2061 let transmit = state.get_mut(id)?;
2062 let WriteState::HostReady {
2063 produce,
2064 try_into,
2065 guest_offset,
2066 ..
2067 } = mem::replace(&mut transmit.write, WriteState::Open)
2068 else {
2069 unreachable!();
2070 };
2071 let code = return_code(kind, stream_state, guest_offset);
2072 transmit.write = match stream_state {
2073 StreamResult::Dropped => WriteState::Dropped,
2074 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2075 produce,
2076 try_into,
2077 guest_offset: 0,
2078 cancel: false,
2079 cancel_waker: None,
2080 },
2081 };
2082 let ReadState::GuestReady { ty, handle, .. } =
2083 mem::replace(&mut transmit.read, ReadState::Open)
2084 else {
2085 unreachable!();
2086 };
2087 state.send_read_result(ty, id, handle, code)?;
2088 Ok(())
2089 })
2090 };
2091
2092 self.concurrent_state_mut().push_future(future.boxed());
2093 }
2094
2095 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2097 let state = self.concurrent_state_mut();
2098 let transmit_id = state.get_mut(id)?.state;
2099 let transmit = state
2100 .get_mut(transmit_id)
2101 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2102 log::trace!(
2103 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2104 transmit.read,
2105 transmit.write
2106 );
2107
2108 transmit.read = ReadState::Dropped;
2109
2110 let new_state = if let WriteState::Dropped = &transmit.write {
2113 WriteState::Dropped
2114 } else {
2115 WriteState::Open
2116 };
2117
2118 let write_handle = transmit.write_handle;
2119
2120 match mem::replace(&mut transmit.write, new_state) {
2121 WriteState::GuestReady { ty, handle, .. } => {
2124 state.update_event(
2125 write_handle.rep(),
2126 match ty {
2127 TransmitIndex::Future(ty) => Event::FutureWrite {
2128 code: ReturnCode::Dropped(0),
2129 pending: Some((ty, handle)),
2130 },
2131 TransmitIndex::Stream(ty) => Event::StreamWrite {
2132 code: ReturnCode::Dropped(0),
2133 pending: Some((ty, handle)),
2134 },
2135 },
2136 )?;
2137 }
2138
2139 WriteState::HostReady { .. } => {}
2140
2141 WriteState::Open => {
2142 state.update_event(
2143 write_handle.rep(),
2144 match kind {
2145 TransmitKind::Future => Event::FutureWrite {
2146 code: ReturnCode::Dropped(0),
2147 pending: None,
2148 },
2149 TransmitKind::Stream => Event::StreamWrite {
2150 code: ReturnCode::Dropped(0),
2151 pending: None,
2152 },
2153 },
2154 )?;
2155 }
2156
2157 WriteState::Dropped => {
2158 log::trace!("host_drop_reader delete {transmit_id:?}");
2159 state.delete_transmit(transmit_id)?;
2160 }
2161 }
2162 Ok(())
2163 }
2164
2165 fn host_drop_writer(
2167 &mut self,
2168 id: TableId<TransmitHandle>,
2169 on_drop_open: Option<fn() -> Result<()>>,
2170 ) -> Result<()> {
2171 let state = self.concurrent_state_mut();
2172 let transmit_id = state.get_mut(id)?.state;
2173 let transmit = state
2174 .get_mut(transmit_id)
2175 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2176 log::trace!(
2177 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2178 transmit.read,
2179 transmit.write
2180 );
2181
2182 match &mut transmit.write {
2184 WriteState::GuestReady { .. } => {
2185 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2186 }
2187 WriteState::HostReady { .. } => {}
2188 v @ WriteState::Open => {
2189 if let (Some(on_drop_open), false) = (
2190 on_drop_open,
2191 transmit.done || matches!(transmit.read, ReadState::Dropped),
2192 ) {
2193 on_drop_open()?;
2194 } else {
2195 *v = WriteState::Dropped;
2196 }
2197 }
2198 WriteState::Dropped => unreachable!("write state is already dropped"),
2199 }
2200
2201 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2202
2203 let new_state = if let ReadState::Dropped = &transmit.read {
2209 ReadState::Dropped
2210 } else {
2211 ReadState::Open
2212 };
2213
2214 let read_handle = transmit.read_handle;
2215
2216 match mem::replace(&mut transmit.read, new_state) {
2218 ReadState::GuestReady { ty, handle, .. } => {
2222 self.concurrent_state_mut().update_event(
2224 read_handle.rep(),
2225 match ty {
2226 TransmitIndex::Future(ty) => Event::FutureRead {
2227 code: ReturnCode::Dropped(0),
2228 pending: Some((ty, handle)),
2229 },
2230 TransmitIndex::Stream(ty) => Event::StreamRead {
2231 code: ReturnCode::Dropped(0),
2232 pending: Some((ty, handle)),
2233 },
2234 },
2235 )?;
2236 }
2237
2238 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2239
2240 ReadState::Open => {
2242 self.concurrent_state_mut().update_event(
2243 read_handle.rep(),
2244 match on_drop_open {
2245 Some(_) => Event::FutureRead {
2246 code: ReturnCode::Dropped(0),
2247 pending: None,
2248 },
2249 None => Event::StreamRead {
2250 code: ReturnCode::Dropped(0),
2251 pending: None,
2252 },
2253 },
2254 )?;
2255 }
2256
2257 ReadState::Dropped => {
2260 log::trace!("host_drop_writer delete {transmit_id:?}");
2261 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2262 }
2263 }
2264 Ok(())
2265 }
2266}
2267
2268impl<T> StoreContextMut<'_, T> {
2269 fn new_transmit<P: StreamProducer<T>>(
2270 mut self,
2271 kind: TransmitKind,
2272 producer: P,
2273 ) -> TableId<TransmitHandle>
2274 where
2275 P::Item: func::Lower,
2276 {
2277 let token = StoreToken::new(self.as_context_mut());
2278 let state = self.0.concurrent_state_mut();
2279 let (_, read) = state.new_transmit().unwrap();
2280 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2281 let id = state.get_mut(read).unwrap().state;
2282 let mut dropped = false;
2283 let produce = Box::new({
2284 let producer = producer.clone();
2285 move || {
2286 let producer = producer.clone();
2287 async move {
2288 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2289
2290 let (result, cancelled) = if buffer.remaining().is_empty() {
2291 future::poll_fn(|cx| {
2292 tls::get(|store| {
2293 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2294
2295 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2296 unreachable!();
2297 };
2298
2299 let mut host_buffer =
2300 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2301 Some(Cursor::new(mem::take(buffer)))
2302 } else {
2303 None
2304 };
2305
2306 let poll = mine.as_mut().poll_produce(
2307 cx,
2308 token.as_context_mut(store),
2309 Destination {
2310 id,
2311 buffer: &mut buffer,
2312 host_buffer: host_buffer.as_mut(),
2313 _phantom: PhantomData,
2314 },
2315 cancel,
2316 );
2317
2318 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2319
2320 let host_offset = if let (
2321 Some(host_buffer),
2322 ReadState::HostToHost { buffer, limit, .. },
2323 ) = (host_buffer, &mut transmit.read)
2324 {
2325 *limit = usize::try_from(host_buffer.position()).unwrap();
2326 *buffer = host_buffer.into_inner();
2327 *limit
2328 } else {
2329 0
2330 };
2331
2332 {
2333 let WriteState::HostReady {
2334 guest_offset,
2335 cancel,
2336 cancel_waker,
2337 ..
2338 } = &mut transmit.write
2339 else {
2340 unreachable!();
2341 };
2342
2343 if poll.is_pending() {
2344 if !buffer.remaining().is_empty()
2345 || *guest_offset > 0
2346 || host_offset > 0
2347 {
2348 return Poll::Ready(Err(anyhow!(
2349 "StreamProducer::poll_produce returned Poll::Pending \
2350 after producing at least one item"
2351 )));
2352 }
2353 *cancel_waker = Some(cx.waker().clone());
2354 } else {
2355 *cancel_waker = None;
2356 *cancel = false;
2357 }
2358 }
2359
2360 poll.map(|v| v.map(|result| (result, cancel)))
2361 })
2362 })
2363 .await?
2364 } else {
2365 (StreamResult::Completed, false)
2366 };
2367
2368 let (guest_offset, host_offset, count) = tls::get(|store| {
2369 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2370 let (count, host_offset) = match &transmit.read {
2371 &ReadState::GuestReady { count, .. } => (count, 0),
2372 &ReadState::HostToHost { limit, .. } => (1, limit),
2373 _ => unreachable!(),
2374 };
2375 let guest_offset = match &transmit.write {
2376 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2377 _ => unreachable!(),
2378 };
2379 (guest_offset, host_offset, count)
2380 });
2381
2382 match result {
2383 StreamResult::Completed => {
2384 if count > 1
2385 && buffer.remaining().is_empty()
2386 && guest_offset == 0
2387 && host_offset == 0
2388 {
2389 bail!(
2390 "StreamProducer::poll_produce returned StreamResult::Completed \
2391 without producing any items"
2392 );
2393 }
2394 }
2395 StreamResult::Cancelled => {
2396 if !cancelled {
2397 bail!(
2398 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2399 without being given a `finish` parameter value of true"
2400 );
2401 }
2402 }
2403 StreamResult::Dropped => {
2404 dropped = true;
2405 }
2406 }
2407
2408 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2409
2410 *producer.lock().unwrap() = Some((mine, buffer));
2411
2412 if write_buffer {
2413 write( token, id, producer.clone(), kind).await?;
2414 }
2415
2416 Ok(if dropped {
2417 if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2418 {
2419 StreamResult::Dropped
2420 } else {
2421 StreamResult::Completed
2422 }
2423 } else {
2424 result
2425 })
2426 }
2427 .boxed()
2428 }
2429 });
2430 let try_into = Box::new(move |ty| {
2431 let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2432 match P::try_into(mine, ty) {
2433 Ok(value) => Some(value),
2434 Err(mine) => {
2435 *producer.lock().unwrap() = Some((mine, buffer));
2436 None
2437 }
2438 }
2439 });
2440 state.get_mut(id).unwrap().write = WriteState::HostReady {
2441 produce,
2442 try_into,
2443 guest_offset: 0,
2444 cancel: false,
2445 cancel_waker: None,
2446 };
2447 read
2448 }
2449
2450 fn set_consumer<C: StreamConsumer<T>>(
2451 mut self,
2452 id: TableId<TransmitHandle>,
2453 kind: TransmitKind,
2454 consumer: C,
2455 ) {
2456 let token = StoreToken::new(self.as_context_mut());
2457 let state = self.0.concurrent_state_mut();
2458 let id = state.get_mut(id).unwrap().state;
2459 let transmit = state.get_mut(id).unwrap();
2460 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2461 let consume_with_buffer = {
2462 let consumer = consumer.clone();
2463 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2464 let mut mine = consumer.lock().unwrap().take().unwrap();
2465
2466 let host_buffer_remaining_before =
2467 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2468
2469 let (result, cancelled) = future::poll_fn(|cx| {
2470 tls::get(|store| {
2471 let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2472 &ReadState::HostReady { cancel, .. } => cancel,
2473 ReadState::Open => false,
2474 _ => unreachable!(),
2475 };
2476
2477 let poll = mine.as_mut().poll_consume(
2478 cx,
2479 token.as_context_mut(store),
2480 Source {
2481 id,
2482 host_buffer: host_buffer.as_deref_mut(),
2483 },
2484 cancel,
2485 );
2486
2487 if let ReadState::HostReady {
2488 cancel_waker,
2489 cancel,
2490 ..
2491 } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2492 {
2493 if poll.is_pending() {
2494 *cancel_waker = Some(cx.waker().clone());
2495 } else {
2496 *cancel_waker = None;
2497 *cancel = false;
2498 }
2499 }
2500
2501 poll.map(|v| v.map(|result| (result, cancel)))
2502 })
2503 })
2504 .await?;
2505
2506 let (guest_offset, count) = tls::get(|store| {
2507 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2508 (
2509 match &transmit.read {
2510 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2511 ReadState::Open => 0,
2512 _ => unreachable!(),
2513 },
2514 match &transmit.write {
2515 &WriteState::GuestReady { count, .. } => count,
2516 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2517 _ => unreachable!(),
2518 },
2519 )
2520 });
2521
2522 match result {
2523 StreamResult::Completed => {
2524 if count > 0
2525 && guest_offset == 0
2526 && host_buffer_remaining_before
2527 .zip(host_buffer.map(|v| v.remaining().len()))
2528 .map(|(before, after)| before == after)
2529 .unwrap_or(false)
2530 {
2531 bail!(
2532 "StreamConsumer::poll_consume returned StreamResult::Completed \
2533 without consuming any items"
2534 );
2535 }
2536
2537 if let TransmitKind::Future = kind {
2538 tls::get(|store| {
2539 store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2540 });
2541 }
2542 }
2543 StreamResult::Cancelled => {
2544 if !cancelled {
2545 bail!(
2546 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2547 without being given a `finish` parameter value of true"
2548 );
2549 }
2550 }
2551 StreamResult::Dropped => {}
2552 }
2553
2554 *consumer.lock().unwrap() = Some(mine);
2555
2556 Ok(result)
2557 }
2558 };
2559 let consume = {
2560 let consume = consume_with_buffer.clone();
2561 Box::new(move || {
2562 let consume = consume.clone();
2563 async move { consume(None).await }.boxed()
2564 })
2565 };
2566
2567 match &transmit.write {
2568 WriteState::Open => {
2569 transmit.read = ReadState::HostReady {
2570 consume,
2571 guest_offset: 0,
2572 cancel: false,
2573 cancel_waker: None,
2574 };
2575 }
2576 &WriteState::GuestReady { .. } => {
2577 let future = consume();
2578 transmit.read = ReadState::HostReady {
2579 consume,
2580 guest_offset: 0,
2581 cancel: false,
2582 cancel_waker: None,
2583 };
2584 self.0.pipe_from_guest(kind, id, future);
2585 }
2586 WriteState::HostReady { .. } => {
2587 let WriteState::HostReady { produce, .. } = mem::replace(
2588 &mut transmit.write,
2589 WriteState::HostReady {
2590 produce: Box::new(|| unreachable!()),
2591 try_into: Box::new(|_| unreachable!()),
2592 guest_offset: 0,
2593 cancel: false,
2594 cancel_waker: None,
2595 },
2596 ) else {
2597 unreachable!();
2598 };
2599
2600 transmit.read = ReadState::HostToHost {
2601 accept: Box::new(move |input| {
2602 let consume = consume_with_buffer.clone();
2603 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2604 }),
2605 buffer: Vec::new(),
2606 limit: 0,
2607 };
2608
2609 let future = async move {
2610 loop {
2611 if tls::get(|store| {
2612 anyhow::Ok(matches!(
2613 store.concurrent_state_mut().get_mut(id)?.read,
2614 ReadState::Dropped
2615 ))
2616 })? {
2617 break Ok(());
2618 }
2619
2620 match produce().await? {
2621 StreamResult::Completed | StreamResult::Cancelled => {}
2622 StreamResult::Dropped => break Ok(()),
2623 }
2624
2625 if let TransmitKind::Future = kind {
2626 break Ok(());
2627 }
2628 }
2629 }
2630 .map(move |result| {
2631 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2632 result
2633 });
2634
2635 state.push_future(Box::pin(future));
2636 }
2637 WriteState::Dropped => {
2638 let reader = transmit.read_handle;
2639 self.0.host_drop_reader(reader, kind).unwrap();
2640 }
2641 }
2642 }
2643}
2644
2645async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2646 token: StoreToken<D>,
2647 id: TableId<TransmitState>,
2648 pair: Arc<Mutex<Option<(P, B)>>>,
2649 kind: TransmitKind,
2650) -> Result<()> {
2651 let (read, guest_offset) = tls::get(|store| {
2652 let transmit = store.concurrent_state_mut().get_mut(id)?;
2653
2654 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2655 Some(guest_offset)
2656 } else {
2657 None
2658 };
2659
2660 anyhow::Ok((
2661 mem::replace(&mut transmit.read, ReadState::Open),
2662 guest_offset,
2663 ))
2664 })?;
2665
2666 match read {
2667 ReadState::GuestReady {
2668 ty,
2669 flat_abi,
2670 options,
2671 address,
2672 count,
2673 handle,
2674 instance,
2675 } => {
2676 let guest_offset = guest_offset.unwrap();
2677
2678 if let TransmitKind::Future = kind {
2679 tls::get(|store| {
2680 store.concurrent_state_mut().get_mut(id)?.done = true;
2681 anyhow::Ok(())
2682 })?;
2683 }
2684
2685 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2686 let accept = {
2687 let pair = pair.clone();
2688 move |mut store: StoreContextMut<D>| {
2689 lower::<T, B, D>(
2690 store.as_context_mut(),
2691 instance,
2692 options,
2693 ty,
2694 address + (T::SIZE32 * guest_offset),
2695 count - guest_offset,
2696 &mut pair.lock().unwrap().as_mut().unwrap().1,
2697 )?;
2698 anyhow::Ok(())
2699 }
2700 };
2701
2702 if guest_offset < count {
2703 if T::MAY_REQUIRE_REALLOC {
2704 let (tx, rx) = oneshot::channel();
2709 tls::get(move |store| {
2710 store
2711 .concurrent_state_mut()
2712 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2713 move |store| {
2714 _ = tx.send(accept(token.as_context_mut(store))?);
2715 Ok(())
2716 },
2717 ))))
2718 });
2719 rx.await?
2720 } else {
2721 tls::get(|store| accept(token.as_context_mut(store)))?
2726 };
2727 }
2728
2729 tls::get(|store| {
2730 let count =
2731 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2732
2733 let transmit = store.concurrent_state_mut().get_mut(id)?;
2734
2735 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2736 unreachable!();
2737 };
2738
2739 *guest_offset += count;
2740
2741 transmit.read = ReadState::GuestReady {
2742 ty,
2743 flat_abi,
2744 options,
2745 address,
2746 count,
2747 handle,
2748 instance,
2749 };
2750
2751 anyhow::Ok(())
2752 })?;
2753
2754 Ok(())
2755 }
2756
2757 ReadState::HostToHost {
2758 accept,
2759 mut buffer,
2760 limit,
2761 } => {
2762 let mut state = StreamResult::Completed;
2763 let mut position = 0;
2764
2765 while !matches!(state, StreamResult::Dropped) && position < limit {
2766 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2767 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2768 (buffer, position, _) = slice_buffer.into_parts();
2769 }
2770
2771 {
2772 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2773
2774 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2775 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2776 }
2777
2778 *pair.lock().unwrap() = Some((mine, buffer));
2779 }
2780
2781 tls::get(|store| {
2782 store.concurrent_state_mut().get_mut(id)?.read = match state {
2783 StreamResult::Dropped => ReadState::Dropped,
2784 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2785 accept,
2786 buffer,
2787 limit: 0,
2788 },
2789 };
2790
2791 anyhow::Ok(())
2792 })?;
2793 Ok(())
2794 }
2795
2796 _ => unreachable!(),
2797 }
2798}
2799
2800impl Instance {
2801 fn consume(
2804 self,
2805 store: &mut dyn VMStore,
2806 kind: TransmitKind,
2807 transmit_id: TableId<TransmitState>,
2808 consume: PollStream,
2809 guest_offset: usize,
2810 cancel: bool,
2811 ) -> Result<ReturnCode> {
2812 let mut future = consume();
2813 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2814 consume,
2815 guest_offset,
2816 cancel,
2817 cancel_waker: None,
2818 };
2819 let poll = tls::set(store, || {
2820 future
2821 .as_mut()
2822 .poll(&mut Context::from_waker(&Waker::noop()))
2823 });
2824
2825 Ok(match poll {
2826 Poll::Ready(state) => {
2827 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2828 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2829 unreachable!();
2830 };
2831 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2832 transmit.write = WriteState::Open;
2833 code
2834 }
2835 Poll::Pending => {
2836 store.pipe_from_guest(kind, transmit_id, future);
2837 ReturnCode::Blocked
2838 }
2839 })
2840 }
2841
2842 fn produce(
2845 self,
2846 store: &mut dyn VMStore,
2847 kind: TransmitKind,
2848 transmit_id: TableId<TransmitState>,
2849 produce: PollStream,
2850 try_into: TryInto,
2851 guest_offset: usize,
2852 cancel: bool,
2853 ) -> Result<ReturnCode> {
2854 let mut future = produce();
2855 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2856 produce,
2857 try_into,
2858 guest_offset,
2859 cancel,
2860 cancel_waker: None,
2861 };
2862 let poll = tls::set(store, || {
2863 future
2864 .as_mut()
2865 .poll(&mut Context::from_waker(&Waker::noop()))
2866 });
2867
2868 Ok(match poll {
2869 Poll::Ready(state) => {
2870 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2871 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2872 unreachable!();
2873 };
2874 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2875 transmit.read = ReadState::Open;
2876 code
2877 }
2878 Poll::Pending => {
2879 store.pipe_to_guest(kind, transmit_id, future);
2880 ReturnCode::Blocked
2881 }
2882 })
2883 }
2884
2885 pub(super) fn guest_drop_writable(
2887 self,
2888 store: &mut StoreOpaque,
2889 ty: TransmitIndex,
2890 writer: u32,
2891 ) -> Result<()> {
2892 let table = self.id().get_mut(store).table_for_transmit(ty);
2893 let transmit_rep = match ty {
2894 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2895 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2896 };
2897
2898 let id = TableId::<TransmitHandle>::new(transmit_rep);
2899 log::trace!("guest_drop_writable: drop writer {id:?}");
2900 match ty {
2901 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
2902 TransmitIndex::Future(_) => store.host_drop_writer(
2903 id,
2904 Some(|| {
2905 Err(anyhow!(
2906 "cannot drop future write end without first writing a value"
2907 ))
2908 }),
2909 ),
2910 }
2911 }
2912
2913 fn copy<T: 'static>(
2916 self,
2917 mut store: StoreContextMut<T>,
2918 flat_abi: Option<FlatAbi>,
2919 write_ty: TransmitIndex,
2920 write_options: OptionsIndex,
2921 write_address: usize,
2922 read_ty: TransmitIndex,
2923 read_options: OptionsIndex,
2924 read_address: usize,
2925 count: usize,
2926 rep: u32,
2927 ) -> Result<()> {
2928 let types = self.id().get(store.0).component().types();
2929 match (write_ty, read_ty) {
2930 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2931 assert_eq!(count, 1);
2932
2933 let val = types[types[write_ty].ty]
2934 .payload
2935 .map(|ty| {
2936 let lift =
2937 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2938
2939 let abi = lift.types.canonical_abi(&ty);
2940 if write_address % usize::try_from(abi.align32)? != 0 {
2942 bail!("write pointer not aligned");
2943 }
2944
2945 let bytes = lift
2946 .memory()
2947 .get(write_address..)
2948 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2949 .ok_or_else(|| {
2950 anyhow::anyhow!("write pointer out of bounds of memory")
2951 })?;
2952
2953 Val::load(lift, ty, bytes)
2954 })
2955 .transpose()?;
2956
2957 if let Some(val) = val {
2958 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
2959 let types = lower.types;
2960 let ty = types[types[read_ty].ty].payload.unwrap();
2961 let ptr = func::validate_inbounds_dynamic(
2962 types.canonical_abi(&ty),
2963 lower.as_slice_mut(),
2964 &ValRaw::u32(read_address.try_into().unwrap()),
2965 )?;
2966 val.store(lower, ty, ptr)?;
2967 }
2968 }
2969 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2970 if let Some(flat_abi) = flat_abi {
2971 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2973 if length_in_bytes > 0 {
2974 if write_address % usize::try_from(flat_abi.align)? != 0 {
2975 bail!("write pointer not aligned");
2976 }
2977 if read_address % usize::try_from(flat_abi.align)? != 0 {
2978 bail!("read pointer not aligned");
2979 }
2980
2981 let store_opaque = store.0.store_opaque_mut();
2982
2983 {
2984 let src = self
2985 .options_memory(store_opaque, write_options)
2986 .get(write_address..)
2987 .and_then(|b| b.get(..length_in_bytes))
2988 .ok_or_else(|| {
2989 anyhow::anyhow!("write pointer out of bounds of memory")
2990 })?
2991 .as_ptr();
2992 let dst = self
2993 .options_memory_mut(store_opaque, read_options)
2994 .get_mut(read_address..)
2995 .and_then(|b| b.get_mut(..length_in_bytes))
2996 .ok_or_else(|| {
2997 anyhow::anyhow!("read pointer out of bounds of memory")
2998 })?
2999 .as_mut_ptr();
3000 unsafe { src.copy_to(dst, length_in_bytes) };
3003 }
3004 }
3005 } else {
3006 let store_opaque = store.0.store_opaque_mut();
3007 let lift = &mut LiftContext::new(store_opaque, write_options, self);
3008 let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3009 let abi = lift.types.canonical_abi(&ty);
3010 let size = usize::try_from(abi.size32).unwrap();
3011 if write_address % usize::try_from(abi.align32)? != 0 {
3012 bail!("write pointer not aligned");
3013 }
3014 let bytes = lift
3015 .memory()
3016 .get(write_address..)
3017 .and_then(|b| b.get(..size * count))
3018 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
3019
3020 let values = (0..count)
3021 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3022 .collect::<Result<Vec<_>>>()?;
3023
3024 let id = TableId::<TransmitHandle>::new(rep);
3025 log::trace!("copy values {values:?} for {id:?}");
3026
3027 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3028 let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3029 let abi = lower.types.canonical_abi(&ty);
3030 if read_address % usize::try_from(abi.align32)? != 0 {
3031 bail!("read pointer not aligned");
3032 }
3033 let size = usize::try_from(abi.size32).unwrap();
3034 lower
3035 .as_slice_mut()
3036 .get_mut(read_address..)
3037 .and_then(|b| b.get_mut(..size * count))
3038 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
3039 let mut ptr = read_address;
3040 for value in values {
3041 value.store(lower, ty, ptr)?;
3042 ptr += size
3043 }
3044 }
3045 }
3046 _ => unreachable!(),
3047 }
3048
3049 Ok(())
3050 }
3051
3052 fn check_bounds(
3053 self,
3054 store: &StoreOpaque,
3055 options: OptionsIndex,
3056 ty: TransmitIndex,
3057 address: usize,
3058 count: usize,
3059 ) -> Result<()> {
3060 let types = self.id().get(store).component().types();
3061 let size = usize::try_from(
3062 match ty {
3063 TransmitIndex::Future(ty) => types[types[ty].ty]
3064 .payload
3065 .map(|ty| types.canonical_abi(&ty).size32),
3066 TransmitIndex::Stream(ty) => types[types[ty].ty]
3067 .payload
3068 .map(|ty| types.canonical_abi(&ty).size32),
3069 }
3070 .unwrap_or(0),
3071 )
3072 .unwrap();
3073
3074 if count > 0 && size > 0 {
3075 self.options_memory(store, options)
3076 .get(address..)
3077 .and_then(|b| b.get(..(size * count)))
3078 .map(drop)
3079 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
3080 } else {
3081 Ok(())
3082 }
3083 }
3084
3085 pub(super) fn guest_write<T: 'static>(
3087 self,
3088 mut store: StoreContextMut<T>,
3089 ty: TransmitIndex,
3090 options: OptionsIndex,
3091 flat_abi: Option<FlatAbi>,
3092 handle: u32,
3093 address: u32,
3094 count: u32,
3095 ) -> Result<ReturnCode> {
3096 let address = usize::try_from(address).unwrap();
3097 let count = usize::try_from(count).unwrap();
3098 self.check_bounds(store.0, options, ty, address, count)?;
3099 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3100 let TransmitLocalState::Write { done } = *state else {
3101 bail!(
3102 "invalid handle {handle}; expected `Write`; got {:?}",
3103 *state
3104 );
3105 };
3106
3107 if done {
3108 bail!("cannot write to stream after being notified that the readable end dropped");
3109 }
3110
3111 *state = TransmitLocalState::Busy;
3112 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3113 let concurrent_state = store.0.concurrent_state_mut();
3114 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3115 let transmit = concurrent_state.get_mut(transmit_id)?;
3116 log::trace!(
3117 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3118 transmit.read
3119 );
3120
3121 if transmit.done {
3122 bail!("cannot write to future after previous write succeeded or readable end dropped");
3123 }
3124
3125 let new_state = if let ReadState::Dropped = &transmit.read {
3126 ReadState::Dropped
3127 } else {
3128 ReadState::Open
3129 };
3130
3131 let set_guest_ready = |me: &mut ConcurrentState| {
3132 let transmit = me.get_mut(transmit_id)?;
3133 assert!(
3134 matches!(&transmit.write, WriteState::Open),
3135 "expected `WriteState::Open`; got `{:?}`",
3136 transmit.write
3137 );
3138 transmit.write = WriteState::GuestReady {
3139 instance: self,
3140 ty,
3141 flat_abi,
3142 options,
3143 address,
3144 count,
3145 handle,
3146 };
3147 Ok::<_, crate::Error>(())
3148 };
3149
3150 let mut result = match mem::replace(&mut transmit.read, new_state) {
3151 ReadState::GuestReady {
3152 ty: read_ty,
3153 flat_abi: read_flat_abi,
3154 options: read_options,
3155 address: read_address,
3156 count: read_count,
3157 handle: read_handle,
3158 instance: read_instance,
3159 } => {
3160 assert_eq!(flat_abi, read_flat_abi);
3161
3162 if let TransmitIndex::Future(_) = ty {
3163 transmit.done = true;
3164 }
3165
3166 let write_complete = count == 0 || read_count > 0;
3188 let read_complete = count > 0;
3189 let read_buffer_remaining = count < read_count;
3190
3191 let read_handle_rep = transmit.read_handle.rep();
3192
3193 let count = count.min(read_count);
3194
3195 self.copy(
3196 store.as_context_mut(),
3197 flat_abi,
3198 ty,
3199 options,
3200 address,
3201 read_ty,
3202 read_options,
3203 read_address,
3204 count,
3205 rep,
3206 )?;
3207
3208 let instance = self.id().get_mut(store.0);
3209 let types = instance.component().types();
3210 let item_size = payload(ty, types)
3211 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3212 .unwrap_or(0);
3213 let concurrent_state = store.0.concurrent_state_mut();
3214 if read_complete {
3215 let count = u32::try_from(count).unwrap();
3216 let total = if let Some(Event::StreamRead {
3217 code: ReturnCode::Completed(old_total),
3218 ..
3219 }) = concurrent_state.take_event(read_handle_rep)?
3220 {
3221 count + old_total
3222 } else {
3223 count
3224 };
3225
3226 let code = ReturnCode::completed(ty.kind(), total);
3227
3228 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3229 }
3230
3231 if read_buffer_remaining {
3232 let transmit = concurrent_state.get_mut(transmit_id)?;
3233 transmit.read = ReadState::GuestReady {
3234 ty: read_ty,
3235 flat_abi: read_flat_abi,
3236 options: read_options,
3237 address: read_address + (count * item_size),
3238 count: read_count - count,
3239 handle: read_handle,
3240 instance: read_instance,
3241 };
3242 }
3243
3244 if write_complete {
3245 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3246 } else {
3247 set_guest_ready(concurrent_state)?;
3248 ReturnCode::Blocked
3249 }
3250 }
3251
3252 ReadState::HostReady {
3253 consume,
3254 guest_offset,
3255 cancel,
3256 cancel_waker,
3257 } => {
3258 assert!(cancel_waker.is_none());
3259 assert!(!cancel);
3260 assert_eq!(0, guest_offset);
3261
3262 if let TransmitIndex::Future(_) = ty {
3263 transmit.done = true;
3264 }
3265
3266 set_guest_ready(concurrent_state)?;
3267 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3268 }
3269
3270 ReadState::HostToHost { .. } => unreachable!(),
3271
3272 ReadState::Open => {
3273 set_guest_ready(concurrent_state)?;
3274 ReturnCode::Blocked
3275 }
3276
3277 ReadState::Dropped => {
3278 if let TransmitIndex::Future(_) = ty {
3279 transmit.done = true;
3280 }
3281
3282 ReturnCode::Dropped(0)
3283 }
3284 };
3285
3286 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3287 result = self.wait_for_write(store.0, transmit_handle)?;
3288 }
3289
3290 if result != ReturnCode::Blocked {
3291 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3292 TransmitLocalState::Write {
3293 done: matches!(
3294 (result, ty),
3295 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3296 ),
3297 };
3298 }
3299
3300 log::trace!(
3301 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3302 );
3303
3304 Ok(result)
3305 }
3306
3307 pub(super) fn guest_read<T: 'static>(
3309 self,
3310 mut store: StoreContextMut<T>,
3311 ty: TransmitIndex,
3312 options: OptionsIndex,
3313 flat_abi: Option<FlatAbi>,
3314 handle: u32,
3315 address: u32,
3316 count: u32,
3317 ) -> Result<ReturnCode> {
3318 let address = usize::try_from(address).unwrap();
3319 let count = usize::try_from(count).unwrap();
3320 self.check_bounds(store.0, options, ty, address, count)?;
3321 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3322 let TransmitLocalState::Read { done } = *state else {
3323 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3324 };
3325
3326 if done {
3327 bail!("cannot read from stream after being notified that the writable end dropped");
3328 }
3329
3330 *state = TransmitLocalState::Busy;
3331 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3332 let concurrent_state = store.0.concurrent_state_mut();
3333 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3334 let transmit = concurrent_state.get_mut(transmit_id)?;
3335 log::trace!(
3336 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3337 transmit.write
3338 );
3339
3340 if transmit.done {
3341 bail!("cannot read from future after previous read succeeded");
3342 }
3343
3344 let new_state = if let WriteState::Dropped = &transmit.write {
3345 WriteState::Dropped
3346 } else {
3347 WriteState::Open
3348 };
3349
3350 let set_guest_ready = |me: &mut ConcurrentState| {
3351 let transmit = me.get_mut(transmit_id)?;
3352 assert!(
3353 matches!(&transmit.read, ReadState::Open),
3354 "expected `ReadState::Open`; got `{:?}`",
3355 transmit.read
3356 );
3357 transmit.read = ReadState::GuestReady {
3358 ty,
3359 flat_abi,
3360 options,
3361 address,
3362 count,
3363 handle,
3364 instance: self,
3365 };
3366 Ok::<_, crate::Error>(())
3367 };
3368
3369 let mut result = match mem::replace(&mut transmit.write, new_state) {
3370 WriteState::GuestReady {
3371 instance: _,
3372 ty: write_ty,
3373 flat_abi: write_flat_abi,
3374 options: write_options,
3375 address: write_address,
3376 count: write_count,
3377 handle: write_handle,
3378 } => {
3379 assert_eq!(flat_abi, write_flat_abi);
3380
3381 if let TransmitIndex::Future(_) = ty {
3382 transmit.done = true;
3383 }
3384
3385 let write_handle_rep = transmit.write_handle.rep();
3386
3387 let write_complete = write_count == 0 || count > 0;
3392 let read_complete = write_count > 0;
3393 let write_buffer_remaining = count < write_count;
3394
3395 let count = count.min(write_count);
3396
3397 self.copy(
3398 store.as_context_mut(),
3399 flat_abi,
3400 write_ty,
3401 write_options,
3402 write_address,
3403 ty,
3404 options,
3405 address,
3406 count,
3407 rep,
3408 )?;
3409
3410 let instance = self.id().get_mut(store.0);
3411 let types = instance.component().types();
3412 let item_size = payload(ty, types)
3413 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3414 .unwrap_or(0);
3415 let concurrent_state = store.0.concurrent_state_mut();
3416
3417 if write_complete {
3418 let count = u32::try_from(count).unwrap();
3419 let total = if let Some(Event::StreamWrite {
3420 code: ReturnCode::Completed(old_total),
3421 ..
3422 }) = concurrent_state.take_event(write_handle_rep)?
3423 {
3424 count + old_total
3425 } else {
3426 count
3427 };
3428
3429 let code = ReturnCode::completed(ty.kind(), total);
3430
3431 concurrent_state.send_write_result(
3432 write_ty,
3433 transmit_id,
3434 write_handle,
3435 code,
3436 )?;
3437 }
3438
3439 if write_buffer_remaining {
3440 let transmit = concurrent_state.get_mut(transmit_id)?;
3441 transmit.write = WriteState::GuestReady {
3442 instance: self,
3443 ty: write_ty,
3444 flat_abi: write_flat_abi,
3445 options: write_options,
3446 address: write_address + (count * item_size),
3447 count: write_count - count,
3448 handle: write_handle,
3449 };
3450 }
3451
3452 if read_complete {
3453 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3454 } else {
3455 set_guest_ready(concurrent_state)?;
3456 ReturnCode::Blocked
3457 }
3458 }
3459
3460 WriteState::HostReady {
3461 produce,
3462 try_into,
3463 guest_offset,
3464 cancel,
3465 cancel_waker,
3466 } => {
3467 assert!(cancel_waker.is_none());
3468 assert!(!cancel);
3469 assert_eq!(0, guest_offset);
3470
3471 if let TransmitIndex::Future(_) = ty {
3472 transmit.done = true;
3473 }
3474
3475 set_guest_ready(concurrent_state)?;
3476
3477 self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?
3478 }
3479
3480 WriteState::Open => {
3481 set_guest_ready(concurrent_state)?;
3482 ReturnCode::Blocked
3483 }
3484
3485 WriteState::Dropped => ReturnCode::Dropped(0),
3486 };
3487
3488 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3489 result = self.wait_for_read(store.0, transmit_handle)?;
3490 }
3491
3492 if result != ReturnCode::Blocked {
3493 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3494 TransmitLocalState::Read {
3495 done: matches!(
3496 (result, ty),
3497 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3498 ),
3499 };
3500 }
3501
3502 log::trace!(
3503 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3504 );
3505
3506 Ok(result)
3507 }
3508
3509 fn wait_for_write(
3510 self,
3511 store: &mut StoreOpaque,
3512 handle: TableId<TransmitHandle>,
3513 ) -> Result<ReturnCode> {
3514 let waitable = Waitable::Transmit(handle);
3515 store.wait_for_event(waitable)?;
3516 let event = waitable.take_event(store.concurrent_state_mut())?;
3517 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3518 event
3519 {
3520 waitable.on_delivery(store, self, event);
3521 Ok(code)
3522 } else {
3523 unreachable!()
3524 }
3525 }
3526
3527 fn cancel_write(
3529 self,
3530 store: &mut StoreOpaque,
3531 transmit_id: TableId<TransmitState>,
3532 async_: bool,
3533 ) -> Result<ReturnCode> {
3534 let state = store.concurrent_state_mut();
3535 let transmit = state.get_mut(transmit_id)?;
3536 log::trace!(
3537 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3538 transmit.read,
3539 transmit.write
3540 );
3541
3542 let code = if let Some(event) =
3543 Waitable::Transmit(transmit.write_handle).take_event(state)?
3544 {
3545 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3546 unreachable!();
3547 };
3548 match (code, event) {
3549 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3550 ReturnCode::Cancelled(count)
3551 }
3552 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3553 _ => unreachable!(),
3554 }
3555 } else if let ReadState::HostReady {
3556 cancel,
3557 cancel_waker,
3558 ..
3559 } = &mut state.get_mut(transmit_id)?.read
3560 {
3561 *cancel = true;
3562 if let Some(waker) = cancel_waker.take() {
3563 waker.wake();
3564 }
3565
3566 if async_ {
3567 ReturnCode::Blocked
3568 } else {
3569 let handle = store
3570 .concurrent_state_mut()
3571 .get_mut(transmit_id)?
3572 .write_handle;
3573 self.wait_for_write(store, handle)?
3574 }
3575 } else {
3576 ReturnCode::Cancelled(0)
3577 };
3578
3579 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3580
3581 match &transmit.write {
3582 WriteState::GuestReady { .. } => {
3583 transmit.write = WriteState::Open;
3584 }
3585 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3586 WriteState::Open | WriteState::Dropped => {}
3587 }
3588
3589 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3590
3591 Ok(code)
3592 }
3593
3594 fn wait_for_read(
3595 self,
3596 store: &mut StoreOpaque,
3597 handle: TableId<TransmitHandle>,
3598 ) -> Result<ReturnCode> {
3599 let waitable = Waitable::Transmit(handle);
3600 store.wait_for_event(waitable)?;
3601 let event = waitable.take_event(store.concurrent_state_mut())?;
3602 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3603 event
3604 {
3605 waitable.on_delivery(store, self, event);
3606 Ok(code)
3607 } else {
3608 unreachable!()
3609 }
3610 }
3611
3612 fn cancel_read(
3614 self,
3615 store: &mut StoreOpaque,
3616 transmit_id: TableId<TransmitState>,
3617 async_: bool,
3618 ) -> Result<ReturnCode> {
3619 let state = store.concurrent_state_mut();
3620 let transmit = state.get_mut(transmit_id)?;
3621 log::trace!(
3622 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3623 transmit.read,
3624 transmit.write
3625 );
3626
3627 let code = if let Some(event) =
3628 Waitable::Transmit(transmit.read_handle).take_event(state)?
3629 {
3630 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3631 unreachable!();
3632 };
3633 match (code, event) {
3634 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3635 ReturnCode::Cancelled(count)
3636 }
3637 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3638 _ => unreachable!(),
3639 }
3640 } else if let WriteState::HostReady {
3641 cancel,
3642 cancel_waker,
3643 ..
3644 } = &mut state.get_mut(transmit_id)?.write
3645 {
3646 *cancel = true;
3647 if let Some(waker) = cancel_waker.take() {
3648 waker.wake();
3649 }
3650
3651 if async_ {
3652 ReturnCode::Blocked
3653 } else {
3654 let handle = store
3655 .concurrent_state_mut()
3656 .get_mut(transmit_id)?
3657 .read_handle;
3658 self.wait_for_read(store, handle)?
3659 }
3660 } else {
3661 ReturnCode::Cancelled(0)
3662 };
3663
3664 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3665
3666 match &transmit.read {
3667 ReadState::GuestReady { .. } => {
3668 transmit.read = ReadState::Open;
3669 }
3670 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3671 todo!("support host read cancellation")
3672 }
3673 ReadState::Open | ReadState::Dropped => {}
3674 }
3675
3676 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3677
3678 Ok(code)
3679 }
3680
3681 fn guest_cancel_write(
3683 self,
3684 store: &mut StoreOpaque,
3685 ty: TransmitIndex,
3686 async_: bool,
3687 writer: u32,
3688 ) -> Result<ReturnCode> {
3689 let (rep, state) =
3690 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3691 let id = TableId::<TransmitHandle>::new(rep);
3692 log::trace!("guest cancel write {id:?} (handle {writer})");
3693 match state {
3694 TransmitLocalState::Write { .. } => {
3695 bail!("stream or future write cancelled when no write is pending")
3696 }
3697 TransmitLocalState::Read { .. } => {
3698 bail!("passed read end to `{{stream|future}}.cancel-write`")
3699 }
3700 TransmitLocalState::Busy => {}
3701 }
3702 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3703 let code = self.cancel_write(store, transmit_id, async_)?;
3704 if !matches!(code, ReturnCode::Blocked) {
3705 let state =
3706 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3707 .1;
3708 if let TransmitLocalState::Busy = state {
3709 *state = TransmitLocalState::Write { done: false };
3710 }
3711 }
3712 Ok(code)
3713 }
3714
3715 fn guest_cancel_read(
3717 self,
3718 store: &mut StoreOpaque,
3719 ty: TransmitIndex,
3720 async_: bool,
3721 reader: u32,
3722 ) -> Result<ReturnCode> {
3723 let (rep, state) =
3724 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3725 let id = TableId::<TransmitHandle>::new(rep);
3726 log::trace!("guest cancel read {id:?} (handle {reader})");
3727 match state {
3728 TransmitLocalState::Read { .. } => {
3729 bail!("stream or future read cancelled when no read is pending")
3730 }
3731 TransmitLocalState::Write { .. } => {
3732 bail!("passed write end to `{{stream|future}}.cancel-read`")
3733 }
3734 TransmitLocalState::Busy => {}
3735 }
3736 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3737 let code = self.cancel_read(store, transmit_id, async_)?;
3738 if !matches!(code, ReturnCode::Blocked) {
3739 let state =
3740 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3741 .1;
3742 if let TransmitLocalState::Busy = state {
3743 *state = TransmitLocalState::Read { done: false };
3744 }
3745 }
3746 Ok(code)
3747 }
3748
3749 fn guest_drop_readable(
3751 self,
3752 store: &mut StoreOpaque,
3753 ty: TransmitIndex,
3754 reader: u32,
3755 ) -> Result<()> {
3756 let table = self.id().get_mut(store).table_for_transmit(ty);
3757 let (rep, _is_done) = match ty {
3758 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3759 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3760 };
3761 let kind = match ty {
3762 TransmitIndex::Stream(_) => TransmitKind::Stream,
3763 TransmitIndex::Future(_) => TransmitKind::Future,
3764 };
3765 let id = TableId::<TransmitHandle>::new(rep);
3766 log::trace!("guest_drop_readable: drop reader {id:?}");
3767 store.host_drop_reader(id, kind)
3768 }
3769
3770 pub(crate) fn error_context_new(
3772 self,
3773 store: &mut StoreOpaque,
3774 caller: RuntimeComponentInstanceIndex,
3775 ty: TypeComponentLocalErrorContextTableIndex,
3776 options: OptionsIndex,
3777 debug_msg_address: u32,
3778 debug_msg_len: u32,
3779 ) -> Result<u32> {
3780 self.id().get(store).check_may_leave(caller)?;
3781 let lift_ctx = &mut LiftContext::new(store, options, self);
3782 let debug_msg = String::linear_lift_from_flat(
3783 lift_ctx,
3784 InterfaceType::String,
3785 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
3786 )?;
3787
3788 let err_ctx = ErrorContextState { debug_msg };
3790 let state = store.concurrent_state_mut();
3791 let table_id = state.push(err_ctx)?;
3792 let global_ref_count_idx =
3793 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3794
3795 let _ = state
3797 .global_error_context_ref_counts
3798 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3799
3800 let local_idx = self
3807 .id()
3808 .get_mut(store)
3809 .table_for_error_context(ty)
3810 .error_context_insert(table_id.rep())?;
3811
3812 Ok(local_idx)
3813 }
3814
3815 pub(super) fn error_context_debug_message<T>(
3817 self,
3818 store: StoreContextMut<T>,
3819 ty: TypeComponentLocalErrorContextTableIndex,
3820 options: OptionsIndex,
3821 err_ctx_handle: u32,
3822 debug_msg_address: u32,
3823 ) -> Result<()> {
3824 let handle_table_id_rep = self
3826 .id()
3827 .get_mut(store.0)
3828 .table_for_error_context(ty)
3829 .error_context_rep(err_ctx_handle)?;
3830
3831 let state = store.0.concurrent_state_mut();
3832 let ErrorContextState { debug_msg } =
3834 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3835 let debug_msg = debug_msg.clone();
3836
3837 let lower_cx = &mut LowerContext::new(store, options, self);
3838 let debug_msg_address = usize::try_from(debug_msg_address)?;
3839 let offset = lower_cx
3841 .as_slice_mut()
3842 .get(debug_msg_address..)
3843 .and_then(|b| b.get(..debug_msg.bytes().len()))
3844 .map(|_| debug_msg_address)
3845 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3846 debug_msg
3847 .as_str()
3848 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3849
3850 Ok(())
3851 }
3852
3853 pub(crate) fn future_cancel_read(
3855 self,
3856 store: &mut StoreOpaque,
3857 caller: RuntimeComponentInstanceIndex,
3858 ty: TypeFutureTableIndex,
3859 async_: bool,
3860 reader: u32,
3861 ) -> Result<u32> {
3862 self.id().get(store).check_may_leave(caller)?;
3863 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3864 .map(|v| v.encode())
3865 }
3866
3867 pub(crate) fn future_cancel_write(
3869 self,
3870 store: &mut StoreOpaque,
3871 caller: RuntimeComponentInstanceIndex,
3872 ty: TypeFutureTableIndex,
3873 async_: bool,
3874 writer: u32,
3875 ) -> Result<u32> {
3876 self.id().get(store).check_may_leave(caller)?;
3877 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3878 .map(|v| v.encode())
3879 }
3880
3881 pub(crate) fn stream_cancel_read(
3883 self,
3884 store: &mut StoreOpaque,
3885 caller: RuntimeComponentInstanceIndex,
3886 ty: TypeStreamTableIndex,
3887 async_: bool,
3888 reader: u32,
3889 ) -> Result<u32> {
3890 self.id().get(store).check_may_leave(caller)?;
3891 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3892 .map(|v| v.encode())
3893 }
3894
3895 pub(crate) fn stream_cancel_write(
3897 self,
3898 store: &mut StoreOpaque,
3899 caller: RuntimeComponentInstanceIndex,
3900 ty: TypeStreamTableIndex,
3901 async_: bool,
3902 writer: u32,
3903 ) -> Result<u32> {
3904 self.id().get(store).check_may_leave(caller)?;
3905 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3906 .map(|v| v.encode())
3907 }
3908
3909 pub(crate) fn future_drop_readable(
3911 self,
3912 store: &mut StoreOpaque,
3913 caller: RuntimeComponentInstanceIndex,
3914 ty: TypeFutureTableIndex,
3915 reader: u32,
3916 ) -> Result<()> {
3917 self.id().get(store).check_may_leave(caller)?;
3918 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3919 }
3920
3921 pub(crate) fn stream_drop_readable(
3923 self,
3924 store: &mut StoreOpaque,
3925 caller: RuntimeComponentInstanceIndex,
3926 ty: TypeStreamTableIndex,
3927 reader: u32,
3928 ) -> Result<()> {
3929 self.id().get(store).check_may_leave(caller)?;
3930 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3931 }
3932
3933 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
3937 let (write, read) = store.concurrent_state_mut().new_transmit()?;
3938
3939 let table = self.id().get_mut(store).table_for_transmit(ty);
3940 let (read_handle, write_handle) = match ty {
3941 TransmitIndex::Future(ty) => (
3942 table.future_insert_read(ty, read.rep())?,
3943 table.future_insert_write(ty, write.rep())?,
3944 ),
3945 TransmitIndex::Stream(ty) => (
3946 table.stream_insert_read(ty, read.rep())?,
3947 table.stream_insert_write(ty, write.rep())?,
3948 ),
3949 };
3950
3951 let state = store.concurrent_state_mut();
3952 state.get_mut(read)?.common.handle = Some(read_handle);
3953 state.get_mut(write)?.common.handle = Some(write_handle);
3954
3955 Ok(ResourcePair {
3956 write: write_handle,
3957 read: read_handle,
3958 })
3959 }
3960
3961 pub(crate) fn error_context_drop(
3963 self,
3964 store: &mut StoreOpaque,
3965 caller: RuntimeComponentInstanceIndex,
3966 ty: TypeComponentLocalErrorContextTableIndex,
3967 error_context: u32,
3968 ) -> Result<()> {
3969 let instance = self.id().get_mut(store);
3970 instance.check_may_leave(caller)?;
3971
3972 let local_handle_table = instance.table_for_error_context(ty);
3973
3974 let rep = local_handle_table.error_context_drop(error_context)?;
3975
3976 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3977
3978 let state = store.concurrent_state_mut();
3979 let GlobalErrorContextRefCount(global_ref_count) = state
3980 .global_error_context_ref_counts
3981 .get_mut(&global_ref_count_idx)
3982 .expect("retrieve concurrent state for error context during drop");
3983
3984 assert!(*global_ref_count >= 1);
3986 *global_ref_count -= 1;
3987 if *global_ref_count == 0 {
3988 state
3989 .global_error_context_ref_counts
3990 .remove(&global_ref_count_idx);
3991
3992 state
3993 .delete(TableId::<ErrorContextState>::new(rep))
3994 .context("deleting component-global error context data")?;
3995 }
3996
3997 Ok(())
3998 }
3999
4000 fn guest_transfer(
4003 self,
4004 store: &mut StoreOpaque,
4005 src_idx: u32,
4006 src: TransmitIndex,
4007 dst: TransmitIndex,
4008 ) -> Result<u32> {
4009 let mut instance = self.id().get_mut(store);
4010 let src_table = instance.as_mut().table_for_transmit(src);
4011 let (rep, is_done) = match src {
4012 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4013 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4014 };
4015 if is_done {
4016 bail!("cannot lift after being notified that the writable end dropped");
4017 }
4018 let dst_table = instance.table_for_transmit(dst);
4019 let handle = match dst {
4020 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4021 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4022 }?;
4023 store
4024 .concurrent_state_mut()
4025 .get_mut(TableId::<TransmitHandle>::new(rep))?
4026 .common
4027 .handle = Some(handle);
4028 Ok(handle)
4029 }
4030
4031 pub(crate) fn future_new(
4033 self,
4034 store: &mut StoreOpaque,
4035 caller: RuntimeComponentInstanceIndex,
4036 ty: TypeFutureTableIndex,
4037 ) -> Result<ResourcePair> {
4038 self.id().get(store).check_may_leave(caller)?;
4039 self.guest_new(store, TransmitIndex::Future(ty))
4040 }
4041
4042 pub(crate) fn stream_new(
4044 self,
4045 store: &mut StoreOpaque,
4046 caller: RuntimeComponentInstanceIndex,
4047 ty: TypeStreamTableIndex,
4048 ) -> Result<ResourcePair> {
4049 self.id().get(store).check_may_leave(caller)?;
4050 self.guest_new(store, TransmitIndex::Stream(ty))
4051 }
4052
4053 pub(crate) fn future_transfer(
4056 self,
4057 store: &mut StoreOpaque,
4058 src_idx: u32,
4059 src: TypeFutureTableIndex,
4060 dst: TypeFutureTableIndex,
4061 ) -> Result<u32> {
4062 self.guest_transfer(
4063 store,
4064 src_idx,
4065 TransmitIndex::Future(src),
4066 TransmitIndex::Future(dst),
4067 )
4068 }
4069
4070 pub(crate) fn stream_transfer(
4073 self,
4074 store: &mut StoreOpaque,
4075 src_idx: u32,
4076 src: TypeStreamTableIndex,
4077 dst: TypeStreamTableIndex,
4078 ) -> Result<u32> {
4079 self.guest_transfer(
4080 store,
4081 src_idx,
4082 TransmitIndex::Stream(src),
4083 TransmitIndex::Stream(dst),
4084 )
4085 }
4086
4087 pub(crate) fn error_context_transfer(
4089 self,
4090 store: &mut StoreOpaque,
4091 src_idx: u32,
4092 src: TypeComponentLocalErrorContextTableIndex,
4093 dst: TypeComponentLocalErrorContextTableIndex,
4094 ) -> Result<u32> {
4095 let mut instance = self.id().get_mut(store);
4096 let rep = instance
4097 .as_mut()
4098 .table_for_error_context(src)
4099 .error_context_rep(src_idx)?;
4100 let dst_idx = instance
4101 .table_for_error_context(dst)
4102 .error_context_insert(rep)?;
4103
4104 let global_ref_count = store
4108 .concurrent_state_mut()
4109 .global_error_context_ref_counts
4110 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4111 .context("global ref count present for existing (sub)component error context")?;
4112 global_ref_count.0 += 1;
4113
4114 Ok(dst_idx)
4115 }
4116}
4117
4118impl ComponentInstance {
4119 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4120 let (tables, types) = self.guest_tables();
4121 let runtime_instance = match ty {
4122 TransmitIndex::Stream(ty) => types[ty].instance,
4123 TransmitIndex::Future(ty) => types[ty].instance,
4124 };
4125 &mut tables[runtime_instance]
4126 }
4127
4128 fn table_for_error_context(
4129 self: Pin<&mut Self>,
4130 ty: TypeComponentLocalErrorContextTableIndex,
4131 ) -> &mut HandleTable {
4132 let (tables, types) = self.guest_tables();
4133 let runtime_instance = types[ty].instance;
4134 &mut tables[runtime_instance]
4135 }
4136
4137 fn get_mut_by_index(
4138 self: Pin<&mut Self>,
4139 ty: TransmitIndex,
4140 index: u32,
4141 ) -> Result<(u32, &mut TransmitLocalState)> {
4142 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4143 }
4144}
4145
4146impl ConcurrentState {
4147 fn send_write_result(
4148 &mut self,
4149 ty: TransmitIndex,
4150 id: TableId<TransmitState>,
4151 handle: u32,
4152 code: ReturnCode,
4153 ) -> Result<()> {
4154 let write_handle = self.get_mut(id)?.write_handle.rep();
4155 self.set_event(
4156 write_handle,
4157 match ty {
4158 TransmitIndex::Future(ty) => Event::FutureWrite {
4159 code,
4160 pending: Some((ty, handle)),
4161 },
4162 TransmitIndex::Stream(ty) => Event::StreamWrite {
4163 code,
4164 pending: Some((ty, handle)),
4165 },
4166 },
4167 )
4168 }
4169
4170 fn send_read_result(
4171 &mut self,
4172 ty: TransmitIndex,
4173 id: TableId<TransmitState>,
4174 handle: u32,
4175 code: ReturnCode,
4176 ) -> Result<()> {
4177 let read_handle = self.get_mut(id)?.read_handle.rep();
4178 self.set_event(
4179 read_handle,
4180 match ty {
4181 TransmitIndex::Future(ty) => Event::FutureRead {
4182 code,
4183 pending: Some((ty, handle)),
4184 },
4185 TransmitIndex::Stream(ty) => Event::StreamRead {
4186 code,
4187 pending: Some((ty, handle)),
4188 },
4189 },
4190 )
4191 }
4192
4193 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4194 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4195 }
4196
4197 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4198 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4199 }
4200
4201 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4212 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4213
4214 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4215 let (ReturnCode::Completed(count)
4216 | ReturnCode::Dropped(count)
4217 | ReturnCode::Cancelled(count)) = old
4218 else {
4219 unreachable!()
4220 };
4221
4222 match new {
4223 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4224 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4225 _ => unreachable!(),
4226 }
4227 }
4228
4229 let event = match (waitable.take_event(self)?, event) {
4230 (None, _) => event,
4231 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4232 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4233 (
4234 Some(Event::StreamWrite {
4235 code: old_code,
4236 pending: old_pending,
4237 }),
4238 Event::StreamWrite { code, pending },
4239 ) => Event::StreamWrite {
4240 code: update_code(old_code, code),
4241 pending: old_pending.or(pending),
4242 },
4243 (
4244 Some(Event::StreamRead {
4245 code: old_code,
4246 pending: old_pending,
4247 }),
4248 Event::StreamRead { code, pending },
4249 ) => Event::StreamRead {
4250 code: update_code(old_code, code),
4251 pending: old_pending.or(pending),
4252 },
4253 _ => unreachable!(),
4254 };
4255
4256 waitable.set_event(self, Some(event))
4257 }
4258
4259 fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4262 let state_id = self.push(TransmitState::default())?;
4263
4264 let write = self.push(TransmitHandle::new(state_id))?;
4265 let read = self.push(TransmitHandle::new(state_id))?;
4266
4267 let state = self.get_mut(state_id)?;
4268 state.write_handle = write;
4269 state.read_handle = read;
4270
4271 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4272
4273 Ok((write, read))
4274 }
4275
4276 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4278 let state = self.delete(state_id)?;
4279 self.delete(state.write_handle)?;
4280 self.delete(state.read_handle)?;
4281
4282 log::trace!(
4283 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4284 state.write_handle,
4285 state.read_handle,
4286 );
4287
4288 Ok(())
4289 }
4290}
4291
4292pub(crate) struct ResourcePair {
4293 pub(crate) write: u32,
4294 pub(crate) read: u32,
4295}
4296
4297impl Waitable {
4298 pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4301 match event {
4302 Event::FutureRead {
4303 pending: Some((ty, handle)),
4304 ..
4305 }
4306 | Event::FutureWrite {
4307 pending: Some((ty, handle)),
4308 ..
4309 } => {
4310 let instance = instance.id().get_mut(store);
4311 let runtime_instance = instance.component().types()[ty].instance;
4312 let (rep, state) = instance.guest_tables().0[runtime_instance]
4313 .future_rep(ty, handle)
4314 .unwrap();
4315 assert_eq!(rep, self.rep());
4316 assert_eq!(*state, TransmitLocalState::Busy);
4317 *state = match event {
4318 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4319 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4320 _ => unreachable!(),
4321 };
4322 }
4323 Event::StreamRead {
4324 pending: Some((ty, handle)),
4325 code,
4326 }
4327 | Event::StreamWrite {
4328 pending: Some((ty, handle)),
4329 code,
4330 } => {
4331 let instance = instance.id().get_mut(store);
4332 let runtime_instance = instance.component().types()[ty].instance;
4333 let (rep, state) = instance.guest_tables().0[runtime_instance]
4334 .stream_rep(ty, handle)
4335 .unwrap();
4336 assert_eq!(rep, self.rep());
4337 assert_eq!(*state, TransmitLocalState::Busy);
4338 let done = matches!(code, ReturnCode::Dropped(_));
4339 *state = match event {
4340 Event::StreamRead { .. } => TransmitLocalState::Read { done },
4341 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4342 _ => unreachable!(),
4343 };
4344
4345 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4346 let state = store.concurrent_state_mut();
4347 let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4348 let transmit = state.get_mut(transmit_id).unwrap();
4349
4350 match event {
4351 Event::StreamRead { .. } => {
4352 transmit.read = ReadState::Open;
4353 }
4354 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4355 _ => unreachable!(),
4356 };
4357 }
4358 _ => {}
4359 }
4360 }
4361}
4362
4363#[cfg(test)]
4364mod tests {
4365 use super::*;
4366 use crate::{Engine, Store};
4367 use core::future::pending;
4368 use core::pin::pin;
4369 use std::sync::LazyLock;
4370
4371 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4372
4373 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4374 where
4375 T: FutureProducer<()>,
4376 {
4377 rx.poll_produce(
4378 &mut Context::from_waker(Waker::noop()),
4379 Store::new(&ENGINE, ()).as_context_mut(),
4380 finish,
4381 )
4382 }
4383
4384 #[test]
4385 fn future_producer() {
4386 let mut fut = pin!(async { anyhow::Ok(()) });
4387 assert!(matches!(
4388 poll_future_producer(fut.as_mut(), false),
4389 Poll::Ready(Ok(Some(()))),
4390 ));
4391
4392 let mut fut = pin!(async { anyhow::Ok(()) });
4393 assert!(matches!(
4394 poll_future_producer(fut.as_mut(), true),
4395 Poll::Ready(Ok(Some(()))),
4396 ));
4397
4398 let mut fut = pin!(pending::<Result<()>>());
4399 assert!(matches!(
4400 poll_future_producer(fut.as_mut(), false),
4401 Poll::Pending,
4402 ));
4403 assert!(matches!(
4404 poll_future_producer(fut.as_mut(), true),
4405 Poll::Ready(Ok(None)),
4406 ));
4407
4408 let (tx, rx) = oneshot::channel();
4409 let mut rx = pin!(rx);
4410 assert!(matches!(
4411 poll_future_producer(rx.as_mut(), false),
4412 Poll::Pending,
4413 ));
4414 assert!(matches!(
4415 poll_future_producer(rx.as_mut(), true),
4416 Poll::Ready(Ok(None)),
4417 ));
4418 tx.send(()).unwrap();
4419 assert!(matches!(
4420 poll_future_producer(rx.as_mut(), true),
4421 Poll::Ready(Ok(Some(()))),
4422 ));
4423
4424 let (tx, rx) = oneshot::channel();
4425 let mut rx = pin!(rx);
4426 tx.send(()).unwrap();
4427 assert!(matches!(
4428 poll_future_producer(rx.as_mut(), false),
4429 Poll::Ready(Ok(Some(()))),
4430 ));
4431
4432 let (tx, rx) = oneshot::channel::<()>();
4433 let mut rx = pin!(rx);
4434 drop(tx);
4435 assert!(matches!(
4436 poll_future_producer(rx.as_mut(), false),
4437 Poll::Ready(Err(..)),
4438 ));
4439
4440 let (tx, rx) = oneshot::channel::<()>();
4441 let mut rx = pin!(rx);
4442 drop(tx);
4443 assert!(matches!(
4444 poll_future_producer(rx.as_mut(), true),
4445 Poll::Ready(Err(..)),
4446 ));
4447 }
4448}