1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lift, Lower, Val, WasmList};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Error, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use core::fmt;
15use core::future;
16use core::iter;
17use core::marker::PhantomData;
18use core::mem::{self, MaybeUninit};
19use core::pin::Pin;
20use core::task::{Context, Poll, Waker, ready};
21use futures::channel::oneshot;
22use futures::{FutureExt as _, stream};
23use std::any::{Any, TypeId};
24use std::boxed::Box;
25use std::io::Cursor;
26use std::string::String;
27use std::sync::{Arc, Mutex};
28use std::vec::Vec;
29use wasmtime_environ::component::{
30 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
31 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
32 TypeFutureTableIndex, TypeStreamTableIndex,
33};
34
35pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
36
37mod buffers;
38
39#[derive(Copy, Clone, Debug)]
42pub enum TransmitKind {
43 Stream,
44 Future,
45}
46
47#[derive(Copy, Clone, Debug, PartialEq)]
49pub enum ReturnCode {
50 Blocked,
51 Completed(u32),
52 Dropped(u32),
53 Cancelled(u32),
54}
55
56impl ReturnCode {
57 pub fn encode(&self) -> u32 {
62 const BLOCKED: u32 = 0xffff_ffff;
63 const COMPLETED: u32 = 0x0;
64 const DROPPED: u32 = 0x1;
65 const CANCELLED: u32 = 0x2;
66 match self {
67 ReturnCode::Blocked => BLOCKED,
68 ReturnCode::Completed(n) => {
69 debug_assert!(*n < (1 << 28));
70 (n << 4) | COMPLETED
71 }
72 ReturnCode::Dropped(n) => {
73 debug_assert!(*n < (1 << 28));
74 (n << 4) | DROPPED
75 }
76 ReturnCode::Cancelled(n) => {
77 debug_assert!(*n < (1 << 28));
78 (n << 4) | CANCELLED
79 }
80 }
81 }
82
83 fn completed(kind: TransmitKind, count: u32) -> Self {
86 Self::Completed(if let TransmitKind::Future = kind {
87 0
88 } else {
89 count
90 })
91 }
92}
93
94#[derive(Copy, Clone, Debug)]
99pub enum TransmitIndex {
100 Stream(TypeStreamTableIndex),
101 Future(TypeFutureTableIndex),
102}
103
104impl TransmitIndex {
105 pub fn kind(&self) -> TransmitKind {
106 match self {
107 TransmitIndex::Stream(_) => TransmitKind::Stream,
108 TransmitIndex::Future(_) => TransmitKind::Future,
109 }
110 }
111}
112
113fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
116 match ty {
117 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
118 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
119 }
120}
121
122fn get_mut_by_index_from(
125 handle_table: &mut HandleTable,
126 ty: TransmitIndex,
127 index: u32,
128) -> Result<(u32, &mut TransmitLocalState)> {
129 match ty {
130 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
131 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
132 }
133}
134
135fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
136 mut store: StoreContextMut<U>,
137 instance: Instance,
138 options: OptionsIndex,
139 ty: TransmitIndex,
140 address: usize,
141 count: usize,
142 buffer: &mut B,
143) -> Result<()> {
144 let count = buffer.remaining().len().min(count);
145
146 let lower = &mut if T::MAY_REQUIRE_REALLOC {
147 LowerContext::new
148 } else {
149 LowerContext::new_without_realloc
150 }(store.as_context_mut(), options, instance);
151
152 if address % usize::try_from(T::ALIGN32)? != 0 {
153 bail!("read pointer not aligned");
154 }
155 lower
156 .as_slice_mut()
157 .get_mut(address..)
158 .and_then(|b| b.get_mut(..T::SIZE32 * count))
159 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
160
161 if let Some(ty) = payload(ty, lower.types) {
162 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
163 }
164
165 buffer.skip(count);
166
167 Ok(())
168}
169
170fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
171 lift: &mut LiftContext<'_>,
172 ty: Option<InterfaceType>,
173 buffer: &mut B,
174 address: usize,
175 count: usize,
176) -> Result<()> {
177 let count = count.min(buffer.remaining_capacity());
178 if T::IS_RUST_UNIT_TYPE {
179 buffer.extend(
183 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
184 )
185 } else {
186 let ty = ty.unwrap();
187 if address % usize::try_from(T::ALIGN32)? != 0 {
188 bail!("write pointer not aligned");
189 }
190 lift.memory()
191 .get(address..)
192 .and_then(|b| b.get(..T::SIZE32 * count))
193 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
194
195 let list = &WasmList::new(address, count, lift, ty)?;
196 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
197 }
198 Ok(())
199}
200
201#[derive(Debug, PartialEq, Eq, PartialOrd)]
203pub(super) struct ErrorContextState {
204 pub(crate) debug_msg: String,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub(super) struct FlatAbi {
212 pub(super) size: u32,
213 pub(super) align: u32,
214}
215
216pub struct Destination<'a, T, B> {
218 id: TableId<TransmitState>,
219 buffer: &'a mut B,
220 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
221 _phantom: PhantomData<fn() -> T>,
222}
223
224impl<'a, T, B> Destination<'a, T, B> {
225 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
227 Destination {
228 id: self.id,
229 buffer: &mut *self.buffer,
230 host_buffer: self.host_buffer.as_deref_mut(),
231 _phantom: PhantomData,
232 }
233 }
234
235 pub fn take_buffer(&mut self) -> B
241 where
242 B: Default,
243 {
244 mem::take(self.buffer)
245 }
246
247 pub fn set_buffer(&mut self, buffer: B) {
257 *self.buffer = buffer;
258 }
259
260 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
277 let transmit = store
278 .as_context_mut()
279 .0
280 .concurrent_state_mut()
281 .get_mut(self.id)
282 .unwrap();
283
284 if let &ReadState::GuestReady { count, .. } = &transmit.read {
285 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
286 unreachable!()
287 };
288
289 Some(count - guest_offset)
290 } else {
291 None
292 }
293 }
294}
295
296impl<'a, B> Destination<'a, u8, B> {
297 pub fn as_direct<D>(
308 mut self,
309 store: StoreContextMut<'a, D>,
310 capacity: usize,
311 ) -> DirectDestination<'a, D> {
312 if let Some(buffer) = self.host_buffer.as_deref_mut() {
313 buffer.set_position(0);
314 if buffer.get_mut().is_empty() {
315 buffer.get_mut().resize(capacity, 0);
316 }
317 }
318
319 DirectDestination {
320 id: self.id,
321 host_buffer: self.host_buffer,
322 store,
323 }
324 }
325}
326
327pub struct DirectDestination<'a, D: 'static> {
330 id: TableId<TransmitState>,
331 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
332 store: StoreContextMut<'a, D>,
333}
334
335impl<D: 'static> DirectDestination<'_, D> {
336 pub fn remaining(&mut self) -> &mut [u8] {
338 if let Some(buffer) = self.host_buffer.as_deref_mut() {
339 buffer.get_mut()
340 } else {
341 let transmit = self
342 .store
343 .as_context_mut()
344 .0
345 .concurrent_state_mut()
346 .get_mut(self.id)
347 .unwrap();
348
349 let &ReadState::GuestReady {
350 address,
351 count,
352 options,
353 instance,
354 ..
355 } = &transmit.read
356 else {
357 unreachable!();
358 };
359
360 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
361 unreachable!()
362 };
363
364 instance
365 .options_memory_mut(self.store.0, options)
366 .get_mut((address + guest_offset)..)
367 .and_then(|b| b.get_mut(..(count - guest_offset)))
368 .unwrap()
369 }
370 }
371
372 pub fn mark_written(&mut self, count: usize) {
377 if let Some(buffer) = self.host_buffer.as_deref_mut() {
378 buffer.set_position(
379 buffer
380 .position()
381 .checked_add(u64::try_from(count).unwrap())
382 .unwrap(),
383 );
384 } else {
385 let transmit = self
386 .store
387 .as_context_mut()
388 .0
389 .concurrent_state_mut()
390 .get_mut(self.id)
391 .unwrap();
392
393 let ReadState::GuestReady {
394 count: read_count, ..
395 } = &transmit.read
396 else {
397 unreachable!();
398 };
399
400 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
401 unreachable!()
402 };
403
404 if *guest_offset + count > *read_count {
405 panic!(
406 "write count ({count}) must be less than or equal to read count ({read_count})"
407 )
408 } else {
409 *guest_offset += count;
410 }
411 }
412 }
413}
414
415#[derive(Copy, Clone, Debug)]
417pub enum StreamResult {
418 Completed,
421 Cancelled,
426 Dropped,
429}
430
431pub trait StreamProducer<D>: Send + 'static {
433 type Item;
435
436 type Buffer: WriteBuffer<Self::Item> + Default;
438
439 fn poll_produce<'a>(
575 self: Pin<&mut Self>,
576 cx: &mut Context<'_>,
577 store: StoreContextMut<'a, D>,
578 destination: Destination<'a, Self::Item, Self::Buffer>,
579 finish: bool,
580 ) -> Poll<Result<StreamResult>>;
581
582 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
588 Err(me)
589 }
590}
591
592impl<T, D> StreamProducer<D> for iter::Empty<T>
593where
594 T: Send + Sync + 'static,
595{
596 type Item = T;
597 type Buffer = Option<Self::Item>;
598
599 fn poll_produce<'a>(
600 self: Pin<&mut Self>,
601 _: &mut Context<'_>,
602 _: StoreContextMut<'a, D>,
603 _: Destination<'a, Self::Item, Self::Buffer>,
604 _: bool,
605 ) -> Poll<Result<StreamResult>> {
606 Poll::Ready(Ok(StreamResult::Dropped))
607 }
608}
609
610impl<T, D> StreamProducer<D> for stream::Empty<T>
611where
612 T: Send + Sync + 'static,
613{
614 type Item = T;
615 type Buffer = Option<Self::Item>;
616
617 fn poll_produce<'a>(
618 self: Pin<&mut Self>,
619 _: &mut Context<'_>,
620 _: StoreContextMut<'a, D>,
621 _: Destination<'a, Self::Item, Self::Buffer>,
622 _: bool,
623 ) -> Poll<Result<StreamResult>> {
624 Poll::Ready(Ok(StreamResult::Dropped))
625 }
626}
627
628impl<T, D> StreamProducer<D> for Vec<T>
629where
630 T: Unpin + Send + Sync + 'static,
631{
632 type Item = T;
633 type Buffer = VecBuffer<T>;
634
635 fn poll_produce<'a>(
636 self: Pin<&mut Self>,
637 _: &mut Context<'_>,
638 _: StoreContextMut<'a, D>,
639 mut dst: Destination<'a, Self::Item, Self::Buffer>,
640 _: bool,
641 ) -> Poll<Result<StreamResult>> {
642 dst.set_buffer(mem::take(self.get_mut()).into());
643 Poll::Ready(Ok(StreamResult::Dropped))
644 }
645}
646
647impl<T, D> StreamProducer<D> for Box<[T]>
648where
649 T: Unpin + Send + Sync + 'static,
650{
651 type Item = T;
652 type Buffer = VecBuffer<T>;
653
654 fn poll_produce<'a>(
655 self: Pin<&mut Self>,
656 _: &mut Context<'_>,
657 _: StoreContextMut<'a, D>,
658 mut dst: Destination<'a, Self::Item, Self::Buffer>,
659 _: bool,
660 ) -> Poll<Result<StreamResult>> {
661 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
662 Poll::Ready(Ok(StreamResult::Dropped))
663 }
664}
665
666#[cfg(feature = "component-model-async-bytes")]
667impl<D> StreamProducer<D> for bytes::Bytes {
668 type Item = u8;
669 type Buffer = Cursor<Self>;
670
671 fn poll_produce<'a>(
672 mut self: Pin<&mut Self>,
673 _: &mut Context<'_>,
674 mut store: StoreContextMut<'a, D>,
675 mut dst: Destination<'a, Self::Item, Self::Buffer>,
676 _: bool,
677 ) -> Poll<Result<StreamResult>> {
678 let cap = dst.remaining(&mut store);
679 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
680 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
682 return Poll::Ready(Ok(StreamResult::Dropped));
683 };
684 let cap = cap.into();
685 dst.set_buffer(Cursor::new(self.split_off(cap)));
687 let mut dst = dst.as_direct(store, cap);
688 dst.remaining().copy_from_slice(&self);
689 dst.mark_written(cap);
690 Poll::Ready(Ok(StreamResult::Dropped))
691 }
692}
693
694#[cfg(feature = "component-model-async-bytes")]
695impl<D> StreamProducer<D> for bytes::BytesMut {
696 type Item = u8;
697 type Buffer = Cursor<Self>;
698
699 fn poll_produce<'a>(
700 mut self: Pin<&mut Self>,
701 _: &mut Context<'_>,
702 mut store: StoreContextMut<'a, D>,
703 mut dst: Destination<'a, Self::Item, Self::Buffer>,
704 _: bool,
705 ) -> Poll<Result<StreamResult>> {
706 let cap = dst.remaining(&mut store);
707 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
708 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
710 return Poll::Ready(Ok(StreamResult::Dropped));
711 };
712 let cap = cap.into();
713 dst.set_buffer(Cursor::new(self.split_off(cap)));
715 let mut dst = dst.as_direct(store, cap);
716 dst.remaining().copy_from_slice(&self);
717 dst.mark_written(cap);
718 Poll::Ready(Ok(StreamResult::Dropped))
719 }
720}
721
722pub struct Source<'a, T> {
724 id: TableId<TransmitState>,
725 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
726}
727
728impl<'a, T> Source<'a, T> {
729 pub fn reborrow(&mut self) -> Source<'_, T> {
731 Source {
732 id: self.id,
733 host_buffer: self.host_buffer.as_deref_mut(),
734 }
735 }
736
737 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
739 where
740 T: func::Lift + 'static,
741 B: ReadBuffer<T>,
742 {
743 if let Some(input) = &mut self.host_buffer {
744 let count = input.remaining().len().min(buffer.remaining_capacity());
745 buffer.move_from(*input, count);
746 } else {
747 let store = store.as_context_mut();
748 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
749
750 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
751 unreachable!();
752 };
753
754 let &WriteState::GuestReady {
755 ty,
756 address,
757 count,
758 options,
759 instance,
760 ..
761 } = &transmit.write
762 else {
763 unreachable!()
764 };
765
766 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
767 let ty = payload(ty, cx.types);
768 let old_remaining = buffer.remaining_capacity();
769 lift::<T, B>(
770 cx,
771 ty,
772 buffer,
773 address + (T::SIZE32 * guest_offset),
774 count - guest_offset,
775 )?;
776
777 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
778
779 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
780 unreachable!();
781 };
782
783 *guest_offset += old_remaining - buffer.remaining_capacity();
784 }
785
786 Ok(())
787 }
788
789 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
792 where
793 T: 'static,
794 {
795 let transmit = store
796 .as_context_mut()
797 .0
798 .concurrent_state_mut()
799 .get_mut(self.id)
800 .unwrap();
801
802 if let &WriteState::GuestReady { count, .. } = &transmit.write {
803 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
804 unreachable!()
805 };
806
807 count - guest_offset
808 } else if let Some(host_buffer) = &self.host_buffer {
809 host_buffer.remaining().len()
810 } else {
811 unreachable!()
812 }
813 }
814}
815
816impl<'a> Source<'a, u8> {
817 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
819 DirectSource {
820 id: self.id,
821 host_buffer: self.host_buffer,
822 store,
823 }
824 }
825}
826
827pub struct DirectSource<'a, D: 'static> {
830 id: TableId<TransmitState>,
831 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
832 store: StoreContextMut<'a, D>,
833}
834
835impl<D: 'static> DirectSource<'_, D> {
836 pub fn remaining(&mut self) -> &[u8] {
838 if let Some(buffer) = self.host_buffer.as_deref_mut() {
839 buffer.remaining()
840 } else {
841 let transmit = self
842 .store
843 .as_context_mut()
844 .0
845 .concurrent_state_mut()
846 .get_mut(self.id)
847 .unwrap();
848
849 let &WriteState::GuestReady {
850 address,
851 count,
852 options,
853 instance,
854 ..
855 } = &transmit.write
856 else {
857 unreachable!()
858 };
859
860 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
861 unreachable!()
862 };
863
864 instance
865 .options_memory(self.store.0, options)
866 .get((address + guest_offset)..)
867 .and_then(|b| b.get(..(count - guest_offset)))
868 .unwrap()
869 }
870 }
871
872 pub fn mark_read(&mut self, count: usize) {
877 if let Some(buffer) = self.host_buffer.as_deref_mut() {
878 buffer.skip(count);
879 } else {
880 let transmit = self
881 .store
882 .as_context_mut()
883 .0
884 .concurrent_state_mut()
885 .get_mut(self.id)
886 .unwrap();
887
888 let WriteState::GuestReady {
889 count: write_count, ..
890 } = &transmit.write
891 else {
892 unreachable!()
893 };
894
895 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
896 unreachable!()
897 };
898
899 if *guest_offset + count > *write_count {
900 panic!(
901 "read count ({count}) must be less than or equal to write count ({write_count})"
902 )
903 } else {
904 *guest_offset += count;
905 }
906 }
907 }
908}
909
910pub trait StreamConsumer<D>: Send + 'static {
912 type Item;
914
915 fn poll_consume(
998 self: Pin<&mut Self>,
999 cx: &mut Context<'_>,
1000 store: StoreContextMut<D>,
1001 source: Source<'_, Self::Item>,
1002 finish: bool,
1003 ) -> Poll<Result<StreamResult>>;
1004}
1005
1006pub trait FutureProducer<D>: Send + 'static {
1008 type Item;
1010
1011 fn poll_produce(
1021 self: Pin<&mut Self>,
1022 cx: &mut Context<'_>,
1023 store: StoreContextMut<D>,
1024 finish: bool,
1025 ) -> Poll<Result<Option<Self::Item>>>;
1026}
1027
1028impl<T, E, D, Fut> FutureProducer<D> for Fut
1029where
1030 E: Into<Error>,
1031 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1032{
1033 type Item = T;
1034
1035 fn poll_produce<'a>(
1036 self: Pin<&mut Self>,
1037 cx: &mut Context<'_>,
1038 _: StoreContextMut<'a, D>,
1039 finish: bool,
1040 ) -> Poll<Result<Option<T>>> {
1041 match self.poll(cx) {
1042 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1043 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1044 Poll::Pending if finish => Poll::Ready(Ok(None)),
1045 Poll::Pending => Poll::Pending,
1046 }
1047 }
1048}
1049
1050pub trait FutureConsumer<D>: Send + 'static {
1052 type Item;
1054
1055 fn poll_consume(
1067 self: Pin<&mut Self>,
1068 cx: &mut Context<'_>,
1069 store: StoreContextMut<D>,
1070 source: Source<'_, Self::Item>,
1071 finish: bool,
1072 ) -> Poll<Result<()>>;
1073}
1074
1075pub struct FutureReader<T> {
1082 id: TableId<TransmitHandle>,
1083 _phantom: PhantomData<T>,
1084}
1085
1086impl<T> FutureReader<T> {
1087 pub fn new<S: AsContextMut>(
1089 mut store: S,
1090 producer: impl FutureProducer<S::Data, Item = T>,
1091 ) -> Self
1092 where
1093 T: func::Lower + func::Lift + Send + Sync + 'static,
1094 {
1095 struct Producer<P>(P);
1096
1097 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1098 for Producer<P>
1099 {
1100 type Item = P::Item;
1101 type Buffer = Option<P::Item>;
1102
1103 fn poll_produce<'a>(
1104 self: Pin<&mut Self>,
1105 cx: &mut Context<'_>,
1106 store: StoreContextMut<D>,
1107 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1108 finish: bool,
1109 ) -> Poll<Result<StreamResult>> {
1110 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1113
1114 Poll::Ready(Ok(
1115 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1116 destination.set_buffer(Some(value));
1117
1118 StreamResult::Completed
1125 } else {
1126 StreamResult::Cancelled
1127 },
1128 ))
1129 }
1130 }
1131
1132 Self::new_(
1133 store
1134 .as_context_mut()
1135 .new_transmit(TransmitKind::Future, Producer(producer)),
1136 )
1137 }
1138
1139 fn new_(id: TableId<TransmitHandle>) -> Self {
1140 Self {
1141 id,
1142 _phantom: PhantomData,
1143 }
1144 }
1145
1146 pub fn pipe<S: AsContextMut>(
1148 self,
1149 mut store: S,
1150 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1151 ) where
1152 T: func::Lift + 'static,
1153 {
1154 struct Consumer<C>(C);
1155
1156 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1157 for Consumer<C>
1158 {
1159 type Item = T;
1160
1161 fn poll_consume(
1162 self: Pin<&mut Self>,
1163 cx: &mut Context<'_>,
1164 mut store: StoreContextMut<D>,
1165 mut source: Source<Self::Item>,
1166 finish: bool,
1167 ) -> Poll<Result<StreamResult>> {
1168 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1171
1172 ready!(consumer.poll_consume(
1173 cx,
1174 store.as_context_mut(),
1175 source.reborrow(),
1176 finish
1177 ))?;
1178
1179 Poll::Ready(Ok(if source.remaining(store) == 0 {
1180 StreamResult::Completed
1186 } else {
1187 StreamResult::Cancelled
1188 }))
1189 }
1190 }
1191
1192 store
1193 .as_context_mut()
1194 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1195 }
1196
1197 pub fn into_val(self) -> Val {
1200 Val::Future(FutureAny(self.id.rep()))
1201 }
1202
1203 pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1205 let Val::Future(FutureAny(rep)) = value else {
1206 bail!("expected `future`; got `{}`", value.desc());
1207 };
1208 let store = store.as_context_mut();
1209 let id = TableId::<TransmitHandle>::new(*rep);
1210 store.0.concurrent_state_mut().get_mut(id)?; Ok(Self::new_(id))
1212 }
1213
1214 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1216 match ty {
1217 InterfaceType::Future(src) => {
1218 let handle_table = cx
1219 .instance_mut()
1220 .table_for_transmit(TransmitIndex::Future(src));
1221 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1222 if is_done {
1223 bail!("cannot lift future after being notified that the writable end dropped");
1224 }
1225 let id = TableId::<TransmitHandle>::new(rep);
1226 let concurrent_state = cx.concurrent_state_mut();
1227 let future = concurrent_state.get_mut(id)?;
1228 future.common.handle = None;
1229 let state = future.state;
1230
1231 if concurrent_state.get_mut(state)?.done {
1232 bail!("cannot lift future after previous read succeeded");
1233 }
1234
1235 Ok(Self::new_(id))
1236 }
1237 _ => func::bad_type_info(),
1238 }
1239 }
1240
1241 pub fn close(&mut self, mut store: impl AsContextMut) {
1249 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1251 store
1252 .as_context_mut()
1253 .0
1254 .host_drop_reader(id, TransmitKind::Future)
1255 .unwrap();
1256 }
1257
1258 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1260 accessor.as_accessor().with(|access| self.close(access))
1261 }
1262
1263 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1269 where
1270 A: AsAccessor,
1271 {
1272 GuardedFutureReader::new(accessor, self)
1273 }
1274}
1275
1276impl<T> fmt::Debug for FutureReader<T> {
1277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1278 f.debug_struct("FutureReader")
1279 .field("id", &self.id)
1280 .finish()
1281 }
1282}
1283
1284pub(crate) fn lower_future_to_index<U>(
1286 rep: u32,
1287 cx: &mut LowerContext<'_, U>,
1288 ty: InterfaceType,
1289) -> Result<u32> {
1290 match ty {
1291 InterfaceType::Future(dst) => {
1292 let concurrent_state = cx.store.0.concurrent_state_mut();
1293 let id = TableId::<TransmitHandle>::new(rep);
1294 let state = concurrent_state.get_mut(id)?.state;
1295 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1296
1297 let handle = cx
1298 .instance_mut()
1299 .table_for_transmit(TransmitIndex::Future(dst))
1300 .future_insert_read(dst, rep)?;
1301
1302 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1303
1304 Ok(handle)
1305 }
1306 _ => func::bad_type_info(),
1307 }
1308}
1309
1310unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1313 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1314
1315 type Lower = <u32 as func::ComponentType>::Lower;
1316
1317 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1318 match ty {
1319 InterfaceType::Future(_) => Ok(()),
1320 other => bail!("expected `future`, found `{}`", func::desc(other)),
1321 }
1322 }
1323}
1324
1325unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1327 fn linear_lower_to_flat<U>(
1328 &self,
1329 cx: &mut LowerContext<'_, U>,
1330 ty: InterfaceType,
1331 dst: &mut MaybeUninit<Self::Lower>,
1332 ) -> Result<()> {
1333 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1334 cx,
1335 InterfaceType::U32,
1336 dst,
1337 )
1338 }
1339
1340 fn linear_lower_to_memory<U>(
1341 &self,
1342 cx: &mut LowerContext<'_, U>,
1343 ty: InterfaceType,
1344 offset: usize,
1345 ) -> Result<()> {
1346 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1347 cx,
1348 InterfaceType::U32,
1349 offset,
1350 )
1351 }
1352}
1353
1354unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1356 fn linear_lift_from_flat(
1357 cx: &mut LiftContext<'_>,
1358 ty: InterfaceType,
1359 src: &Self::Lower,
1360 ) -> Result<Self> {
1361 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1362 Self::lift_from_index(cx, ty, index)
1363 }
1364
1365 fn linear_lift_from_memory(
1366 cx: &mut LiftContext<'_>,
1367 ty: InterfaceType,
1368 bytes: &[u8],
1369 ) -> Result<Self> {
1370 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1371 Self::lift_from_index(cx, ty, index)
1372 }
1373}
1374
1375pub struct GuardedFutureReader<T, A>
1381where
1382 A: AsAccessor,
1383{
1384 reader: Option<FutureReader<T>>,
1388 accessor: A,
1389}
1390
1391impl<T, A> GuardedFutureReader<T, A>
1392where
1393 A: AsAccessor,
1394{
1395 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1397 Self {
1398 reader: Some(reader),
1399 accessor,
1400 }
1401 }
1402
1403 pub fn into_future(self) -> FutureReader<T> {
1406 self.into()
1407 }
1408}
1409
1410impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1411where
1412 A: AsAccessor,
1413{
1414 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1415 guard.reader.take().unwrap()
1416 }
1417}
1418
1419impl<T, A> Drop for GuardedFutureReader<T, A>
1420where
1421 A: AsAccessor,
1422{
1423 fn drop(&mut self) {
1424 if let Some(reader) = &mut self.reader {
1425 reader.close_with(&self.accessor)
1426 }
1427 }
1428}
1429
1430pub struct StreamReader<T> {
1437 id: TableId<TransmitHandle>,
1438 _phantom: PhantomData<T>,
1439}
1440
1441impl<T> StreamReader<T> {
1442 pub fn new<S: AsContextMut>(
1444 mut store: S,
1445 producer: impl StreamProducer<S::Data, Item = T>,
1446 ) -> Self
1447 where
1448 T: func::Lower + func::Lift + Send + Sync + 'static,
1449 {
1450 Self::new_(
1451 store
1452 .as_context_mut()
1453 .new_transmit(TransmitKind::Stream, producer),
1454 )
1455 }
1456
1457 fn new_(id: TableId<TransmitHandle>) -> Self {
1458 Self {
1459 id,
1460 _phantom: PhantomData,
1461 }
1462 }
1463
1464 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1481 let store = store.as_context_mut();
1482 let state = store.0.concurrent_state_mut();
1483 let id = state.get_mut(self.id).unwrap().state;
1484 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1485 match try_into(TypeId::of::<V>()) {
1486 Some(result) => {
1487 self.close(store);
1488 Ok(*result.downcast::<V>().unwrap())
1489 }
1490 None => Err(self),
1491 }
1492 } else {
1493 Err(self)
1494 }
1495 }
1496
1497 pub fn pipe<S: AsContextMut>(
1499 self,
1500 mut store: S,
1501 consumer: impl StreamConsumer<S::Data, Item = T>,
1502 ) where
1503 T: 'static,
1504 {
1505 store
1506 .as_context_mut()
1507 .set_consumer(self.id, TransmitKind::Stream, consumer);
1508 }
1509
1510 pub fn into_val(self) -> Val {
1513 Val::Stream(StreamAny(self.id.rep()))
1514 }
1515
1516 pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1518 let Val::Stream(StreamAny(rep)) = value else {
1519 bail!("expected `stream`; got `{}`", value.desc());
1520 };
1521 let store = store.as_context_mut();
1522 let id = TableId::<TransmitHandle>::new(*rep);
1523 store.0.concurrent_state_mut().get_mut(id)?; Ok(Self::new_(id))
1525 }
1526
1527 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1529 match ty {
1530 InterfaceType::Stream(src) => {
1531 let handle_table = cx
1532 .instance_mut()
1533 .table_for_transmit(TransmitIndex::Stream(src));
1534 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1535 if is_done {
1536 bail!("cannot lift stream after being notified that the writable end dropped");
1537 }
1538 let id = TableId::<TransmitHandle>::new(rep);
1539 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1540 Ok(Self::new_(id))
1541 }
1542 _ => func::bad_type_info(),
1543 }
1544 }
1545
1546 pub fn close(&mut self, mut store: impl AsContextMut) {
1554 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1556 store
1557 .as_context_mut()
1558 .0
1559 .host_drop_reader(id, TransmitKind::Stream)
1560 .unwrap()
1561 }
1562
1563 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1565 accessor.as_accessor().with(|access| self.close(access))
1566 }
1567
1568 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1574 where
1575 A: AsAccessor,
1576 {
1577 GuardedStreamReader::new(accessor, self)
1578 }
1579}
1580
1581impl<T> fmt::Debug for StreamReader<T> {
1582 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1583 f.debug_struct("StreamReader")
1584 .field("id", &self.id)
1585 .finish()
1586 }
1587}
1588
1589pub(crate) fn lower_stream_to_index<U>(
1591 rep: u32,
1592 cx: &mut LowerContext<'_, U>,
1593 ty: InterfaceType,
1594) -> Result<u32> {
1595 match ty {
1596 InterfaceType::Stream(dst) => {
1597 let concurrent_state = cx.store.0.concurrent_state_mut();
1598 let id = TableId::<TransmitHandle>::new(rep);
1599 let state = concurrent_state.get_mut(id)?.state;
1600 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1601
1602 let handle = cx
1603 .instance_mut()
1604 .table_for_transmit(TransmitIndex::Stream(dst))
1605 .stream_insert_read(dst, rep)?;
1606
1607 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1608
1609 Ok(handle)
1610 }
1611 _ => func::bad_type_info(),
1612 }
1613}
1614
1615unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1618 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1619
1620 type Lower = <u32 as func::ComponentType>::Lower;
1621
1622 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1623 match ty {
1624 InterfaceType::Stream(_) => Ok(()),
1625 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1626 }
1627 }
1628}
1629
1630unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1632 fn linear_lower_to_flat<U>(
1633 &self,
1634 cx: &mut LowerContext<'_, U>,
1635 ty: InterfaceType,
1636 dst: &mut MaybeUninit<Self::Lower>,
1637 ) -> Result<()> {
1638 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1639 cx,
1640 InterfaceType::U32,
1641 dst,
1642 )
1643 }
1644
1645 fn linear_lower_to_memory<U>(
1646 &self,
1647 cx: &mut LowerContext<'_, U>,
1648 ty: InterfaceType,
1649 offset: usize,
1650 ) -> Result<()> {
1651 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1652 cx,
1653 InterfaceType::U32,
1654 offset,
1655 )
1656 }
1657}
1658
1659unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1661 fn linear_lift_from_flat(
1662 cx: &mut LiftContext<'_>,
1663 ty: InterfaceType,
1664 src: &Self::Lower,
1665 ) -> Result<Self> {
1666 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1667 Self::lift_from_index(cx, ty, index)
1668 }
1669
1670 fn linear_lift_from_memory(
1671 cx: &mut LiftContext<'_>,
1672 ty: InterfaceType,
1673 bytes: &[u8],
1674 ) -> Result<Self> {
1675 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1676 Self::lift_from_index(cx, ty, index)
1677 }
1678}
1679
1680pub struct GuardedStreamReader<T, A>
1686where
1687 A: AsAccessor,
1688{
1689 reader: Option<StreamReader<T>>,
1693 accessor: A,
1694}
1695
1696impl<T, A> GuardedStreamReader<T, A>
1697where
1698 A: AsAccessor,
1699{
1700 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1703 Self {
1704 reader: Some(reader),
1705 accessor,
1706 }
1707 }
1708
1709 pub fn into_stream(self) -> StreamReader<T> {
1712 self.into()
1713 }
1714}
1715
1716impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1717where
1718 A: AsAccessor,
1719{
1720 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1721 guard.reader.take().unwrap()
1722 }
1723}
1724
1725impl<T, A> Drop for GuardedStreamReader<T, A>
1726where
1727 A: AsAccessor,
1728{
1729 fn drop(&mut self) {
1730 if let Some(reader) = &mut self.reader {
1731 reader.close_with(&self.accessor)
1732 }
1733 }
1734}
1735
1736pub struct ErrorContext {
1738 rep: u32,
1739}
1740
1741impl ErrorContext {
1742 pub(crate) fn new(rep: u32) -> Self {
1743 Self { rep }
1744 }
1745
1746 pub fn into_val(self) -> Val {
1748 Val::ErrorContext(ErrorContextAny(self.rep))
1749 }
1750
1751 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1753 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1754 bail!("expected `error-context`; got `{}`", value.desc());
1755 };
1756 Ok(Self::new(*rep))
1757 }
1758
1759 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1760 match ty {
1761 InterfaceType::ErrorContext(src) => {
1762 let rep = cx
1763 .instance_mut()
1764 .table_for_error_context(src)
1765 .error_context_rep(index)?;
1766
1767 Ok(Self { rep })
1768 }
1769 _ => func::bad_type_info(),
1770 }
1771 }
1772}
1773
1774pub(crate) fn lower_error_context_to_index<U>(
1775 rep: u32,
1776 cx: &mut LowerContext<'_, U>,
1777 ty: InterfaceType,
1778) -> Result<u32> {
1779 match ty {
1780 InterfaceType::ErrorContext(dst) => {
1781 let tbl = cx.instance_mut().table_for_error_context(dst);
1782 tbl.error_context_insert(rep)
1783 }
1784 _ => func::bad_type_info(),
1785 }
1786}
1787unsafe impl func::ComponentType for ErrorContext {
1790 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1791
1792 type Lower = <u32 as func::ComponentType>::Lower;
1793
1794 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1795 match ty {
1796 InterfaceType::ErrorContext(_) => Ok(()),
1797 other => bail!("expected `error`, found `{}`", func::desc(other)),
1798 }
1799 }
1800}
1801
1802unsafe impl func::Lower for ErrorContext {
1804 fn linear_lower_to_flat<T>(
1805 &self,
1806 cx: &mut LowerContext<'_, T>,
1807 ty: InterfaceType,
1808 dst: &mut MaybeUninit<Self::Lower>,
1809 ) -> Result<()> {
1810 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1811 cx,
1812 InterfaceType::U32,
1813 dst,
1814 )
1815 }
1816
1817 fn linear_lower_to_memory<T>(
1818 &self,
1819 cx: &mut LowerContext<'_, T>,
1820 ty: InterfaceType,
1821 offset: usize,
1822 ) -> Result<()> {
1823 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1824 cx,
1825 InterfaceType::U32,
1826 offset,
1827 )
1828 }
1829}
1830
1831unsafe impl func::Lift for ErrorContext {
1833 fn linear_lift_from_flat(
1834 cx: &mut LiftContext<'_>,
1835 ty: InterfaceType,
1836 src: &Self::Lower,
1837 ) -> Result<Self> {
1838 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1839 Self::lift_from_index(cx, ty, index)
1840 }
1841
1842 fn linear_lift_from_memory(
1843 cx: &mut LiftContext<'_>,
1844 ty: InterfaceType,
1845 bytes: &[u8],
1846 ) -> Result<Self> {
1847 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1848 Self::lift_from_index(cx, ty, index)
1849 }
1850}
1851
1852pub(super) struct TransmitHandle {
1854 pub(super) common: WaitableCommon,
1855 state: TableId<TransmitState>,
1857}
1858
1859impl TransmitHandle {
1860 fn new(state: TableId<TransmitState>) -> Self {
1861 Self {
1862 common: WaitableCommon::default(),
1863 state,
1864 }
1865 }
1866}
1867
1868impl TableDebug for TransmitHandle {
1869 fn type_name() -> &'static str {
1870 "TransmitHandle"
1871 }
1872}
1873
1874struct TransmitState {
1876 write_handle: TableId<TransmitHandle>,
1878 read_handle: TableId<TransmitHandle>,
1880 write: WriteState,
1882 read: ReadState,
1884 done: bool,
1886}
1887
1888impl Default for TransmitState {
1889 fn default() -> Self {
1890 Self {
1891 write_handle: TableId::new(u32::MAX),
1892 read_handle: TableId::new(u32::MAX),
1893 read: ReadState::Open,
1894 write: WriteState::Open,
1895 done: false,
1896 }
1897 }
1898}
1899
1900impl TableDebug for TransmitState {
1901 fn type_name() -> &'static str {
1902 "TransmitState"
1903 }
1904}
1905
1906type PollStream = Box<
1907 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
1908>;
1909
1910type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
1911
1912enum WriteState {
1914 Open,
1916 GuestReady {
1918 instance: Instance,
1919 caller: RuntimeComponentInstanceIndex,
1920 ty: TransmitIndex,
1921 flat_abi: Option<FlatAbi>,
1922 options: OptionsIndex,
1923 address: usize,
1924 count: usize,
1925 handle: u32,
1926 },
1927 HostReady {
1929 produce: PollStream,
1930 try_into: TryInto,
1931 guest_offset: usize,
1932 cancel: bool,
1933 cancel_waker: Option<Waker>,
1934 },
1935 Dropped,
1937}
1938
1939impl fmt::Debug for WriteState {
1940 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1941 match self {
1942 Self::Open => f.debug_tuple("Open").finish(),
1943 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1944 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1945 Self::Dropped => f.debug_tuple("Dropped").finish(),
1946 }
1947 }
1948}
1949
1950enum ReadState {
1952 Open,
1954 GuestReady {
1956 ty: TransmitIndex,
1957 caller: RuntimeComponentInstanceIndex,
1958 flat_abi: Option<FlatAbi>,
1959 instance: Instance,
1960 options: OptionsIndex,
1961 address: usize,
1962 count: usize,
1963 handle: u32,
1964 },
1965 HostReady {
1967 consume: PollStream,
1968 guest_offset: usize,
1969 cancel: bool,
1970 cancel_waker: Option<Waker>,
1971 },
1972 HostToHost {
1974 accept: Box<
1975 dyn for<'a> Fn(
1976 &'a mut UntypedWriteBuffer<'a>,
1977 )
1978 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1979 + Send
1980 + Sync,
1981 >,
1982 buffer: Vec<u8>,
1983 limit: usize,
1984 },
1985 Dropped,
1987}
1988
1989impl fmt::Debug for ReadState {
1990 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1991 match self {
1992 Self::Open => f.debug_tuple("Open").finish(),
1993 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1994 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1995 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1996 Self::Dropped => f.debug_tuple("Dropped").finish(),
1997 }
1998 }
1999}
2000
2001fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2002 let count = guest_offset.try_into().unwrap();
2003 match state {
2004 StreamResult::Dropped => ReturnCode::Dropped(count),
2005 StreamResult::Completed => ReturnCode::completed(kind, count),
2006 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2007 }
2008}
2009
2010impl StoreOpaque {
2011 fn pipe_from_guest(
2012 &mut self,
2013 kind: TransmitKind,
2014 id: TableId<TransmitState>,
2015 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2016 ) {
2017 let future = async move {
2018 let stream_state = future.await?;
2019 tls::get(|store| {
2020 let state = store.concurrent_state_mut();
2021 let transmit = state.get_mut(id)?;
2022 let ReadState::HostReady {
2023 consume,
2024 guest_offset,
2025 ..
2026 } = mem::replace(&mut transmit.read, ReadState::Open)
2027 else {
2028 unreachable!();
2029 };
2030 let code = return_code(kind, stream_state, guest_offset);
2031 transmit.read = match stream_state {
2032 StreamResult::Dropped => ReadState::Dropped,
2033 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2034 consume,
2035 guest_offset: 0,
2036 cancel: false,
2037 cancel_waker: None,
2038 },
2039 };
2040 let WriteState::GuestReady { ty, handle, .. } =
2041 mem::replace(&mut transmit.write, WriteState::Open)
2042 else {
2043 unreachable!();
2044 };
2045 state.send_write_result(ty, id, handle, code)?;
2046 Ok(())
2047 })
2048 };
2049
2050 self.concurrent_state_mut().push_future(future.boxed());
2051 }
2052
2053 fn pipe_to_guest(
2054 &mut self,
2055 kind: TransmitKind,
2056 id: TableId<TransmitState>,
2057 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2058 ) {
2059 let future = async move {
2060 let stream_state = future.await?;
2061 tls::get(|store| {
2062 let state = store.concurrent_state_mut();
2063 let transmit = state.get_mut(id)?;
2064 let WriteState::HostReady {
2065 produce,
2066 try_into,
2067 guest_offset,
2068 ..
2069 } = mem::replace(&mut transmit.write, WriteState::Open)
2070 else {
2071 unreachable!();
2072 };
2073 let code = return_code(kind, stream_state, guest_offset);
2074 transmit.write = match stream_state {
2075 StreamResult::Dropped => WriteState::Dropped,
2076 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2077 produce,
2078 try_into,
2079 guest_offset: 0,
2080 cancel: false,
2081 cancel_waker: None,
2082 },
2083 };
2084 let ReadState::GuestReady { ty, handle, .. } =
2085 mem::replace(&mut transmit.read, ReadState::Open)
2086 else {
2087 unreachable!();
2088 };
2089 state.send_read_result(ty, id, handle, code)?;
2090 Ok(())
2091 })
2092 };
2093
2094 self.concurrent_state_mut().push_future(future.boxed());
2095 }
2096
2097 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2099 let state = self.concurrent_state_mut();
2100 let transmit_id = state.get_mut(id)?.state;
2101 let transmit = state
2102 .get_mut(transmit_id)
2103 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2104 log::trace!(
2105 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2106 transmit.read,
2107 transmit.write
2108 );
2109
2110 transmit.read = ReadState::Dropped;
2111
2112 let new_state = if let WriteState::Dropped = &transmit.write {
2115 WriteState::Dropped
2116 } else {
2117 WriteState::Open
2118 };
2119
2120 let write_handle = transmit.write_handle;
2121
2122 match mem::replace(&mut transmit.write, new_state) {
2123 WriteState::GuestReady { ty, handle, .. } => {
2126 state.update_event(
2127 write_handle.rep(),
2128 match ty {
2129 TransmitIndex::Future(ty) => Event::FutureWrite {
2130 code: ReturnCode::Dropped(0),
2131 pending: Some((ty, handle)),
2132 },
2133 TransmitIndex::Stream(ty) => Event::StreamWrite {
2134 code: ReturnCode::Dropped(0),
2135 pending: Some((ty, handle)),
2136 },
2137 },
2138 )?;
2139 }
2140
2141 WriteState::HostReady { .. } => {}
2142
2143 WriteState::Open => {
2144 state.update_event(
2145 write_handle.rep(),
2146 match kind {
2147 TransmitKind::Future => Event::FutureWrite {
2148 code: ReturnCode::Dropped(0),
2149 pending: None,
2150 },
2151 TransmitKind::Stream => Event::StreamWrite {
2152 code: ReturnCode::Dropped(0),
2153 pending: None,
2154 },
2155 },
2156 )?;
2157 }
2158
2159 WriteState::Dropped => {
2160 log::trace!("host_drop_reader delete {transmit_id:?}");
2161 state.delete_transmit(transmit_id)?;
2162 }
2163 }
2164 Ok(())
2165 }
2166
2167 fn host_drop_writer(
2169 &mut self,
2170 id: TableId<TransmitHandle>,
2171 on_drop_open: Option<fn() -> Result<()>>,
2172 ) -> Result<()> {
2173 let state = self.concurrent_state_mut();
2174 let transmit_id = state.get_mut(id)?.state;
2175 let transmit = state
2176 .get_mut(transmit_id)
2177 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2178 log::trace!(
2179 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2180 transmit.read,
2181 transmit.write
2182 );
2183
2184 match &mut transmit.write {
2186 WriteState::GuestReady { .. } => {
2187 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2188 }
2189 WriteState::HostReady { .. } => {}
2190 v @ WriteState::Open => {
2191 if let (Some(on_drop_open), false) = (
2192 on_drop_open,
2193 transmit.done || matches!(transmit.read, ReadState::Dropped),
2194 ) {
2195 on_drop_open()?;
2196 } else {
2197 *v = WriteState::Dropped;
2198 }
2199 }
2200 WriteState::Dropped => unreachable!("write state is already dropped"),
2201 }
2202
2203 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2204
2205 let new_state = if let ReadState::Dropped = &transmit.read {
2211 ReadState::Dropped
2212 } else {
2213 ReadState::Open
2214 };
2215
2216 let read_handle = transmit.read_handle;
2217
2218 match mem::replace(&mut transmit.read, new_state) {
2220 ReadState::GuestReady { ty, handle, .. } => {
2224 self.concurrent_state_mut().update_event(
2226 read_handle.rep(),
2227 match ty {
2228 TransmitIndex::Future(ty) => Event::FutureRead {
2229 code: ReturnCode::Dropped(0),
2230 pending: Some((ty, handle)),
2231 },
2232 TransmitIndex::Stream(ty) => Event::StreamRead {
2233 code: ReturnCode::Dropped(0),
2234 pending: Some((ty, handle)),
2235 },
2236 },
2237 )?;
2238 }
2239
2240 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2241
2242 ReadState::Open => {
2244 self.concurrent_state_mut().update_event(
2245 read_handle.rep(),
2246 match on_drop_open {
2247 Some(_) => Event::FutureRead {
2248 code: ReturnCode::Dropped(0),
2249 pending: None,
2250 },
2251 None => Event::StreamRead {
2252 code: ReturnCode::Dropped(0),
2253 pending: None,
2254 },
2255 },
2256 )?;
2257 }
2258
2259 ReadState::Dropped => {
2262 log::trace!("host_drop_writer delete {transmit_id:?}");
2263 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2264 }
2265 }
2266 Ok(())
2267 }
2268}
2269
2270impl<T> StoreContextMut<'_, T> {
2271 fn new_transmit<P: StreamProducer<T>>(
2272 mut self,
2273 kind: TransmitKind,
2274 producer: P,
2275 ) -> TableId<TransmitHandle>
2276 where
2277 P::Item: func::Lower,
2278 {
2279 let token = StoreToken::new(self.as_context_mut());
2280 let state = self.0.concurrent_state_mut();
2281 let (_, read) = state.new_transmit().unwrap();
2282 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2283 let id = state.get_mut(read).unwrap().state;
2284 let mut dropped = false;
2285 let produce = Box::new({
2286 let producer = producer.clone();
2287 move || {
2288 let producer = producer.clone();
2289 async move {
2290 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2291
2292 let (result, cancelled) = if buffer.remaining().is_empty() {
2293 future::poll_fn(|cx| {
2294 tls::get(|store| {
2295 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2296
2297 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2298 unreachable!();
2299 };
2300
2301 let mut host_buffer =
2302 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2303 Some(Cursor::new(mem::take(buffer)))
2304 } else {
2305 None
2306 };
2307
2308 let poll = mine.as_mut().poll_produce(
2309 cx,
2310 token.as_context_mut(store),
2311 Destination {
2312 id,
2313 buffer: &mut buffer,
2314 host_buffer: host_buffer.as_mut(),
2315 _phantom: PhantomData,
2316 },
2317 cancel,
2318 );
2319
2320 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2321
2322 let host_offset = if let (
2323 Some(host_buffer),
2324 ReadState::HostToHost { buffer, limit, .. },
2325 ) = (host_buffer, &mut transmit.read)
2326 {
2327 *limit = usize::try_from(host_buffer.position()).unwrap();
2328 *buffer = host_buffer.into_inner();
2329 *limit
2330 } else {
2331 0
2332 };
2333
2334 {
2335 let WriteState::HostReady {
2336 guest_offset,
2337 cancel,
2338 cancel_waker,
2339 ..
2340 } = &mut transmit.write
2341 else {
2342 unreachable!();
2343 };
2344
2345 if poll.is_pending() {
2346 if !buffer.remaining().is_empty()
2347 || *guest_offset > 0
2348 || host_offset > 0
2349 {
2350 return Poll::Ready(Err(anyhow!(
2351 "StreamProducer::poll_produce returned Poll::Pending \
2352 after producing at least one item"
2353 )));
2354 }
2355 *cancel_waker = Some(cx.waker().clone());
2356 } else {
2357 *cancel_waker = None;
2358 *cancel = false;
2359 }
2360 }
2361
2362 poll.map(|v| v.map(|result| (result, cancel)))
2363 })
2364 })
2365 .await?
2366 } else {
2367 (StreamResult::Completed, false)
2368 };
2369
2370 let (guest_offset, host_offset, count) = tls::get(|store| {
2371 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2372 let (count, host_offset) = match &transmit.read {
2373 &ReadState::GuestReady { count, .. } => (count, 0),
2374 &ReadState::HostToHost { limit, .. } => (1, limit),
2375 _ => unreachable!(),
2376 };
2377 let guest_offset = match &transmit.write {
2378 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2379 _ => unreachable!(),
2380 };
2381 (guest_offset, host_offset, count)
2382 });
2383
2384 match result {
2385 StreamResult::Completed => {
2386 if count > 1
2387 && buffer.remaining().is_empty()
2388 && guest_offset == 0
2389 && host_offset == 0
2390 {
2391 bail!(
2392 "StreamProducer::poll_produce returned StreamResult::Completed \
2393 without producing any items"
2394 );
2395 }
2396 }
2397 StreamResult::Cancelled => {
2398 if !cancelled {
2399 bail!(
2400 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2401 without being given a `finish` parameter value of true"
2402 );
2403 }
2404 }
2405 StreamResult::Dropped => {
2406 dropped = true;
2407 }
2408 }
2409
2410 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2411
2412 *producer.lock().unwrap() = Some((mine, buffer));
2413
2414 if write_buffer {
2415 write( token, id, producer.clone(), kind).await?;
2416 }
2417
2418 Ok(if dropped {
2419 if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2420 {
2421 StreamResult::Dropped
2422 } else {
2423 StreamResult::Completed
2424 }
2425 } else {
2426 result
2427 })
2428 }
2429 .boxed()
2430 }
2431 });
2432 let try_into = Box::new(move |ty| {
2433 let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2434 match P::try_into(mine, ty) {
2435 Ok(value) => Some(value),
2436 Err(mine) => {
2437 *producer.lock().unwrap() = Some((mine, buffer));
2438 None
2439 }
2440 }
2441 });
2442 state.get_mut(id).unwrap().write = WriteState::HostReady {
2443 produce,
2444 try_into,
2445 guest_offset: 0,
2446 cancel: false,
2447 cancel_waker: None,
2448 };
2449 read
2450 }
2451
2452 fn set_consumer<C: StreamConsumer<T>>(
2453 mut self,
2454 id: TableId<TransmitHandle>,
2455 kind: TransmitKind,
2456 consumer: C,
2457 ) {
2458 let token = StoreToken::new(self.as_context_mut());
2459 let state = self.0.concurrent_state_mut();
2460 let id = state.get_mut(id).unwrap().state;
2461 let transmit = state.get_mut(id).unwrap();
2462 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2463 let consume_with_buffer = {
2464 let consumer = consumer.clone();
2465 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2466 let mut mine = consumer.lock().unwrap().take().unwrap();
2467
2468 let host_buffer_remaining_before =
2469 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2470
2471 let (result, cancelled) = future::poll_fn(|cx| {
2472 tls::get(|store| {
2473 let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2474 &ReadState::HostReady { cancel, .. } => cancel,
2475 ReadState::Open => false,
2476 _ => unreachable!(),
2477 };
2478
2479 let poll = mine.as_mut().poll_consume(
2480 cx,
2481 token.as_context_mut(store),
2482 Source {
2483 id,
2484 host_buffer: host_buffer.as_deref_mut(),
2485 },
2486 cancel,
2487 );
2488
2489 if let ReadState::HostReady {
2490 cancel_waker,
2491 cancel,
2492 ..
2493 } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2494 {
2495 if poll.is_pending() {
2496 *cancel_waker = Some(cx.waker().clone());
2497 } else {
2498 *cancel_waker = None;
2499 *cancel = false;
2500 }
2501 }
2502
2503 poll.map(|v| v.map(|result| (result, cancel)))
2504 })
2505 })
2506 .await?;
2507
2508 let (guest_offset, count) = tls::get(|store| {
2509 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2510 (
2511 match &transmit.read {
2512 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2513 ReadState::Open => 0,
2514 _ => unreachable!(),
2515 },
2516 match &transmit.write {
2517 &WriteState::GuestReady { count, .. } => count,
2518 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2519 _ => unreachable!(),
2520 },
2521 )
2522 });
2523
2524 match result {
2525 StreamResult::Completed => {
2526 if count > 0
2527 && guest_offset == 0
2528 && host_buffer_remaining_before
2529 .zip(host_buffer.map(|v| v.remaining().len()))
2530 .map(|(before, after)| before == after)
2531 .unwrap_or(false)
2532 {
2533 bail!(
2534 "StreamConsumer::poll_consume returned StreamResult::Completed \
2535 without consuming any items"
2536 );
2537 }
2538
2539 if let TransmitKind::Future = kind {
2540 tls::get(|store| {
2541 store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2542 });
2543 }
2544 }
2545 StreamResult::Cancelled => {
2546 if !cancelled {
2547 bail!(
2548 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2549 without being given a `finish` parameter value of true"
2550 );
2551 }
2552 }
2553 StreamResult::Dropped => {}
2554 }
2555
2556 *consumer.lock().unwrap() = Some(mine);
2557
2558 Ok(result)
2559 }
2560 };
2561 let consume = {
2562 let consume = consume_with_buffer.clone();
2563 Box::new(move || {
2564 let consume = consume.clone();
2565 async move { consume(None).await }.boxed()
2566 })
2567 };
2568
2569 match &transmit.write {
2570 WriteState::Open => {
2571 transmit.read = ReadState::HostReady {
2572 consume,
2573 guest_offset: 0,
2574 cancel: false,
2575 cancel_waker: None,
2576 };
2577 }
2578 &WriteState::GuestReady { .. } => {
2579 let future = consume();
2580 transmit.read = ReadState::HostReady {
2581 consume,
2582 guest_offset: 0,
2583 cancel: false,
2584 cancel_waker: None,
2585 };
2586 self.0.pipe_from_guest(kind, id, future);
2587 }
2588 WriteState::HostReady { .. } => {
2589 let WriteState::HostReady { produce, .. } = mem::replace(
2590 &mut transmit.write,
2591 WriteState::HostReady {
2592 produce: Box::new(|| unreachable!()),
2593 try_into: Box::new(|_| unreachable!()),
2594 guest_offset: 0,
2595 cancel: false,
2596 cancel_waker: None,
2597 },
2598 ) else {
2599 unreachable!();
2600 };
2601
2602 transmit.read = ReadState::HostToHost {
2603 accept: Box::new(move |input| {
2604 let consume = consume_with_buffer.clone();
2605 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2606 }),
2607 buffer: Vec::new(),
2608 limit: 0,
2609 };
2610
2611 let future = async move {
2612 loop {
2613 if tls::get(|store| {
2614 anyhow::Ok(matches!(
2615 store.concurrent_state_mut().get_mut(id)?.read,
2616 ReadState::Dropped
2617 ))
2618 })? {
2619 break Ok(());
2620 }
2621
2622 match produce().await? {
2623 StreamResult::Completed | StreamResult::Cancelled => {}
2624 StreamResult::Dropped => break Ok(()),
2625 }
2626
2627 if let TransmitKind::Future = kind {
2628 break Ok(());
2629 }
2630 }
2631 }
2632 .map(move |result| {
2633 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2634 result
2635 });
2636
2637 state.push_future(Box::pin(future));
2638 }
2639 WriteState::Dropped => {
2640 let reader = transmit.read_handle;
2641 self.0.host_drop_reader(reader, kind).unwrap();
2642 }
2643 }
2644 }
2645}
2646
2647async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2648 token: StoreToken<D>,
2649 id: TableId<TransmitState>,
2650 pair: Arc<Mutex<Option<(P, B)>>>,
2651 kind: TransmitKind,
2652) -> Result<()> {
2653 let (read, guest_offset) = tls::get(|store| {
2654 let transmit = store.concurrent_state_mut().get_mut(id)?;
2655
2656 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2657 Some(guest_offset)
2658 } else {
2659 None
2660 };
2661
2662 anyhow::Ok((
2663 mem::replace(&mut transmit.read, ReadState::Open),
2664 guest_offset,
2665 ))
2666 })?;
2667
2668 match read {
2669 ReadState::GuestReady {
2670 ty,
2671 flat_abi,
2672 options,
2673 address,
2674 count,
2675 handle,
2676 instance,
2677 caller,
2678 } => {
2679 let guest_offset = guest_offset.unwrap();
2680
2681 if let TransmitKind::Future = kind {
2682 tls::get(|store| {
2683 store.concurrent_state_mut().get_mut(id)?.done = true;
2684 anyhow::Ok(())
2685 })?;
2686 }
2687
2688 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2689 let accept = {
2690 let pair = pair.clone();
2691 move |mut store: StoreContextMut<D>| {
2692 lower::<T, B, D>(
2693 store.as_context_mut(),
2694 instance,
2695 options,
2696 ty,
2697 address + (T::SIZE32 * guest_offset),
2698 count - guest_offset,
2699 &mut pair.lock().unwrap().as_mut().unwrap().1,
2700 )?;
2701 anyhow::Ok(())
2702 }
2703 };
2704
2705 if guest_offset < count {
2706 if T::MAY_REQUIRE_REALLOC {
2707 let (tx, rx) = oneshot::channel();
2712 tls::get(move |store| {
2713 store
2714 .concurrent_state_mut()
2715 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2716 move |store| {
2717 _ = tx.send(accept(token.as_context_mut(store))?);
2718 Ok(())
2719 },
2720 ))))
2721 });
2722 rx.await?
2723 } else {
2724 tls::get(|store| accept(token.as_context_mut(store)))?
2729 };
2730 }
2731
2732 tls::get(|store| {
2733 let count =
2734 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2735
2736 let transmit = store.concurrent_state_mut().get_mut(id)?;
2737
2738 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2739 unreachable!();
2740 };
2741
2742 *guest_offset += count;
2743
2744 transmit.read = ReadState::GuestReady {
2745 ty,
2746 flat_abi,
2747 options,
2748 address,
2749 count,
2750 handle,
2751 instance,
2752 caller,
2753 };
2754
2755 anyhow::Ok(())
2756 })?;
2757
2758 Ok(())
2759 }
2760
2761 ReadState::HostToHost {
2762 accept,
2763 mut buffer,
2764 limit,
2765 } => {
2766 let mut state = StreamResult::Completed;
2767 let mut position = 0;
2768
2769 while !matches!(state, StreamResult::Dropped) && position < limit {
2770 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2771 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2772 (buffer, position, _) = slice_buffer.into_parts();
2773 }
2774
2775 {
2776 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2777
2778 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2779 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2780 }
2781
2782 *pair.lock().unwrap() = Some((mine, buffer));
2783 }
2784
2785 tls::get(|store| {
2786 store.concurrent_state_mut().get_mut(id)?.read = match state {
2787 StreamResult::Dropped => ReadState::Dropped,
2788 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2789 accept,
2790 buffer,
2791 limit: 0,
2792 },
2793 };
2794
2795 anyhow::Ok(())
2796 })?;
2797 Ok(())
2798 }
2799
2800 _ => unreachable!(),
2801 }
2802}
2803
2804impl Instance {
2805 fn consume(
2808 self,
2809 store: &mut dyn VMStore,
2810 kind: TransmitKind,
2811 transmit_id: TableId<TransmitState>,
2812 consume: PollStream,
2813 guest_offset: usize,
2814 cancel: bool,
2815 ) -> Result<ReturnCode> {
2816 let mut future = consume();
2817 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2818 consume,
2819 guest_offset,
2820 cancel,
2821 cancel_waker: None,
2822 };
2823 let poll = tls::set(store, || {
2824 future
2825 .as_mut()
2826 .poll(&mut Context::from_waker(&Waker::noop()))
2827 });
2828
2829 Ok(match poll {
2830 Poll::Ready(state) => {
2831 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2832 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2833 unreachable!();
2834 };
2835 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2836 transmit.write = WriteState::Open;
2837 code
2838 }
2839 Poll::Pending => {
2840 store.pipe_from_guest(kind, transmit_id, future);
2841 ReturnCode::Blocked
2842 }
2843 })
2844 }
2845
2846 fn produce(
2849 self,
2850 store: &mut dyn VMStore,
2851 kind: TransmitKind,
2852 transmit_id: TableId<TransmitState>,
2853 produce: PollStream,
2854 try_into: TryInto,
2855 guest_offset: usize,
2856 cancel: bool,
2857 ) -> Result<ReturnCode> {
2858 let mut future = produce();
2859 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2860 produce,
2861 try_into,
2862 guest_offset,
2863 cancel,
2864 cancel_waker: None,
2865 };
2866 let poll = tls::set(store, || {
2867 future
2868 .as_mut()
2869 .poll(&mut Context::from_waker(&Waker::noop()))
2870 });
2871
2872 Ok(match poll {
2873 Poll::Ready(state) => {
2874 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2875 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2876 unreachable!();
2877 };
2878 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2879 transmit.read = ReadState::Open;
2880 code
2881 }
2882 Poll::Pending => {
2883 store.pipe_to_guest(kind, transmit_id, future);
2884 ReturnCode::Blocked
2885 }
2886 })
2887 }
2888
2889 pub(super) fn guest_drop_writable(
2891 self,
2892 store: &mut StoreOpaque,
2893 ty: TransmitIndex,
2894 writer: u32,
2895 ) -> Result<()> {
2896 let table = self.id().get_mut(store).table_for_transmit(ty);
2897 let transmit_rep = match ty {
2898 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2899 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2900 };
2901
2902 let id = TableId::<TransmitHandle>::new(transmit_rep);
2903 log::trace!("guest_drop_writable: drop writer {id:?}");
2904 match ty {
2905 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
2906 TransmitIndex::Future(_) => store.host_drop_writer(
2907 id,
2908 Some(|| {
2909 Err(anyhow!(
2910 "cannot drop future write end without first writing a value"
2911 ))
2912 }),
2913 ),
2914 }
2915 }
2916
2917 fn copy<T: 'static>(
2920 self,
2921 mut store: StoreContextMut<T>,
2922 flat_abi: Option<FlatAbi>,
2923 write_caller: RuntimeComponentInstanceIndex,
2924 write_ty: TransmitIndex,
2925 write_options: OptionsIndex,
2926 write_address: usize,
2927 read_caller: RuntimeComponentInstanceIndex,
2928 read_ty: TransmitIndex,
2929 read_options: OptionsIndex,
2930 read_address: usize,
2931 count: usize,
2932 rep: u32,
2933 ) -> Result<()> {
2934 let types = self.id().get(store.0).component().types();
2935 match (write_ty, read_ty) {
2936 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2937 assert_eq!(count, 1);
2938
2939 let payload = types[types[write_ty].ty].payload;
2940
2941 if write_caller == read_caller && payload.is_some() {
2942 bail!(
2943 "cannot read from and write to intra-component future with non-unit payload"
2944 )
2945 }
2946
2947 let val = payload
2948 .map(|ty| {
2949 let lift =
2950 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2951
2952 let abi = lift.types.canonical_abi(&ty);
2953 if write_address % usize::try_from(abi.align32)? != 0 {
2955 bail!("write pointer not aligned");
2956 }
2957
2958 let bytes = lift
2959 .memory()
2960 .get(write_address..)
2961 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2962 .ok_or_else(|| {
2963 anyhow::anyhow!("write pointer out of bounds of memory")
2964 })?;
2965
2966 Val::load(lift, ty, bytes)
2967 })
2968 .transpose()?;
2969
2970 if let Some(val) = val {
2971 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
2972 let types = lower.types;
2973 let ty = types[types[read_ty].ty].payload.unwrap();
2974 let ptr = func::validate_inbounds_dynamic(
2975 types.canonical_abi(&ty),
2976 lower.as_slice_mut(),
2977 &ValRaw::u32(read_address.try_into().unwrap()),
2978 )?;
2979 val.store(lower, ty, ptr)?;
2980 }
2981 }
2982 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2983 if let Some(flat_abi) = flat_abi {
2984 if write_caller == read_caller && types[types[write_ty].ty].payload.is_some() {
2985 bail!(
2986 "cannot read from and write to intra-component stream with non-unit payload"
2987 )
2988 }
2989
2990 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2992 if length_in_bytes > 0 {
2993 if write_address % usize::try_from(flat_abi.align)? != 0 {
2994 bail!("write pointer not aligned");
2995 }
2996 if read_address % usize::try_from(flat_abi.align)? != 0 {
2997 bail!("read pointer not aligned");
2998 }
2999
3000 let store_opaque = store.0.store_opaque_mut();
3001
3002 {
3003 let src = self
3004 .options_memory(store_opaque, write_options)
3005 .get(write_address..)
3006 .and_then(|b| b.get(..length_in_bytes))
3007 .ok_or_else(|| {
3008 anyhow::anyhow!("write pointer out of bounds of memory")
3009 })?
3010 .as_ptr();
3011 let dst = self
3012 .options_memory_mut(store_opaque, read_options)
3013 .get_mut(read_address..)
3014 .and_then(|b| b.get_mut(..length_in_bytes))
3015 .ok_or_else(|| {
3016 anyhow::anyhow!("read pointer out of bounds of memory")
3017 })?
3018 .as_mut_ptr();
3019 unsafe { src.copy_to(dst, length_in_bytes) };
3022 }
3023 }
3024 } else {
3025 let store_opaque = store.0.store_opaque_mut();
3026 let lift = &mut LiftContext::new(store_opaque, write_options, self);
3027 let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3028 let abi = lift.types.canonical_abi(&ty);
3029 let size = usize::try_from(abi.size32).unwrap();
3030 if write_address % usize::try_from(abi.align32)? != 0 {
3031 bail!("write pointer not aligned");
3032 }
3033 let bytes = lift
3034 .memory()
3035 .get(write_address..)
3036 .and_then(|b| b.get(..size * count))
3037 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
3038
3039 let values = (0..count)
3040 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3041 .collect::<Result<Vec<_>>>()?;
3042
3043 let id = TableId::<TransmitHandle>::new(rep);
3044 log::trace!("copy values {values:?} for {id:?}");
3045
3046 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3047 let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3048 let abi = lower.types.canonical_abi(&ty);
3049 if read_address % usize::try_from(abi.align32)? != 0 {
3050 bail!("read pointer not aligned");
3051 }
3052 let size = usize::try_from(abi.size32).unwrap();
3053 lower
3054 .as_slice_mut()
3055 .get_mut(read_address..)
3056 .and_then(|b| b.get_mut(..size * count))
3057 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
3058 let mut ptr = read_address;
3059 for value in values {
3060 value.store(lower, ty, ptr)?;
3061 ptr += size
3062 }
3063 }
3064 }
3065 _ => unreachable!(),
3066 }
3067
3068 Ok(())
3069 }
3070
3071 fn check_bounds(
3072 self,
3073 store: &StoreOpaque,
3074 options: OptionsIndex,
3075 ty: TransmitIndex,
3076 address: usize,
3077 count: usize,
3078 ) -> Result<()> {
3079 let types = self.id().get(store).component().types();
3080 let size = usize::try_from(
3081 match ty {
3082 TransmitIndex::Future(ty) => types[types[ty].ty]
3083 .payload
3084 .map(|ty| types.canonical_abi(&ty).size32),
3085 TransmitIndex::Stream(ty) => types[types[ty].ty]
3086 .payload
3087 .map(|ty| types.canonical_abi(&ty).size32),
3088 }
3089 .unwrap_or(0),
3090 )
3091 .unwrap();
3092
3093 if count > 0 && size > 0 {
3094 self.options_memory(store, options)
3095 .get(address..)
3096 .and_then(|b| b.get(..(size * count)))
3097 .map(drop)
3098 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
3099 } else {
3100 Ok(())
3101 }
3102 }
3103
3104 pub(super) fn guest_write<T: 'static>(
3106 self,
3107 mut store: StoreContextMut<T>,
3108 caller: RuntimeComponentInstanceIndex,
3109 ty: TransmitIndex,
3110 options: OptionsIndex,
3111 flat_abi: Option<FlatAbi>,
3112 handle: u32,
3113 address: u32,
3114 count: u32,
3115 ) -> Result<ReturnCode> {
3116 let address = usize::try_from(address).unwrap();
3117 let count = usize::try_from(count).unwrap();
3118 self.check_bounds(store.0, options, ty, address, count)?;
3119 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3120 let TransmitLocalState::Write { done } = *state else {
3121 bail!(
3122 "invalid handle {handle}; expected `Write`; got {:?}",
3123 *state
3124 );
3125 };
3126
3127 if done {
3128 bail!("cannot write to stream after being notified that the readable end dropped");
3129 }
3130
3131 *state = TransmitLocalState::Busy;
3132 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3133 let concurrent_state = store.0.concurrent_state_mut();
3134 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3135 let transmit = concurrent_state.get_mut(transmit_id)?;
3136 log::trace!(
3137 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3138 transmit.read
3139 );
3140
3141 if transmit.done {
3142 bail!("cannot write to future after previous write succeeded or readable end dropped");
3143 }
3144
3145 let new_state = if let ReadState::Dropped = &transmit.read {
3146 ReadState::Dropped
3147 } else {
3148 ReadState::Open
3149 };
3150
3151 let set_guest_ready = |me: &mut ConcurrentState| {
3152 let transmit = me.get_mut(transmit_id)?;
3153 assert!(
3154 matches!(&transmit.write, WriteState::Open),
3155 "expected `WriteState::Open`; got `{:?}`",
3156 transmit.write
3157 );
3158 transmit.write = WriteState::GuestReady {
3159 instance: self,
3160 caller,
3161 ty,
3162 flat_abi,
3163 options,
3164 address,
3165 count,
3166 handle,
3167 };
3168 Ok::<_, crate::Error>(())
3169 };
3170
3171 let mut result = match mem::replace(&mut transmit.read, new_state) {
3172 ReadState::GuestReady {
3173 ty: read_ty,
3174 flat_abi: read_flat_abi,
3175 options: read_options,
3176 address: read_address,
3177 count: read_count,
3178 handle: read_handle,
3179 instance: read_instance,
3180 caller: read_caller,
3181 } => {
3182 assert_eq!(flat_abi, read_flat_abi);
3183
3184 if let TransmitIndex::Future(_) = ty {
3185 transmit.done = true;
3186 }
3187
3188 let write_complete = count == 0 || read_count > 0;
3210 let read_complete = count > 0;
3211 let read_buffer_remaining = count < read_count;
3212
3213 let read_handle_rep = transmit.read_handle.rep();
3214
3215 let count = count.min(read_count);
3216
3217 self.copy(
3218 store.as_context_mut(),
3219 flat_abi,
3220 caller,
3221 ty,
3222 options,
3223 address,
3224 read_caller,
3225 read_ty,
3226 read_options,
3227 read_address,
3228 count,
3229 rep,
3230 )?;
3231
3232 let instance = self.id().get_mut(store.0);
3233 let types = instance.component().types();
3234 let item_size = payload(ty, types)
3235 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3236 .unwrap_or(0);
3237 let concurrent_state = store.0.concurrent_state_mut();
3238 if read_complete {
3239 let count = u32::try_from(count).unwrap();
3240 let total = if let Some(Event::StreamRead {
3241 code: ReturnCode::Completed(old_total),
3242 ..
3243 }) = concurrent_state.take_event(read_handle_rep)?
3244 {
3245 count + old_total
3246 } else {
3247 count
3248 };
3249
3250 let code = ReturnCode::completed(ty.kind(), total);
3251
3252 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3253 }
3254
3255 if read_buffer_remaining {
3256 let transmit = concurrent_state.get_mut(transmit_id)?;
3257 transmit.read = ReadState::GuestReady {
3258 ty: read_ty,
3259 flat_abi: read_flat_abi,
3260 options: read_options,
3261 address: read_address + (count * item_size),
3262 count: read_count - count,
3263 handle: read_handle,
3264 instance: read_instance,
3265 caller: read_caller,
3266 };
3267 }
3268
3269 if write_complete {
3270 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3271 } else {
3272 set_guest_ready(concurrent_state)?;
3273 ReturnCode::Blocked
3274 }
3275 }
3276
3277 ReadState::HostReady {
3278 consume,
3279 guest_offset,
3280 cancel,
3281 cancel_waker,
3282 } => {
3283 assert!(cancel_waker.is_none());
3284 assert!(!cancel);
3285 assert_eq!(0, guest_offset);
3286
3287 if let TransmitIndex::Future(_) = ty {
3288 transmit.done = true;
3289 }
3290
3291 set_guest_ready(concurrent_state)?;
3292 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3293 }
3294
3295 ReadState::HostToHost { .. } => unreachable!(),
3296
3297 ReadState::Open => {
3298 set_guest_ready(concurrent_state)?;
3299 ReturnCode::Blocked
3300 }
3301
3302 ReadState::Dropped => {
3303 if let TransmitIndex::Future(_) = ty {
3304 transmit.done = true;
3305 }
3306
3307 ReturnCode::Dropped(0)
3308 }
3309 };
3310
3311 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3312 result = self.wait_for_write(store.0, transmit_handle)?;
3313 }
3314
3315 if result != ReturnCode::Blocked {
3316 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3317 TransmitLocalState::Write {
3318 done: matches!(
3319 (result, ty),
3320 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3321 ),
3322 };
3323 }
3324
3325 log::trace!(
3326 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3327 );
3328
3329 Ok(result)
3330 }
3331
3332 pub(super) fn guest_read<T: 'static>(
3334 self,
3335 mut store: StoreContextMut<T>,
3336 caller: RuntimeComponentInstanceIndex,
3337 ty: TransmitIndex,
3338 options: OptionsIndex,
3339 flat_abi: Option<FlatAbi>,
3340 handle: u32,
3341 address: u32,
3342 count: u32,
3343 ) -> Result<ReturnCode> {
3344 let address = usize::try_from(address).unwrap();
3345 let count = usize::try_from(count).unwrap();
3346 self.check_bounds(store.0, options, ty, address, count)?;
3347 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3348 let TransmitLocalState::Read { done } = *state else {
3349 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3350 };
3351
3352 if done {
3353 bail!("cannot read from stream after being notified that the writable end dropped");
3354 }
3355
3356 *state = TransmitLocalState::Busy;
3357 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3358 let concurrent_state = store.0.concurrent_state_mut();
3359 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3360 let transmit = concurrent_state.get_mut(transmit_id)?;
3361 log::trace!(
3362 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3363 transmit.write
3364 );
3365
3366 if transmit.done {
3367 bail!("cannot read from future after previous read succeeded");
3368 }
3369
3370 let new_state = if let WriteState::Dropped = &transmit.write {
3371 WriteState::Dropped
3372 } else {
3373 WriteState::Open
3374 };
3375
3376 let set_guest_ready = |me: &mut ConcurrentState| {
3377 let transmit = me.get_mut(transmit_id)?;
3378 assert!(
3379 matches!(&transmit.read, ReadState::Open),
3380 "expected `ReadState::Open`; got `{:?}`",
3381 transmit.read
3382 );
3383 transmit.read = ReadState::GuestReady {
3384 ty,
3385 flat_abi,
3386 options,
3387 address,
3388 count,
3389 handle,
3390 instance: self,
3391 caller,
3392 };
3393 Ok::<_, crate::Error>(())
3394 };
3395
3396 let mut result = match mem::replace(&mut transmit.write, new_state) {
3397 WriteState::GuestReady {
3398 instance: _,
3399 ty: write_ty,
3400 flat_abi: write_flat_abi,
3401 options: write_options,
3402 address: write_address,
3403 count: write_count,
3404 handle: write_handle,
3405 caller: write_caller,
3406 } => {
3407 assert_eq!(flat_abi, write_flat_abi);
3408
3409 if let TransmitIndex::Future(_) = ty {
3410 transmit.done = true;
3411 }
3412
3413 let write_handle_rep = transmit.write_handle.rep();
3414
3415 let write_complete = write_count == 0 || count > 0;
3420 let read_complete = write_count > 0;
3421 let write_buffer_remaining = count < write_count;
3422
3423 let count = count.min(write_count);
3424
3425 self.copy(
3426 store.as_context_mut(),
3427 flat_abi,
3428 write_caller,
3429 write_ty,
3430 write_options,
3431 write_address,
3432 caller,
3433 ty,
3434 options,
3435 address,
3436 count,
3437 rep,
3438 )?;
3439
3440 let instance = self.id().get_mut(store.0);
3441 let types = instance.component().types();
3442 let item_size = payload(ty, types)
3443 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3444 .unwrap_or(0);
3445 let concurrent_state = store.0.concurrent_state_mut();
3446
3447 if write_complete {
3448 let count = u32::try_from(count).unwrap();
3449 let total = if let Some(Event::StreamWrite {
3450 code: ReturnCode::Completed(old_total),
3451 ..
3452 }) = concurrent_state.take_event(write_handle_rep)?
3453 {
3454 count + old_total
3455 } else {
3456 count
3457 };
3458
3459 let code = ReturnCode::completed(ty.kind(), total);
3460
3461 concurrent_state.send_write_result(
3462 write_ty,
3463 transmit_id,
3464 write_handle,
3465 code,
3466 )?;
3467 }
3468
3469 if write_buffer_remaining {
3470 let transmit = concurrent_state.get_mut(transmit_id)?;
3471 transmit.write = WriteState::GuestReady {
3472 instance: self,
3473 caller: write_caller,
3474 ty: write_ty,
3475 flat_abi: write_flat_abi,
3476 options: write_options,
3477 address: write_address + (count * item_size),
3478 count: write_count - count,
3479 handle: write_handle,
3480 };
3481 }
3482
3483 if read_complete {
3484 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3485 } else {
3486 set_guest_ready(concurrent_state)?;
3487 ReturnCode::Blocked
3488 }
3489 }
3490
3491 WriteState::HostReady {
3492 produce,
3493 try_into,
3494 guest_offset,
3495 cancel,
3496 cancel_waker,
3497 } => {
3498 assert!(cancel_waker.is_none());
3499 assert!(!cancel);
3500 assert_eq!(0, guest_offset);
3501
3502 set_guest_ready(concurrent_state)?;
3503
3504 let code =
3505 self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3506
3507 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3508 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3509 }
3510
3511 code
3512 }
3513
3514 WriteState::Open => {
3515 set_guest_ready(concurrent_state)?;
3516 ReturnCode::Blocked
3517 }
3518
3519 WriteState::Dropped => ReturnCode::Dropped(0),
3520 };
3521
3522 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3523 result = self.wait_for_read(store.0, transmit_handle)?;
3524 }
3525
3526 if result != ReturnCode::Blocked {
3527 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3528 TransmitLocalState::Read {
3529 done: matches!(
3530 (result, ty),
3531 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3532 ),
3533 };
3534 }
3535
3536 log::trace!(
3537 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3538 );
3539
3540 Ok(result)
3541 }
3542
3543 fn wait_for_write(
3544 self,
3545 store: &mut StoreOpaque,
3546 handle: TableId<TransmitHandle>,
3547 ) -> Result<ReturnCode> {
3548 let waitable = Waitable::Transmit(handle);
3549 store.wait_for_event(waitable)?;
3550 let event = waitable.take_event(store.concurrent_state_mut())?;
3551 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3552 event
3553 {
3554 waitable.on_delivery(store, self, event);
3555 Ok(code)
3556 } else {
3557 unreachable!()
3558 }
3559 }
3560
3561 fn cancel_write(
3563 self,
3564 store: &mut StoreOpaque,
3565 transmit_id: TableId<TransmitState>,
3566 async_: bool,
3567 ) -> Result<ReturnCode> {
3568 let state = store.concurrent_state_mut();
3569 let transmit = state.get_mut(transmit_id)?;
3570 log::trace!(
3571 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3572 transmit.read,
3573 transmit.write
3574 );
3575
3576 let code = if let Some(event) =
3577 Waitable::Transmit(transmit.write_handle).take_event(state)?
3578 {
3579 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3580 unreachable!();
3581 };
3582 match (code, event) {
3583 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3584 ReturnCode::Cancelled(count)
3585 }
3586 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3587 _ => unreachable!(),
3588 }
3589 } else if let ReadState::HostReady {
3590 cancel,
3591 cancel_waker,
3592 ..
3593 } = &mut state.get_mut(transmit_id)?.read
3594 {
3595 *cancel = true;
3596 if let Some(waker) = cancel_waker.take() {
3597 waker.wake();
3598 }
3599
3600 if async_ {
3601 ReturnCode::Blocked
3602 } else {
3603 let handle = store
3604 .concurrent_state_mut()
3605 .get_mut(transmit_id)?
3606 .write_handle;
3607 self.wait_for_write(store, handle)?
3608 }
3609 } else {
3610 ReturnCode::Cancelled(0)
3611 };
3612
3613 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3614
3615 match &transmit.write {
3616 WriteState::GuestReady { .. } => {
3617 transmit.write = WriteState::Open;
3618 }
3619 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3620 WriteState::Open | WriteState::Dropped => {}
3621 }
3622
3623 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3624
3625 Ok(code)
3626 }
3627
3628 fn wait_for_read(
3629 self,
3630 store: &mut StoreOpaque,
3631 handle: TableId<TransmitHandle>,
3632 ) -> Result<ReturnCode> {
3633 let waitable = Waitable::Transmit(handle);
3634 store.wait_for_event(waitable)?;
3635 let event = waitable.take_event(store.concurrent_state_mut())?;
3636 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3637 event
3638 {
3639 waitable.on_delivery(store, self, event);
3640 Ok(code)
3641 } else {
3642 unreachable!()
3643 }
3644 }
3645
3646 fn cancel_read(
3648 self,
3649 store: &mut StoreOpaque,
3650 transmit_id: TableId<TransmitState>,
3651 async_: bool,
3652 ) -> Result<ReturnCode> {
3653 let state = store.concurrent_state_mut();
3654 let transmit = state.get_mut(transmit_id)?;
3655 log::trace!(
3656 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3657 transmit.read,
3658 transmit.write
3659 );
3660
3661 let code = if let Some(event) =
3662 Waitable::Transmit(transmit.read_handle).take_event(state)?
3663 {
3664 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3665 unreachable!();
3666 };
3667 match (code, event) {
3668 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3669 ReturnCode::Cancelled(count)
3670 }
3671 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3672 _ => unreachable!(),
3673 }
3674 } else if let WriteState::HostReady {
3675 cancel,
3676 cancel_waker,
3677 ..
3678 } = &mut state.get_mut(transmit_id)?.write
3679 {
3680 *cancel = true;
3681 if let Some(waker) = cancel_waker.take() {
3682 waker.wake();
3683 }
3684
3685 if async_ {
3686 ReturnCode::Blocked
3687 } else {
3688 let handle = store
3689 .concurrent_state_mut()
3690 .get_mut(transmit_id)?
3691 .read_handle;
3692 self.wait_for_read(store, handle)?
3693 }
3694 } else {
3695 ReturnCode::Cancelled(0)
3696 };
3697
3698 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3699
3700 match &transmit.read {
3701 ReadState::GuestReady { .. } => {
3702 transmit.read = ReadState::Open;
3703 }
3704 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3705 todo!("support host read cancellation")
3706 }
3707 ReadState::Open | ReadState::Dropped => {}
3708 }
3709
3710 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3711
3712 Ok(code)
3713 }
3714
3715 fn guest_cancel_write(
3717 self,
3718 store: &mut StoreOpaque,
3719 ty: TransmitIndex,
3720 async_: bool,
3721 writer: u32,
3722 ) -> Result<ReturnCode> {
3723 let (rep, state) =
3724 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3725 let id = TableId::<TransmitHandle>::new(rep);
3726 log::trace!("guest cancel write {id:?} (handle {writer})");
3727 match state {
3728 TransmitLocalState::Write { .. } => {
3729 bail!("stream or future write cancelled when no write is pending")
3730 }
3731 TransmitLocalState::Read { .. } => {
3732 bail!("passed read end to `{{stream|future}}.cancel-write`")
3733 }
3734 TransmitLocalState::Busy => {}
3735 }
3736 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3737 let code = self.cancel_write(store, transmit_id, async_)?;
3738 if !matches!(code, ReturnCode::Blocked) {
3739 let state =
3740 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3741 .1;
3742 if let TransmitLocalState::Busy = state {
3743 *state = TransmitLocalState::Write { done: false };
3744 }
3745 }
3746 Ok(code)
3747 }
3748
3749 fn guest_cancel_read(
3751 self,
3752 store: &mut StoreOpaque,
3753 ty: TransmitIndex,
3754 async_: bool,
3755 reader: u32,
3756 ) -> Result<ReturnCode> {
3757 let (rep, state) =
3758 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3759 let id = TableId::<TransmitHandle>::new(rep);
3760 log::trace!("guest cancel read {id:?} (handle {reader})");
3761 match state {
3762 TransmitLocalState::Read { .. } => {
3763 bail!("stream or future read cancelled when no read is pending")
3764 }
3765 TransmitLocalState::Write { .. } => {
3766 bail!("passed write end to `{{stream|future}}.cancel-read`")
3767 }
3768 TransmitLocalState::Busy => {}
3769 }
3770 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3771 let code = self.cancel_read(store, transmit_id, async_)?;
3772 if !matches!(code, ReturnCode::Blocked) {
3773 let state =
3774 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3775 .1;
3776 if let TransmitLocalState::Busy = state {
3777 *state = TransmitLocalState::Read { done: false };
3778 }
3779 }
3780 Ok(code)
3781 }
3782
3783 fn guest_drop_readable(
3785 self,
3786 store: &mut StoreOpaque,
3787 ty: TransmitIndex,
3788 reader: u32,
3789 ) -> Result<()> {
3790 let table = self.id().get_mut(store).table_for_transmit(ty);
3791 let (rep, _is_done) = match ty {
3792 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3793 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3794 };
3795 let kind = match ty {
3796 TransmitIndex::Stream(_) => TransmitKind::Stream,
3797 TransmitIndex::Future(_) => TransmitKind::Future,
3798 };
3799 let id = TableId::<TransmitHandle>::new(rep);
3800 log::trace!("guest_drop_readable: drop reader {id:?}");
3801 store.host_drop_reader(id, kind)
3802 }
3803
3804 pub(crate) fn error_context_new(
3806 self,
3807 store: &mut StoreOpaque,
3808 caller: RuntimeComponentInstanceIndex,
3809 ty: TypeComponentLocalErrorContextTableIndex,
3810 options: OptionsIndex,
3811 debug_msg_address: u32,
3812 debug_msg_len: u32,
3813 ) -> Result<u32> {
3814 self.id().get(store).check_may_leave(caller)?;
3815 let lift_ctx = &mut LiftContext::new(store, options, self);
3816 let debug_msg = String::linear_lift_from_flat(
3817 lift_ctx,
3818 InterfaceType::String,
3819 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
3820 )?;
3821
3822 let err_ctx = ErrorContextState { debug_msg };
3824 let state = store.concurrent_state_mut();
3825 let table_id = state.push(err_ctx)?;
3826 let global_ref_count_idx =
3827 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3828
3829 let _ = state
3831 .global_error_context_ref_counts
3832 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3833
3834 let local_idx = self
3841 .id()
3842 .get_mut(store)
3843 .table_for_error_context(ty)
3844 .error_context_insert(table_id.rep())?;
3845
3846 Ok(local_idx)
3847 }
3848
3849 pub(super) fn error_context_debug_message<T>(
3851 self,
3852 store: StoreContextMut<T>,
3853 ty: TypeComponentLocalErrorContextTableIndex,
3854 options: OptionsIndex,
3855 err_ctx_handle: u32,
3856 debug_msg_address: u32,
3857 ) -> Result<()> {
3858 let handle_table_id_rep = self
3860 .id()
3861 .get_mut(store.0)
3862 .table_for_error_context(ty)
3863 .error_context_rep(err_ctx_handle)?;
3864
3865 let state = store.0.concurrent_state_mut();
3866 let ErrorContextState { debug_msg } =
3868 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3869 let debug_msg = debug_msg.clone();
3870
3871 let lower_cx = &mut LowerContext::new(store, options, self);
3872 let debug_msg_address = usize::try_from(debug_msg_address)?;
3873 let offset = lower_cx
3875 .as_slice_mut()
3876 .get(debug_msg_address..)
3877 .and_then(|b| b.get(..debug_msg.bytes().len()))
3878 .map(|_| debug_msg_address)
3879 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3880 debug_msg
3881 .as_str()
3882 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3883
3884 Ok(())
3885 }
3886
3887 pub(crate) fn future_cancel_read(
3889 self,
3890 store: &mut StoreOpaque,
3891 caller: RuntimeComponentInstanceIndex,
3892 ty: TypeFutureTableIndex,
3893 async_: bool,
3894 reader: u32,
3895 ) -> Result<u32> {
3896 self.id().get(store).check_may_leave(caller)?;
3897 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3898 .map(|v| v.encode())
3899 }
3900
3901 pub(crate) fn future_cancel_write(
3903 self,
3904 store: &mut StoreOpaque,
3905 caller: RuntimeComponentInstanceIndex,
3906 ty: TypeFutureTableIndex,
3907 async_: bool,
3908 writer: u32,
3909 ) -> Result<u32> {
3910 self.id().get(store).check_may_leave(caller)?;
3911 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3912 .map(|v| v.encode())
3913 }
3914
3915 pub(crate) fn stream_cancel_read(
3917 self,
3918 store: &mut StoreOpaque,
3919 caller: RuntimeComponentInstanceIndex,
3920 ty: TypeStreamTableIndex,
3921 async_: bool,
3922 reader: u32,
3923 ) -> Result<u32> {
3924 self.id().get(store).check_may_leave(caller)?;
3925 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3926 .map(|v| v.encode())
3927 }
3928
3929 pub(crate) fn stream_cancel_write(
3931 self,
3932 store: &mut StoreOpaque,
3933 caller: RuntimeComponentInstanceIndex,
3934 ty: TypeStreamTableIndex,
3935 async_: bool,
3936 writer: u32,
3937 ) -> Result<u32> {
3938 self.id().get(store).check_may_leave(caller)?;
3939 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3940 .map(|v| v.encode())
3941 }
3942
3943 pub(crate) fn future_drop_readable(
3945 self,
3946 store: &mut StoreOpaque,
3947 caller: RuntimeComponentInstanceIndex,
3948 ty: TypeFutureTableIndex,
3949 reader: u32,
3950 ) -> Result<()> {
3951 self.id().get(store).check_may_leave(caller)?;
3952 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3953 }
3954
3955 pub(crate) fn stream_drop_readable(
3957 self,
3958 store: &mut StoreOpaque,
3959 caller: RuntimeComponentInstanceIndex,
3960 ty: TypeStreamTableIndex,
3961 reader: u32,
3962 ) -> Result<()> {
3963 self.id().get(store).check_may_leave(caller)?;
3964 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3965 }
3966
3967 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
3971 let (write, read) = store.concurrent_state_mut().new_transmit()?;
3972
3973 let table = self.id().get_mut(store).table_for_transmit(ty);
3974 let (read_handle, write_handle) = match ty {
3975 TransmitIndex::Future(ty) => (
3976 table.future_insert_read(ty, read.rep())?,
3977 table.future_insert_write(ty, write.rep())?,
3978 ),
3979 TransmitIndex::Stream(ty) => (
3980 table.stream_insert_read(ty, read.rep())?,
3981 table.stream_insert_write(ty, write.rep())?,
3982 ),
3983 };
3984
3985 let state = store.concurrent_state_mut();
3986 state.get_mut(read)?.common.handle = Some(read_handle);
3987 state.get_mut(write)?.common.handle = Some(write_handle);
3988
3989 Ok(ResourcePair {
3990 write: write_handle,
3991 read: read_handle,
3992 })
3993 }
3994
3995 pub(crate) fn error_context_drop(
3997 self,
3998 store: &mut StoreOpaque,
3999 caller: RuntimeComponentInstanceIndex,
4000 ty: TypeComponentLocalErrorContextTableIndex,
4001 error_context: u32,
4002 ) -> Result<()> {
4003 let instance = self.id().get_mut(store);
4004 instance.check_may_leave(caller)?;
4005
4006 let local_handle_table = instance.table_for_error_context(ty);
4007
4008 let rep = local_handle_table.error_context_drop(error_context)?;
4009
4010 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4011
4012 let state = store.concurrent_state_mut();
4013 let GlobalErrorContextRefCount(global_ref_count) = state
4014 .global_error_context_ref_counts
4015 .get_mut(&global_ref_count_idx)
4016 .expect("retrieve concurrent state for error context during drop");
4017
4018 assert!(*global_ref_count >= 1);
4020 *global_ref_count -= 1;
4021 if *global_ref_count == 0 {
4022 state
4023 .global_error_context_ref_counts
4024 .remove(&global_ref_count_idx);
4025
4026 state
4027 .delete(TableId::<ErrorContextState>::new(rep))
4028 .context("deleting component-global error context data")?;
4029 }
4030
4031 Ok(())
4032 }
4033
4034 fn guest_transfer(
4037 self,
4038 store: &mut StoreOpaque,
4039 src_idx: u32,
4040 src: TransmitIndex,
4041 dst: TransmitIndex,
4042 ) -> Result<u32> {
4043 let mut instance = self.id().get_mut(store);
4044 let src_table = instance.as_mut().table_for_transmit(src);
4045 let (rep, is_done) = match src {
4046 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4047 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4048 };
4049 if is_done {
4050 bail!("cannot lift after being notified that the writable end dropped");
4051 }
4052 let dst_table = instance.table_for_transmit(dst);
4053 let handle = match dst {
4054 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4055 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4056 }?;
4057 store
4058 .concurrent_state_mut()
4059 .get_mut(TableId::<TransmitHandle>::new(rep))?
4060 .common
4061 .handle = Some(handle);
4062 Ok(handle)
4063 }
4064
4065 pub(crate) fn future_new(
4067 self,
4068 store: &mut StoreOpaque,
4069 caller: RuntimeComponentInstanceIndex,
4070 ty: TypeFutureTableIndex,
4071 ) -> Result<ResourcePair> {
4072 self.id().get(store).check_may_leave(caller)?;
4073 self.guest_new(store, TransmitIndex::Future(ty))
4074 }
4075
4076 pub(crate) fn stream_new(
4078 self,
4079 store: &mut StoreOpaque,
4080 caller: RuntimeComponentInstanceIndex,
4081 ty: TypeStreamTableIndex,
4082 ) -> Result<ResourcePair> {
4083 self.id().get(store).check_may_leave(caller)?;
4084 self.guest_new(store, TransmitIndex::Stream(ty))
4085 }
4086
4087 pub(crate) fn future_transfer(
4090 self,
4091 store: &mut StoreOpaque,
4092 src_idx: u32,
4093 src: TypeFutureTableIndex,
4094 dst: TypeFutureTableIndex,
4095 ) -> Result<u32> {
4096 self.guest_transfer(
4097 store,
4098 src_idx,
4099 TransmitIndex::Future(src),
4100 TransmitIndex::Future(dst),
4101 )
4102 }
4103
4104 pub(crate) fn stream_transfer(
4107 self,
4108 store: &mut StoreOpaque,
4109 src_idx: u32,
4110 src: TypeStreamTableIndex,
4111 dst: TypeStreamTableIndex,
4112 ) -> Result<u32> {
4113 self.guest_transfer(
4114 store,
4115 src_idx,
4116 TransmitIndex::Stream(src),
4117 TransmitIndex::Stream(dst),
4118 )
4119 }
4120
4121 pub(crate) fn error_context_transfer(
4123 self,
4124 store: &mut StoreOpaque,
4125 src_idx: u32,
4126 src: TypeComponentLocalErrorContextTableIndex,
4127 dst: TypeComponentLocalErrorContextTableIndex,
4128 ) -> Result<u32> {
4129 let mut instance = self.id().get_mut(store);
4130 let rep = instance
4131 .as_mut()
4132 .table_for_error_context(src)
4133 .error_context_rep(src_idx)?;
4134 let dst_idx = instance
4135 .table_for_error_context(dst)
4136 .error_context_insert(rep)?;
4137
4138 let global_ref_count = store
4142 .concurrent_state_mut()
4143 .global_error_context_ref_counts
4144 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4145 .context("global ref count present for existing (sub)component error context")?;
4146 global_ref_count.0 += 1;
4147
4148 Ok(dst_idx)
4149 }
4150}
4151
4152impl ComponentInstance {
4153 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4154 let (tables, types) = self.guest_tables();
4155 let runtime_instance = match ty {
4156 TransmitIndex::Stream(ty) => types[ty].instance,
4157 TransmitIndex::Future(ty) => types[ty].instance,
4158 };
4159 &mut tables[runtime_instance]
4160 }
4161
4162 fn table_for_error_context(
4163 self: Pin<&mut Self>,
4164 ty: TypeComponentLocalErrorContextTableIndex,
4165 ) -> &mut HandleTable {
4166 let (tables, types) = self.guest_tables();
4167 let runtime_instance = types[ty].instance;
4168 &mut tables[runtime_instance]
4169 }
4170
4171 fn get_mut_by_index(
4172 self: Pin<&mut Self>,
4173 ty: TransmitIndex,
4174 index: u32,
4175 ) -> Result<(u32, &mut TransmitLocalState)> {
4176 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4177 }
4178}
4179
4180impl ConcurrentState {
4181 fn send_write_result(
4182 &mut self,
4183 ty: TransmitIndex,
4184 id: TableId<TransmitState>,
4185 handle: u32,
4186 code: ReturnCode,
4187 ) -> Result<()> {
4188 let write_handle = self.get_mut(id)?.write_handle.rep();
4189 self.set_event(
4190 write_handle,
4191 match ty {
4192 TransmitIndex::Future(ty) => Event::FutureWrite {
4193 code,
4194 pending: Some((ty, handle)),
4195 },
4196 TransmitIndex::Stream(ty) => Event::StreamWrite {
4197 code,
4198 pending: Some((ty, handle)),
4199 },
4200 },
4201 )
4202 }
4203
4204 fn send_read_result(
4205 &mut self,
4206 ty: TransmitIndex,
4207 id: TableId<TransmitState>,
4208 handle: u32,
4209 code: ReturnCode,
4210 ) -> Result<()> {
4211 let read_handle = self.get_mut(id)?.read_handle.rep();
4212 self.set_event(
4213 read_handle,
4214 match ty {
4215 TransmitIndex::Future(ty) => Event::FutureRead {
4216 code,
4217 pending: Some((ty, handle)),
4218 },
4219 TransmitIndex::Stream(ty) => Event::StreamRead {
4220 code,
4221 pending: Some((ty, handle)),
4222 },
4223 },
4224 )
4225 }
4226
4227 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4228 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4229 }
4230
4231 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4232 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4233 }
4234
4235 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4246 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4247
4248 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4249 let (ReturnCode::Completed(count)
4250 | ReturnCode::Dropped(count)
4251 | ReturnCode::Cancelled(count)) = old
4252 else {
4253 unreachable!()
4254 };
4255
4256 match new {
4257 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4258 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4259 _ => unreachable!(),
4260 }
4261 }
4262
4263 let event = match (waitable.take_event(self)?, event) {
4264 (None, _) => event,
4265 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4266 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4267 (
4268 Some(Event::StreamWrite {
4269 code: old_code,
4270 pending: old_pending,
4271 }),
4272 Event::StreamWrite { code, pending },
4273 ) => Event::StreamWrite {
4274 code: update_code(old_code, code),
4275 pending: old_pending.or(pending),
4276 },
4277 (
4278 Some(Event::StreamRead {
4279 code: old_code,
4280 pending: old_pending,
4281 }),
4282 Event::StreamRead { code, pending },
4283 ) => Event::StreamRead {
4284 code: update_code(old_code, code),
4285 pending: old_pending.or(pending),
4286 },
4287 _ => unreachable!(),
4288 };
4289
4290 waitable.set_event(self, Some(event))
4291 }
4292
4293 fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4296 let state_id = self.push(TransmitState::default())?;
4297
4298 let write = self.push(TransmitHandle::new(state_id))?;
4299 let read = self.push(TransmitHandle::new(state_id))?;
4300
4301 let state = self.get_mut(state_id)?;
4302 state.write_handle = write;
4303 state.read_handle = read;
4304
4305 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4306
4307 Ok((write, read))
4308 }
4309
4310 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4312 let state = self.delete(state_id)?;
4313 self.delete(state.write_handle)?;
4314 self.delete(state.read_handle)?;
4315
4316 log::trace!(
4317 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4318 state.write_handle,
4319 state.read_handle,
4320 );
4321
4322 Ok(())
4323 }
4324}
4325
4326pub(crate) struct ResourcePair {
4327 pub(crate) write: u32,
4328 pub(crate) read: u32,
4329}
4330
4331impl Waitable {
4332 pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4335 match event {
4336 Event::FutureRead {
4337 pending: Some((ty, handle)),
4338 ..
4339 }
4340 | Event::FutureWrite {
4341 pending: Some((ty, handle)),
4342 ..
4343 } => {
4344 let instance = instance.id().get_mut(store);
4345 let runtime_instance = instance.component().types()[ty].instance;
4346 let (rep, state) = instance.guest_tables().0[runtime_instance]
4347 .future_rep(ty, handle)
4348 .unwrap();
4349 assert_eq!(rep, self.rep());
4350 assert_eq!(*state, TransmitLocalState::Busy);
4351 *state = match event {
4352 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4353 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4354 _ => unreachable!(),
4355 };
4356 }
4357 Event::StreamRead {
4358 pending: Some((ty, handle)),
4359 code,
4360 }
4361 | Event::StreamWrite {
4362 pending: Some((ty, handle)),
4363 code,
4364 } => {
4365 let instance = instance.id().get_mut(store);
4366 let runtime_instance = instance.component().types()[ty].instance;
4367 let (rep, state) = instance.guest_tables().0[runtime_instance]
4368 .stream_rep(ty, handle)
4369 .unwrap();
4370 assert_eq!(rep, self.rep());
4371 assert_eq!(*state, TransmitLocalState::Busy);
4372 let done = matches!(code, ReturnCode::Dropped(_));
4373 *state = match event {
4374 Event::StreamRead { .. } => TransmitLocalState::Read { done },
4375 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4376 _ => unreachable!(),
4377 };
4378
4379 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4380 let state = store.concurrent_state_mut();
4381 let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4382 let transmit = state.get_mut(transmit_id).unwrap();
4383
4384 match event {
4385 Event::StreamRead { .. } => {
4386 transmit.read = ReadState::Open;
4387 }
4388 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4389 _ => unreachable!(),
4390 };
4391 }
4392 _ => {}
4393 }
4394 }
4395}
4396
4397#[cfg(test)]
4398mod tests {
4399 use super::*;
4400 use crate::{Engine, Store};
4401 use core::future::pending;
4402 use core::pin::pin;
4403 use std::sync::LazyLock;
4404
4405 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4406
4407 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4408 where
4409 T: FutureProducer<()>,
4410 {
4411 rx.poll_produce(
4412 &mut Context::from_waker(Waker::noop()),
4413 Store::new(&ENGINE, ()).as_context_mut(),
4414 finish,
4415 )
4416 }
4417
4418 #[test]
4419 fn future_producer() {
4420 let mut fut = pin!(async { anyhow::Ok(()) });
4421 assert!(matches!(
4422 poll_future_producer(fut.as_mut(), false),
4423 Poll::Ready(Ok(Some(()))),
4424 ));
4425
4426 let mut fut = pin!(async { anyhow::Ok(()) });
4427 assert!(matches!(
4428 poll_future_producer(fut.as_mut(), true),
4429 Poll::Ready(Ok(Some(()))),
4430 ));
4431
4432 let mut fut = pin!(pending::<Result<()>>());
4433 assert!(matches!(
4434 poll_future_producer(fut.as_mut(), false),
4435 Poll::Pending,
4436 ));
4437 assert!(matches!(
4438 poll_future_producer(fut.as_mut(), true),
4439 Poll::Ready(Ok(None)),
4440 ));
4441
4442 let (tx, rx) = oneshot::channel();
4443 let mut rx = pin!(rx);
4444 assert!(matches!(
4445 poll_future_producer(rx.as_mut(), false),
4446 Poll::Pending,
4447 ));
4448 assert!(matches!(
4449 poll_future_producer(rx.as_mut(), true),
4450 Poll::Ready(Ok(None)),
4451 ));
4452 tx.send(()).unwrap();
4453 assert!(matches!(
4454 poll_future_producer(rx.as_mut(), true),
4455 Poll::Ready(Ok(Some(()))),
4456 ));
4457
4458 let (tx, rx) = oneshot::channel();
4459 let mut rx = pin!(rx);
4460 tx.send(()).unwrap();
4461 assert!(matches!(
4462 poll_future_producer(rx.as_mut(), false),
4463 Poll::Ready(Ok(Some(()))),
4464 ));
4465
4466 let (tx, rx) = oneshot::channel::<()>();
4467 let mut rx = pin!(rx);
4468 drop(tx);
4469 assert!(matches!(
4470 poll_future_producer(rx.as_mut(), false),
4471 Poll::Ready(Err(..)),
4472 ));
4473
4474 let (tx, rx) = oneshot::channel::<()>();
4475 let mut rx = pin!(rx);
4476 drop(tx);
4477 assert!(matches!(
4478 poll_future_producer(rx.as_mut(), true),
4479 Poll::Ready(Err(..)),
4480 ));
4481 }
4482}