1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContextMut, StoreContextMut, ValRaw};
16use anyhow::{Context as _, Error, Result, anyhow, bail};
17use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
18use core::fmt;
19use core::future;
20use core::iter;
21use core::marker::PhantomData;
22use core::mem::{self, MaybeUninit};
23use core::pin::Pin;
24use core::task::{Context, Poll, Waker, ready};
25use futures::channel::oneshot;
26use futures::{FutureExt as _, stream};
27use std::any::{Any, TypeId};
28use std::boxed::Box;
29use std::io::Cursor;
30use std::string::String;
31use std::sync::{Arc, Mutex};
32use std::vec::Vec;
33use wasmtime_environ::component::{
34 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
35 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
36 TypeFutureTableIndex, TypeStreamTableIndex,
37};
38
39pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
40
41mod buffers;
42
43#[derive(Copy, Clone, Debug)]
46pub enum TransmitKind {
47 Stream,
48 Future,
49}
50
51#[derive(Copy, Clone, Debug, PartialEq)]
53pub enum ReturnCode {
54 Blocked,
55 Completed(u32),
56 Dropped(u32),
57 Cancelled(u32),
58}
59
60impl ReturnCode {
61 pub fn encode(&self) -> u32 {
66 const BLOCKED: u32 = 0xffff_ffff;
67 const COMPLETED: u32 = 0x0;
68 const DROPPED: u32 = 0x1;
69 const CANCELLED: u32 = 0x2;
70 match self {
71 ReturnCode::Blocked => BLOCKED,
72 ReturnCode::Completed(n) => {
73 debug_assert!(*n < (1 << 28));
74 (n << 4) | COMPLETED
75 }
76 ReturnCode::Dropped(n) => {
77 debug_assert!(*n < (1 << 28));
78 (n << 4) | DROPPED
79 }
80 ReturnCode::Cancelled(n) => {
81 debug_assert!(*n < (1 << 28));
82 (n << 4) | CANCELLED
83 }
84 }
85 }
86
87 fn completed(kind: TransmitKind, count: u32) -> Self {
90 Self::Completed(if let TransmitKind::Future = kind {
91 0
92 } else {
93 count
94 })
95 }
96}
97
98#[derive(Copy, Clone, Debug)]
103pub enum TransmitIndex {
104 Stream(TypeStreamTableIndex),
105 Future(TypeFutureTableIndex),
106}
107
108impl TransmitIndex {
109 pub fn kind(&self) -> TransmitKind {
110 match self {
111 TransmitIndex::Stream(_) => TransmitKind::Stream,
112 TransmitIndex::Future(_) => TransmitKind::Future,
113 }
114 }
115}
116
117fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
120 match ty {
121 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
122 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
123 }
124}
125
126fn get_mut_by_index_from(
129 handle_table: &mut HandleTable,
130 ty: TransmitIndex,
131 index: u32,
132) -> Result<(u32, &mut TransmitLocalState)> {
133 match ty {
134 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
135 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
136 }
137}
138
139fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
140 mut store: StoreContextMut<U>,
141 instance: Instance,
142 options: OptionsIndex,
143 ty: TransmitIndex,
144 address: usize,
145 count: usize,
146 buffer: &mut B,
147) -> Result<()> {
148 let count = buffer.remaining().len().min(count);
149
150 let lower = &mut if T::MAY_REQUIRE_REALLOC {
151 LowerContext::new
152 } else {
153 LowerContext::new_without_realloc
154 }(store.as_context_mut(), options, instance);
155
156 if address % usize::try_from(T::ALIGN32)? != 0 {
157 bail!("read pointer not aligned");
158 }
159 lower
160 .as_slice_mut()
161 .get_mut(address..)
162 .and_then(|b| b.get_mut(..T::SIZE32 * count))
163 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
164
165 if let Some(ty) = payload(ty, lower.types) {
166 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
167 }
168
169 buffer.skip(count);
170
171 Ok(())
172}
173
174fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
175 lift: &mut LiftContext<'_>,
176 ty: Option<InterfaceType>,
177 buffer: &mut B,
178 address: usize,
179 count: usize,
180) -> Result<()> {
181 let count = count.min(buffer.remaining_capacity());
182 if T::IS_RUST_UNIT_TYPE {
183 buffer.extend(
187 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
188 )
189 } else {
190 let ty = ty.unwrap();
191 if address % usize::try_from(T::ALIGN32)? != 0 {
192 bail!("write pointer not aligned");
193 }
194 lift.memory()
195 .get(address..)
196 .and_then(|b| b.get(..T::SIZE32 * count))
197 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
198
199 let list = &WasmList::new(address, count, lift, ty)?;
200 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
201 }
202 Ok(())
203}
204
205#[derive(Debug, PartialEq, Eq, PartialOrd)]
207pub(super) struct ErrorContextState {
208 pub(crate) debug_msg: String,
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub(super) struct FlatAbi {
216 pub(super) size: u32,
217 pub(super) align: u32,
218}
219
220pub struct Destination<'a, T, B> {
222 id: TableId<TransmitState>,
223 buffer: &'a mut B,
224 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
225 _phantom: PhantomData<fn() -> T>,
226}
227
228impl<'a, T, B> Destination<'a, T, B> {
229 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
231 Destination {
232 id: self.id,
233 buffer: &mut *self.buffer,
234 host_buffer: self.host_buffer.as_deref_mut(),
235 _phantom: PhantomData,
236 }
237 }
238
239 pub fn take_buffer(&mut self) -> B
245 where
246 B: Default,
247 {
248 mem::take(self.buffer)
249 }
250
251 pub fn set_buffer(&mut self, buffer: B) {
261 *self.buffer = buffer;
262 }
263
264 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
281 let transmit = store
282 .as_context_mut()
283 .0
284 .concurrent_state_mut()
285 .get_mut(self.id)
286 .unwrap();
287
288 if let &ReadState::GuestReady { count, .. } = &transmit.read {
289 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
290 unreachable!()
291 };
292
293 Some(count - guest_offset)
294 } else {
295 None
296 }
297 }
298}
299
300impl<'a, B> Destination<'a, u8, B> {
301 pub fn as_direct<D>(
312 mut self,
313 store: StoreContextMut<'a, D>,
314 capacity: usize,
315 ) -> DirectDestination<'a, D> {
316 if let Some(buffer) = self.host_buffer.as_deref_mut() {
317 buffer.set_position(0);
318 if buffer.get_mut().is_empty() {
319 buffer.get_mut().resize(capacity, 0);
320 }
321 }
322
323 DirectDestination {
324 id: self.id,
325 host_buffer: self.host_buffer,
326 store,
327 }
328 }
329}
330
331pub struct DirectDestination<'a, D: 'static> {
334 id: TableId<TransmitState>,
335 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
336 store: StoreContextMut<'a, D>,
337}
338
339impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
340 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
341 let rem = self.remaining();
342 let n = rem.len().min(buf.len());
343 rem[..n].copy_from_slice(&buf[..n]);
344 self.mark_written(n);
345 Ok(n)
346 }
347
348 fn flush(&mut self) -> std::io::Result<()> {
349 Ok(())
350 }
351}
352
353impl<D: 'static> DirectDestination<'_, D> {
354 pub fn remaining(&mut self) -> &mut [u8] {
356 if let Some(buffer) = self.host_buffer.as_deref_mut() {
357 buffer.get_mut()
358 } else {
359 let transmit = self
360 .store
361 .as_context_mut()
362 .0
363 .concurrent_state_mut()
364 .get_mut(self.id)
365 .unwrap();
366
367 let &ReadState::GuestReady {
368 address,
369 count,
370 options,
371 instance,
372 ..
373 } = &transmit.read
374 else {
375 unreachable!();
376 };
377
378 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
379 unreachable!()
380 };
381
382 instance
383 .options_memory_mut(self.store.0, options)
384 .get_mut((address + guest_offset)..)
385 .and_then(|b| b.get_mut(..(count - guest_offset)))
386 .unwrap()
387 }
388 }
389
390 pub fn mark_written(&mut self, count: usize) {
395 if let Some(buffer) = self.host_buffer.as_deref_mut() {
396 buffer.set_position(
397 buffer
398 .position()
399 .checked_add(u64::try_from(count).unwrap())
400 .unwrap(),
401 );
402 } else {
403 let transmit = self
404 .store
405 .as_context_mut()
406 .0
407 .concurrent_state_mut()
408 .get_mut(self.id)
409 .unwrap();
410
411 let ReadState::GuestReady {
412 count: read_count, ..
413 } = &transmit.read
414 else {
415 unreachable!();
416 };
417
418 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
419 unreachable!()
420 };
421
422 if *guest_offset + count > *read_count {
423 panic!(
424 "write count ({count}) must be less than or equal to read count ({read_count})"
425 )
426 } else {
427 *guest_offset += count;
428 }
429 }
430 }
431}
432
433#[derive(Copy, Clone, Debug)]
435pub enum StreamResult {
436 Completed,
439 Cancelled,
444 Dropped,
447}
448
449pub trait StreamProducer<D>: Send + 'static {
451 type Item;
453
454 type Buffer: WriteBuffer<Self::Item> + Default;
456
457 fn poll_produce<'a>(
593 self: Pin<&mut Self>,
594 cx: &mut Context<'_>,
595 store: StoreContextMut<'a, D>,
596 destination: Destination<'a, Self::Item, Self::Buffer>,
597 finish: bool,
598 ) -> Poll<Result<StreamResult>>;
599
600 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
606 Err(me)
607 }
608}
609
610impl<T, D> StreamProducer<D> for iter::Empty<T>
611where
612 T: Send + Sync + 'static,
613{
614 type Item = T;
615 type Buffer = Option<Self::Item>;
616
617 fn poll_produce<'a>(
618 self: Pin<&mut Self>,
619 _: &mut Context<'_>,
620 _: StoreContextMut<'a, D>,
621 _: Destination<'a, Self::Item, Self::Buffer>,
622 _: bool,
623 ) -> Poll<Result<StreamResult>> {
624 Poll::Ready(Ok(StreamResult::Dropped))
625 }
626}
627
628impl<T, D> StreamProducer<D> for stream::Empty<T>
629where
630 T: Send + Sync + 'static,
631{
632 type Item = T;
633 type Buffer = Option<Self::Item>;
634
635 fn poll_produce<'a>(
636 self: Pin<&mut Self>,
637 _: &mut Context<'_>,
638 _: StoreContextMut<'a, D>,
639 _: Destination<'a, Self::Item, Self::Buffer>,
640 _: bool,
641 ) -> Poll<Result<StreamResult>> {
642 Poll::Ready(Ok(StreamResult::Dropped))
643 }
644}
645
646impl<T, D> StreamProducer<D> for Vec<T>
647where
648 T: Unpin + Send + Sync + 'static,
649{
650 type Item = T;
651 type Buffer = VecBuffer<T>;
652
653 fn poll_produce<'a>(
654 self: Pin<&mut Self>,
655 _: &mut Context<'_>,
656 _: StoreContextMut<'a, D>,
657 mut dst: Destination<'a, Self::Item, Self::Buffer>,
658 _: bool,
659 ) -> Poll<Result<StreamResult>> {
660 dst.set_buffer(mem::take(self.get_mut()).into());
661 Poll::Ready(Ok(StreamResult::Dropped))
662 }
663}
664
665impl<T, D> StreamProducer<D> for Box<[T]>
666where
667 T: Unpin + Send + Sync + 'static,
668{
669 type Item = T;
670 type Buffer = VecBuffer<T>;
671
672 fn poll_produce<'a>(
673 self: Pin<&mut Self>,
674 _: &mut Context<'_>,
675 _: StoreContextMut<'a, D>,
676 mut dst: Destination<'a, Self::Item, Self::Buffer>,
677 _: bool,
678 ) -> Poll<Result<StreamResult>> {
679 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
680 Poll::Ready(Ok(StreamResult::Dropped))
681 }
682}
683
684#[cfg(feature = "component-model-async-bytes")]
685impl<D> StreamProducer<D> for bytes::Bytes {
686 type Item = u8;
687 type Buffer = Cursor<Self>;
688
689 fn poll_produce<'a>(
690 mut self: Pin<&mut Self>,
691 _: &mut Context<'_>,
692 mut store: StoreContextMut<'a, D>,
693 mut dst: Destination<'a, Self::Item, Self::Buffer>,
694 _: bool,
695 ) -> Poll<Result<StreamResult>> {
696 let cap = dst.remaining(&mut store);
697 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
698 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
700 return Poll::Ready(Ok(StreamResult::Dropped));
701 };
702 let cap = cap.into();
703 dst.set_buffer(Cursor::new(self.split_off(cap)));
705 let mut dst = dst.as_direct(store, cap);
706 dst.remaining().copy_from_slice(&self);
707 dst.mark_written(cap);
708 Poll::Ready(Ok(StreamResult::Dropped))
709 }
710}
711
712#[cfg(feature = "component-model-async-bytes")]
713impl<D> StreamProducer<D> for bytes::BytesMut {
714 type Item = u8;
715 type Buffer = Cursor<Self>;
716
717 fn poll_produce<'a>(
718 mut self: Pin<&mut Self>,
719 _: &mut Context<'_>,
720 mut store: StoreContextMut<'a, D>,
721 mut dst: Destination<'a, Self::Item, Self::Buffer>,
722 _: bool,
723 ) -> Poll<Result<StreamResult>> {
724 let cap = dst.remaining(&mut store);
725 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
726 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
728 return Poll::Ready(Ok(StreamResult::Dropped));
729 };
730 let cap = cap.into();
731 dst.set_buffer(Cursor::new(self.split_off(cap)));
733 let mut dst = dst.as_direct(store, cap);
734 dst.remaining().copy_from_slice(&self);
735 dst.mark_written(cap);
736 Poll::Ready(Ok(StreamResult::Dropped))
737 }
738}
739
740pub struct Source<'a, T> {
742 id: TableId<TransmitState>,
743 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
744}
745
746impl<'a, T> Source<'a, T> {
747 pub fn reborrow(&mut self) -> Source<'_, T> {
749 Source {
750 id: self.id,
751 host_buffer: self.host_buffer.as_deref_mut(),
752 }
753 }
754
755 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
757 where
758 T: func::Lift + 'static,
759 B: ReadBuffer<T>,
760 {
761 if let Some(input) = &mut self.host_buffer {
762 let count = input.remaining().len().min(buffer.remaining_capacity());
763 buffer.move_from(*input, count);
764 } else {
765 let store = store.as_context_mut();
766 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
767
768 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
769 unreachable!();
770 };
771
772 let &WriteState::GuestReady {
773 ty,
774 address,
775 count,
776 options,
777 instance,
778 ..
779 } = &transmit.write
780 else {
781 unreachable!()
782 };
783
784 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
785 let ty = payload(ty, cx.types);
786 let old_remaining = buffer.remaining_capacity();
787 lift::<T, B>(
788 cx,
789 ty,
790 buffer,
791 address + (T::SIZE32 * guest_offset),
792 count - guest_offset,
793 )?;
794
795 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
796
797 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
798 unreachable!();
799 };
800
801 *guest_offset += old_remaining - buffer.remaining_capacity();
802 }
803
804 Ok(())
805 }
806
807 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
810 where
811 T: 'static,
812 {
813 let transmit = store
814 .as_context_mut()
815 .0
816 .concurrent_state_mut()
817 .get_mut(self.id)
818 .unwrap();
819
820 if let &WriteState::GuestReady { count, .. } = &transmit.write {
821 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
822 unreachable!()
823 };
824
825 count - guest_offset
826 } else if let Some(host_buffer) = &self.host_buffer {
827 host_buffer.remaining().len()
828 } else {
829 unreachable!()
830 }
831 }
832}
833
834impl<'a> Source<'a, u8> {
835 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
837 DirectSource {
838 id: self.id,
839 host_buffer: self.host_buffer,
840 store,
841 }
842 }
843}
844
845pub struct DirectSource<'a, D: 'static> {
848 id: TableId<TransmitState>,
849 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
850 store: StoreContextMut<'a, D>,
851}
852
853impl<D: 'static> std::io::Read for DirectSource<'_, D> {
854 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
855 let rem = self.remaining();
856 let n = rem.len().min(buf.len());
857 buf[..n].copy_from_slice(&rem[..n]);
858 self.mark_read(n);
859 Ok(n)
860 }
861}
862
863impl<D: 'static> DirectSource<'_, D> {
864 pub fn remaining(&mut self) -> &[u8] {
866 if let Some(buffer) = self.host_buffer.as_deref_mut() {
867 buffer.remaining()
868 } else {
869 let transmit = self
870 .store
871 .as_context_mut()
872 .0
873 .concurrent_state_mut()
874 .get_mut(self.id)
875 .unwrap();
876
877 let &WriteState::GuestReady {
878 address,
879 count,
880 options,
881 instance,
882 ..
883 } = &transmit.write
884 else {
885 unreachable!()
886 };
887
888 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
889 unreachable!()
890 };
891
892 instance
893 .options_memory(self.store.0, options)
894 .get((address + guest_offset)..)
895 .and_then(|b| b.get(..(count - guest_offset)))
896 .unwrap()
897 }
898 }
899
900 pub fn mark_read(&mut self, count: usize) {
905 if let Some(buffer) = self.host_buffer.as_deref_mut() {
906 buffer.skip(count);
907 } else {
908 let transmit = self
909 .store
910 .as_context_mut()
911 .0
912 .concurrent_state_mut()
913 .get_mut(self.id)
914 .unwrap();
915
916 let WriteState::GuestReady {
917 count: write_count, ..
918 } = &transmit.write
919 else {
920 unreachable!()
921 };
922
923 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
924 unreachable!()
925 };
926
927 if *guest_offset + count > *write_count {
928 panic!(
929 "read count ({count}) must be less than or equal to write count ({write_count})"
930 )
931 } else {
932 *guest_offset += count;
933 }
934 }
935 }
936}
937
938pub trait StreamConsumer<D>: Send + 'static {
940 type Item;
942
943 fn poll_consume(
1026 self: Pin<&mut Self>,
1027 cx: &mut Context<'_>,
1028 store: StoreContextMut<D>,
1029 source: Source<'_, Self::Item>,
1030 finish: bool,
1031 ) -> Poll<Result<StreamResult>>;
1032}
1033
1034pub trait FutureProducer<D>: Send + 'static {
1036 type Item;
1038
1039 fn poll_produce(
1049 self: Pin<&mut Self>,
1050 cx: &mut Context<'_>,
1051 store: StoreContextMut<D>,
1052 finish: bool,
1053 ) -> Poll<Result<Option<Self::Item>>>;
1054}
1055
1056impl<T, E, D, Fut> FutureProducer<D> for Fut
1057where
1058 E: Into<Error>,
1059 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1060{
1061 type Item = T;
1062
1063 fn poll_produce<'a>(
1064 self: Pin<&mut Self>,
1065 cx: &mut Context<'_>,
1066 _: StoreContextMut<'a, D>,
1067 finish: bool,
1068 ) -> Poll<Result<Option<T>>> {
1069 match self.poll(cx) {
1070 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1071 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1072 Poll::Pending if finish => Poll::Ready(Ok(None)),
1073 Poll::Pending => Poll::Pending,
1074 }
1075 }
1076}
1077
1078pub trait FutureConsumer<D>: Send + 'static {
1080 type Item;
1082
1083 fn poll_consume(
1095 self: Pin<&mut Self>,
1096 cx: &mut Context<'_>,
1097 store: StoreContextMut<D>,
1098 source: Source<'_, Self::Item>,
1099 finish: bool,
1100 ) -> Poll<Result<()>>;
1101}
1102
1103pub struct FutureReader<T> {
1110 id: TableId<TransmitHandle>,
1111 _phantom: PhantomData<T>,
1112}
1113
1114impl<T> FutureReader<T> {
1115 pub fn new<S: AsContextMut>(
1117 mut store: S,
1118 producer: impl FutureProducer<S::Data, Item = T>,
1119 ) -> Self
1120 where
1121 T: func::Lower + func::Lift + Send + Sync + 'static,
1122 {
1123 struct Producer<P>(P);
1124
1125 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1126 for Producer<P>
1127 {
1128 type Item = P::Item;
1129 type Buffer = Option<P::Item>;
1130
1131 fn poll_produce<'a>(
1132 self: Pin<&mut Self>,
1133 cx: &mut Context<'_>,
1134 store: StoreContextMut<D>,
1135 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1136 finish: bool,
1137 ) -> Poll<Result<StreamResult>> {
1138 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1141
1142 Poll::Ready(Ok(
1143 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1144 destination.set_buffer(Some(value));
1145
1146 StreamResult::Completed
1153 } else {
1154 StreamResult::Cancelled
1155 },
1156 ))
1157 }
1158 }
1159
1160 Self::new_(
1161 store
1162 .as_context_mut()
1163 .new_transmit(TransmitKind::Future, Producer(producer)),
1164 )
1165 }
1166
1167 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1168 Self {
1169 id,
1170 _phantom: PhantomData,
1171 }
1172 }
1173
1174 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1175 self.id
1176 }
1177
1178 pub fn pipe<S: AsContextMut>(
1180 self,
1181 mut store: S,
1182 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1183 ) where
1184 T: func::Lift + 'static,
1185 {
1186 struct Consumer<C>(C);
1187
1188 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1189 for Consumer<C>
1190 {
1191 type Item = T;
1192
1193 fn poll_consume(
1194 self: Pin<&mut Self>,
1195 cx: &mut Context<'_>,
1196 mut store: StoreContextMut<D>,
1197 mut source: Source<Self::Item>,
1198 finish: bool,
1199 ) -> Poll<Result<StreamResult>> {
1200 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1203
1204 ready!(consumer.poll_consume(
1205 cx,
1206 store.as_context_mut(),
1207 source.reborrow(),
1208 finish
1209 ))?;
1210
1211 Poll::Ready(Ok(if source.remaining(store) == 0 {
1212 StreamResult::Completed
1218 } else {
1219 StreamResult::Cancelled
1220 }))
1221 }
1222 }
1223
1224 store
1225 .as_context_mut()
1226 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1227 }
1228
1229 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1231 let id = lift_index_to_future(cx, ty, index)?;
1232 Ok(Self::new_(id))
1233 }
1234
1235 pub fn close(&mut self, mut store: impl AsContextMut) {
1248 future_close(store.as_context_mut().0, &mut self.id)
1249 }
1250
1251 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1253 accessor.as_accessor().with(|access| self.close(access))
1254 }
1255
1256 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1262 where
1263 A: AsAccessor,
1264 {
1265 GuardedFutureReader::new(accessor, self)
1266 }
1267
1268 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1275 where
1276 T: ComponentType + 'static,
1277 {
1278 FutureAny::try_from_future_reader(store, self)
1279 }
1280
1281 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1288 where
1289 T: ComponentType + 'static,
1290 {
1291 future.try_into_future_reader()
1292 }
1293}
1294
1295impl<T> fmt::Debug for FutureReader<T> {
1296 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1297 f.debug_struct("FutureReader")
1298 .field("id", &self.id)
1299 .finish()
1300 }
1301}
1302
1303pub(super) fn future_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1304 let id = mem::replace(id, TableId::new(u32::MAX));
1305 store.host_drop_reader(id, TransmitKind::Future).unwrap();
1306}
1307
1308pub(super) fn lift_index_to_future(
1310 cx: &mut LiftContext<'_>,
1311 ty: InterfaceType,
1312 index: u32,
1313) -> Result<TableId<TransmitHandle>> {
1314 match ty {
1315 InterfaceType::Future(src) => {
1316 let handle_table = cx
1317 .instance_mut()
1318 .table_for_transmit(TransmitIndex::Future(src));
1319 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1320 if is_done {
1321 bail!("cannot lift future after being notified that the writable end dropped");
1322 }
1323 let id = TableId::<TransmitHandle>::new(rep);
1324 let concurrent_state = cx.concurrent_state_mut();
1325 let future = concurrent_state.get_mut(id)?;
1326 future.common.handle = None;
1327 let state = future.state;
1328
1329 if concurrent_state.get_mut(state)?.done {
1330 bail!("cannot lift future after previous read succeeded");
1331 }
1332
1333 Ok(id)
1334 }
1335 _ => func::bad_type_info(),
1336 }
1337}
1338
1339pub(super) fn lower_future_to_index<U>(
1341 id: TableId<TransmitHandle>,
1342 cx: &mut LowerContext<'_, U>,
1343 ty: InterfaceType,
1344) -> Result<u32> {
1345 match ty {
1346 InterfaceType::Future(dst) => {
1347 let concurrent_state = cx.store.0.concurrent_state_mut();
1348 let state = concurrent_state.get_mut(id)?.state;
1349 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1350
1351 let handle = cx
1352 .instance_mut()
1353 .table_for_transmit(TransmitIndex::Future(dst))
1354 .future_insert_read(dst, rep)?;
1355
1356 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1357
1358 Ok(handle)
1359 }
1360 _ => func::bad_type_info(),
1361 }
1362}
1363
1364unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1367 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1368
1369 type Lower = <u32 as func::ComponentType>::Lower;
1370
1371 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1372 match ty {
1373 InterfaceType::Future(ty) => {
1374 let ty = types.types[*ty].ty;
1375 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1376 }
1377 other => bail!("expected `future`, found `{}`", func::desc(other)),
1378 }
1379 }
1380}
1381
1382unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1384 fn linear_lower_to_flat<U>(
1385 &self,
1386 cx: &mut LowerContext<'_, U>,
1387 ty: InterfaceType,
1388 dst: &mut MaybeUninit<Self::Lower>,
1389 ) -> Result<()> {
1390 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1391 }
1392
1393 fn linear_lower_to_memory<U>(
1394 &self,
1395 cx: &mut LowerContext<'_, U>,
1396 ty: InterfaceType,
1397 offset: usize,
1398 ) -> Result<()> {
1399 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1400 cx,
1401 InterfaceType::U32,
1402 offset,
1403 )
1404 }
1405}
1406
1407unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1409 fn linear_lift_from_flat(
1410 cx: &mut LiftContext<'_>,
1411 ty: InterfaceType,
1412 src: &Self::Lower,
1413 ) -> Result<Self> {
1414 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1415 Self::lift_from_index(cx, ty, index)
1416 }
1417
1418 fn linear_lift_from_memory(
1419 cx: &mut LiftContext<'_>,
1420 ty: InterfaceType,
1421 bytes: &[u8],
1422 ) -> Result<Self> {
1423 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1424 Self::lift_from_index(cx, ty, index)
1425 }
1426}
1427
1428pub struct GuardedFutureReader<T, A>
1434where
1435 A: AsAccessor,
1436{
1437 reader: Option<FutureReader<T>>,
1441 accessor: A,
1442}
1443
1444impl<T, A> GuardedFutureReader<T, A>
1445where
1446 A: AsAccessor,
1447{
1448 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1450 Self {
1451 reader: Some(reader),
1452 accessor,
1453 }
1454 }
1455
1456 pub fn into_future(self) -> FutureReader<T> {
1459 self.into()
1460 }
1461}
1462
1463impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1464where
1465 A: AsAccessor,
1466{
1467 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1468 guard.reader.take().unwrap()
1469 }
1470}
1471
1472impl<T, A> Drop for GuardedFutureReader<T, A>
1473where
1474 A: AsAccessor,
1475{
1476 fn drop(&mut self) {
1477 if let Some(reader) = &mut self.reader {
1478 reader.close_with(&self.accessor)
1479 }
1480 }
1481}
1482
1483pub struct StreamReader<T> {
1490 id: TableId<TransmitHandle>,
1491 _phantom: PhantomData<T>,
1492}
1493
1494impl<T> StreamReader<T> {
1495 pub fn new<S: AsContextMut>(
1497 mut store: S,
1498 producer: impl StreamProducer<S::Data, Item = T>,
1499 ) -> Self
1500 where
1501 T: func::Lower + func::Lift + Send + Sync + 'static,
1502 {
1503 Self::new_(
1504 store
1505 .as_context_mut()
1506 .new_transmit(TransmitKind::Stream, producer),
1507 )
1508 }
1509
1510 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1511 Self {
1512 id,
1513 _phantom: PhantomData,
1514 }
1515 }
1516
1517 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1518 self.id
1519 }
1520
1521 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1538 let store = store.as_context_mut();
1539 let state = store.0.concurrent_state_mut();
1540 let id = state.get_mut(self.id).unwrap().state;
1541 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1542 match try_into(TypeId::of::<V>()) {
1543 Some(result) => {
1544 self.close(store);
1545 Ok(*result.downcast::<V>().unwrap())
1546 }
1547 None => Err(self),
1548 }
1549 } else {
1550 Err(self)
1551 }
1552 }
1553
1554 pub fn pipe<S: AsContextMut>(
1556 self,
1557 mut store: S,
1558 consumer: impl StreamConsumer<S::Data, Item = T>,
1559 ) where
1560 T: 'static,
1561 {
1562 store
1563 .as_context_mut()
1564 .set_consumer(self.id, TransmitKind::Stream, consumer);
1565 }
1566
1567 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1569 let id = lift_index_to_stream(cx, ty, index)?;
1570 Ok(Self::new_(id))
1571 }
1572
1573 pub fn close(&mut self, mut store: impl AsContextMut) {
1583 stream_close(store.as_context_mut().0, &mut self.id);
1584 }
1585
1586 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1588 accessor.as_accessor().with(|access| self.close(access))
1589 }
1590
1591 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1597 where
1598 A: AsAccessor,
1599 {
1600 GuardedStreamReader::new(accessor, self)
1601 }
1602
1603 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1610 where
1611 T: ComponentType + 'static,
1612 {
1613 StreamAny::try_from_stream_reader(store, self)
1614 }
1615
1616 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1623 where
1624 T: ComponentType + 'static,
1625 {
1626 stream.try_into_stream_reader()
1627 }
1628}
1629
1630impl<T> fmt::Debug for StreamReader<T> {
1631 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1632 f.debug_struct("StreamReader")
1633 .field("id", &self.id)
1634 .finish()
1635 }
1636}
1637
1638pub(super) fn stream_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1639 let id = mem::replace(id, TableId::new(u32::MAX));
1640 store.host_drop_reader(id, TransmitKind::Stream).unwrap();
1641}
1642
1643pub(super) fn lift_index_to_stream(
1645 cx: &mut LiftContext<'_>,
1646 ty: InterfaceType,
1647 index: u32,
1648) -> Result<TableId<TransmitHandle>> {
1649 match ty {
1650 InterfaceType::Stream(src) => {
1651 let handle_table = cx
1652 .instance_mut()
1653 .table_for_transmit(TransmitIndex::Stream(src));
1654 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1655 if is_done {
1656 bail!("cannot lift stream after being notified that the writable end dropped");
1657 }
1658 let id = TableId::<TransmitHandle>::new(rep);
1659 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1660 Ok(id)
1661 }
1662 _ => func::bad_type_info(),
1663 }
1664}
1665
1666pub(super) fn lower_stream_to_index<U>(
1668 id: TableId<TransmitHandle>,
1669 cx: &mut LowerContext<'_, U>,
1670 ty: InterfaceType,
1671) -> Result<u32> {
1672 match ty {
1673 InterfaceType::Stream(dst) => {
1674 let concurrent_state = cx.store.0.concurrent_state_mut();
1675 let state = concurrent_state.get_mut(id)?.state;
1676 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1677
1678 let handle = cx
1679 .instance_mut()
1680 .table_for_transmit(TransmitIndex::Stream(dst))
1681 .stream_insert_read(dst, rep)?;
1682
1683 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1684
1685 Ok(handle)
1686 }
1687 _ => func::bad_type_info(),
1688 }
1689}
1690
1691unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1694 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1695
1696 type Lower = <u32 as func::ComponentType>::Lower;
1697
1698 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1699 match ty {
1700 InterfaceType::Stream(ty) => {
1701 let ty = types.types[*ty].ty;
1702 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1703 }
1704 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1705 }
1706 }
1707}
1708
1709unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1711 fn linear_lower_to_flat<U>(
1712 &self,
1713 cx: &mut LowerContext<'_, U>,
1714 ty: InterfaceType,
1715 dst: &mut MaybeUninit<Self::Lower>,
1716 ) -> Result<()> {
1717 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1718 }
1719
1720 fn linear_lower_to_memory<U>(
1721 &self,
1722 cx: &mut LowerContext<'_, U>,
1723 ty: InterfaceType,
1724 offset: usize,
1725 ) -> Result<()> {
1726 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1727 cx,
1728 InterfaceType::U32,
1729 offset,
1730 )
1731 }
1732}
1733
1734unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1736 fn linear_lift_from_flat(
1737 cx: &mut LiftContext<'_>,
1738 ty: InterfaceType,
1739 src: &Self::Lower,
1740 ) -> Result<Self> {
1741 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1742 Self::lift_from_index(cx, ty, index)
1743 }
1744
1745 fn linear_lift_from_memory(
1746 cx: &mut LiftContext<'_>,
1747 ty: InterfaceType,
1748 bytes: &[u8],
1749 ) -> Result<Self> {
1750 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1751 Self::lift_from_index(cx, ty, index)
1752 }
1753}
1754
1755pub struct GuardedStreamReader<T, A>
1761where
1762 A: AsAccessor,
1763{
1764 reader: Option<StreamReader<T>>,
1768 accessor: A,
1769}
1770
1771impl<T, A> GuardedStreamReader<T, A>
1772where
1773 A: AsAccessor,
1774{
1775 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1778 Self {
1779 reader: Some(reader),
1780 accessor,
1781 }
1782 }
1783
1784 pub fn into_stream(self) -> StreamReader<T> {
1787 self.into()
1788 }
1789}
1790
1791impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1792where
1793 A: AsAccessor,
1794{
1795 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1796 guard.reader.take().unwrap()
1797 }
1798}
1799
1800impl<T, A> Drop for GuardedStreamReader<T, A>
1801where
1802 A: AsAccessor,
1803{
1804 fn drop(&mut self) {
1805 if let Some(reader) = &mut self.reader {
1806 reader.close_with(&self.accessor)
1807 }
1808 }
1809}
1810
1811pub struct ErrorContext {
1813 rep: u32,
1814}
1815
1816impl ErrorContext {
1817 pub(crate) fn new(rep: u32) -> Self {
1818 Self { rep }
1819 }
1820
1821 pub fn into_val(self) -> Val {
1823 Val::ErrorContext(ErrorContextAny(self.rep))
1824 }
1825
1826 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1828 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1829 bail!("expected `error-context`; got `{}`", value.desc());
1830 };
1831 Ok(Self::new(*rep))
1832 }
1833
1834 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1835 match ty {
1836 InterfaceType::ErrorContext(src) => {
1837 let rep = cx
1838 .instance_mut()
1839 .table_for_error_context(src)
1840 .error_context_rep(index)?;
1841
1842 Ok(Self { rep })
1843 }
1844 _ => func::bad_type_info(),
1845 }
1846 }
1847}
1848
1849pub(crate) fn lower_error_context_to_index<U>(
1850 rep: u32,
1851 cx: &mut LowerContext<'_, U>,
1852 ty: InterfaceType,
1853) -> Result<u32> {
1854 match ty {
1855 InterfaceType::ErrorContext(dst) => {
1856 let tbl = cx.instance_mut().table_for_error_context(dst);
1857 tbl.error_context_insert(rep)
1858 }
1859 _ => func::bad_type_info(),
1860 }
1861}
1862unsafe impl func::ComponentType for ErrorContext {
1865 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1866
1867 type Lower = <u32 as func::ComponentType>::Lower;
1868
1869 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1870 match ty {
1871 InterfaceType::ErrorContext(_) => Ok(()),
1872 other => bail!("expected `error`, found `{}`", func::desc(other)),
1873 }
1874 }
1875}
1876
1877unsafe impl func::Lower for ErrorContext {
1879 fn linear_lower_to_flat<T>(
1880 &self,
1881 cx: &mut LowerContext<'_, T>,
1882 ty: InterfaceType,
1883 dst: &mut MaybeUninit<Self::Lower>,
1884 ) -> Result<()> {
1885 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1886 cx,
1887 InterfaceType::U32,
1888 dst,
1889 )
1890 }
1891
1892 fn linear_lower_to_memory<T>(
1893 &self,
1894 cx: &mut LowerContext<'_, T>,
1895 ty: InterfaceType,
1896 offset: usize,
1897 ) -> Result<()> {
1898 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1899 cx,
1900 InterfaceType::U32,
1901 offset,
1902 )
1903 }
1904}
1905
1906unsafe impl func::Lift for ErrorContext {
1908 fn linear_lift_from_flat(
1909 cx: &mut LiftContext<'_>,
1910 ty: InterfaceType,
1911 src: &Self::Lower,
1912 ) -> Result<Self> {
1913 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1914 Self::lift_from_index(cx, ty, index)
1915 }
1916
1917 fn linear_lift_from_memory(
1918 cx: &mut LiftContext<'_>,
1919 ty: InterfaceType,
1920 bytes: &[u8],
1921 ) -> Result<Self> {
1922 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1923 Self::lift_from_index(cx, ty, index)
1924 }
1925}
1926
1927pub(super) struct TransmitHandle {
1929 pub(super) common: WaitableCommon,
1930 state: TableId<TransmitState>,
1932}
1933
1934impl TransmitHandle {
1935 fn new(state: TableId<TransmitState>) -> Self {
1936 Self {
1937 common: WaitableCommon::default(),
1938 state,
1939 }
1940 }
1941}
1942
1943impl TableDebug for TransmitHandle {
1944 fn type_name() -> &'static str {
1945 "TransmitHandle"
1946 }
1947}
1948
1949struct TransmitState {
1951 write_handle: TableId<TransmitHandle>,
1953 read_handle: TableId<TransmitHandle>,
1955 write: WriteState,
1957 read: ReadState,
1959 done: bool,
1961 pub(super) origin: TransmitOrigin,
1964}
1965
1966#[derive(Copy, Clone)]
1967pub(super) enum TransmitOrigin {
1968 Host,
1969 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
1970 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
1971}
1972
1973impl TransmitState {
1974 fn new(origin: TransmitOrigin) -> Self {
1975 Self {
1976 write_handle: TableId::new(u32::MAX),
1977 read_handle: TableId::new(u32::MAX),
1978 read: ReadState::Open,
1979 write: WriteState::Open,
1980 done: false,
1981 origin,
1982 }
1983 }
1984}
1985
1986impl TableDebug for TransmitState {
1987 fn type_name() -> &'static str {
1988 "TransmitState"
1989 }
1990}
1991
1992impl TransmitOrigin {
1993 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
1994 match index {
1995 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
1996 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
1997 }
1998 }
1999}
2000
2001type PollStream = Box<
2002 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2003>;
2004
2005type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2006
2007enum WriteState {
2009 Open,
2011 GuestReady {
2013 instance: Instance,
2014 caller: RuntimeComponentInstanceIndex,
2015 ty: TransmitIndex,
2016 flat_abi: Option<FlatAbi>,
2017 options: OptionsIndex,
2018 address: usize,
2019 count: usize,
2020 handle: u32,
2021 },
2022 HostReady {
2024 produce: PollStream,
2025 try_into: TryInto,
2026 guest_offset: usize,
2027 cancel: bool,
2028 cancel_waker: Option<Waker>,
2029 },
2030 Dropped,
2032}
2033
2034impl fmt::Debug for WriteState {
2035 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2036 match self {
2037 Self::Open => f.debug_tuple("Open").finish(),
2038 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2039 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2040 Self::Dropped => f.debug_tuple("Dropped").finish(),
2041 }
2042 }
2043}
2044
2045enum ReadState {
2047 Open,
2049 GuestReady {
2051 ty: TransmitIndex,
2052 caller: RuntimeComponentInstanceIndex,
2053 flat_abi: Option<FlatAbi>,
2054 instance: Instance,
2055 options: OptionsIndex,
2056 address: usize,
2057 count: usize,
2058 handle: u32,
2059 },
2060 HostReady {
2062 consume: PollStream,
2063 guest_offset: usize,
2064 cancel: bool,
2065 cancel_waker: Option<Waker>,
2066 },
2067 HostToHost {
2069 accept: Box<
2070 dyn for<'a> Fn(
2071 &'a mut UntypedWriteBuffer<'a>,
2072 )
2073 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2074 + Send
2075 + Sync,
2076 >,
2077 buffer: Vec<u8>,
2078 limit: usize,
2079 },
2080 Dropped,
2082}
2083
2084impl fmt::Debug for ReadState {
2085 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2086 match self {
2087 Self::Open => f.debug_tuple("Open").finish(),
2088 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2089 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2090 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2091 Self::Dropped => f.debug_tuple("Dropped").finish(),
2092 }
2093 }
2094}
2095
2096fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2097 let count = guest_offset.try_into().unwrap();
2098 match state {
2099 StreamResult::Dropped => ReturnCode::Dropped(count),
2100 StreamResult::Completed => ReturnCode::completed(kind, count),
2101 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2102 }
2103}
2104
2105impl StoreOpaque {
2106 fn pipe_from_guest(
2107 &mut self,
2108 kind: TransmitKind,
2109 id: TableId<TransmitState>,
2110 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2111 ) {
2112 let future = async move {
2113 let stream_state = future.await?;
2114 tls::get(|store| {
2115 let state = store.concurrent_state_mut();
2116 let transmit = state.get_mut(id)?;
2117 let ReadState::HostReady {
2118 consume,
2119 guest_offset,
2120 ..
2121 } = mem::replace(&mut transmit.read, ReadState::Open)
2122 else {
2123 unreachable!();
2124 };
2125 let code = return_code(kind, stream_state, guest_offset);
2126 transmit.read = match stream_state {
2127 StreamResult::Dropped => ReadState::Dropped,
2128 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2129 consume,
2130 guest_offset: 0,
2131 cancel: false,
2132 cancel_waker: None,
2133 },
2134 };
2135 let WriteState::GuestReady { ty, handle, .. } =
2136 mem::replace(&mut transmit.write, WriteState::Open)
2137 else {
2138 unreachable!();
2139 };
2140 state.send_write_result(ty, id, handle, code)?;
2141 Ok(())
2142 })
2143 };
2144
2145 self.concurrent_state_mut().push_future(future.boxed());
2146 }
2147
2148 fn pipe_to_guest(
2149 &mut self,
2150 kind: TransmitKind,
2151 id: TableId<TransmitState>,
2152 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2153 ) {
2154 let future = async move {
2155 let stream_state = future.await?;
2156 tls::get(|store| {
2157 let state = store.concurrent_state_mut();
2158 let transmit = state.get_mut(id)?;
2159 let WriteState::HostReady {
2160 produce,
2161 try_into,
2162 guest_offset,
2163 ..
2164 } = mem::replace(&mut transmit.write, WriteState::Open)
2165 else {
2166 unreachable!();
2167 };
2168 let code = return_code(kind, stream_state, guest_offset);
2169 transmit.write = match stream_state {
2170 StreamResult::Dropped => WriteState::Dropped,
2171 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2172 produce,
2173 try_into,
2174 guest_offset: 0,
2175 cancel: false,
2176 cancel_waker: None,
2177 },
2178 };
2179 let ReadState::GuestReady { ty, handle, .. } =
2180 mem::replace(&mut transmit.read, ReadState::Open)
2181 else {
2182 unreachable!();
2183 };
2184 state.send_read_result(ty, id, handle, code)?;
2185 Ok(())
2186 })
2187 };
2188
2189 self.concurrent_state_mut().push_future(future.boxed());
2190 }
2191
2192 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2194 let state = self.concurrent_state_mut();
2195 let transmit_id = state.get_mut(id)?.state;
2196 let transmit = state
2197 .get_mut(transmit_id)
2198 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2199 log::trace!(
2200 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2201 transmit.read,
2202 transmit.write
2203 );
2204
2205 transmit.read = ReadState::Dropped;
2206
2207 let new_state = if let WriteState::Dropped = &transmit.write {
2210 WriteState::Dropped
2211 } else {
2212 WriteState::Open
2213 };
2214
2215 let write_handle = transmit.write_handle;
2216
2217 match mem::replace(&mut transmit.write, new_state) {
2218 WriteState::GuestReady { ty, handle, .. } => {
2221 state.update_event(
2222 write_handle.rep(),
2223 match ty {
2224 TransmitIndex::Future(ty) => Event::FutureWrite {
2225 code: ReturnCode::Dropped(0),
2226 pending: Some((ty, handle)),
2227 },
2228 TransmitIndex::Stream(ty) => Event::StreamWrite {
2229 code: ReturnCode::Dropped(0),
2230 pending: Some((ty, handle)),
2231 },
2232 },
2233 )?;
2234 }
2235
2236 WriteState::HostReady { .. } => {}
2237
2238 WriteState::Open => {
2239 state.update_event(
2240 write_handle.rep(),
2241 match kind {
2242 TransmitKind::Future => Event::FutureWrite {
2243 code: ReturnCode::Dropped(0),
2244 pending: None,
2245 },
2246 TransmitKind::Stream => Event::StreamWrite {
2247 code: ReturnCode::Dropped(0),
2248 pending: None,
2249 },
2250 },
2251 )?;
2252 }
2253
2254 WriteState::Dropped => {
2255 log::trace!("host_drop_reader delete {transmit_id:?}");
2256 state.delete_transmit(transmit_id)?;
2257 }
2258 }
2259 Ok(())
2260 }
2261
2262 fn host_drop_writer(
2264 &mut self,
2265 id: TableId<TransmitHandle>,
2266 on_drop_open: Option<fn() -> Result<()>>,
2267 ) -> Result<()> {
2268 let state = self.concurrent_state_mut();
2269 let transmit_id = state.get_mut(id)?.state;
2270 let transmit = state
2271 .get_mut(transmit_id)
2272 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2273 log::trace!(
2274 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2275 transmit.read,
2276 transmit.write
2277 );
2278
2279 match &mut transmit.write {
2281 WriteState::GuestReady { .. } => {
2282 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2283 }
2284 WriteState::HostReady { .. } => {}
2285 v @ WriteState::Open => {
2286 if let (Some(on_drop_open), false) = (
2287 on_drop_open,
2288 transmit.done || matches!(transmit.read, ReadState::Dropped),
2289 ) {
2290 on_drop_open()?;
2291 } else {
2292 *v = WriteState::Dropped;
2293 }
2294 }
2295 WriteState::Dropped => unreachable!("write state is already dropped"),
2296 }
2297
2298 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2299
2300 let new_state = if let ReadState::Dropped = &transmit.read {
2306 ReadState::Dropped
2307 } else {
2308 ReadState::Open
2309 };
2310
2311 let read_handle = transmit.read_handle;
2312
2313 match mem::replace(&mut transmit.read, new_state) {
2315 ReadState::GuestReady { ty, handle, .. } => {
2319 self.concurrent_state_mut().update_event(
2321 read_handle.rep(),
2322 match ty {
2323 TransmitIndex::Future(ty) => Event::FutureRead {
2324 code: ReturnCode::Dropped(0),
2325 pending: Some((ty, handle)),
2326 },
2327 TransmitIndex::Stream(ty) => Event::StreamRead {
2328 code: ReturnCode::Dropped(0),
2329 pending: Some((ty, handle)),
2330 },
2331 },
2332 )?;
2333 }
2334
2335 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2336
2337 ReadState::Open => {
2339 self.concurrent_state_mut().update_event(
2340 read_handle.rep(),
2341 match on_drop_open {
2342 Some(_) => Event::FutureRead {
2343 code: ReturnCode::Dropped(0),
2344 pending: None,
2345 },
2346 None => Event::StreamRead {
2347 code: ReturnCode::Dropped(0),
2348 pending: None,
2349 },
2350 },
2351 )?;
2352 }
2353
2354 ReadState::Dropped => {
2357 log::trace!("host_drop_writer delete {transmit_id:?}");
2358 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2359 }
2360 }
2361 Ok(())
2362 }
2363
2364 pub(super) fn transmit_origin(
2365 &mut self,
2366 id: TableId<TransmitHandle>,
2367 ) -> Result<TransmitOrigin> {
2368 let state = self.concurrent_state_mut();
2369 let state_id = state.get_mut(id)?.state;
2370 Ok(state.get_mut(state_id)?.origin)
2371 }
2372}
2373
2374impl<T> StoreContextMut<'_, T> {
2375 fn new_transmit<P: StreamProducer<T>>(
2376 mut self,
2377 kind: TransmitKind,
2378 producer: P,
2379 ) -> TableId<TransmitHandle>
2380 where
2381 P::Item: func::Lower,
2382 {
2383 let token = StoreToken::new(self.as_context_mut());
2384 let state = self.0.concurrent_state_mut();
2385 let (_, read) = state.new_transmit(TransmitOrigin::Host).unwrap();
2386 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2387 let id = state.get_mut(read).unwrap().state;
2388 let mut dropped = false;
2389 let produce = Box::new({
2390 let producer = producer.clone();
2391 move || {
2392 let producer = producer.clone();
2393 async move {
2394 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2395
2396 let (result, cancelled) = if buffer.remaining().is_empty() {
2397 future::poll_fn(|cx| {
2398 tls::get(|store| {
2399 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2400
2401 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2402 unreachable!();
2403 };
2404
2405 let mut host_buffer =
2406 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2407 Some(Cursor::new(mem::take(buffer)))
2408 } else {
2409 None
2410 };
2411
2412 let poll = mine.as_mut().poll_produce(
2413 cx,
2414 token.as_context_mut(store),
2415 Destination {
2416 id,
2417 buffer: &mut buffer,
2418 host_buffer: host_buffer.as_mut(),
2419 _phantom: PhantomData,
2420 },
2421 cancel,
2422 );
2423
2424 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2425
2426 let host_offset = if let (
2427 Some(host_buffer),
2428 ReadState::HostToHost { buffer, limit, .. },
2429 ) = (host_buffer, &mut transmit.read)
2430 {
2431 *limit = usize::try_from(host_buffer.position()).unwrap();
2432 *buffer = host_buffer.into_inner();
2433 *limit
2434 } else {
2435 0
2436 };
2437
2438 {
2439 let WriteState::HostReady {
2440 guest_offset,
2441 cancel,
2442 cancel_waker,
2443 ..
2444 } = &mut transmit.write
2445 else {
2446 unreachable!();
2447 };
2448
2449 if poll.is_pending() {
2450 if !buffer.remaining().is_empty()
2451 || *guest_offset > 0
2452 || host_offset > 0
2453 {
2454 return Poll::Ready(Err(anyhow!(
2455 "StreamProducer::poll_produce returned Poll::Pending \
2456 after producing at least one item"
2457 )));
2458 }
2459 *cancel_waker = Some(cx.waker().clone());
2460 } else {
2461 *cancel_waker = None;
2462 *cancel = false;
2463 }
2464 }
2465
2466 poll.map(|v| v.map(|result| (result, cancel)))
2467 })
2468 })
2469 .await?
2470 } else {
2471 (StreamResult::Completed, false)
2472 };
2473
2474 let (guest_offset, host_offset, count) = tls::get(|store| {
2475 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2476 let (count, host_offset) = match &transmit.read {
2477 &ReadState::GuestReady { count, .. } => (count, 0),
2478 &ReadState::HostToHost { limit, .. } => (1, limit),
2479 _ => unreachable!(),
2480 };
2481 let guest_offset = match &transmit.write {
2482 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2483 _ => unreachable!(),
2484 };
2485 (guest_offset, host_offset, count)
2486 });
2487
2488 match result {
2489 StreamResult::Completed => {
2490 if count > 1
2491 && buffer.remaining().is_empty()
2492 && guest_offset == 0
2493 && host_offset == 0
2494 {
2495 bail!(
2496 "StreamProducer::poll_produce returned StreamResult::Completed \
2497 without producing any items"
2498 );
2499 }
2500 }
2501 StreamResult::Cancelled => {
2502 if !cancelled {
2503 bail!(
2504 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2505 without being given a `finish` parameter value of true"
2506 );
2507 }
2508 }
2509 StreamResult::Dropped => {
2510 dropped = true;
2511 }
2512 }
2513
2514 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2515
2516 *producer.lock().unwrap() = Some((mine, buffer));
2517
2518 if write_buffer {
2519 write(token, id, producer.clone(), kind).await?;
2520 }
2521
2522 Ok(if dropped {
2523 if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2524 {
2525 StreamResult::Dropped
2526 } else {
2527 StreamResult::Completed
2528 }
2529 } else {
2530 result
2531 })
2532 }
2533 .boxed()
2534 }
2535 });
2536 let try_into = Box::new(move |ty| {
2537 let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2538 match P::try_into(mine, ty) {
2539 Ok(value) => Some(value),
2540 Err(mine) => {
2541 *producer.lock().unwrap() = Some((mine, buffer));
2542 None
2543 }
2544 }
2545 });
2546 state.get_mut(id).unwrap().write = WriteState::HostReady {
2547 produce,
2548 try_into,
2549 guest_offset: 0,
2550 cancel: false,
2551 cancel_waker: None,
2552 };
2553 read
2554 }
2555
2556 fn set_consumer<C: StreamConsumer<T>>(
2557 mut self,
2558 id: TableId<TransmitHandle>,
2559 kind: TransmitKind,
2560 consumer: C,
2561 ) {
2562 let token = StoreToken::new(self.as_context_mut());
2563 let state = self.0.concurrent_state_mut();
2564 let id = state.get_mut(id).unwrap().state;
2565 let transmit = state.get_mut(id).unwrap();
2566 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2567 let consume_with_buffer = {
2568 let consumer = consumer.clone();
2569 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2570 let mut mine = consumer.lock().unwrap().take().unwrap();
2571
2572 let host_buffer_remaining_before =
2573 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2574
2575 let (result, cancelled) = future::poll_fn(|cx| {
2576 tls::get(|store| {
2577 let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2578 &ReadState::HostReady { cancel, .. } => cancel,
2579 ReadState::Open => false,
2580 _ => unreachable!(),
2581 };
2582
2583 let poll = mine.as_mut().poll_consume(
2584 cx,
2585 token.as_context_mut(store),
2586 Source {
2587 id,
2588 host_buffer: host_buffer.as_deref_mut(),
2589 },
2590 cancel,
2591 );
2592
2593 if let ReadState::HostReady {
2594 cancel_waker,
2595 cancel,
2596 ..
2597 } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2598 {
2599 if poll.is_pending() {
2600 *cancel_waker = Some(cx.waker().clone());
2601 } else {
2602 *cancel_waker = None;
2603 *cancel = false;
2604 }
2605 }
2606
2607 poll.map(|v| v.map(|result| (result, cancel)))
2608 })
2609 })
2610 .await?;
2611
2612 let (guest_offset, count) = tls::get(|store| {
2613 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2614 (
2615 match &transmit.read {
2616 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2617 ReadState::Open => 0,
2618 _ => unreachable!(),
2619 },
2620 match &transmit.write {
2621 &WriteState::GuestReady { count, .. } => count,
2622 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2623 _ => unreachable!(),
2624 },
2625 )
2626 });
2627
2628 match result {
2629 StreamResult::Completed => {
2630 if count > 0
2631 && guest_offset == 0
2632 && host_buffer_remaining_before
2633 .zip(host_buffer.map(|v| v.remaining().len()))
2634 .map(|(before, after)| before == after)
2635 .unwrap_or(false)
2636 {
2637 bail!(
2638 "StreamConsumer::poll_consume returned StreamResult::Completed \
2639 without consuming any items"
2640 );
2641 }
2642
2643 if let TransmitKind::Future = kind {
2644 tls::get(|store| {
2645 store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2646 });
2647 }
2648 }
2649 StreamResult::Cancelled => {
2650 if !cancelled {
2651 bail!(
2652 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2653 without being given a `finish` parameter value of true"
2654 );
2655 }
2656 }
2657 StreamResult::Dropped => {}
2658 }
2659
2660 *consumer.lock().unwrap() = Some(mine);
2661
2662 Ok(result)
2663 }
2664 };
2665 let consume = {
2666 let consume = consume_with_buffer.clone();
2667 Box::new(move || {
2668 let consume = consume.clone();
2669 async move { consume(None).await }.boxed()
2670 })
2671 };
2672
2673 match &transmit.write {
2674 WriteState::Open => {
2675 transmit.read = ReadState::HostReady {
2676 consume,
2677 guest_offset: 0,
2678 cancel: false,
2679 cancel_waker: None,
2680 };
2681 }
2682 &WriteState::GuestReady { .. } => {
2683 let future = consume();
2684 transmit.read = ReadState::HostReady {
2685 consume,
2686 guest_offset: 0,
2687 cancel: false,
2688 cancel_waker: None,
2689 };
2690 self.0.pipe_from_guest(kind, id, future);
2691 }
2692 WriteState::HostReady { .. } => {
2693 let WriteState::HostReady { produce, .. } = mem::replace(
2694 &mut transmit.write,
2695 WriteState::HostReady {
2696 produce: Box::new(|| unreachable!()),
2697 try_into: Box::new(|_| unreachable!()),
2698 guest_offset: 0,
2699 cancel: false,
2700 cancel_waker: None,
2701 },
2702 ) else {
2703 unreachable!();
2704 };
2705
2706 transmit.read = ReadState::HostToHost {
2707 accept: Box::new(move |input| {
2708 let consume = consume_with_buffer.clone();
2709 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2710 }),
2711 buffer: Vec::new(),
2712 limit: 0,
2713 };
2714
2715 let future = async move {
2716 loop {
2717 if tls::get(|store| {
2718 anyhow::Ok(matches!(
2719 store.concurrent_state_mut().get_mut(id)?.read,
2720 ReadState::Dropped
2721 ))
2722 })? {
2723 break Ok(());
2724 }
2725
2726 match produce().await? {
2727 StreamResult::Completed | StreamResult::Cancelled => {}
2728 StreamResult::Dropped => break Ok(()),
2729 }
2730
2731 if let TransmitKind::Future = kind {
2732 break Ok(());
2733 }
2734 }
2735 }
2736 .map(move |result| {
2737 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2738 result
2739 });
2740
2741 state.push_future(Box::pin(future));
2742 }
2743 WriteState::Dropped => {
2744 let reader = transmit.read_handle;
2745 self.0.host_drop_reader(reader, kind).unwrap();
2746 }
2747 }
2748 }
2749}
2750
2751async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2752 token: StoreToken<D>,
2753 id: TableId<TransmitState>,
2754 pair: Arc<Mutex<Option<(P, B)>>>,
2755 kind: TransmitKind,
2756) -> Result<()> {
2757 let (read, guest_offset) = tls::get(|store| {
2758 let transmit = store.concurrent_state_mut().get_mut(id)?;
2759
2760 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2761 Some(guest_offset)
2762 } else {
2763 None
2764 };
2765
2766 anyhow::Ok((
2767 mem::replace(&mut transmit.read, ReadState::Open),
2768 guest_offset,
2769 ))
2770 })?;
2771
2772 match read {
2773 ReadState::GuestReady {
2774 ty,
2775 flat_abi,
2776 options,
2777 address,
2778 count,
2779 handle,
2780 instance,
2781 caller,
2782 } => {
2783 let guest_offset = guest_offset.unwrap();
2784
2785 if let TransmitKind::Future = kind {
2786 tls::get(|store| {
2787 store.concurrent_state_mut().get_mut(id)?.done = true;
2788 anyhow::Ok(())
2789 })?;
2790 }
2791
2792 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2793 let accept = {
2794 let pair = pair.clone();
2795 move |mut store: StoreContextMut<D>| {
2796 lower::<T, B, D>(
2797 store.as_context_mut(),
2798 instance,
2799 options,
2800 ty,
2801 address + (T::SIZE32 * guest_offset),
2802 count - guest_offset,
2803 &mut pair.lock().unwrap().as_mut().unwrap().1,
2804 )?;
2805 anyhow::Ok(())
2806 }
2807 };
2808
2809 if guest_offset < count {
2810 if T::MAY_REQUIRE_REALLOC {
2811 let (tx, rx) = oneshot::channel();
2816 tls::get(move |store| {
2817 store
2818 .concurrent_state_mut()
2819 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2820 move |store| {
2821 _ = tx.send(accept(token.as_context_mut(store))?);
2822 Ok(())
2823 },
2824 ))))
2825 });
2826 rx.await?
2827 } else {
2828 tls::get(|store| accept(token.as_context_mut(store)))?
2833 };
2834 }
2835
2836 tls::get(|store| {
2837 let count =
2838 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2839
2840 let transmit = store.concurrent_state_mut().get_mut(id)?;
2841
2842 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2843 unreachable!();
2844 };
2845
2846 *guest_offset += count;
2847
2848 transmit.read = ReadState::GuestReady {
2849 ty,
2850 flat_abi,
2851 options,
2852 address,
2853 count,
2854 handle,
2855 instance,
2856 caller,
2857 };
2858
2859 anyhow::Ok(())
2860 })?;
2861
2862 Ok(())
2863 }
2864
2865 ReadState::HostToHost {
2866 accept,
2867 mut buffer,
2868 limit,
2869 } => {
2870 let mut state = StreamResult::Completed;
2871 let mut position = 0;
2872
2873 while !matches!(state, StreamResult::Dropped) && position < limit {
2874 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2875 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2876 (buffer, position, _) = slice_buffer.into_parts();
2877 }
2878
2879 {
2880 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2881
2882 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2883 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2884 }
2885
2886 *pair.lock().unwrap() = Some((mine, buffer));
2887 }
2888
2889 tls::get(|store| {
2890 store.concurrent_state_mut().get_mut(id)?.read = match state {
2891 StreamResult::Dropped => ReadState::Dropped,
2892 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2893 accept,
2894 buffer,
2895 limit: 0,
2896 },
2897 };
2898
2899 anyhow::Ok(())
2900 })?;
2901 Ok(())
2902 }
2903
2904 _ => unreachable!(),
2905 }
2906}
2907
2908impl Instance {
2909 fn consume(
2912 self,
2913 store: &mut dyn VMStore,
2914 kind: TransmitKind,
2915 transmit_id: TableId<TransmitState>,
2916 consume: PollStream,
2917 guest_offset: usize,
2918 cancel: bool,
2919 ) -> Result<ReturnCode> {
2920 let mut future = consume();
2921 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2922 consume,
2923 guest_offset,
2924 cancel,
2925 cancel_waker: None,
2926 };
2927 let poll = tls::set(store, || {
2928 future
2929 .as_mut()
2930 .poll(&mut Context::from_waker(&Waker::noop()))
2931 });
2932
2933 Ok(match poll {
2934 Poll::Ready(state) => {
2935 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2936 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2937 unreachable!();
2938 };
2939 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2940 transmit.write = WriteState::Open;
2941 code
2942 }
2943 Poll::Pending => {
2944 store.pipe_from_guest(kind, transmit_id, future);
2945 ReturnCode::Blocked
2946 }
2947 })
2948 }
2949
2950 fn produce(
2953 self,
2954 store: &mut dyn VMStore,
2955 kind: TransmitKind,
2956 transmit_id: TableId<TransmitState>,
2957 produce: PollStream,
2958 try_into: TryInto,
2959 guest_offset: usize,
2960 cancel: bool,
2961 ) -> Result<ReturnCode> {
2962 let mut future = produce();
2963 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2964 produce,
2965 try_into,
2966 guest_offset,
2967 cancel,
2968 cancel_waker: None,
2969 };
2970 let poll = tls::set(store, || {
2971 future
2972 .as_mut()
2973 .poll(&mut Context::from_waker(&Waker::noop()))
2974 });
2975
2976 Ok(match poll {
2977 Poll::Ready(state) => {
2978 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2979 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2980 unreachable!();
2981 };
2982 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2983 transmit.read = ReadState::Open;
2984 code
2985 }
2986 Poll::Pending => {
2987 store.pipe_to_guest(kind, transmit_id, future);
2988 ReturnCode::Blocked
2989 }
2990 })
2991 }
2992
2993 pub(super) fn guest_drop_writable(
2995 self,
2996 store: &mut StoreOpaque,
2997 ty: TransmitIndex,
2998 writer: u32,
2999 ) -> Result<()> {
3000 let table = self.id().get_mut(store).table_for_transmit(ty);
3001 let transmit_rep = match ty {
3002 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3003 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3004 };
3005
3006 let id = TableId::<TransmitHandle>::new(transmit_rep);
3007 log::trace!("guest_drop_writable: drop writer {id:?}");
3008 match ty {
3009 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3010 TransmitIndex::Future(_) => store.host_drop_writer(
3011 id,
3012 Some(|| {
3013 Err(anyhow!(
3014 "cannot drop future write end without first writing a value"
3015 ))
3016 }),
3017 ),
3018 }
3019 }
3020
3021 fn copy<T: 'static>(
3024 self,
3025 mut store: StoreContextMut<T>,
3026 flat_abi: Option<FlatAbi>,
3027 write_caller: RuntimeComponentInstanceIndex,
3028 write_ty: TransmitIndex,
3029 write_options: OptionsIndex,
3030 write_address: usize,
3031 read_caller: RuntimeComponentInstanceIndex,
3032 read_ty: TransmitIndex,
3033 read_options: OptionsIndex,
3034 read_address: usize,
3035 count: usize,
3036 rep: u32,
3037 ) -> Result<()> {
3038 let types = self.id().get(store.0).component().types();
3039 match (write_ty, read_ty) {
3040 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3041 assert_eq!(count, 1);
3042
3043 let payload = types[types[write_ty].ty].payload;
3044
3045 if write_caller == read_caller && !allow_intra_component_read_write(payload) {
3046 bail!(
3047 "cannot read from and write to intra-component future with non-numeric payload"
3048 )
3049 }
3050
3051 let val = payload
3052 .map(|ty| {
3053 let lift =
3054 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3055
3056 let abi = lift.types.canonical_abi(&ty);
3057 if write_address % usize::try_from(abi.align32)? != 0 {
3059 bail!("write pointer not aligned");
3060 }
3061
3062 let bytes = lift
3063 .memory()
3064 .get(write_address..)
3065 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
3066 .ok_or_else(|| {
3067 anyhow::anyhow!("write pointer out of bounds of memory")
3068 })?;
3069
3070 Val::load(lift, ty, bytes)
3071 })
3072 .transpose()?;
3073
3074 if let Some(val) = val {
3075 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3076 let types = lower.types;
3077 let ty = types[types[read_ty].ty].payload.unwrap();
3078 let ptr = func::validate_inbounds_dynamic(
3079 types.canonical_abi(&ty),
3080 lower.as_slice_mut(),
3081 &ValRaw::u32(read_address.try_into().unwrap()),
3082 )?;
3083 val.store(lower, ty, ptr)?;
3084 }
3085 }
3086 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3087 if write_caller == read_caller
3088 && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3089 {
3090 bail!(
3091 "cannot read from and write to intra-component stream with non-numeric payload"
3092 )
3093 }
3094
3095 if let Some(flat_abi) = flat_abi {
3096 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
3098 if length_in_bytes > 0 {
3099 if write_address % usize::try_from(flat_abi.align)? != 0 {
3100 bail!("write pointer not aligned");
3101 }
3102 if read_address % usize::try_from(flat_abi.align)? != 0 {
3103 bail!("read pointer not aligned");
3104 }
3105
3106 let store_opaque = store.0.store_opaque_mut();
3107
3108 {
3109 let src = self
3110 .options_memory(store_opaque, write_options)
3111 .get(write_address..)
3112 .and_then(|b| b.get(..length_in_bytes))
3113 .ok_or_else(|| {
3114 anyhow::anyhow!("write pointer out of bounds of memory")
3115 })?
3116 .as_ptr();
3117 let dst = self
3118 .options_memory_mut(store_opaque, read_options)
3119 .get_mut(read_address..)
3120 .and_then(|b| b.get_mut(..length_in_bytes))
3121 .ok_or_else(|| {
3122 anyhow::anyhow!("read pointer out of bounds of memory")
3123 })?
3124 .as_mut_ptr();
3125 unsafe {
3128 if write_caller == read_caller {
3129 src.copy_to(dst, length_in_bytes)
3133 } else {
3134 src.copy_to_nonoverlapping(dst, length_in_bytes)
3139 }
3140 }
3141 }
3142 }
3143 } else {
3144 let store_opaque = store.0.store_opaque_mut();
3145 let lift = &mut LiftContext::new(store_opaque, write_options, self);
3146 let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3147 let abi = lift.types.canonical_abi(&ty);
3148 let size = usize::try_from(abi.size32).unwrap();
3149 if write_address % usize::try_from(abi.align32)? != 0 {
3150 bail!("write pointer not aligned");
3151 }
3152 let bytes = lift
3153 .memory()
3154 .get(write_address..)
3155 .and_then(|b| b.get(..size * count))
3156 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
3157
3158 let values = (0..count)
3159 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3160 .collect::<Result<Vec<_>>>()?;
3161
3162 let id = TableId::<TransmitHandle>::new(rep);
3163 log::trace!("copy values {values:?} for {id:?}");
3164
3165 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3166 let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3167 let abi = lower.types.canonical_abi(&ty);
3168 if read_address % usize::try_from(abi.align32)? != 0 {
3169 bail!("read pointer not aligned");
3170 }
3171 let size = usize::try_from(abi.size32).unwrap();
3172 lower
3173 .as_slice_mut()
3174 .get_mut(read_address..)
3175 .and_then(|b| b.get_mut(..size * count))
3176 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
3177 let mut ptr = read_address;
3178 for value in values {
3179 value.store(lower, ty, ptr)?;
3180 ptr += size
3181 }
3182 }
3183 }
3184 _ => unreachable!(),
3185 }
3186
3187 Ok(())
3188 }
3189
3190 fn check_bounds(
3191 self,
3192 store: &StoreOpaque,
3193 options: OptionsIndex,
3194 ty: TransmitIndex,
3195 address: usize,
3196 count: usize,
3197 ) -> Result<()> {
3198 let types = self.id().get(store).component().types();
3199 let size = usize::try_from(
3200 match ty {
3201 TransmitIndex::Future(ty) => types[types[ty].ty]
3202 .payload
3203 .map(|ty| types.canonical_abi(&ty).size32),
3204 TransmitIndex::Stream(ty) => types[types[ty].ty]
3205 .payload
3206 .map(|ty| types.canonical_abi(&ty).size32),
3207 }
3208 .unwrap_or(0),
3209 )
3210 .unwrap();
3211
3212 if count > 0 && size > 0 {
3213 self.options_memory(store, options)
3214 .get(address..)
3215 .and_then(|b| b.get(..(size * count)))
3216 .map(drop)
3217 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
3218 } else {
3219 Ok(())
3220 }
3221 }
3222
3223 pub(super) fn guest_write<T: 'static>(
3225 self,
3226 mut store: StoreContextMut<T>,
3227 caller: RuntimeComponentInstanceIndex,
3228 ty: TransmitIndex,
3229 options: OptionsIndex,
3230 flat_abi: Option<FlatAbi>,
3231 handle: u32,
3232 address: u32,
3233 count: u32,
3234 ) -> Result<ReturnCode> {
3235 if !self.options(store.0, options).async_ {
3236 store.0.concurrent_state_mut().check_blocking()?;
3240 }
3241
3242 let address = usize::try_from(address).unwrap();
3243 let count = usize::try_from(count).unwrap();
3244 self.check_bounds(store.0, options, ty, address, count)?;
3245 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3246 let TransmitLocalState::Write { done } = *state else {
3247 bail!(
3248 "invalid handle {handle}; expected `Write`; got {:?}",
3249 *state
3250 );
3251 };
3252
3253 if done {
3254 bail!("cannot write to stream after being notified that the readable end dropped");
3255 }
3256
3257 *state = TransmitLocalState::Busy;
3258 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3259 let concurrent_state = store.0.concurrent_state_mut();
3260 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3261 let transmit = concurrent_state.get_mut(transmit_id)?;
3262 log::trace!(
3263 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3264 transmit.read
3265 );
3266
3267 if transmit.done {
3268 bail!("cannot write to future after previous write succeeded or readable end dropped");
3269 }
3270
3271 let new_state = if let ReadState::Dropped = &transmit.read {
3272 ReadState::Dropped
3273 } else {
3274 ReadState::Open
3275 };
3276
3277 let set_guest_ready = |me: &mut ConcurrentState| {
3278 let transmit = me.get_mut(transmit_id)?;
3279 assert!(
3280 matches!(&transmit.write, WriteState::Open),
3281 "expected `WriteState::Open`; got `{:?}`",
3282 transmit.write
3283 );
3284 transmit.write = WriteState::GuestReady {
3285 instance: self,
3286 caller,
3287 ty,
3288 flat_abi,
3289 options,
3290 address,
3291 count,
3292 handle,
3293 };
3294 Ok::<_, crate::Error>(())
3295 };
3296
3297 let mut result = match mem::replace(&mut transmit.read, new_state) {
3298 ReadState::GuestReady {
3299 ty: read_ty,
3300 flat_abi: read_flat_abi,
3301 options: read_options,
3302 address: read_address,
3303 count: read_count,
3304 handle: read_handle,
3305 instance: read_instance,
3306 caller: read_caller,
3307 } => {
3308 assert_eq!(flat_abi, read_flat_abi);
3309
3310 if let TransmitIndex::Future(_) = ty {
3311 transmit.done = true;
3312 }
3313
3314 let write_complete = count == 0 || read_count > 0;
3336 let read_complete = count > 0;
3337 let read_buffer_remaining = count < read_count;
3338
3339 let read_handle_rep = transmit.read_handle.rep();
3340
3341 let count = count.min(read_count);
3342
3343 self.copy(
3344 store.as_context_mut(),
3345 flat_abi,
3346 caller,
3347 ty,
3348 options,
3349 address,
3350 read_caller,
3351 read_ty,
3352 read_options,
3353 read_address,
3354 count,
3355 rep,
3356 )?;
3357
3358 let instance = self.id().get_mut(store.0);
3359 let types = instance.component().types();
3360 let item_size = payload(ty, types)
3361 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3362 .unwrap_or(0);
3363 let concurrent_state = store.0.concurrent_state_mut();
3364 if read_complete {
3365 let count = u32::try_from(count).unwrap();
3366 let total = if let Some(Event::StreamRead {
3367 code: ReturnCode::Completed(old_total),
3368 ..
3369 }) = concurrent_state.take_event(read_handle_rep)?
3370 {
3371 count + old_total
3372 } else {
3373 count
3374 };
3375
3376 let code = ReturnCode::completed(ty.kind(), total);
3377
3378 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3379 }
3380
3381 if read_buffer_remaining {
3382 let transmit = concurrent_state.get_mut(transmit_id)?;
3383 transmit.read = ReadState::GuestReady {
3384 ty: read_ty,
3385 flat_abi: read_flat_abi,
3386 options: read_options,
3387 address: read_address + (count * item_size),
3388 count: read_count - count,
3389 handle: read_handle,
3390 instance: read_instance,
3391 caller: read_caller,
3392 };
3393 }
3394
3395 if write_complete {
3396 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3397 } else {
3398 set_guest_ready(concurrent_state)?;
3399 ReturnCode::Blocked
3400 }
3401 }
3402
3403 ReadState::HostReady {
3404 consume,
3405 guest_offset,
3406 cancel,
3407 cancel_waker,
3408 } => {
3409 assert!(cancel_waker.is_none());
3410 assert!(!cancel);
3411 assert_eq!(0, guest_offset);
3412
3413 if let TransmitIndex::Future(_) = ty {
3414 transmit.done = true;
3415 }
3416
3417 set_guest_ready(concurrent_state)?;
3418 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3419 }
3420
3421 ReadState::HostToHost { .. } => unreachable!(),
3422
3423 ReadState::Open => {
3424 set_guest_ready(concurrent_state)?;
3425 ReturnCode::Blocked
3426 }
3427
3428 ReadState::Dropped => {
3429 if let TransmitIndex::Future(_) = ty {
3430 transmit.done = true;
3431 }
3432
3433 ReturnCode::Dropped(0)
3434 }
3435 };
3436
3437 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3438 result = self.wait_for_write(store.0, transmit_handle)?;
3439 }
3440
3441 if result != ReturnCode::Blocked {
3442 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3443 TransmitLocalState::Write {
3444 done: matches!(
3445 (result, ty),
3446 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3447 ),
3448 };
3449 }
3450
3451 log::trace!(
3452 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3453 );
3454
3455 Ok(result)
3456 }
3457
3458 pub(super) fn guest_read<T: 'static>(
3460 self,
3461 mut store: StoreContextMut<T>,
3462 caller: RuntimeComponentInstanceIndex,
3463 ty: TransmitIndex,
3464 options: OptionsIndex,
3465 flat_abi: Option<FlatAbi>,
3466 handle: u32,
3467 address: u32,
3468 count: u32,
3469 ) -> Result<ReturnCode> {
3470 if !self.options(store.0, options).async_ {
3471 store.0.concurrent_state_mut().check_blocking()?;
3475 }
3476
3477 let address = usize::try_from(address).unwrap();
3478 let count = usize::try_from(count).unwrap();
3479 self.check_bounds(store.0, options, ty, address, count)?;
3480 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3481 let TransmitLocalState::Read { done } = *state else {
3482 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3483 };
3484
3485 if done {
3486 bail!("cannot read from stream after being notified that the writable end dropped");
3487 }
3488
3489 *state = TransmitLocalState::Busy;
3490 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3491 let concurrent_state = store.0.concurrent_state_mut();
3492 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3493 let transmit = concurrent_state.get_mut(transmit_id)?;
3494 log::trace!(
3495 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3496 transmit.write
3497 );
3498
3499 if transmit.done {
3500 bail!("cannot read from future after previous read succeeded");
3501 }
3502
3503 let new_state = if let WriteState::Dropped = &transmit.write {
3504 WriteState::Dropped
3505 } else {
3506 WriteState::Open
3507 };
3508
3509 let set_guest_ready = |me: &mut ConcurrentState| {
3510 let transmit = me.get_mut(transmit_id)?;
3511 assert!(
3512 matches!(&transmit.read, ReadState::Open),
3513 "expected `ReadState::Open`; got `{:?}`",
3514 transmit.read
3515 );
3516 transmit.read = ReadState::GuestReady {
3517 ty,
3518 flat_abi,
3519 options,
3520 address,
3521 count,
3522 handle,
3523 instance: self,
3524 caller,
3525 };
3526 Ok::<_, crate::Error>(())
3527 };
3528
3529 let mut result = match mem::replace(&mut transmit.write, new_state) {
3530 WriteState::GuestReady {
3531 instance: _,
3532 ty: write_ty,
3533 flat_abi: write_flat_abi,
3534 options: write_options,
3535 address: write_address,
3536 count: write_count,
3537 handle: write_handle,
3538 caller: write_caller,
3539 } => {
3540 assert_eq!(flat_abi, write_flat_abi);
3541
3542 if let TransmitIndex::Future(_) = ty {
3543 transmit.done = true;
3544 }
3545
3546 let write_handle_rep = transmit.write_handle.rep();
3547
3548 let write_complete = write_count == 0 || count > 0;
3553 let read_complete = write_count > 0;
3554 let write_buffer_remaining = count < write_count;
3555
3556 let count = count.min(write_count);
3557
3558 self.copy(
3559 store.as_context_mut(),
3560 flat_abi,
3561 write_caller,
3562 write_ty,
3563 write_options,
3564 write_address,
3565 caller,
3566 ty,
3567 options,
3568 address,
3569 count,
3570 rep,
3571 )?;
3572
3573 let instance = self.id().get_mut(store.0);
3574 let types = instance.component().types();
3575 let item_size = payload(ty, types)
3576 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3577 .unwrap_or(0);
3578 let concurrent_state = store.0.concurrent_state_mut();
3579
3580 if write_complete {
3581 let count = u32::try_from(count).unwrap();
3582 let total = if let Some(Event::StreamWrite {
3583 code: ReturnCode::Completed(old_total),
3584 ..
3585 }) = concurrent_state.take_event(write_handle_rep)?
3586 {
3587 count + old_total
3588 } else {
3589 count
3590 };
3591
3592 let code = ReturnCode::completed(ty.kind(), total);
3593
3594 concurrent_state.send_write_result(
3595 write_ty,
3596 transmit_id,
3597 write_handle,
3598 code,
3599 )?;
3600 }
3601
3602 if write_buffer_remaining {
3603 let transmit = concurrent_state.get_mut(transmit_id)?;
3604 transmit.write = WriteState::GuestReady {
3605 instance: self,
3606 caller: write_caller,
3607 ty: write_ty,
3608 flat_abi: write_flat_abi,
3609 options: write_options,
3610 address: write_address + (count * item_size),
3611 count: write_count - count,
3612 handle: write_handle,
3613 };
3614 }
3615
3616 if read_complete {
3617 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3618 } else {
3619 set_guest_ready(concurrent_state)?;
3620 ReturnCode::Blocked
3621 }
3622 }
3623
3624 WriteState::HostReady {
3625 produce,
3626 try_into,
3627 guest_offset,
3628 cancel,
3629 cancel_waker,
3630 } => {
3631 assert!(cancel_waker.is_none());
3632 assert!(!cancel);
3633 assert_eq!(0, guest_offset);
3634
3635 set_guest_ready(concurrent_state)?;
3636
3637 let code =
3638 self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3639
3640 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3641 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3642 }
3643
3644 code
3645 }
3646
3647 WriteState::Open => {
3648 set_guest_ready(concurrent_state)?;
3649 ReturnCode::Blocked
3650 }
3651
3652 WriteState::Dropped => ReturnCode::Dropped(0),
3653 };
3654
3655 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3656 result = self.wait_for_read(store.0, transmit_handle)?;
3657 }
3658
3659 if result != ReturnCode::Blocked {
3660 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3661 TransmitLocalState::Read {
3662 done: matches!(
3663 (result, ty),
3664 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3665 ),
3666 };
3667 }
3668
3669 log::trace!(
3670 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3671 );
3672
3673 Ok(result)
3674 }
3675
3676 fn wait_for_write(
3677 self,
3678 store: &mut StoreOpaque,
3679 handle: TableId<TransmitHandle>,
3680 ) -> Result<ReturnCode> {
3681 let waitable = Waitable::Transmit(handle);
3682 store.wait_for_event(waitable)?;
3683 let event = waitable.take_event(store.concurrent_state_mut())?;
3684 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3685 event
3686 {
3687 waitable.on_delivery(store, self, event);
3688 Ok(code)
3689 } else {
3690 unreachable!()
3691 }
3692 }
3693
3694 fn cancel_write(
3696 self,
3697 store: &mut StoreOpaque,
3698 transmit_id: TableId<TransmitState>,
3699 async_: bool,
3700 ) -> Result<ReturnCode> {
3701 let state = store.concurrent_state_mut();
3702 let transmit = state.get_mut(transmit_id)?;
3703 log::trace!(
3704 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3705 transmit.read,
3706 transmit.write
3707 );
3708
3709 let code = if let Some(event) =
3710 Waitable::Transmit(transmit.write_handle).take_event(state)?
3711 {
3712 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3713 unreachable!();
3714 };
3715 match (code, event) {
3716 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3717 ReturnCode::Cancelled(count)
3718 }
3719 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3720 _ => unreachable!(),
3721 }
3722 } else if let ReadState::HostReady {
3723 cancel,
3724 cancel_waker,
3725 ..
3726 } = &mut state.get_mut(transmit_id)?.read
3727 {
3728 *cancel = true;
3729 if let Some(waker) = cancel_waker.take() {
3730 waker.wake();
3731 }
3732
3733 if async_ {
3734 ReturnCode::Blocked
3735 } else {
3736 let handle = store
3737 .concurrent_state_mut()
3738 .get_mut(transmit_id)?
3739 .write_handle;
3740 self.wait_for_write(store, handle)?
3741 }
3742 } else {
3743 ReturnCode::Cancelled(0)
3744 };
3745
3746 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3747
3748 match &transmit.write {
3749 WriteState::GuestReady { .. } => {
3750 transmit.write = WriteState::Open;
3751 }
3752 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3753 WriteState::Open | WriteState::Dropped => {}
3754 }
3755
3756 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3757
3758 Ok(code)
3759 }
3760
3761 fn wait_for_read(
3762 self,
3763 store: &mut StoreOpaque,
3764 handle: TableId<TransmitHandle>,
3765 ) -> Result<ReturnCode> {
3766 let waitable = Waitable::Transmit(handle);
3767 store.wait_for_event(waitable)?;
3768 let event = waitable.take_event(store.concurrent_state_mut())?;
3769 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3770 event
3771 {
3772 waitable.on_delivery(store, self, event);
3773 Ok(code)
3774 } else {
3775 unreachable!()
3776 }
3777 }
3778
3779 fn cancel_read(
3781 self,
3782 store: &mut StoreOpaque,
3783 transmit_id: TableId<TransmitState>,
3784 async_: bool,
3785 ) -> Result<ReturnCode> {
3786 let state = store.concurrent_state_mut();
3787 let transmit = state.get_mut(transmit_id)?;
3788 log::trace!(
3789 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3790 transmit.read,
3791 transmit.write
3792 );
3793
3794 let code = if let Some(event) =
3795 Waitable::Transmit(transmit.read_handle).take_event(state)?
3796 {
3797 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3798 unreachable!();
3799 };
3800 match (code, event) {
3801 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3802 ReturnCode::Cancelled(count)
3803 }
3804 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3805 _ => unreachable!(),
3806 }
3807 } else if let WriteState::HostReady {
3808 cancel,
3809 cancel_waker,
3810 ..
3811 } = &mut state.get_mut(transmit_id)?.write
3812 {
3813 *cancel = true;
3814 if let Some(waker) = cancel_waker.take() {
3815 waker.wake();
3816 }
3817
3818 if async_ {
3819 ReturnCode::Blocked
3820 } else {
3821 let handle = store
3822 .concurrent_state_mut()
3823 .get_mut(transmit_id)?
3824 .read_handle;
3825 self.wait_for_read(store, handle)?
3826 }
3827 } else {
3828 ReturnCode::Cancelled(0)
3829 };
3830
3831 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3832
3833 match &transmit.read {
3834 ReadState::GuestReady { .. } => {
3835 transmit.read = ReadState::Open;
3836 }
3837 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3838 todo!("support host read cancellation")
3839 }
3840 ReadState::Open | ReadState::Dropped => {}
3841 }
3842
3843 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3844
3845 Ok(code)
3846 }
3847
3848 fn guest_cancel_write(
3850 self,
3851 store: &mut StoreOpaque,
3852 ty: TransmitIndex,
3853 async_: bool,
3854 writer: u32,
3855 ) -> Result<ReturnCode> {
3856 if !async_ {
3857 store.concurrent_state_mut().check_blocking()?;
3861 }
3862
3863 let (rep, state) =
3864 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3865 let id = TableId::<TransmitHandle>::new(rep);
3866 log::trace!("guest cancel write {id:?} (handle {writer})");
3867 match state {
3868 TransmitLocalState::Write { .. } => {
3869 bail!("stream or future write cancelled when no write is pending")
3870 }
3871 TransmitLocalState::Read { .. } => {
3872 bail!("passed read end to `{{stream|future}}.cancel-write`")
3873 }
3874 TransmitLocalState::Busy => {}
3875 }
3876 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3877 let code = self.cancel_write(store, transmit_id, async_)?;
3878 if !matches!(code, ReturnCode::Blocked) {
3879 let state =
3880 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3881 .1;
3882 if let TransmitLocalState::Busy = state {
3883 *state = TransmitLocalState::Write { done: false };
3884 }
3885 }
3886 Ok(code)
3887 }
3888
3889 fn guest_cancel_read(
3891 self,
3892 store: &mut StoreOpaque,
3893 ty: TransmitIndex,
3894 async_: bool,
3895 reader: u32,
3896 ) -> Result<ReturnCode> {
3897 if !async_ {
3898 store.concurrent_state_mut().check_blocking()?;
3902 }
3903
3904 let (rep, state) =
3905 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3906 let id = TableId::<TransmitHandle>::new(rep);
3907 log::trace!("guest cancel read {id:?} (handle {reader})");
3908 match state {
3909 TransmitLocalState::Read { .. } => {
3910 bail!("stream or future read cancelled when no read is pending")
3911 }
3912 TransmitLocalState::Write { .. } => {
3913 bail!("passed write end to `{{stream|future}}.cancel-read`")
3914 }
3915 TransmitLocalState::Busy => {}
3916 }
3917 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3918 let code = self.cancel_read(store, transmit_id, async_)?;
3919 if !matches!(code, ReturnCode::Blocked) {
3920 let state =
3921 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3922 .1;
3923 if let TransmitLocalState::Busy = state {
3924 *state = TransmitLocalState::Read { done: false };
3925 }
3926 }
3927 Ok(code)
3928 }
3929
3930 fn guest_drop_readable(
3932 self,
3933 store: &mut StoreOpaque,
3934 ty: TransmitIndex,
3935 reader: u32,
3936 ) -> Result<()> {
3937 let table = self.id().get_mut(store).table_for_transmit(ty);
3938 let (rep, _is_done) = match ty {
3939 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3940 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3941 };
3942 let kind = match ty {
3943 TransmitIndex::Stream(_) => TransmitKind::Stream,
3944 TransmitIndex::Future(_) => TransmitKind::Future,
3945 };
3946 let id = TableId::<TransmitHandle>::new(rep);
3947 log::trace!("guest_drop_readable: drop reader {id:?}");
3948 store.host_drop_reader(id, kind)
3949 }
3950
3951 pub(crate) fn error_context_new(
3953 self,
3954 store: &mut StoreOpaque,
3955 caller: RuntimeComponentInstanceIndex,
3956 ty: TypeComponentLocalErrorContextTableIndex,
3957 options: OptionsIndex,
3958 debug_msg_address: u32,
3959 debug_msg_len: u32,
3960 ) -> Result<u32> {
3961 self.id().get(store).check_may_leave(caller)?;
3962 let lift_ctx = &mut LiftContext::new(store, options, self);
3963 let debug_msg = String::linear_lift_from_flat(
3964 lift_ctx,
3965 InterfaceType::String,
3966 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
3967 )?;
3968
3969 let err_ctx = ErrorContextState { debug_msg };
3971 let state = store.concurrent_state_mut();
3972 let table_id = state.push(err_ctx)?;
3973 let global_ref_count_idx =
3974 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3975
3976 let _ = state
3978 .global_error_context_ref_counts
3979 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3980
3981 let local_idx = self
3988 .id()
3989 .get_mut(store)
3990 .table_for_error_context(ty)
3991 .error_context_insert(table_id.rep())?;
3992
3993 Ok(local_idx)
3994 }
3995
3996 pub(super) fn error_context_debug_message<T>(
3998 self,
3999 store: StoreContextMut<T>,
4000 ty: TypeComponentLocalErrorContextTableIndex,
4001 options: OptionsIndex,
4002 err_ctx_handle: u32,
4003 debug_msg_address: u32,
4004 ) -> Result<()> {
4005 let handle_table_id_rep = self
4007 .id()
4008 .get_mut(store.0)
4009 .table_for_error_context(ty)
4010 .error_context_rep(err_ctx_handle)?;
4011
4012 let state = store.0.concurrent_state_mut();
4013 let ErrorContextState { debug_msg } =
4015 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4016 let debug_msg = debug_msg.clone();
4017
4018 let lower_cx = &mut LowerContext::new(store, options, self);
4019 let debug_msg_address = usize::try_from(debug_msg_address)?;
4020 let offset = lower_cx
4022 .as_slice_mut()
4023 .get(debug_msg_address..)
4024 .and_then(|b| b.get(..debug_msg.bytes().len()))
4025 .map(|_| debug_msg_address)
4026 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
4027 debug_msg
4028 .as_str()
4029 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4030
4031 Ok(())
4032 }
4033
4034 pub(crate) fn future_cancel_read(
4036 self,
4037 store: &mut StoreOpaque,
4038 caller: RuntimeComponentInstanceIndex,
4039 ty: TypeFutureTableIndex,
4040 async_: bool,
4041 reader: u32,
4042 ) -> Result<u32> {
4043 self.id().get(store).check_may_leave(caller)?;
4044 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4045 .map(|v| v.encode())
4046 }
4047
4048 pub(crate) fn future_cancel_write(
4050 self,
4051 store: &mut StoreOpaque,
4052 caller: RuntimeComponentInstanceIndex,
4053 ty: TypeFutureTableIndex,
4054 async_: bool,
4055 writer: u32,
4056 ) -> Result<u32> {
4057 self.id().get(store).check_may_leave(caller)?;
4058 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4059 .map(|v| v.encode())
4060 }
4061
4062 pub(crate) fn stream_cancel_read(
4064 self,
4065 store: &mut StoreOpaque,
4066 caller: RuntimeComponentInstanceIndex,
4067 ty: TypeStreamTableIndex,
4068 async_: bool,
4069 reader: u32,
4070 ) -> Result<u32> {
4071 self.id().get(store).check_may_leave(caller)?;
4072 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4073 .map(|v| v.encode())
4074 }
4075
4076 pub(crate) fn stream_cancel_write(
4078 self,
4079 store: &mut StoreOpaque,
4080 caller: RuntimeComponentInstanceIndex,
4081 ty: TypeStreamTableIndex,
4082 async_: bool,
4083 writer: u32,
4084 ) -> Result<u32> {
4085 self.id().get(store).check_may_leave(caller)?;
4086 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4087 .map(|v| v.encode())
4088 }
4089
4090 pub(crate) fn future_drop_readable(
4092 self,
4093 store: &mut StoreOpaque,
4094 caller: RuntimeComponentInstanceIndex,
4095 ty: TypeFutureTableIndex,
4096 reader: u32,
4097 ) -> Result<()> {
4098 self.id().get(store).check_may_leave(caller)?;
4099 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4100 }
4101
4102 pub(crate) fn stream_drop_readable(
4104 self,
4105 store: &mut StoreOpaque,
4106 caller: RuntimeComponentInstanceIndex,
4107 ty: TypeStreamTableIndex,
4108 reader: u32,
4109 ) -> Result<()> {
4110 self.id().get(store).check_may_leave(caller)?;
4111 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4112 }
4113
4114 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4118 let (write, read) = store
4119 .concurrent_state_mut()
4120 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4121
4122 let table = self.id().get_mut(store).table_for_transmit(ty);
4123 let (read_handle, write_handle) = match ty {
4124 TransmitIndex::Future(ty) => (
4125 table.future_insert_read(ty, read.rep())?,
4126 table.future_insert_write(ty, write.rep())?,
4127 ),
4128 TransmitIndex::Stream(ty) => (
4129 table.stream_insert_read(ty, read.rep())?,
4130 table.stream_insert_write(ty, write.rep())?,
4131 ),
4132 };
4133
4134 let state = store.concurrent_state_mut();
4135 state.get_mut(read)?.common.handle = Some(read_handle);
4136 state.get_mut(write)?.common.handle = Some(write_handle);
4137
4138 Ok(ResourcePair {
4139 write: write_handle,
4140 read: read_handle,
4141 })
4142 }
4143
4144 pub(crate) fn error_context_drop(
4146 self,
4147 store: &mut StoreOpaque,
4148 caller: RuntimeComponentInstanceIndex,
4149 ty: TypeComponentLocalErrorContextTableIndex,
4150 error_context: u32,
4151 ) -> Result<()> {
4152 let instance = self.id().get_mut(store);
4153 instance.check_may_leave(caller)?;
4154
4155 let local_handle_table = instance.table_for_error_context(ty);
4156
4157 let rep = local_handle_table.error_context_drop(error_context)?;
4158
4159 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4160
4161 let state = store.concurrent_state_mut();
4162 let GlobalErrorContextRefCount(global_ref_count) = state
4163 .global_error_context_ref_counts
4164 .get_mut(&global_ref_count_idx)
4165 .expect("retrieve concurrent state for error context during drop");
4166
4167 assert!(*global_ref_count >= 1);
4169 *global_ref_count -= 1;
4170 if *global_ref_count == 0 {
4171 state
4172 .global_error_context_ref_counts
4173 .remove(&global_ref_count_idx);
4174
4175 state
4176 .delete(TableId::<ErrorContextState>::new(rep))
4177 .context("deleting component-global error context data")?;
4178 }
4179
4180 Ok(())
4181 }
4182
4183 fn guest_transfer(
4186 self,
4187 store: &mut StoreOpaque,
4188 src_idx: u32,
4189 src: TransmitIndex,
4190 dst: TransmitIndex,
4191 ) -> Result<u32> {
4192 let mut instance = self.id().get_mut(store);
4193 let src_table = instance.as_mut().table_for_transmit(src);
4194 let (rep, is_done) = match src {
4195 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4196 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4197 };
4198 if is_done {
4199 bail!("cannot lift after being notified that the writable end dropped");
4200 }
4201 let dst_table = instance.table_for_transmit(dst);
4202 let handle = match dst {
4203 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4204 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4205 }?;
4206 store
4207 .concurrent_state_mut()
4208 .get_mut(TableId::<TransmitHandle>::new(rep))?
4209 .common
4210 .handle = Some(handle);
4211 Ok(handle)
4212 }
4213
4214 pub(crate) fn future_new(
4216 self,
4217 store: &mut StoreOpaque,
4218 caller: RuntimeComponentInstanceIndex,
4219 ty: TypeFutureTableIndex,
4220 ) -> Result<ResourcePair> {
4221 self.id().get(store).check_may_leave(caller)?;
4222 self.guest_new(store, TransmitIndex::Future(ty))
4223 }
4224
4225 pub(crate) fn stream_new(
4227 self,
4228 store: &mut StoreOpaque,
4229 caller: RuntimeComponentInstanceIndex,
4230 ty: TypeStreamTableIndex,
4231 ) -> Result<ResourcePair> {
4232 self.id().get(store).check_may_leave(caller)?;
4233 self.guest_new(store, TransmitIndex::Stream(ty))
4234 }
4235
4236 pub(crate) fn future_transfer(
4239 self,
4240 store: &mut StoreOpaque,
4241 src_idx: u32,
4242 src: TypeFutureTableIndex,
4243 dst: TypeFutureTableIndex,
4244 ) -> Result<u32> {
4245 self.guest_transfer(
4246 store,
4247 src_idx,
4248 TransmitIndex::Future(src),
4249 TransmitIndex::Future(dst),
4250 )
4251 }
4252
4253 pub(crate) fn stream_transfer(
4256 self,
4257 store: &mut StoreOpaque,
4258 src_idx: u32,
4259 src: TypeStreamTableIndex,
4260 dst: TypeStreamTableIndex,
4261 ) -> Result<u32> {
4262 self.guest_transfer(
4263 store,
4264 src_idx,
4265 TransmitIndex::Stream(src),
4266 TransmitIndex::Stream(dst),
4267 )
4268 }
4269
4270 pub(crate) fn error_context_transfer(
4272 self,
4273 store: &mut StoreOpaque,
4274 src_idx: u32,
4275 src: TypeComponentLocalErrorContextTableIndex,
4276 dst: TypeComponentLocalErrorContextTableIndex,
4277 ) -> Result<u32> {
4278 let mut instance = self.id().get_mut(store);
4279 let rep = instance
4280 .as_mut()
4281 .table_for_error_context(src)
4282 .error_context_rep(src_idx)?;
4283 let dst_idx = instance
4284 .table_for_error_context(dst)
4285 .error_context_insert(rep)?;
4286
4287 let global_ref_count = store
4291 .concurrent_state_mut()
4292 .global_error_context_ref_counts
4293 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4294 .context("global ref count present for existing (sub)component error context")?;
4295 global_ref_count.0 += 1;
4296
4297 Ok(dst_idx)
4298 }
4299}
4300
4301impl ComponentInstance {
4302 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4303 let (states, types) = self.instance_states();
4304 let runtime_instance = match ty {
4305 TransmitIndex::Stream(ty) => types[ty].instance,
4306 TransmitIndex::Future(ty) => types[ty].instance,
4307 };
4308 states[runtime_instance].handle_table()
4309 }
4310
4311 fn table_for_error_context(
4312 self: Pin<&mut Self>,
4313 ty: TypeComponentLocalErrorContextTableIndex,
4314 ) -> &mut HandleTable {
4315 let (states, types) = self.instance_states();
4316 let runtime_instance = types[ty].instance;
4317 states[runtime_instance].handle_table()
4318 }
4319
4320 fn get_mut_by_index(
4321 self: Pin<&mut Self>,
4322 ty: TransmitIndex,
4323 index: u32,
4324 ) -> Result<(u32, &mut TransmitLocalState)> {
4325 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4326 }
4327}
4328
4329impl ConcurrentState {
4330 fn send_write_result(
4331 &mut self,
4332 ty: TransmitIndex,
4333 id: TableId<TransmitState>,
4334 handle: u32,
4335 code: ReturnCode,
4336 ) -> Result<()> {
4337 let write_handle = self.get_mut(id)?.write_handle.rep();
4338 self.set_event(
4339 write_handle,
4340 match ty {
4341 TransmitIndex::Future(ty) => Event::FutureWrite {
4342 code,
4343 pending: Some((ty, handle)),
4344 },
4345 TransmitIndex::Stream(ty) => Event::StreamWrite {
4346 code,
4347 pending: Some((ty, handle)),
4348 },
4349 },
4350 )
4351 }
4352
4353 fn send_read_result(
4354 &mut self,
4355 ty: TransmitIndex,
4356 id: TableId<TransmitState>,
4357 handle: u32,
4358 code: ReturnCode,
4359 ) -> Result<()> {
4360 let read_handle = self.get_mut(id)?.read_handle.rep();
4361 self.set_event(
4362 read_handle,
4363 match ty {
4364 TransmitIndex::Future(ty) => Event::FutureRead {
4365 code,
4366 pending: Some((ty, handle)),
4367 },
4368 TransmitIndex::Stream(ty) => Event::StreamRead {
4369 code,
4370 pending: Some((ty, handle)),
4371 },
4372 },
4373 )
4374 }
4375
4376 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4377 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4378 }
4379
4380 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4381 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4382 }
4383
4384 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4395 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4396
4397 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4398 let (ReturnCode::Completed(count)
4399 | ReturnCode::Dropped(count)
4400 | ReturnCode::Cancelled(count)) = old
4401 else {
4402 unreachable!()
4403 };
4404
4405 match new {
4406 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4407 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4408 _ => unreachable!(),
4409 }
4410 }
4411
4412 let event = match (waitable.take_event(self)?, event) {
4413 (None, _) => event,
4414 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4415 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4416 (
4417 Some(Event::StreamWrite {
4418 code: old_code,
4419 pending: old_pending,
4420 }),
4421 Event::StreamWrite { code, pending },
4422 ) => Event::StreamWrite {
4423 code: update_code(old_code, code),
4424 pending: old_pending.or(pending),
4425 },
4426 (
4427 Some(Event::StreamRead {
4428 code: old_code,
4429 pending: old_pending,
4430 }),
4431 Event::StreamRead { code, pending },
4432 ) => Event::StreamRead {
4433 code: update_code(old_code, code),
4434 pending: old_pending.or(pending),
4435 },
4436 _ => unreachable!(),
4437 };
4438
4439 waitable.set_event(self, Some(event))
4440 }
4441
4442 fn new_transmit(
4445 &mut self,
4446 origin: TransmitOrigin,
4447 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4448 let state_id = self.push(TransmitState::new(origin))?;
4449
4450 let write = self.push(TransmitHandle::new(state_id))?;
4451 let read = self.push(TransmitHandle::new(state_id))?;
4452
4453 let state = self.get_mut(state_id)?;
4454 state.write_handle = write;
4455 state.read_handle = read;
4456
4457 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4458
4459 Ok((write, read))
4460 }
4461
4462 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4464 let state = self.delete(state_id)?;
4465 self.delete(state.write_handle)?;
4466 self.delete(state.read_handle)?;
4467
4468 log::trace!(
4469 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4470 state.write_handle,
4471 state.read_handle,
4472 );
4473
4474 Ok(())
4475 }
4476}
4477
4478pub(crate) struct ResourcePair {
4479 pub(crate) write: u32,
4480 pub(crate) read: u32,
4481}
4482
4483impl Waitable {
4484 pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4487 match event {
4488 Event::FutureRead {
4489 pending: Some((ty, handle)),
4490 ..
4491 }
4492 | Event::FutureWrite {
4493 pending: Some((ty, handle)),
4494 ..
4495 } => {
4496 let instance = instance.id().get_mut(store);
4497 let runtime_instance = instance.component().types()[ty].instance;
4498 let (rep, state) = instance.instance_states().0[runtime_instance]
4499 .handle_table()
4500 .future_rep(ty, handle)
4501 .unwrap();
4502 assert_eq!(rep, self.rep());
4503 assert_eq!(*state, TransmitLocalState::Busy);
4504 *state = match event {
4505 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4506 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4507 _ => unreachable!(),
4508 };
4509 }
4510 Event::StreamRead {
4511 pending: Some((ty, handle)),
4512 code,
4513 }
4514 | Event::StreamWrite {
4515 pending: Some((ty, handle)),
4516 code,
4517 } => {
4518 let instance = instance.id().get_mut(store);
4519 let runtime_instance = instance.component().types()[ty].instance;
4520 let (rep, state) = instance.instance_states().0[runtime_instance]
4521 .handle_table()
4522 .stream_rep(ty, handle)
4523 .unwrap();
4524 assert_eq!(rep, self.rep());
4525 assert_eq!(*state, TransmitLocalState::Busy);
4526 let done = matches!(code, ReturnCode::Dropped(_));
4527 *state = match event {
4528 Event::StreamRead { .. } => TransmitLocalState::Read { done },
4529 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4530 _ => unreachable!(),
4531 };
4532
4533 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4534 let state = store.concurrent_state_mut();
4535 let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4536 let transmit = state.get_mut(transmit_id).unwrap();
4537
4538 match event {
4539 Event::StreamRead { .. } => {
4540 transmit.read = ReadState::Open;
4541 }
4542 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4543 _ => unreachable!(),
4544 };
4545 }
4546 _ => {}
4547 }
4548 }
4549}
4550
4551fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4555 matches!(
4556 ty,
4557 None | Some(
4558 InterfaceType::S8
4559 | InterfaceType::U8
4560 | InterfaceType::S16
4561 | InterfaceType::U16
4562 | InterfaceType::S32
4563 | InterfaceType::U32
4564 | InterfaceType::S64
4565 | InterfaceType::U64
4566 | InterfaceType::Float32
4567 | InterfaceType::Float64
4568 )
4569 )
4570}
4571
4572#[cfg(test)]
4573mod tests {
4574 use super::*;
4575 use crate::{Engine, Store};
4576 use core::future::pending;
4577 use core::pin::pin;
4578 use std::sync::LazyLock;
4579
4580 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4581
4582 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4583 where
4584 T: FutureProducer<()>,
4585 {
4586 rx.poll_produce(
4587 &mut Context::from_waker(Waker::noop()),
4588 Store::new(&ENGINE, ()).as_context_mut(),
4589 finish,
4590 )
4591 }
4592
4593 #[test]
4594 fn future_producer() {
4595 let mut fut = pin!(async { anyhow::Ok(()) });
4596 assert!(matches!(
4597 poll_future_producer(fut.as_mut(), false),
4598 Poll::Ready(Ok(Some(()))),
4599 ));
4600
4601 let mut fut = pin!(async { anyhow::Ok(()) });
4602 assert!(matches!(
4603 poll_future_producer(fut.as_mut(), true),
4604 Poll::Ready(Ok(Some(()))),
4605 ));
4606
4607 let mut fut = pin!(pending::<Result<()>>());
4608 assert!(matches!(
4609 poll_future_producer(fut.as_mut(), false),
4610 Poll::Pending,
4611 ));
4612 assert!(matches!(
4613 poll_future_producer(fut.as_mut(), true),
4614 Poll::Ready(Ok(None)),
4615 ));
4616
4617 let (tx, rx) = oneshot::channel();
4618 let mut rx = pin!(rx);
4619 assert!(matches!(
4620 poll_future_producer(rx.as_mut(), false),
4621 Poll::Pending,
4622 ));
4623 assert!(matches!(
4624 poll_future_producer(rx.as_mut(), true),
4625 Poll::Ready(Ok(None)),
4626 ));
4627 tx.send(()).unwrap();
4628 assert!(matches!(
4629 poll_future_producer(rx.as_mut(), true),
4630 Poll::Ready(Ok(Some(()))),
4631 ));
4632
4633 let (tx, rx) = oneshot::channel();
4634 let mut rx = pin!(rx);
4635 tx.send(()).unwrap();
4636 assert!(matches!(
4637 poll_future_producer(rx.as_mut(), false),
4638 Poll::Ready(Ok(Some(()))),
4639 ));
4640
4641 let (tx, rx) = oneshot::channel::<()>();
4642 let mut rx = pin!(rx);
4643 drop(tx);
4644 assert!(matches!(
4645 poll_future_producer(rx.as_mut(), false),
4646 Poll::Ready(Err(..)),
4647 ));
4648
4649 let (tx, rx) = oneshot::channel::<()>();
4650 let mut rx = pin!(rx);
4651 drop(tx);
4652 assert!(matches!(
4653 poll_future_producer(rx.as_mut(), true),
4654 Poll::Ready(Err(..)),
4655 ));
4656 }
4657}