1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17 Error, Result, bail,
18 error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, MaybeUninit};
26use core::pin::Pin;
27use core::task::{Context, Poll, Waker, ready};
28use futures::channel::oneshot;
29use futures::{FutureExt as _, stream};
30use std::any::{Any, TypeId};
31use std::boxed::Box;
32use std::io::Cursor;
33use std::string::String;
34use std::sync::{Arc, Mutex};
35use std::vec::Vec;
36use wasmtime_environ::component::{
37 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
38 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
39 TypeFutureTableIndex, TypeStreamTableIndex,
40};
41
42pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
43
44mod buffers;
45
46#[derive(Copy, Clone, Debug)]
49pub enum TransmitKind {
50 Stream,
51 Future,
52}
53
54#[derive(Copy, Clone, Debug, PartialEq)]
56pub enum ReturnCode {
57 Blocked,
58 Completed(u32),
59 Dropped(u32),
60 Cancelled(u32),
61}
62
63impl ReturnCode {
64 pub fn encode(&self) -> u32 {
69 const BLOCKED: u32 = 0xffff_ffff;
70 const COMPLETED: u32 = 0x0;
71 const DROPPED: u32 = 0x1;
72 const CANCELLED: u32 = 0x2;
73 match self {
74 ReturnCode::Blocked => BLOCKED,
75 ReturnCode::Completed(n) => {
76 debug_assert!(*n < (1 << 28));
77 (n << 4) | COMPLETED
78 }
79 ReturnCode::Dropped(n) => {
80 debug_assert!(*n < (1 << 28));
81 (n << 4) | DROPPED
82 }
83 ReturnCode::Cancelled(n) => {
84 debug_assert!(*n < (1 << 28));
85 (n << 4) | CANCELLED
86 }
87 }
88 }
89
90 fn completed(kind: TransmitKind, count: u32) -> Self {
93 Self::Completed(if let TransmitKind::Future = kind {
94 0
95 } else {
96 count
97 })
98 }
99}
100
101#[derive(Copy, Clone, Debug)]
106pub enum TransmitIndex {
107 Stream(TypeStreamTableIndex),
108 Future(TypeFutureTableIndex),
109}
110
111impl TransmitIndex {
112 pub fn kind(&self) -> TransmitKind {
113 match self {
114 TransmitIndex::Stream(_) => TransmitKind::Stream,
115 TransmitIndex::Future(_) => TransmitKind::Future,
116 }
117 }
118}
119
120fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
123 match ty {
124 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
125 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
126 }
127}
128
129fn get_mut_by_index_from(
132 handle_table: &mut HandleTable,
133 ty: TransmitIndex,
134 index: u32,
135) -> Result<(u32, &mut TransmitLocalState)> {
136 match ty {
137 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
138 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
139 }
140}
141
142fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
143 mut store: StoreContextMut<U>,
144 instance: Instance,
145 options: OptionsIndex,
146 ty: TransmitIndex,
147 address: usize,
148 count: usize,
149 buffer: &mut B,
150) -> Result<()> {
151 let count = buffer.remaining().len().min(count);
152
153 let lower = &mut if T::MAY_REQUIRE_REALLOC {
154 LowerContext::new
155 } else {
156 LowerContext::new_without_realloc
157 }(store.as_context_mut(), options, instance);
158
159 if address % usize::try_from(T::ALIGN32)? != 0 {
160 bail!("read pointer not aligned");
161 }
162 lower
163 .as_slice_mut()
164 .get_mut(address..)
165 .and_then(|b| b.get_mut(..T::SIZE32 * count))
166 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
167
168 if let Some(ty) = payload(ty, lower.types) {
169 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
170 }
171
172 buffer.skip(count);
173
174 Ok(())
175}
176
177fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
178 lift: &mut LiftContext<'_>,
179 ty: Option<InterfaceType>,
180 buffer: &mut B,
181 address: usize,
182 count: usize,
183) -> Result<()> {
184 let count = count.min(buffer.remaining_capacity());
185 if T::IS_RUST_UNIT_TYPE {
186 buffer.extend(
190 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
191 )
192 } else {
193 let ty = ty.unwrap();
194 if address % usize::try_from(T::ALIGN32)? != 0 {
195 bail!("write pointer not aligned");
196 }
197 lift.memory()
198 .get(address..)
199 .and_then(|b| b.get(..T::SIZE32 * count))
200 .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
201
202 let list = &WasmList::new(address, count, lift, ty)?;
203 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
204 }
205 Ok(())
206}
207
208#[derive(Debug, PartialEq, Eq, PartialOrd)]
210pub(super) struct ErrorContextState {
211 pub(crate) debug_msg: String,
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub(super) struct FlatAbi {
219 pub(super) size: u32,
220 pub(super) align: u32,
221}
222
223pub struct Destination<'a, T, B> {
225 id: TableId<TransmitState>,
226 buffer: &'a mut B,
227 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
228 _phantom: PhantomData<fn() -> T>,
229}
230
231impl<'a, T, B> Destination<'a, T, B> {
232 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
234 Destination {
235 id: self.id,
236 buffer: &mut *self.buffer,
237 host_buffer: self.host_buffer.as_deref_mut(),
238 _phantom: PhantomData,
239 }
240 }
241
242 pub fn take_buffer(&mut self) -> B
248 where
249 B: Default,
250 {
251 mem::take(self.buffer)
252 }
253
254 pub fn set_buffer(&mut self, buffer: B) {
264 *self.buffer = buffer;
265 }
266
267 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
284 let transmit = store
285 .as_context_mut()
286 .0
287 .concurrent_state_mut()
288 .get_mut(self.id)
289 .unwrap();
290
291 if let &ReadState::GuestReady { count, .. } = &transmit.read {
292 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
293 unreachable!()
294 };
295
296 Some(count - guest_offset)
297 } else {
298 None
299 }
300 }
301}
302
303impl<'a, B> Destination<'a, u8, B> {
304 pub fn as_direct<D>(
315 mut self,
316 store: StoreContextMut<'a, D>,
317 capacity: usize,
318 ) -> DirectDestination<'a, D> {
319 if let Some(buffer) = self.host_buffer.as_deref_mut() {
320 buffer.set_position(0);
321 if buffer.get_mut().is_empty() {
322 buffer.get_mut().resize(capacity, 0);
323 }
324 }
325
326 DirectDestination {
327 id: self.id,
328 host_buffer: self.host_buffer,
329 store,
330 }
331 }
332}
333
334pub struct DirectDestination<'a, D: 'static> {
337 id: TableId<TransmitState>,
338 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
339 store: StoreContextMut<'a, D>,
340}
341
342impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
343 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
344 let rem = self.remaining();
345 let n = rem.len().min(buf.len());
346 rem[..n].copy_from_slice(&buf[..n]);
347 self.mark_written(n);
348 Ok(n)
349 }
350
351 fn flush(&mut self) -> std::io::Result<()> {
352 Ok(())
353 }
354}
355
356impl<D: 'static> DirectDestination<'_, D> {
357 pub fn remaining(&mut self) -> &mut [u8] {
359 if let Some(buffer) = self.host_buffer.as_deref_mut() {
360 buffer.get_mut()
361 } else {
362 let transmit = self
363 .store
364 .as_context_mut()
365 .0
366 .concurrent_state_mut()
367 .get_mut(self.id)
368 .unwrap();
369
370 let &ReadState::GuestReady {
371 address,
372 count,
373 options,
374 instance,
375 ..
376 } = &transmit.read
377 else {
378 unreachable!();
379 };
380
381 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
382 unreachable!()
383 };
384
385 instance
386 .options_memory_mut(self.store.0, options)
387 .get_mut((address + guest_offset)..)
388 .and_then(|b| b.get_mut(..(count - guest_offset)))
389 .unwrap()
390 }
391 }
392
393 pub fn mark_written(&mut self, count: usize) {
398 if let Some(buffer) = self.host_buffer.as_deref_mut() {
399 buffer.set_position(
400 buffer
401 .position()
402 .checked_add(u64::try_from(count).unwrap())
403 .unwrap(),
404 );
405 } else {
406 let transmit = self
407 .store
408 .as_context_mut()
409 .0
410 .concurrent_state_mut()
411 .get_mut(self.id)
412 .unwrap();
413
414 let ReadState::GuestReady {
415 count: read_count, ..
416 } = &transmit.read
417 else {
418 unreachable!();
419 };
420
421 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
422 unreachable!()
423 };
424
425 if *guest_offset + count > *read_count {
426 panic!(
427 "write count ({count}) must be less than or equal to read count ({read_count})"
428 )
429 } else {
430 *guest_offset += count;
431 }
432 }
433 }
434}
435
436#[derive(Copy, Clone, Debug)]
438pub enum StreamResult {
439 Completed,
442 Cancelled,
447 Dropped,
450}
451
452pub trait StreamProducer<D>: Send + 'static {
454 type Item;
456
457 type Buffer: WriteBuffer<Self::Item> + Default;
459
460 fn poll_produce<'a>(
596 self: Pin<&mut Self>,
597 cx: &mut Context<'_>,
598 store: StoreContextMut<'a, D>,
599 destination: Destination<'a, Self::Item, Self::Buffer>,
600 finish: bool,
601 ) -> Poll<Result<StreamResult>>;
602
603 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
609 Err(me)
610 }
611}
612
613impl<T, D> StreamProducer<D> for iter::Empty<T>
614where
615 T: Send + Sync + 'static,
616{
617 type Item = T;
618 type Buffer = Option<Self::Item>;
619
620 fn poll_produce<'a>(
621 self: Pin<&mut Self>,
622 _: &mut Context<'_>,
623 _: StoreContextMut<'a, D>,
624 _: Destination<'a, Self::Item, Self::Buffer>,
625 _: bool,
626 ) -> Poll<Result<StreamResult>> {
627 Poll::Ready(Ok(StreamResult::Dropped))
628 }
629}
630
631impl<T, D> StreamProducer<D> for stream::Empty<T>
632where
633 T: Send + Sync + 'static,
634{
635 type Item = T;
636 type Buffer = Option<Self::Item>;
637
638 fn poll_produce<'a>(
639 self: Pin<&mut Self>,
640 _: &mut Context<'_>,
641 _: StoreContextMut<'a, D>,
642 _: Destination<'a, Self::Item, Self::Buffer>,
643 _: bool,
644 ) -> Poll<Result<StreamResult>> {
645 Poll::Ready(Ok(StreamResult::Dropped))
646 }
647}
648
649impl<T, D> StreamProducer<D> for Vec<T>
650where
651 T: Unpin + Send + Sync + 'static,
652{
653 type Item = T;
654 type Buffer = VecBuffer<T>;
655
656 fn poll_produce<'a>(
657 self: Pin<&mut Self>,
658 _: &mut Context<'_>,
659 _: StoreContextMut<'a, D>,
660 mut dst: Destination<'a, Self::Item, Self::Buffer>,
661 _: bool,
662 ) -> Poll<Result<StreamResult>> {
663 dst.set_buffer(mem::take(self.get_mut()).into());
664 Poll::Ready(Ok(StreamResult::Dropped))
665 }
666}
667
668impl<T, D> StreamProducer<D> for Box<[T]>
669where
670 T: Unpin + Send + Sync + 'static,
671{
672 type Item = T;
673 type Buffer = VecBuffer<T>;
674
675 fn poll_produce<'a>(
676 self: Pin<&mut Self>,
677 _: &mut Context<'_>,
678 _: StoreContextMut<'a, D>,
679 mut dst: Destination<'a, Self::Item, Self::Buffer>,
680 _: bool,
681 ) -> Poll<Result<StreamResult>> {
682 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
683 Poll::Ready(Ok(StreamResult::Dropped))
684 }
685}
686
687#[cfg(feature = "component-model-async-bytes")]
688impl<D> StreamProducer<D> for bytes::Bytes {
689 type Item = u8;
690 type Buffer = Cursor<Self>;
691
692 fn poll_produce<'a>(
693 mut self: Pin<&mut Self>,
694 _: &mut Context<'_>,
695 mut store: StoreContextMut<'a, D>,
696 mut dst: Destination<'a, Self::Item, Self::Buffer>,
697 _: bool,
698 ) -> Poll<Result<StreamResult>> {
699 let cap = dst.remaining(&mut store);
700 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
701 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
703 return Poll::Ready(Ok(StreamResult::Dropped));
704 };
705 let cap = cap.into();
706 dst.set_buffer(Cursor::new(self.split_off(cap)));
708 let mut dst = dst.as_direct(store, cap);
709 dst.remaining().copy_from_slice(&self);
710 dst.mark_written(cap);
711 Poll::Ready(Ok(StreamResult::Dropped))
712 }
713}
714
715#[cfg(feature = "component-model-async-bytes")]
716impl<D> StreamProducer<D> for bytes::BytesMut {
717 type Item = u8;
718 type Buffer = Cursor<Self>;
719
720 fn poll_produce<'a>(
721 mut self: Pin<&mut Self>,
722 _: &mut Context<'_>,
723 mut store: StoreContextMut<'a, D>,
724 mut dst: Destination<'a, Self::Item, Self::Buffer>,
725 _: bool,
726 ) -> Poll<Result<StreamResult>> {
727 let cap = dst.remaining(&mut store);
728 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
729 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
731 return Poll::Ready(Ok(StreamResult::Dropped));
732 };
733 let cap = cap.into();
734 dst.set_buffer(Cursor::new(self.split_off(cap)));
736 let mut dst = dst.as_direct(store, cap);
737 dst.remaining().copy_from_slice(&self);
738 dst.mark_written(cap);
739 Poll::Ready(Ok(StreamResult::Dropped))
740 }
741}
742
743pub struct Source<'a, T> {
745 id: TableId<TransmitState>,
746 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
747}
748
749impl<'a, T> Source<'a, T> {
750 pub fn reborrow(&mut self) -> Source<'_, T> {
752 Source {
753 id: self.id,
754 host_buffer: self.host_buffer.as_deref_mut(),
755 }
756 }
757
758 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
760 where
761 T: func::Lift + 'static,
762 B: ReadBuffer<T>,
763 {
764 if let Some(input) = &mut self.host_buffer {
765 let count = input.remaining().len().min(buffer.remaining_capacity());
766 buffer.move_from(*input, count);
767 } else {
768 let store = store.as_context_mut();
769 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
770
771 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
772 unreachable!();
773 };
774
775 let &WriteState::GuestReady {
776 ty,
777 address,
778 count,
779 options,
780 instance,
781 ..
782 } = &transmit.write
783 else {
784 unreachable!()
785 };
786
787 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
788 let ty = payload(ty, cx.types);
789 let old_remaining = buffer.remaining_capacity();
790 lift::<T, B>(
791 cx,
792 ty,
793 buffer,
794 address + (T::SIZE32 * guest_offset),
795 count - guest_offset,
796 )?;
797
798 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
799
800 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
801 unreachable!();
802 };
803
804 *guest_offset += old_remaining - buffer.remaining_capacity();
805 }
806
807 Ok(())
808 }
809
810 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
813 where
814 T: 'static,
815 {
816 let transmit = store
817 .as_context_mut()
818 .0
819 .concurrent_state_mut()
820 .get_mut(self.id)
821 .unwrap();
822
823 if let &WriteState::GuestReady { count, .. } = &transmit.write {
824 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
825 unreachable!()
826 };
827
828 count - guest_offset
829 } else if let Some(host_buffer) = &self.host_buffer {
830 host_buffer.remaining().len()
831 } else {
832 unreachable!()
833 }
834 }
835}
836
837impl<'a> Source<'a, u8> {
838 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
840 DirectSource {
841 id: self.id,
842 host_buffer: self.host_buffer,
843 store,
844 }
845 }
846}
847
848pub struct DirectSource<'a, D: 'static> {
851 id: TableId<TransmitState>,
852 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
853 store: StoreContextMut<'a, D>,
854}
855
856impl<D: 'static> std::io::Read for DirectSource<'_, D> {
857 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
858 let rem = self.remaining();
859 let n = rem.len().min(buf.len());
860 buf[..n].copy_from_slice(&rem[..n]);
861 self.mark_read(n);
862 Ok(n)
863 }
864}
865
866impl<D: 'static> DirectSource<'_, D> {
867 pub fn remaining(&mut self) -> &[u8] {
869 if let Some(buffer) = self.host_buffer.as_deref_mut() {
870 buffer.remaining()
871 } else {
872 let transmit = self
873 .store
874 .as_context_mut()
875 .0
876 .concurrent_state_mut()
877 .get_mut(self.id)
878 .unwrap();
879
880 let &WriteState::GuestReady {
881 address,
882 count,
883 options,
884 instance,
885 ..
886 } = &transmit.write
887 else {
888 unreachable!()
889 };
890
891 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
892 unreachable!()
893 };
894
895 instance
896 .options_memory(self.store.0, options)
897 .get((address + guest_offset)..)
898 .and_then(|b| b.get(..(count - guest_offset)))
899 .unwrap()
900 }
901 }
902
903 pub fn mark_read(&mut self, count: usize) {
908 if let Some(buffer) = self.host_buffer.as_deref_mut() {
909 buffer.skip(count);
910 } else {
911 let transmit = self
912 .store
913 .as_context_mut()
914 .0
915 .concurrent_state_mut()
916 .get_mut(self.id)
917 .unwrap();
918
919 let WriteState::GuestReady {
920 count: write_count, ..
921 } = &transmit.write
922 else {
923 unreachable!()
924 };
925
926 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
927 unreachable!()
928 };
929
930 if *guest_offset + count > *write_count {
931 panic!(
932 "read count ({count}) must be less than or equal to write count ({write_count})"
933 )
934 } else {
935 *guest_offset += count;
936 }
937 }
938 }
939}
940
941pub trait StreamConsumer<D>: Send + 'static {
943 type Item;
945
946 fn poll_consume(
1029 self: Pin<&mut Self>,
1030 cx: &mut Context<'_>,
1031 store: StoreContextMut<D>,
1032 source: Source<'_, Self::Item>,
1033 finish: bool,
1034 ) -> Poll<Result<StreamResult>>;
1035}
1036
1037pub trait FutureProducer<D>: Send + 'static {
1039 type Item;
1041
1042 fn poll_produce(
1052 self: Pin<&mut Self>,
1053 cx: &mut Context<'_>,
1054 store: StoreContextMut<D>,
1055 finish: bool,
1056 ) -> Poll<Result<Option<Self::Item>>>;
1057}
1058
1059impl<T, E, D, Fut> FutureProducer<D> for Fut
1060where
1061 E: Into<Error>,
1062 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1063{
1064 type Item = T;
1065
1066 fn poll_produce<'a>(
1067 self: Pin<&mut Self>,
1068 cx: &mut Context<'_>,
1069 _: StoreContextMut<'a, D>,
1070 finish: bool,
1071 ) -> Poll<Result<Option<T>>> {
1072 match self.poll(cx) {
1073 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1074 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1075 Poll::Pending if finish => Poll::Ready(Ok(None)),
1076 Poll::Pending => Poll::Pending,
1077 }
1078 }
1079}
1080
1081pub trait FutureConsumer<D>: Send + 'static {
1083 type Item;
1085
1086 fn poll_consume(
1098 self: Pin<&mut Self>,
1099 cx: &mut Context<'_>,
1100 store: StoreContextMut<D>,
1101 source: Source<'_, Self::Item>,
1102 finish: bool,
1103 ) -> Poll<Result<()>>;
1104}
1105
1106pub struct FutureReader<T> {
1113 id: TableId<TransmitHandle>,
1114 _phantom: PhantomData<T>,
1115}
1116
1117impl<T> FutureReader<T> {
1118 pub fn new<S: AsContextMut>(
1126 mut store: S,
1127 producer: impl FutureProducer<S::Data, Item = T>,
1128 ) -> Self
1129 where
1130 T: func::Lower + func::Lift + Send + Sync + 'static,
1131 {
1132 assert!(store.as_context().0.concurrency_support());
1133
1134 struct Producer<P>(P);
1135
1136 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1137 for Producer<P>
1138 {
1139 type Item = P::Item;
1140 type Buffer = Option<P::Item>;
1141
1142 fn poll_produce<'a>(
1143 self: Pin<&mut Self>,
1144 cx: &mut Context<'_>,
1145 store: StoreContextMut<D>,
1146 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1147 finish: bool,
1148 ) -> Poll<Result<StreamResult>> {
1149 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1152
1153 Poll::Ready(Ok(
1154 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1155 destination.set_buffer(Some(value));
1156
1157 StreamResult::Completed
1164 } else {
1165 StreamResult::Cancelled
1166 },
1167 ))
1168 }
1169 }
1170
1171 Self::new_(
1172 store
1173 .as_context_mut()
1174 .new_transmit(TransmitKind::Future, Producer(producer)),
1175 )
1176 }
1177
1178 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1179 Self {
1180 id,
1181 _phantom: PhantomData,
1182 }
1183 }
1184
1185 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1186 self.id
1187 }
1188
1189 pub fn pipe<S: AsContextMut>(
1191 self,
1192 mut store: S,
1193 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1194 ) where
1195 T: func::Lift + 'static,
1196 {
1197 struct Consumer<C>(C);
1198
1199 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1200 for Consumer<C>
1201 {
1202 type Item = T;
1203
1204 fn poll_consume(
1205 self: Pin<&mut Self>,
1206 cx: &mut Context<'_>,
1207 mut store: StoreContextMut<D>,
1208 mut source: Source<Self::Item>,
1209 finish: bool,
1210 ) -> Poll<Result<StreamResult>> {
1211 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1214
1215 ready!(consumer.poll_consume(
1216 cx,
1217 store.as_context_mut(),
1218 source.reborrow(),
1219 finish
1220 ))?;
1221
1222 Poll::Ready(Ok(if source.remaining(store) == 0 {
1223 StreamResult::Completed
1229 } else {
1230 StreamResult::Cancelled
1231 }))
1232 }
1233 }
1234
1235 store
1236 .as_context_mut()
1237 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1238 }
1239
1240 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1242 let id = lift_index_to_future(cx, ty, index)?;
1243 Ok(Self::new_(id))
1244 }
1245
1246 pub fn close(&mut self, mut store: impl AsContextMut) {
1259 future_close(store.as_context_mut().0, &mut self.id)
1260 }
1261
1262 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1264 accessor.as_accessor().with(|access| self.close(access))
1265 }
1266
1267 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1273 where
1274 A: AsAccessor,
1275 {
1276 GuardedFutureReader::new(accessor, self)
1277 }
1278
1279 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1286 where
1287 T: ComponentType + 'static,
1288 {
1289 FutureAny::try_from_future_reader(store, self)
1290 }
1291
1292 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1299 where
1300 T: ComponentType + 'static,
1301 {
1302 future.try_into_future_reader()
1303 }
1304}
1305
1306impl<T> fmt::Debug for FutureReader<T> {
1307 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1308 f.debug_struct("FutureReader")
1309 .field("id", &self.id)
1310 .finish()
1311 }
1312}
1313
1314pub(super) fn future_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1315 let id = mem::replace(id, TableId::new(u32::MAX));
1316 store.host_drop_reader(id, TransmitKind::Future).unwrap();
1317}
1318
1319pub(super) fn lift_index_to_future(
1321 cx: &mut LiftContext<'_>,
1322 ty: InterfaceType,
1323 index: u32,
1324) -> Result<TableId<TransmitHandle>> {
1325 match ty {
1326 InterfaceType::Future(src) => {
1327 let handle_table = cx
1328 .instance_mut()
1329 .table_for_transmit(TransmitIndex::Future(src));
1330 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1331 if is_done {
1332 bail!("cannot lift future after being notified that the writable end dropped");
1333 }
1334 let id = TableId::<TransmitHandle>::new(rep);
1335 let concurrent_state = cx.concurrent_state_mut();
1336 let future = concurrent_state.get_mut(id)?;
1337 future.common.handle = None;
1338 let state = future.state;
1339
1340 if concurrent_state.get_mut(state)?.done {
1341 bail!("cannot lift future after previous read succeeded");
1342 }
1343
1344 Ok(id)
1345 }
1346 _ => func::bad_type_info(),
1347 }
1348}
1349
1350pub(super) fn lower_future_to_index<U>(
1352 id: TableId<TransmitHandle>,
1353 cx: &mut LowerContext<'_, U>,
1354 ty: InterfaceType,
1355) -> Result<u32> {
1356 match ty {
1357 InterfaceType::Future(dst) => {
1358 let concurrent_state = cx.store.0.concurrent_state_mut();
1359 let state = concurrent_state.get_mut(id)?.state;
1360 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1361
1362 let handle = cx
1363 .instance_mut()
1364 .table_for_transmit(TransmitIndex::Future(dst))
1365 .future_insert_read(dst, rep)?;
1366
1367 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1368
1369 Ok(handle)
1370 }
1371 _ => func::bad_type_info(),
1372 }
1373}
1374
1375unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1378 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1379
1380 type Lower = <u32 as func::ComponentType>::Lower;
1381
1382 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1383 match ty {
1384 InterfaceType::Future(ty) => {
1385 let ty = types.types[*ty].ty;
1386 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1387 }
1388 other => bail!("expected `future`, found `{}`", func::desc(other)),
1389 }
1390 }
1391}
1392
1393unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1395 fn linear_lower_to_flat<U>(
1396 &self,
1397 cx: &mut LowerContext<'_, U>,
1398 ty: InterfaceType,
1399 dst: &mut MaybeUninit<Self::Lower>,
1400 ) -> Result<()> {
1401 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1402 }
1403
1404 fn linear_lower_to_memory<U>(
1405 &self,
1406 cx: &mut LowerContext<'_, U>,
1407 ty: InterfaceType,
1408 offset: usize,
1409 ) -> Result<()> {
1410 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1411 cx,
1412 InterfaceType::U32,
1413 offset,
1414 )
1415 }
1416}
1417
1418unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1420 fn linear_lift_from_flat(
1421 cx: &mut LiftContext<'_>,
1422 ty: InterfaceType,
1423 src: &Self::Lower,
1424 ) -> Result<Self> {
1425 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1426 Self::lift_from_index(cx, ty, index)
1427 }
1428
1429 fn linear_lift_from_memory(
1430 cx: &mut LiftContext<'_>,
1431 ty: InterfaceType,
1432 bytes: &[u8],
1433 ) -> Result<Self> {
1434 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1435 Self::lift_from_index(cx, ty, index)
1436 }
1437}
1438
1439pub struct GuardedFutureReader<T, A>
1445where
1446 A: AsAccessor,
1447{
1448 reader: Option<FutureReader<T>>,
1452 accessor: A,
1453}
1454
1455impl<T, A> GuardedFutureReader<T, A>
1456where
1457 A: AsAccessor,
1458{
1459 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1467 assert!(
1468 accessor
1469 .as_accessor()
1470 .with(|a| a.as_context().0.concurrency_support())
1471 );
1472 Self {
1473 reader: Some(reader),
1474 accessor,
1475 }
1476 }
1477
1478 pub fn into_future(self) -> FutureReader<T> {
1481 self.into()
1482 }
1483}
1484
1485impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1486where
1487 A: AsAccessor,
1488{
1489 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1490 guard.reader.take().unwrap()
1491 }
1492}
1493
1494impl<T, A> Drop for GuardedFutureReader<T, A>
1495where
1496 A: AsAccessor,
1497{
1498 fn drop(&mut self) {
1499 if let Some(reader) = &mut self.reader {
1500 reader.close_with(&self.accessor)
1501 }
1502 }
1503}
1504
1505pub struct StreamReader<T> {
1512 id: TableId<TransmitHandle>,
1513 _phantom: PhantomData<T>,
1514}
1515
1516impl<T> StreamReader<T> {
1517 pub fn new<S: AsContextMut>(
1525 mut store: S,
1526 producer: impl StreamProducer<S::Data, Item = T>,
1527 ) -> Self
1528 where
1529 T: func::Lower + func::Lift + Send + Sync + 'static,
1530 {
1531 assert!(store.as_context().0.concurrency_support());
1532 Self::new_(
1533 store
1534 .as_context_mut()
1535 .new_transmit(TransmitKind::Stream, producer),
1536 )
1537 }
1538
1539 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1540 Self {
1541 id,
1542 _phantom: PhantomData,
1543 }
1544 }
1545
1546 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1547 self.id
1548 }
1549
1550 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1567 let store = store.as_context_mut();
1568 let state = store.0.concurrent_state_mut();
1569 let id = state.get_mut(self.id).unwrap().state;
1570 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1571 match try_into(TypeId::of::<V>()) {
1572 Some(result) => {
1573 self.close(store);
1574 Ok(*result.downcast::<V>().unwrap())
1575 }
1576 None => Err(self),
1577 }
1578 } else {
1579 Err(self)
1580 }
1581 }
1582
1583 pub fn pipe<S: AsContextMut>(
1585 self,
1586 mut store: S,
1587 consumer: impl StreamConsumer<S::Data, Item = T>,
1588 ) where
1589 T: 'static,
1590 {
1591 store
1592 .as_context_mut()
1593 .set_consumer(self.id, TransmitKind::Stream, consumer);
1594 }
1595
1596 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1598 let id = lift_index_to_stream(cx, ty, index)?;
1599 Ok(Self::new_(id))
1600 }
1601
1602 pub fn close(&mut self, mut store: impl AsContextMut) {
1612 stream_close(store.as_context_mut().0, &mut self.id);
1613 }
1614
1615 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1617 accessor.as_accessor().with(|access| self.close(access))
1618 }
1619
1620 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1626 where
1627 A: AsAccessor,
1628 {
1629 GuardedStreamReader::new(accessor, self)
1630 }
1631
1632 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1639 where
1640 T: ComponentType + 'static,
1641 {
1642 StreamAny::try_from_stream_reader(store, self)
1643 }
1644
1645 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1652 where
1653 T: ComponentType + 'static,
1654 {
1655 stream.try_into_stream_reader()
1656 }
1657}
1658
1659impl<T> fmt::Debug for StreamReader<T> {
1660 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1661 f.debug_struct("StreamReader")
1662 .field("id", &self.id)
1663 .finish()
1664 }
1665}
1666
1667pub(super) fn stream_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1668 let id = mem::replace(id, TableId::new(u32::MAX));
1669 store.host_drop_reader(id, TransmitKind::Stream).unwrap();
1670}
1671
1672pub(super) fn lift_index_to_stream(
1674 cx: &mut LiftContext<'_>,
1675 ty: InterfaceType,
1676 index: u32,
1677) -> Result<TableId<TransmitHandle>> {
1678 match ty {
1679 InterfaceType::Stream(src) => {
1680 let handle_table = cx
1681 .instance_mut()
1682 .table_for_transmit(TransmitIndex::Stream(src));
1683 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1684 if is_done {
1685 bail!("cannot lift stream after being notified that the writable end dropped");
1686 }
1687 let id = TableId::<TransmitHandle>::new(rep);
1688 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1689 Ok(id)
1690 }
1691 _ => func::bad_type_info(),
1692 }
1693}
1694
1695pub(super) fn lower_stream_to_index<U>(
1697 id: TableId<TransmitHandle>,
1698 cx: &mut LowerContext<'_, U>,
1699 ty: InterfaceType,
1700) -> Result<u32> {
1701 match ty {
1702 InterfaceType::Stream(dst) => {
1703 let concurrent_state = cx.store.0.concurrent_state_mut();
1704 let state = concurrent_state.get_mut(id)?.state;
1705 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1706
1707 let handle = cx
1708 .instance_mut()
1709 .table_for_transmit(TransmitIndex::Stream(dst))
1710 .stream_insert_read(dst, rep)?;
1711
1712 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1713
1714 Ok(handle)
1715 }
1716 _ => func::bad_type_info(),
1717 }
1718}
1719
1720unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1723 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1724
1725 type Lower = <u32 as func::ComponentType>::Lower;
1726
1727 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1728 match ty {
1729 InterfaceType::Stream(ty) => {
1730 let ty = types.types[*ty].ty;
1731 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1732 }
1733 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1734 }
1735 }
1736}
1737
1738unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1740 fn linear_lower_to_flat<U>(
1741 &self,
1742 cx: &mut LowerContext<'_, U>,
1743 ty: InterfaceType,
1744 dst: &mut MaybeUninit<Self::Lower>,
1745 ) -> Result<()> {
1746 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1747 }
1748
1749 fn linear_lower_to_memory<U>(
1750 &self,
1751 cx: &mut LowerContext<'_, U>,
1752 ty: InterfaceType,
1753 offset: usize,
1754 ) -> Result<()> {
1755 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1756 cx,
1757 InterfaceType::U32,
1758 offset,
1759 )
1760 }
1761}
1762
1763unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1765 fn linear_lift_from_flat(
1766 cx: &mut LiftContext<'_>,
1767 ty: InterfaceType,
1768 src: &Self::Lower,
1769 ) -> Result<Self> {
1770 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1771 Self::lift_from_index(cx, ty, index)
1772 }
1773
1774 fn linear_lift_from_memory(
1775 cx: &mut LiftContext<'_>,
1776 ty: InterfaceType,
1777 bytes: &[u8],
1778 ) -> Result<Self> {
1779 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1780 Self::lift_from_index(cx, ty, index)
1781 }
1782}
1783
1784pub struct GuardedStreamReader<T, A>
1790where
1791 A: AsAccessor,
1792{
1793 reader: Option<StreamReader<T>>,
1797 accessor: A,
1798}
1799
1800impl<T, A> GuardedStreamReader<T, A>
1801where
1802 A: AsAccessor,
1803{
1804 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1813 assert!(
1814 accessor
1815 .as_accessor()
1816 .with(|a| a.as_context().0.concurrency_support())
1817 );
1818 Self {
1819 reader: Some(reader),
1820 accessor,
1821 }
1822 }
1823
1824 pub fn into_stream(self) -> StreamReader<T> {
1827 self.into()
1828 }
1829}
1830
1831impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1832where
1833 A: AsAccessor,
1834{
1835 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1836 guard.reader.take().unwrap()
1837 }
1838}
1839
1840impl<T, A> Drop for GuardedStreamReader<T, A>
1841where
1842 A: AsAccessor,
1843{
1844 fn drop(&mut self) {
1845 if let Some(reader) = &mut self.reader {
1846 reader.close_with(&self.accessor)
1847 }
1848 }
1849}
1850
1851pub struct ErrorContext {
1853 rep: u32,
1854}
1855
1856impl ErrorContext {
1857 pub(crate) fn new(rep: u32) -> Self {
1858 Self { rep }
1859 }
1860
1861 pub fn into_val(self) -> Val {
1863 Val::ErrorContext(ErrorContextAny(self.rep))
1864 }
1865
1866 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1868 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1869 bail!("expected `error-context`; got `{}`", value.desc());
1870 };
1871 Ok(Self::new(*rep))
1872 }
1873
1874 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1875 match ty {
1876 InterfaceType::ErrorContext(src) => {
1877 let rep = cx
1878 .instance_mut()
1879 .table_for_error_context(src)
1880 .error_context_rep(index)?;
1881
1882 Ok(Self { rep })
1883 }
1884 _ => func::bad_type_info(),
1885 }
1886 }
1887}
1888
1889pub(crate) fn lower_error_context_to_index<U>(
1890 rep: u32,
1891 cx: &mut LowerContext<'_, U>,
1892 ty: InterfaceType,
1893) -> Result<u32> {
1894 match ty {
1895 InterfaceType::ErrorContext(dst) => {
1896 let tbl = cx.instance_mut().table_for_error_context(dst);
1897 tbl.error_context_insert(rep)
1898 }
1899 _ => func::bad_type_info(),
1900 }
1901}
1902unsafe impl func::ComponentType for ErrorContext {
1905 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1906
1907 type Lower = <u32 as func::ComponentType>::Lower;
1908
1909 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1910 match ty {
1911 InterfaceType::ErrorContext(_) => Ok(()),
1912 other => bail!("expected `error`, found `{}`", func::desc(other)),
1913 }
1914 }
1915}
1916
1917unsafe impl func::Lower for ErrorContext {
1919 fn linear_lower_to_flat<T>(
1920 &self,
1921 cx: &mut LowerContext<'_, T>,
1922 ty: InterfaceType,
1923 dst: &mut MaybeUninit<Self::Lower>,
1924 ) -> Result<()> {
1925 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1926 cx,
1927 InterfaceType::U32,
1928 dst,
1929 )
1930 }
1931
1932 fn linear_lower_to_memory<T>(
1933 &self,
1934 cx: &mut LowerContext<'_, T>,
1935 ty: InterfaceType,
1936 offset: usize,
1937 ) -> Result<()> {
1938 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1939 cx,
1940 InterfaceType::U32,
1941 offset,
1942 )
1943 }
1944}
1945
1946unsafe impl func::Lift for ErrorContext {
1948 fn linear_lift_from_flat(
1949 cx: &mut LiftContext<'_>,
1950 ty: InterfaceType,
1951 src: &Self::Lower,
1952 ) -> Result<Self> {
1953 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1954 Self::lift_from_index(cx, ty, index)
1955 }
1956
1957 fn linear_lift_from_memory(
1958 cx: &mut LiftContext<'_>,
1959 ty: InterfaceType,
1960 bytes: &[u8],
1961 ) -> Result<Self> {
1962 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1963 Self::lift_from_index(cx, ty, index)
1964 }
1965}
1966
1967pub(super) struct TransmitHandle {
1969 pub(super) common: WaitableCommon,
1970 state: TableId<TransmitState>,
1972}
1973
1974impl TransmitHandle {
1975 fn new(state: TableId<TransmitState>) -> Self {
1976 Self {
1977 common: WaitableCommon::default(),
1978 state,
1979 }
1980 }
1981}
1982
1983impl TableDebug for TransmitHandle {
1984 fn type_name() -> &'static str {
1985 "TransmitHandle"
1986 }
1987}
1988
1989struct TransmitState {
1991 write_handle: TableId<TransmitHandle>,
1993 read_handle: TableId<TransmitHandle>,
1995 write: WriteState,
1997 read: ReadState,
1999 done: bool,
2001 pub(super) origin: TransmitOrigin,
2004}
2005
2006#[derive(Copy, Clone)]
2007pub(super) enum TransmitOrigin {
2008 Host,
2009 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2010 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2011}
2012
2013impl TransmitState {
2014 fn new(origin: TransmitOrigin) -> Self {
2015 Self {
2016 write_handle: TableId::new(u32::MAX),
2017 read_handle: TableId::new(u32::MAX),
2018 read: ReadState::Open,
2019 write: WriteState::Open,
2020 done: false,
2021 origin,
2022 }
2023 }
2024}
2025
2026impl TableDebug for TransmitState {
2027 fn type_name() -> &'static str {
2028 "TransmitState"
2029 }
2030}
2031
2032impl TransmitOrigin {
2033 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2034 match index {
2035 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2036 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2037 }
2038 }
2039}
2040
2041type PollStream = Box<
2042 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2043>;
2044
2045type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2046
2047enum WriteState {
2049 Open,
2051 GuestReady {
2053 instance: Instance,
2054 caller: RuntimeComponentInstanceIndex,
2055 ty: TransmitIndex,
2056 flat_abi: Option<FlatAbi>,
2057 options: OptionsIndex,
2058 address: usize,
2059 count: usize,
2060 handle: u32,
2061 },
2062 HostReady {
2064 produce: PollStream,
2065 try_into: TryInto,
2066 guest_offset: usize,
2067 cancel: bool,
2068 cancel_waker: Option<Waker>,
2069 },
2070 Dropped,
2072}
2073
2074impl fmt::Debug for WriteState {
2075 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2076 match self {
2077 Self::Open => f.debug_tuple("Open").finish(),
2078 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2079 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2080 Self::Dropped => f.debug_tuple("Dropped").finish(),
2081 }
2082 }
2083}
2084
2085enum ReadState {
2087 Open,
2089 GuestReady {
2091 ty: TransmitIndex,
2092 caller: RuntimeComponentInstanceIndex,
2093 flat_abi: Option<FlatAbi>,
2094 instance: Instance,
2095 options: OptionsIndex,
2096 address: usize,
2097 count: usize,
2098 handle: u32,
2099 },
2100 HostReady {
2102 consume: PollStream,
2103 guest_offset: usize,
2104 cancel: bool,
2105 cancel_waker: Option<Waker>,
2106 },
2107 HostToHost {
2109 accept: Box<
2110 dyn for<'a> Fn(
2111 &'a mut UntypedWriteBuffer<'a>,
2112 )
2113 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2114 + Send
2115 + Sync,
2116 >,
2117 buffer: Vec<u8>,
2118 limit: usize,
2119 },
2120 Dropped,
2122}
2123
2124impl fmt::Debug for ReadState {
2125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2126 match self {
2127 Self::Open => f.debug_tuple("Open").finish(),
2128 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2129 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2130 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2131 Self::Dropped => f.debug_tuple("Dropped").finish(),
2132 }
2133 }
2134}
2135
2136fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2137 let count = guest_offset.try_into().unwrap();
2138 match state {
2139 StreamResult::Dropped => ReturnCode::Dropped(count),
2140 StreamResult::Completed => ReturnCode::completed(kind, count),
2141 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2142 }
2143}
2144
2145impl StoreOpaque {
2146 fn pipe_from_guest(
2147 &mut self,
2148 kind: TransmitKind,
2149 id: TableId<TransmitState>,
2150 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2151 ) {
2152 let future = async move {
2153 let stream_state = future.await?;
2154 tls::get(|store| {
2155 let state = store.concurrent_state_mut();
2156 let transmit = state.get_mut(id)?;
2157 let ReadState::HostReady {
2158 consume,
2159 guest_offset,
2160 ..
2161 } = mem::replace(&mut transmit.read, ReadState::Open)
2162 else {
2163 unreachable!();
2164 };
2165 let code = return_code(kind, stream_state, guest_offset);
2166 transmit.read = match stream_state {
2167 StreamResult::Dropped => ReadState::Dropped,
2168 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2169 consume,
2170 guest_offset: 0,
2171 cancel: false,
2172 cancel_waker: None,
2173 },
2174 };
2175 let WriteState::GuestReady { ty, handle, .. } =
2176 mem::replace(&mut transmit.write, WriteState::Open)
2177 else {
2178 unreachable!();
2179 };
2180 state.send_write_result(ty, id, handle, code)?;
2181 Ok(())
2182 })
2183 };
2184
2185 self.concurrent_state_mut().push_future(future.boxed());
2186 }
2187
2188 fn pipe_to_guest(
2189 &mut self,
2190 kind: TransmitKind,
2191 id: TableId<TransmitState>,
2192 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2193 ) {
2194 let future = async move {
2195 let stream_state = future.await?;
2196 tls::get(|store| {
2197 let state = store.concurrent_state_mut();
2198 let transmit = state.get_mut(id)?;
2199 let WriteState::HostReady {
2200 produce,
2201 try_into,
2202 guest_offset,
2203 ..
2204 } = mem::replace(&mut transmit.write, WriteState::Open)
2205 else {
2206 unreachable!();
2207 };
2208 let code = return_code(kind, stream_state, guest_offset);
2209 transmit.write = match stream_state {
2210 StreamResult::Dropped => WriteState::Dropped,
2211 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2212 produce,
2213 try_into,
2214 guest_offset: 0,
2215 cancel: false,
2216 cancel_waker: None,
2217 },
2218 };
2219 let ReadState::GuestReady { ty, handle, .. } =
2220 mem::replace(&mut transmit.read, ReadState::Open)
2221 else {
2222 unreachable!();
2223 };
2224 state.send_read_result(ty, id, handle, code)?;
2225 Ok(())
2226 })
2227 };
2228
2229 self.concurrent_state_mut().push_future(future.boxed());
2230 }
2231
2232 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2234 let state = self.concurrent_state_mut();
2235 let transmit_id = state.get_mut(id)?.state;
2236 let transmit = state
2237 .get_mut(transmit_id)
2238 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2239 log::trace!(
2240 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2241 transmit.read,
2242 transmit.write
2243 );
2244
2245 transmit.read = ReadState::Dropped;
2246
2247 let new_state = if let WriteState::Dropped = &transmit.write {
2250 WriteState::Dropped
2251 } else {
2252 WriteState::Open
2253 };
2254
2255 let write_handle = transmit.write_handle;
2256
2257 match mem::replace(&mut transmit.write, new_state) {
2258 WriteState::GuestReady { ty, handle, .. } => {
2261 state.update_event(
2262 write_handle.rep(),
2263 match ty {
2264 TransmitIndex::Future(ty) => Event::FutureWrite {
2265 code: ReturnCode::Dropped(0),
2266 pending: Some((ty, handle)),
2267 },
2268 TransmitIndex::Stream(ty) => Event::StreamWrite {
2269 code: ReturnCode::Dropped(0),
2270 pending: Some((ty, handle)),
2271 },
2272 },
2273 )?;
2274 }
2275
2276 WriteState::HostReady { .. } => {}
2277
2278 WriteState::Open => {
2279 state.update_event(
2280 write_handle.rep(),
2281 match kind {
2282 TransmitKind::Future => Event::FutureWrite {
2283 code: ReturnCode::Dropped(0),
2284 pending: None,
2285 },
2286 TransmitKind::Stream => Event::StreamWrite {
2287 code: ReturnCode::Dropped(0),
2288 pending: None,
2289 },
2290 },
2291 )?;
2292 }
2293
2294 WriteState::Dropped => {
2295 log::trace!("host_drop_reader delete {transmit_id:?}");
2296 state.delete_transmit(transmit_id)?;
2297 }
2298 }
2299 Ok(())
2300 }
2301
2302 fn host_drop_writer(
2304 &mut self,
2305 id: TableId<TransmitHandle>,
2306 on_drop_open: Option<fn() -> Result<()>>,
2307 ) -> Result<()> {
2308 let state = self.concurrent_state_mut();
2309 let transmit_id = state.get_mut(id)?.state;
2310 let transmit = state
2311 .get_mut(transmit_id)
2312 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2313 log::trace!(
2314 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2315 transmit.read,
2316 transmit.write
2317 );
2318
2319 match &mut transmit.write {
2321 WriteState::GuestReady { .. } => {
2322 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2323 }
2324 WriteState::HostReady { .. } => {}
2325 v @ WriteState::Open => {
2326 if let (Some(on_drop_open), false) = (
2327 on_drop_open,
2328 transmit.done || matches!(transmit.read, ReadState::Dropped),
2329 ) {
2330 on_drop_open()?;
2331 } else {
2332 *v = WriteState::Dropped;
2333 }
2334 }
2335 WriteState::Dropped => unreachable!("write state is already dropped"),
2336 }
2337
2338 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2339
2340 let new_state = if let ReadState::Dropped = &transmit.read {
2346 ReadState::Dropped
2347 } else {
2348 ReadState::Open
2349 };
2350
2351 let read_handle = transmit.read_handle;
2352
2353 match mem::replace(&mut transmit.read, new_state) {
2355 ReadState::GuestReady { ty, handle, .. } => {
2359 self.concurrent_state_mut().update_event(
2361 read_handle.rep(),
2362 match ty {
2363 TransmitIndex::Future(ty) => Event::FutureRead {
2364 code: ReturnCode::Dropped(0),
2365 pending: Some((ty, handle)),
2366 },
2367 TransmitIndex::Stream(ty) => Event::StreamRead {
2368 code: ReturnCode::Dropped(0),
2369 pending: Some((ty, handle)),
2370 },
2371 },
2372 )?;
2373 }
2374
2375 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2376
2377 ReadState::Open => {
2379 self.concurrent_state_mut().update_event(
2380 read_handle.rep(),
2381 match on_drop_open {
2382 Some(_) => Event::FutureRead {
2383 code: ReturnCode::Dropped(0),
2384 pending: None,
2385 },
2386 None => Event::StreamRead {
2387 code: ReturnCode::Dropped(0),
2388 pending: None,
2389 },
2390 },
2391 )?;
2392 }
2393
2394 ReadState::Dropped => {
2397 log::trace!("host_drop_writer delete {transmit_id:?}");
2398 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2399 }
2400 }
2401 Ok(())
2402 }
2403
2404 pub(super) fn transmit_origin(
2405 &mut self,
2406 id: TableId<TransmitHandle>,
2407 ) -> Result<TransmitOrigin> {
2408 let state = self.concurrent_state_mut();
2409 let state_id = state.get_mut(id)?.state;
2410 Ok(state.get_mut(state_id)?.origin)
2411 }
2412}
2413
2414impl<T> StoreContextMut<'_, T> {
2415 fn new_transmit<P: StreamProducer<T>>(
2416 mut self,
2417 kind: TransmitKind,
2418 producer: P,
2419 ) -> TableId<TransmitHandle>
2420 where
2421 P::Item: func::Lower,
2422 {
2423 let token = StoreToken::new(self.as_context_mut());
2424 let state = self.0.concurrent_state_mut();
2425 let (_, read) = state.new_transmit(TransmitOrigin::Host).unwrap();
2426 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2427 let id = state.get_mut(read).unwrap().state;
2428 let mut dropped = false;
2429 let produce = Box::new({
2430 let producer = producer.clone();
2431 move || {
2432 let producer = producer.clone();
2433 async move {
2434 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2435
2436 let (result, cancelled) = if buffer.remaining().is_empty() {
2437 future::poll_fn(|cx| {
2438 tls::get(|store| {
2439 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2440
2441 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2442 unreachable!();
2443 };
2444
2445 let mut host_buffer =
2446 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2447 Some(Cursor::new(mem::take(buffer)))
2448 } else {
2449 None
2450 };
2451
2452 let poll = mine.as_mut().poll_produce(
2453 cx,
2454 token.as_context_mut(store),
2455 Destination {
2456 id,
2457 buffer: &mut buffer,
2458 host_buffer: host_buffer.as_mut(),
2459 _phantom: PhantomData,
2460 },
2461 cancel,
2462 );
2463
2464 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2465
2466 let host_offset = if let (
2467 Some(host_buffer),
2468 ReadState::HostToHost { buffer, limit, .. },
2469 ) = (host_buffer, &mut transmit.read)
2470 {
2471 *limit = usize::try_from(host_buffer.position()).unwrap();
2472 *buffer = host_buffer.into_inner();
2473 *limit
2474 } else {
2475 0
2476 };
2477
2478 {
2479 let WriteState::HostReady {
2480 guest_offset,
2481 cancel,
2482 cancel_waker,
2483 ..
2484 } = &mut transmit.write
2485 else {
2486 unreachable!();
2487 };
2488
2489 if poll.is_pending() {
2490 if !buffer.remaining().is_empty()
2491 || *guest_offset > 0
2492 || host_offset > 0
2493 {
2494 return Poll::Ready(Err(format_err!(
2495 "StreamProducer::poll_produce returned Poll::Pending \
2496 after producing at least one item"
2497 )));
2498 }
2499 *cancel_waker = Some(cx.waker().clone());
2500 } else {
2501 *cancel_waker = None;
2502 *cancel = false;
2503 }
2504 }
2505
2506 poll.map(|v| v.map(|result| (result, cancel)))
2507 })
2508 })
2509 .await?
2510 } else {
2511 (StreamResult::Completed, false)
2512 };
2513
2514 let (guest_offset, host_offset, count) = tls::get(|store| {
2515 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2516 let (count, host_offset) = match &transmit.read {
2517 &ReadState::GuestReady { count, .. } => (count, 0),
2518 &ReadState::HostToHost { limit, .. } => (1, limit),
2519 _ => unreachable!(),
2520 };
2521 let guest_offset = match &transmit.write {
2522 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2523 _ => unreachable!(),
2524 };
2525 (guest_offset, host_offset, count)
2526 });
2527
2528 match result {
2529 StreamResult::Completed => {
2530 if count > 1
2531 && buffer.remaining().is_empty()
2532 && guest_offset == 0
2533 && host_offset == 0
2534 {
2535 bail!(
2536 "StreamProducer::poll_produce returned StreamResult::Completed \
2537 without producing any items"
2538 );
2539 }
2540 }
2541 StreamResult::Cancelled => {
2542 if !cancelled {
2543 bail!(
2544 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2545 without being given a `finish` parameter value of true"
2546 );
2547 }
2548 }
2549 StreamResult::Dropped => {
2550 dropped = true;
2551 }
2552 }
2553
2554 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2555
2556 *producer.lock().unwrap() = Some((mine, buffer));
2557
2558 if write_buffer {
2559 write(token, id, producer.clone(), kind).await?;
2560 }
2561
2562 Ok(if dropped {
2563 if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2564 {
2565 StreamResult::Dropped
2566 } else {
2567 StreamResult::Completed
2568 }
2569 } else {
2570 result
2571 })
2572 }
2573 .boxed()
2574 }
2575 });
2576 let try_into = Box::new(move |ty| {
2577 let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2578 match P::try_into(mine, ty) {
2579 Ok(value) => Some(value),
2580 Err(mine) => {
2581 *producer.lock().unwrap() = Some((mine, buffer));
2582 None
2583 }
2584 }
2585 });
2586 state.get_mut(id).unwrap().write = WriteState::HostReady {
2587 produce,
2588 try_into,
2589 guest_offset: 0,
2590 cancel: false,
2591 cancel_waker: None,
2592 };
2593 read
2594 }
2595
2596 fn set_consumer<C: StreamConsumer<T>>(
2597 mut self,
2598 id: TableId<TransmitHandle>,
2599 kind: TransmitKind,
2600 consumer: C,
2601 ) {
2602 let token = StoreToken::new(self.as_context_mut());
2603 let state = self.0.concurrent_state_mut();
2604 let id = state.get_mut(id).unwrap().state;
2605 let transmit = state.get_mut(id).unwrap();
2606 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2607 let consume_with_buffer = {
2608 let consumer = consumer.clone();
2609 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2610 let mut mine = consumer.lock().unwrap().take().unwrap();
2611
2612 let host_buffer_remaining_before =
2613 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2614
2615 let (result, cancelled) = future::poll_fn(|cx| {
2616 tls::get(|store| {
2617 let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2618 &ReadState::HostReady { cancel, .. } => cancel,
2619 ReadState::Open => false,
2620 _ => unreachable!(),
2621 };
2622
2623 let poll = mine.as_mut().poll_consume(
2624 cx,
2625 token.as_context_mut(store),
2626 Source {
2627 id,
2628 host_buffer: host_buffer.as_deref_mut(),
2629 },
2630 cancel,
2631 );
2632
2633 if let ReadState::HostReady {
2634 cancel_waker,
2635 cancel,
2636 ..
2637 } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2638 {
2639 if poll.is_pending() {
2640 *cancel_waker = Some(cx.waker().clone());
2641 } else {
2642 *cancel_waker = None;
2643 *cancel = false;
2644 }
2645 }
2646
2647 poll.map(|v| v.map(|result| (result, cancel)))
2648 })
2649 })
2650 .await?;
2651
2652 let (guest_offset, count) = tls::get(|store| {
2653 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2654 (
2655 match &transmit.read {
2656 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2657 ReadState::Open => 0,
2658 _ => unreachable!(),
2659 },
2660 match &transmit.write {
2661 &WriteState::GuestReady { count, .. } => count,
2662 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2663 _ => unreachable!(),
2664 },
2665 )
2666 });
2667
2668 match result {
2669 StreamResult::Completed => {
2670 if count > 0
2671 && guest_offset == 0
2672 && host_buffer_remaining_before
2673 .zip(host_buffer.map(|v| v.remaining().len()))
2674 .map(|(before, after)| before == after)
2675 .unwrap_or(false)
2676 {
2677 bail!(
2678 "StreamConsumer::poll_consume returned StreamResult::Completed \
2679 without consuming any items"
2680 );
2681 }
2682
2683 if let TransmitKind::Future = kind {
2684 tls::get(|store| {
2685 store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2686 });
2687 }
2688 }
2689 StreamResult::Cancelled => {
2690 if !cancelled {
2691 bail!(
2692 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2693 without being given a `finish` parameter value of true"
2694 );
2695 }
2696 }
2697 StreamResult::Dropped => {}
2698 }
2699
2700 *consumer.lock().unwrap() = Some(mine);
2701
2702 Ok(result)
2703 }
2704 };
2705 let consume = {
2706 let consume = consume_with_buffer.clone();
2707 Box::new(move || {
2708 let consume = consume.clone();
2709 async move { consume(None).await }.boxed()
2710 })
2711 };
2712
2713 match &transmit.write {
2714 WriteState::Open => {
2715 transmit.read = ReadState::HostReady {
2716 consume,
2717 guest_offset: 0,
2718 cancel: false,
2719 cancel_waker: None,
2720 };
2721 }
2722 &WriteState::GuestReady { .. } => {
2723 let future = consume();
2724 transmit.read = ReadState::HostReady {
2725 consume,
2726 guest_offset: 0,
2727 cancel: false,
2728 cancel_waker: None,
2729 };
2730 self.0.pipe_from_guest(kind, id, future);
2731 }
2732 WriteState::HostReady { .. } => {
2733 let WriteState::HostReady { produce, .. } = mem::replace(
2734 &mut transmit.write,
2735 WriteState::HostReady {
2736 produce: Box::new(|| unreachable!()),
2737 try_into: Box::new(|_| unreachable!()),
2738 guest_offset: 0,
2739 cancel: false,
2740 cancel_waker: None,
2741 },
2742 ) else {
2743 unreachable!();
2744 };
2745
2746 transmit.read = ReadState::HostToHost {
2747 accept: Box::new(move |input| {
2748 let consume = consume_with_buffer.clone();
2749 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2750 }),
2751 buffer: Vec::new(),
2752 limit: 0,
2753 };
2754
2755 let future = async move {
2756 loop {
2757 if tls::get(|store| {
2758 crate::error::Ok(matches!(
2759 store.concurrent_state_mut().get_mut(id)?.read,
2760 ReadState::Dropped
2761 ))
2762 })? {
2763 break Ok(());
2764 }
2765
2766 match produce().await? {
2767 StreamResult::Completed | StreamResult::Cancelled => {}
2768 StreamResult::Dropped => break Ok(()),
2769 }
2770
2771 if let TransmitKind::Future = kind {
2772 break Ok(());
2773 }
2774 }
2775 }
2776 .map(move |result| {
2777 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2778 result
2779 });
2780
2781 state.push_future(Box::pin(future));
2782 }
2783 WriteState::Dropped => {
2784 let reader = transmit.read_handle;
2785 self.0.host_drop_reader(reader, kind).unwrap();
2786 }
2787 }
2788 }
2789}
2790
2791async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2792 token: StoreToken<D>,
2793 id: TableId<TransmitState>,
2794 pair: Arc<Mutex<Option<(P, B)>>>,
2795 kind: TransmitKind,
2796) -> Result<()> {
2797 let (read, guest_offset) = tls::get(|store| {
2798 let transmit = store.concurrent_state_mut().get_mut(id)?;
2799
2800 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2801 Some(guest_offset)
2802 } else {
2803 None
2804 };
2805
2806 crate::error::Ok((
2807 mem::replace(&mut transmit.read, ReadState::Open),
2808 guest_offset,
2809 ))
2810 })?;
2811
2812 match read {
2813 ReadState::GuestReady {
2814 ty,
2815 flat_abi,
2816 options,
2817 address,
2818 count,
2819 handle,
2820 instance,
2821 caller,
2822 } => {
2823 let guest_offset = guest_offset.unwrap();
2824
2825 if let TransmitKind::Future = kind {
2826 tls::get(|store| {
2827 store.concurrent_state_mut().get_mut(id)?.done = true;
2828 crate::error::Ok(())
2829 })?;
2830 }
2831
2832 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2833 let accept = {
2834 let pair = pair.clone();
2835 move |mut store: StoreContextMut<D>| {
2836 lower::<T, B, D>(
2837 store.as_context_mut(),
2838 instance,
2839 options,
2840 ty,
2841 address + (T::SIZE32 * guest_offset),
2842 count - guest_offset,
2843 &mut pair.lock().unwrap().as_mut().unwrap().1,
2844 )?;
2845 crate::error::Ok(())
2846 }
2847 };
2848
2849 if guest_offset < count {
2850 if T::MAY_REQUIRE_REALLOC {
2851 let (tx, rx) = oneshot::channel();
2856 tls::get(move |store| {
2857 store
2858 .concurrent_state_mut()
2859 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2860 move |store| {
2861 _ = tx.send(accept(token.as_context_mut(store))?);
2862 Ok(())
2863 },
2864 ))))
2865 });
2866 rx.await?
2867 } else {
2868 tls::get(|store| accept(token.as_context_mut(store)))?
2873 };
2874 }
2875
2876 tls::get(|store| {
2877 let count =
2878 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2879
2880 let transmit = store.concurrent_state_mut().get_mut(id)?;
2881
2882 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2883 unreachable!();
2884 };
2885
2886 *guest_offset += count;
2887
2888 transmit.read = ReadState::GuestReady {
2889 ty,
2890 flat_abi,
2891 options,
2892 address,
2893 count,
2894 handle,
2895 instance,
2896 caller,
2897 };
2898
2899 crate::error::Ok(())
2900 })?;
2901
2902 Ok(())
2903 }
2904
2905 ReadState::HostToHost {
2906 accept,
2907 mut buffer,
2908 limit,
2909 } => {
2910 let mut state = StreamResult::Completed;
2911 let mut position = 0;
2912
2913 while !matches!(state, StreamResult::Dropped) && position < limit {
2914 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2915 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2916 (buffer, position, _) = slice_buffer.into_parts();
2917 }
2918
2919 {
2920 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2921
2922 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2923 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2924 }
2925
2926 *pair.lock().unwrap() = Some((mine, buffer));
2927 }
2928
2929 tls::get(|store| {
2930 store.concurrent_state_mut().get_mut(id)?.read = match state {
2931 StreamResult::Dropped => ReadState::Dropped,
2932 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2933 accept,
2934 buffer,
2935 limit: 0,
2936 },
2937 };
2938
2939 crate::error::Ok(())
2940 })?;
2941 Ok(())
2942 }
2943
2944 _ => unreachable!(),
2945 }
2946}
2947
2948impl Instance {
2949 fn consume(
2952 self,
2953 store: &mut dyn VMStore,
2954 kind: TransmitKind,
2955 transmit_id: TableId<TransmitState>,
2956 consume: PollStream,
2957 guest_offset: usize,
2958 cancel: bool,
2959 ) -> Result<ReturnCode> {
2960 let mut future = consume();
2961 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2962 consume,
2963 guest_offset,
2964 cancel,
2965 cancel_waker: None,
2966 };
2967 let poll = tls::set(store, || {
2968 future
2969 .as_mut()
2970 .poll(&mut Context::from_waker(&Waker::noop()))
2971 });
2972
2973 Ok(match poll {
2974 Poll::Ready(state) => {
2975 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2976 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2977 unreachable!();
2978 };
2979 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2980 transmit.write = WriteState::Open;
2981 code
2982 }
2983 Poll::Pending => {
2984 store.pipe_from_guest(kind, transmit_id, future);
2985 ReturnCode::Blocked
2986 }
2987 })
2988 }
2989
2990 fn produce(
2993 self,
2994 store: &mut dyn VMStore,
2995 kind: TransmitKind,
2996 transmit_id: TableId<TransmitState>,
2997 produce: PollStream,
2998 try_into: TryInto,
2999 guest_offset: usize,
3000 cancel: bool,
3001 ) -> Result<ReturnCode> {
3002 let mut future = produce();
3003 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3004 produce,
3005 try_into,
3006 guest_offset,
3007 cancel,
3008 cancel_waker: None,
3009 };
3010 let poll = tls::set(store, || {
3011 future
3012 .as_mut()
3013 .poll(&mut Context::from_waker(&Waker::noop()))
3014 });
3015
3016 Ok(match poll {
3017 Poll::Ready(state) => {
3018 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3019 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3020 unreachable!();
3021 };
3022 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3023 transmit.read = ReadState::Open;
3024 code
3025 }
3026 Poll::Pending => {
3027 store.pipe_to_guest(kind, transmit_id, future);
3028 ReturnCode::Blocked
3029 }
3030 })
3031 }
3032
3033 pub(super) fn guest_drop_writable(
3035 self,
3036 store: &mut StoreOpaque,
3037 ty: TransmitIndex,
3038 writer: u32,
3039 ) -> Result<()> {
3040 let table = self.id().get_mut(store).table_for_transmit(ty);
3041 let transmit_rep = match ty {
3042 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3043 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3044 };
3045
3046 let id = TableId::<TransmitHandle>::new(transmit_rep);
3047 log::trace!("guest_drop_writable: drop writer {id:?}");
3048 match ty {
3049 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3050 TransmitIndex::Future(_) => store.host_drop_writer(
3051 id,
3052 Some(|| {
3053 Err(format_err!(
3054 "cannot drop future write end without first writing a value"
3055 ))
3056 }),
3057 ),
3058 }
3059 }
3060
3061 fn copy<T: 'static>(
3064 self,
3065 mut store: StoreContextMut<T>,
3066 flat_abi: Option<FlatAbi>,
3067 write_caller: RuntimeComponentInstanceIndex,
3068 write_ty: TransmitIndex,
3069 write_options: OptionsIndex,
3070 write_address: usize,
3071 read_caller: RuntimeComponentInstanceIndex,
3072 read_ty: TransmitIndex,
3073 read_options: OptionsIndex,
3074 read_address: usize,
3075 count: usize,
3076 rep: u32,
3077 ) -> Result<()> {
3078 let types = self.id().get(store.0).component().types();
3079 match (write_ty, read_ty) {
3080 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3081 assert_eq!(count, 1);
3082
3083 let payload = types[types[write_ty].ty].payload;
3084
3085 if write_caller == read_caller && !allow_intra_component_read_write(payload) {
3086 bail!(
3087 "cannot read from and write to intra-component future with non-numeric payload"
3088 )
3089 }
3090
3091 let val = payload
3092 .map(|ty| {
3093 let lift =
3094 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3095
3096 let abi = lift.types.canonical_abi(&ty);
3097 if write_address % usize::try_from(abi.align32)? != 0 {
3099 bail!("write pointer not aligned");
3100 }
3101
3102 let bytes = lift
3103 .memory()
3104 .get(write_address..)
3105 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
3106 .ok_or_else(|| {
3107 crate::format_err!("write pointer out of bounds of memory")
3108 })?;
3109
3110 Val::load(lift, ty, bytes)
3111 })
3112 .transpose()?;
3113
3114 if let Some(val) = val {
3115 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3116 let types = lower.types;
3117 let ty = types[types[read_ty].ty].payload.unwrap();
3118 let ptr = func::validate_inbounds_dynamic(
3119 types.canonical_abi(&ty),
3120 lower.as_slice_mut(),
3121 &ValRaw::u32(read_address.try_into().unwrap()),
3122 )?;
3123 val.store(lower, ty, ptr)?;
3124 }
3125 }
3126 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3127 if write_caller == read_caller
3128 && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3129 {
3130 bail!(
3131 "cannot read from and write to intra-component stream with non-numeric payload"
3132 )
3133 }
3134
3135 if let Some(flat_abi) = flat_abi {
3136 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
3138 if length_in_bytes > 0 {
3139 if write_address % usize::try_from(flat_abi.align)? != 0 {
3140 bail!("write pointer not aligned");
3141 }
3142 if read_address % usize::try_from(flat_abi.align)? != 0 {
3143 bail!("read pointer not aligned");
3144 }
3145
3146 let store_opaque = store.0.store_opaque_mut();
3147
3148 {
3149 let src = self
3150 .options_memory(store_opaque, write_options)
3151 .get(write_address..)
3152 .and_then(|b| b.get(..length_in_bytes))
3153 .ok_or_else(|| {
3154 crate::format_err!("write pointer out of bounds of memory")
3155 })?
3156 .as_ptr();
3157 let dst = self
3158 .options_memory_mut(store_opaque, read_options)
3159 .get_mut(read_address..)
3160 .and_then(|b| b.get_mut(..length_in_bytes))
3161 .ok_or_else(|| {
3162 crate::format_err!("read pointer out of bounds of memory")
3163 })?
3164 .as_mut_ptr();
3165 unsafe {
3168 if write_caller == read_caller {
3169 src.copy_to(dst, length_in_bytes)
3173 } else {
3174 src.copy_to_nonoverlapping(dst, length_in_bytes)
3179 }
3180 }
3181 }
3182 }
3183 } else {
3184 let store_opaque = store.0.store_opaque_mut();
3185 let lift = &mut LiftContext::new(store_opaque, write_options, self);
3186 let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3187 let abi = lift.types.canonical_abi(&ty);
3188 let size = usize::try_from(abi.size32).unwrap();
3189 if write_address % usize::try_from(abi.align32)? != 0 {
3190 bail!("write pointer not aligned");
3191 }
3192 let bytes = lift
3193 .memory()
3194 .get(write_address..)
3195 .and_then(|b| b.get(..size * count))
3196 .ok_or_else(|| {
3197 crate::format_err!("write pointer out of bounds of memory")
3198 })?;
3199
3200 let values = (0..count)
3201 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3202 .collect::<Result<Vec<_>>>()?;
3203
3204 let id = TableId::<TransmitHandle>::new(rep);
3205 log::trace!("copy values {values:?} for {id:?}");
3206
3207 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3208 let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3209 let abi = lower.types.canonical_abi(&ty);
3210 if read_address % usize::try_from(abi.align32)? != 0 {
3211 bail!("read pointer not aligned");
3212 }
3213 let size = usize::try_from(abi.size32).unwrap();
3214 lower
3215 .as_slice_mut()
3216 .get_mut(read_address..)
3217 .and_then(|b| b.get_mut(..size * count))
3218 .ok_or_else(|| {
3219 crate::format_err!("read pointer out of bounds of memory")
3220 })?;
3221 let mut ptr = read_address;
3222 for value in values {
3223 value.store(lower, ty, ptr)?;
3224 ptr += size
3225 }
3226 }
3227 }
3228 _ => unreachable!(),
3229 }
3230
3231 Ok(())
3232 }
3233
3234 fn check_bounds(
3235 self,
3236 store: &StoreOpaque,
3237 options: OptionsIndex,
3238 ty: TransmitIndex,
3239 address: usize,
3240 count: usize,
3241 ) -> Result<()> {
3242 let types = self.id().get(store).component().types();
3243 let size = usize::try_from(
3244 match ty {
3245 TransmitIndex::Future(ty) => types[types[ty].ty]
3246 .payload
3247 .map(|ty| types.canonical_abi(&ty).size32),
3248 TransmitIndex::Stream(ty) => types[types[ty].ty]
3249 .payload
3250 .map(|ty| types.canonical_abi(&ty).size32),
3251 }
3252 .unwrap_or(0),
3253 )
3254 .unwrap();
3255
3256 if count > 0 && size > 0 {
3257 self.options_memory(store, options)
3258 .get(address..)
3259 .and_then(|b| b.get(..(size * count)))
3260 .map(drop)
3261 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3262 } else {
3263 Ok(())
3264 }
3265 }
3266
3267 pub(super) fn guest_write<T: 'static>(
3269 self,
3270 mut store: StoreContextMut<T>,
3271 caller: RuntimeComponentInstanceIndex,
3272 ty: TransmitIndex,
3273 options: OptionsIndex,
3274 flat_abi: Option<FlatAbi>,
3275 handle: u32,
3276 address: u32,
3277 count: u32,
3278 ) -> Result<ReturnCode> {
3279 if !self.options(store.0, options).async_ {
3280 store.0.check_blocking()?;
3284 }
3285
3286 let address = usize::try_from(address).unwrap();
3287 let count = usize::try_from(count).unwrap();
3288 self.check_bounds(store.0, options, ty, address, count)?;
3289 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3290 let TransmitLocalState::Write { done } = *state else {
3291 bail!(
3292 "invalid handle {handle}; expected `Write`; got {:?}",
3293 *state
3294 );
3295 };
3296
3297 if done {
3298 bail!("cannot write to stream after being notified that the readable end dropped");
3299 }
3300
3301 *state = TransmitLocalState::Busy;
3302 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3303 let concurrent_state = store.0.concurrent_state_mut();
3304 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3305 let transmit = concurrent_state.get_mut(transmit_id)?;
3306 log::trace!(
3307 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3308 transmit.read
3309 );
3310
3311 if transmit.done {
3312 bail!("cannot write to future after previous write succeeded or readable end dropped");
3313 }
3314
3315 let new_state = if let ReadState::Dropped = &transmit.read {
3316 ReadState::Dropped
3317 } else {
3318 ReadState::Open
3319 };
3320
3321 let set_guest_ready = |me: &mut ConcurrentState| {
3322 let transmit = me.get_mut(transmit_id)?;
3323 assert!(
3324 matches!(&transmit.write, WriteState::Open),
3325 "expected `WriteState::Open`; got `{:?}`",
3326 transmit.write
3327 );
3328 transmit.write = WriteState::GuestReady {
3329 instance: self,
3330 caller,
3331 ty,
3332 flat_abi,
3333 options,
3334 address,
3335 count,
3336 handle,
3337 };
3338 Ok::<_, crate::Error>(())
3339 };
3340
3341 let mut result = match mem::replace(&mut transmit.read, new_state) {
3342 ReadState::GuestReady {
3343 ty: read_ty,
3344 flat_abi: read_flat_abi,
3345 options: read_options,
3346 address: read_address,
3347 count: read_count,
3348 handle: read_handle,
3349 instance: read_instance,
3350 caller: read_caller,
3351 } => {
3352 assert_eq!(flat_abi, read_flat_abi);
3353
3354 if let TransmitIndex::Future(_) = ty {
3355 transmit.done = true;
3356 }
3357
3358 let write_complete = count == 0 || read_count > 0;
3380 let read_complete = count > 0;
3381 let read_buffer_remaining = count < read_count;
3382
3383 let read_handle_rep = transmit.read_handle.rep();
3384
3385 let count = count.min(read_count);
3386
3387 self.copy(
3388 store.as_context_mut(),
3389 flat_abi,
3390 caller,
3391 ty,
3392 options,
3393 address,
3394 read_caller,
3395 read_ty,
3396 read_options,
3397 read_address,
3398 count,
3399 rep,
3400 )?;
3401
3402 let instance = self.id().get_mut(store.0);
3403 let types = instance.component().types();
3404 let item_size = payload(ty, types)
3405 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3406 .unwrap_or(0);
3407 let concurrent_state = store.0.concurrent_state_mut();
3408 if read_complete {
3409 let count = u32::try_from(count).unwrap();
3410 let total = if let Some(Event::StreamRead {
3411 code: ReturnCode::Completed(old_total),
3412 ..
3413 }) = concurrent_state.take_event(read_handle_rep)?
3414 {
3415 count + old_total
3416 } else {
3417 count
3418 };
3419
3420 let code = ReturnCode::completed(ty.kind(), total);
3421
3422 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3423 }
3424
3425 if read_buffer_remaining {
3426 let transmit = concurrent_state.get_mut(transmit_id)?;
3427 transmit.read = ReadState::GuestReady {
3428 ty: read_ty,
3429 flat_abi: read_flat_abi,
3430 options: read_options,
3431 address: read_address + (count * item_size),
3432 count: read_count - count,
3433 handle: read_handle,
3434 instance: read_instance,
3435 caller: read_caller,
3436 };
3437 }
3438
3439 if write_complete {
3440 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3441 } else {
3442 set_guest_ready(concurrent_state)?;
3443 ReturnCode::Blocked
3444 }
3445 }
3446
3447 ReadState::HostReady {
3448 consume,
3449 guest_offset,
3450 cancel,
3451 cancel_waker,
3452 } => {
3453 assert!(cancel_waker.is_none());
3454 assert!(!cancel);
3455 assert_eq!(0, guest_offset);
3456
3457 if let TransmitIndex::Future(_) = ty {
3458 transmit.done = true;
3459 }
3460
3461 set_guest_ready(concurrent_state)?;
3462 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3463 }
3464
3465 ReadState::HostToHost { .. } => unreachable!(),
3466
3467 ReadState::Open => {
3468 set_guest_ready(concurrent_state)?;
3469 ReturnCode::Blocked
3470 }
3471
3472 ReadState::Dropped => {
3473 if let TransmitIndex::Future(_) = ty {
3474 transmit.done = true;
3475 }
3476
3477 ReturnCode::Dropped(0)
3478 }
3479 };
3480
3481 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3482 result = self.wait_for_write(store.0, transmit_handle)?;
3483 }
3484
3485 if result != ReturnCode::Blocked {
3486 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3487 TransmitLocalState::Write {
3488 done: matches!(
3489 (result, ty),
3490 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3491 ),
3492 };
3493 }
3494
3495 log::trace!(
3496 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3497 );
3498
3499 Ok(result)
3500 }
3501
3502 pub(super) fn guest_read<T: 'static>(
3504 self,
3505 mut store: StoreContextMut<T>,
3506 caller: RuntimeComponentInstanceIndex,
3507 ty: TransmitIndex,
3508 options: OptionsIndex,
3509 flat_abi: Option<FlatAbi>,
3510 handle: u32,
3511 address: u32,
3512 count: u32,
3513 ) -> Result<ReturnCode> {
3514 if !self.options(store.0, options).async_ {
3515 store.0.check_blocking()?;
3519 }
3520
3521 let address = usize::try_from(address).unwrap();
3522 let count = usize::try_from(count).unwrap();
3523 self.check_bounds(store.0, options, ty, address, count)?;
3524 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3525 let TransmitLocalState::Read { done } = *state else {
3526 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3527 };
3528
3529 if done {
3530 bail!("cannot read from stream after being notified that the writable end dropped");
3531 }
3532
3533 *state = TransmitLocalState::Busy;
3534 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3535 let concurrent_state = store.0.concurrent_state_mut();
3536 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3537 let transmit = concurrent_state.get_mut(transmit_id)?;
3538 log::trace!(
3539 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3540 transmit.write
3541 );
3542
3543 if transmit.done {
3544 bail!("cannot read from future after previous read succeeded");
3545 }
3546
3547 let new_state = if let WriteState::Dropped = &transmit.write {
3548 WriteState::Dropped
3549 } else {
3550 WriteState::Open
3551 };
3552
3553 let set_guest_ready = |me: &mut ConcurrentState| {
3554 let transmit = me.get_mut(transmit_id)?;
3555 assert!(
3556 matches!(&transmit.read, ReadState::Open),
3557 "expected `ReadState::Open`; got `{:?}`",
3558 transmit.read
3559 );
3560 transmit.read = ReadState::GuestReady {
3561 ty,
3562 flat_abi,
3563 options,
3564 address,
3565 count,
3566 handle,
3567 instance: self,
3568 caller,
3569 };
3570 Ok::<_, crate::Error>(())
3571 };
3572
3573 let mut result = match mem::replace(&mut transmit.write, new_state) {
3574 WriteState::GuestReady {
3575 instance: _,
3576 ty: write_ty,
3577 flat_abi: write_flat_abi,
3578 options: write_options,
3579 address: write_address,
3580 count: write_count,
3581 handle: write_handle,
3582 caller: write_caller,
3583 } => {
3584 assert_eq!(flat_abi, write_flat_abi);
3585
3586 if let TransmitIndex::Future(_) = ty {
3587 transmit.done = true;
3588 }
3589
3590 let write_handle_rep = transmit.write_handle.rep();
3591
3592 let write_complete = write_count == 0 || count > 0;
3597 let read_complete = write_count > 0;
3598 let write_buffer_remaining = count < write_count;
3599
3600 let count = count.min(write_count);
3601
3602 self.copy(
3603 store.as_context_mut(),
3604 flat_abi,
3605 write_caller,
3606 write_ty,
3607 write_options,
3608 write_address,
3609 caller,
3610 ty,
3611 options,
3612 address,
3613 count,
3614 rep,
3615 )?;
3616
3617 let instance = self.id().get_mut(store.0);
3618 let types = instance.component().types();
3619 let item_size = payload(ty, types)
3620 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3621 .unwrap_or(0);
3622 let concurrent_state = store.0.concurrent_state_mut();
3623
3624 if write_complete {
3625 let count = u32::try_from(count).unwrap();
3626 let total = if let Some(Event::StreamWrite {
3627 code: ReturnCode::Completed(old_total),
3628 ..
3629 }) = concurrent_state.take_event(write_handle_rep)?
3630 {
3631 count + old_total
3632 } else {
3633 count
3634 };
3635
3636 let code = ReturnCode::completed(ty.kind(), total);
3637
3638 concurrent_state.send_write_result(
3639 write_ty,
3640 transmit_id,
3641 write_handle,
3642 code,
3643 )?;
3644 }
3645
3646 if write_buffer_remaining {
3647 let transmit = concurrent_state.get_mut(transmit_id)?;
3648 transmit.write = WriteState::GuestReady {
3649 instance: self,
3650 caller: write_caller,
3651 ty: write_ty,
3652 flat_abi: write_flat_abi,
3653 options: write_options,
3654 address: write_address + (count * item_size),
3655 count: write_count - count,
3656 handle: write_handle,
3657 };
3658 }
3659
3660 if read_complete {
3661 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3662 } else {
3663 set_guest_ready(concurrent_state)?;
3664 ReturnCode::Blocked
3665 }
3666 }
3667
3668 WriteState::HostReady {
3669 produce,
3670 try_into,
3671 guest_offset,
3672 cancel,
3673 cancel_waker,
3674 } => {
3675 assert!(cancel_waker.is_none());
3676 assert!(!cancel);
3677 assert_eq!(0, guest_offset);
3678
3679 set_guest_ready(concurrent_state)?;
3680
3681 let code =
3682 self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3683
3684 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3685 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3686 }
3687
3688 code
3689 }
3690
3691 WriteState::Open => {
3692 set_guest_ready(concurrent_state)?;
3693 ReturnCode::Blocked
3694 }
3695
3696 WriteState::Dropped => ReturnCode::Dropped(0),
3697 };
3698
3699 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3700 result = self.wait_for_read(store.0, transmit_handle)?;
3701 }
3702
3703 if result != ReturnCode::Blocked {
3704 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3705 TransmitLocalState::Read {
3706 done: matches!(
3707 (result, ty),
3708 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3709 ),
3710 };
3711 }
3712
3713 log::trace!(
3714 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3715 );
3716
3717 Ok(result)
3718 }
3719
3720 fn wait_for_write(
3721 self,
3722 store: &mut StoreOpaque,
3723 handle: TableId<TransmitHandle>,
3724 ) -> Result<ReturnCode> {
3725 let waitable = Waitable::Transmit(handle);
3726 store.wait_for_event(waitable)?;
3727 let event = waitable.take_event(store.concurrent_state_mut())?;
3728 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3729 event
3730 {
3731 waitable.on_delivery(store, self, event);
3732 Ok(code)
3733 } else {
3734 unreachable!()
3735 }
3736 }
3737
3738 fn cancel_write(
3740 self,
3741 store: &mut StoreOpaque,
3742 transmit_id: TableId<TransmitState>,
3743 async_: bool,
3744 ) -> Result<ReturnCode> {
3745 let state = store.concurrent_state_mut();
3746 let transmit = state.get_mut(transmit_id)?;
3747 log::trace!(
3748 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3749 transmit.read,
3750 transmit.write
3751 );
3752
3753 let code = if let Some(event) =
3754 Waitable::Transmit(transmit.write_handle).take_event(state)?
3755 {
3756 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3757 unreachable!();
3758 };
3759 match (code, event) {
3760 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3761 ReturnCode::Cancelled(count)
3762 }
3763 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3764 _ => unreachable!(),
3765 }
3766 } else if let ReadState::HostReady {
3767 cancel,
3768 cancel_waker,
3769 ..
3770 } = &mut state.get_mut(transmit_id)?.read
3771 {
3772 *cancel = true;
3773 if let Some(waker) = cancel_waker.take() {
3774 waker.wake();
3775 }
3776
3777 if async_ {
3778 ReturnCode::Blocked
3779 } else {
3780 let handle = store
3781 .concurrent_state_mut()
3782 .get_mut(transmit_id)?
3783 .write_handle;
3784 self.wait_for_write(store, handle)?
3785 }
3786 } else {
3787 ReturnCode::Cancelled(0)
3788 };
3789
3790 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3791
3792 match &transmit.write {
3793 WriteState::GuestReady { .. } => {
3794 transmit.write = WriteState::Open;
3795 }
3796 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3797 WriteState::Open | WriteState::Dropped => {}
3798 }
3799
3800 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3801
3802 Ok(code)
3803 }
3804
3805 fn wait_for_read(
3806 self,
3807 store: &mut StoreOpaque,
3808 handle: TableId<TransmitHandle>,
3809 ) -> Result<ReturnCode> {
3810 let waitable = Waitable::Transmit(handle);
3811 store.wait_for_event(waitable)?;
3812 let event = waitable.take_event(store.concurrent_state_mut())?;
3813 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3814 event
3815 {
3816 waitable.on_delivery(store, self, event);
3817 Ok(code)
3818 } else {
3819 unreachable!()
3820 }
3821 }
3822
3823 fn cancel_read(
3825 self,
3826 store: &mut StoreOpaque,
3827 transmit_id: TableId<TransmitState>,
3828 async_: bool,
3829 ) -> Result<ReturnCode> {
3830 let state = store.concurrent_state_mut();
3831 let transmit = state.get_mut(transmit_id)?;
3832 log::trace!(
3833 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3834 transmit.read,
3835 transmit.write
3836 );
3837
3838 let code = if let Some(event) =
3839 Waitable::Transmit(transmit.read_handle).take_event(state)?
3840 {
3841 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3842 unreachable!();
3843 };
3844 match (code, event) {
3845 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3846 ReturnCode::Cancelled(count)
3847 }
3848 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3849 _ => unreachable!(),
3850 }
3851 } else if let WriteState::HostReady {
3852 cancel,
3853 cancel_waker,
3854 ..
3855 } = &mut state.get_mut(transmit_id)?.write
3856 {
3857 *cancel = true;
3858 if let Some(waker) = cancel_waker.take() {
3859 waker.wake();
3860 }
3861
3862 if async_ {
3863 ReturnCode::Blocked
3864 } else {
3865 let handle = store
3866 .concurrent_state_mut()
3867 .get_mut(transmit_id)?
3868 .read_handle;
3869 self.wait_for_read(store, handle)?
3870 }
3871 } else {
3872 ReturnCode::Cancelled(0)
3873 };
3874
3875 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3876
3877 match &transmit.read {
3878 ReadState::GuestReady { .. } => {
3879 transmit.read = ReadState::Open;
3880 }
3881 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3882 todo!("support host read cancellation")
3883 }
3884 ReadState::Open | ReadState::Dropped => {}
3885 }
3886
3887 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3888
3889 Ok(code)
3890 }
3891
3892 fn guest_cancel_write(
3894 self,
3895 store: &mut StoreOpaque,
3896 ty: TransmitIndex,
3897 async_: bool,
3898 writer: u32,
3899 ) -> Result<ReturnCode> {
3900 if !async_ {
3901 store.check_blocking()?;
3905 }
3906
3907 let (rep, state) =
3908 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3909 let id = TableId::<TransmitHandle>::new(rep);
3910 log::trace!("guest cancel write {id:?} (handle {writer})");
3911 match state {
3912 TransmitLocalState::Write { .. } => {
3913 bail!("stream or future write cancelled when no write is pending")
3914 }
3915 TransmitLocalState::Read { .. } => {
3916 bail!("passed read end to `{{stream|future}}.cancel-write`")
3917 }
3918 TransmitLocalState::Busy => {}
3919 }
3920 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3921 let code = self.cancel_write(store, transmit_id, async_)?;
3922 if !matches!(code, ReturnCode::Blocked) {
3923 let state =
3924 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3925 .1;
3926 if let TransmitLocalState::Busy = state {
3927 *state = TransmitLocalState::Write { done: false };
3928 }
3929 }
3930 Ok(code)
3931 }
3932
3933 fn guest_cancel_read(
3935 self,
3936 store: &mut StoreOpaque,
3937 ty: TransmitIndex,
3938 async_: bool,
3939 reader: u32,
3940 ) -> Result<ReturnCode> {
3941 if !async_ {
3942 store.check_blocking()?;
3946 }
3947
3948 let (rep, state) =
3949 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3950 let id = TableId::<TransmitHandle>::new(rep);
3951 log::trace!("guest cancel read {id:?} (handle {reader})");
3952 match state {
3953 TransmitLocalState::Read { .. } => {
3954 bail!("stream or future read cancelled when no read is pending")
3955 }
3956 TransmitLocalState::Write { .. } => {
3957 bail!("passed write end to `{{stream|future}}.cancel-read`")
3958 }
3959 TransmitLocalState::Busy => {}
3960 }
3961 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3962 let code = self.cancel_read(store, transmit_id, async_)?;
3963 if !matches!(code, ReturnCode::Blocked) {
3964 let state =
3965 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3966 .1;
3967 if let TransmitLocalState::Busy = state {
3968 *state = TransmitLocalState::Read { done: false };
3969 }
3970 }
3971 Ok(code)
3972 }
3973
3974 fn guest_drop_readable(
3976 self,
3977 store: &mut StoreOpaque,
3978 ty: TransmitIndex,
3979 reader: u32,
3980 ) -> Result<()> {
3981 let table = self.id().get_mut(store).table_for_transmit(ty);
3982 let (rep, _is_done) = match ty {
3983 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3984 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3985 };
3986 let kind = match ty {
3987 TransmitIndex::Stream(_) => TransmitKind::Stream,
3988 TransmitIndex::Future(_) => TransmitKind::Future,
3989 };
3990 let id = TableId::<TransmitHandle>::new(rep);
3991 log::trace!("guest_drop_readable: drop reader {id:?}");
3992 store.host_drop_reader(id, kind)
3993 }
3994
3995 pub(crate) fn error_context_new(
3997 self,
3998 store: &mut StoreOpaque,
3999 caller: RuntimeComponentInstanceIndex,
4000 ty: TypeComponentLocalErrorContextTableIndex,
4001 options: OptionsIndex,
4002 debug_msg_address: u32,
4003 debug_msg_len: u32,
4004 ) -> Result<u32> {
4005 self.id().get(store).check_may_leave(caller)?;
4006 let lift_ctx = &mut LiftContext::new(store, options, self);
4007 let debug_msg = String::linear_lift_from_flat(
4008 lift_ctx,
4009 InterfaceType::String,
4010 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4011 )?;
4012
4013 let err_ctx = ErrorContextState { debug_msg };
4015 let state = store.concurrent_state_mut();
4016 let table_id = state.push(err_ctx)?;
4017 let global_ref_count_idx =
4018 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4019
4020 let _ = state
4022 .global_error_context_ref_counts
4023 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4024
4025 let local_idx = self
4032 .id()
4033 .get_mut(store)
4034 .table_for_error_context(ty)
4035 .error_context_insert(table_id.rep())?;
4036
4037 Ok(local_idx)
4038 }
4039
4040 pub(super) fn error_context_debug_message<T>(
4042 self,
4043 store: StoreContextMut<T>,
4044 ty: TypeComponentLocalErrorContextTableIndex,
4045 options: OptionsIndex,
4046 err_ctx_handle: u32,
4047 debug_msg_address: u32,
4048 ) -> Result<()> {
4049 let handle_table_id_rep = self
4051 .id()
4052 .get_mut(store.0)
4053 .table_for_error_context(ty)
4054 .error_context_rep(err_ctx_handle)?;
4055
4056 let state = store.0.concurrent_state_mut();
4057 let ErrorContextState { debug_msg } =
4059 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4060 let debug_msg = debug_msg.clone();
4061
4062 let lower_cx = &mut LowerContext::new(store, options, self);
4063 let debug_msg_address = usize::try_from(debug_msg_address)?;
4064 let offset = lower_cx
4066 .as_slice_mut()
4067 .get(debug_msg_address..)
4068 .and_then(|b| b.get(..debug_msg.bytes().len()))
4069 .map(|_| debug_msg_address)
4070 .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4071 debug_msg
4072 .as_str()
4073 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4074
4075 Ok(())
4076 }
4077
4078 pub(crate) fn future_cancel_read(
4080 self,
4081 store: &mut StoreOpaque,
4082 caller: RuntimeComponentInstanceIndex,
4083 ty: TypeFutureTableIndex,
4084 async_: bool,
4085 reader: u32,
4086 ) -> Result<u32> {
4087 self.id().get(store).check_may_leave(caller)?;
4088 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4089 .map(|v| v.encode())
4090 }
4091
4092 pub(crate) fn future_cancel_write(
4094 self,
4095 store: &mut StoreOpaque,
4096 caller: RuntimeComponentInstanceIndex,
4097 ty: TypeFutureTableIndex,
4098 async_: bool,
4099 writer: u32,
4100 ) -> Result<u32> {
4101 self.id().get(store).check_may_leave(caller)?;
4102 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4103 .map(|v| v.encode())
4104 }
4105
4106 pub(crate) fn stream_cancel_read(
4108 self,
4109 store: &mut StoreOpaque,
4110 caller: RuntimeComponentInstanceIndex,
4111 ty: TypeStreamTableIndex,
4112 async_: bool,
4113 reader: u32,
4114 ) -> Result<u32> {
4115 self.id().get(store).check_may_leave(caller)?;
4116 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4117 .map(|v| v.encode())
4118 }
4119
4120 pub(crate) fn stream_cancel_write(
4122 self,
4123 store: &mut StoreOpaque,
4124 caller: RuntimeComponentInstanceIndex,
4125 ty: TypeStreamTableIndex,
4126 async_: bool,
4127 writer: u32,
4128 ) -> Result<u32> {
4129 self.id().get(store).check_may_leave(caller)?;
4130 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4131 .map(|v| v.encode())
4132 }
4133
4134 pub(crate) fn future_drop_readable(
4136 self,
4137 store: &mut StoreOpaque,
4138 caller: RuntimeComponentInstanceIndex,
4139 ty: TypeFutureTableIndex,
4140 reader: u32,
4141 ) -> Result<()> {
4142 self.id().get(store).check_may_leave(caller)?;
4143 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4144 }
4145
4146 pub(crate) fn stream_drop_readable(
4148 self,
4149 store: &mut StoreOpaque,
4150 caller: RuntimeComponentInstanceIndex,
4151 ty: TypeStreamTableIndex,
4152 reader: u32,
4153 ) -> Result<()> {
4154 self.id().get(store).check_may_leave(caller)?;
4155 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4156 }
4157
4158 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4162 let (write, read) = store
4163 .concurrent_state_mut()
4164 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4165
4166 let table = self.id().get_mut(store).table_for_transmit(ty);
4167 let (read_handle, write_handle) = match ty {
4168 TransmitIndex::Future(ty) => (
4169 table.future_insert_read(ty, read.rep())?,
4170 table.future_insert_write(ty, write.rep())?,
4171 ),
4172 TransmitIndex::Stream(ty) => (
4173 table.stream_insert_read(ty, read.rep())?,
4174 table.stream_insert_write(ty, write.rep())?,
4175 ),
4176 };
4177
4178 let state = store.concurrent_state_mut();
4179 state.get_mut(read)?.common.handle = Some(read_handle);
4180 state.get_mut(write)?.common.handle = Some(write_handle);
4181
4182 Ok(ResourcePair {
4183 write: write_handle,
4184 read: read_handle,
4185 })
4186 }
4187
4188 pub(crate) fn error_context_drop(
4190 self,
4191 store: &mut StoreOpaque,
4192 caller: RuntimeComponentInstanceIndex,
4193 ty: TypeComponentLocalErrorContextTableIndex,
4194 error_context: u32,
4195 ) -> Result<()> {
4196 let instance = self.id().get_mut(store);
4197 instance.check_may_leave(caller)?;
4198
4199 let local_handle_table = instance.table_for_error_context(ty);
4200
4201 let rep = local_handle_table.error_context_drop(error_context)?;
4202
4203 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4204
4205 let state = store.concurrent_state_mut();
4206 let GlobalErrorContextRefCount(global_ref_count) = state
4207 .global_error_context_ref_counts
4208 .get_mut(&global_ref_count_idx)
4209 .expect("retrieve concurrent state for error context during drop");
4210
4211 assert!(*global_ref_count >= 1);
4213 *global_ref_count -= 1;
4214 if *global_ref_count == 0 {
4215 state
4216 .global_error_context_ref_counts
4217 .remove(&global_ref_count_idx);
4218
4219 state
4220 .delete(TableId::<ErrorContextState>::new(rep))
4221 .context("deleting component-global error context data")?;
4222 }
4223
4224 Ok(())
4225 }
4226
4227 fn guest_transfer(
4230 self,
4231 store: &mut StoreOpaque,
4232 src_idx: u32,
4233 src: TransmitIndex,
4234 dst: TransmitIndex,
4235 ) -> Result<u32> {
4236 let mut instance = self.id().get_mut(store);
4237 let src_table = instance.as_mut().table_for_transmit(src);
4238 let (rep, is_done) = match src {
4239 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4240 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4241 };
4242 if is_done {
4243 bail!("cannot lift after being notified that the writable end dropped");
4244 }
4245 let dst_table = instance.table_for_transmit(dst);
4246 let handle = match dst {
4247 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4248 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4249 }?;
4250 store
4251 .concurrent_state_mut()
4252 .get_mut(TableId::<TransmitHandle>::new(rep))?
4253 .common
4254 .handle = Some(handle);
4255 Ok(handle)
4256 }
4257
4258 pub(crate) fn future_new(
4260 self,
4261 store: &mut StoreOpaque,
4262 caller: RuntimeComponentInstanceIndex,
4263 ty: TypeFutureTableIndex,
4264 ) -> Result<ResourcePair> {
4265 self.id().get(store).check_may_leave(caller)?;
4266 self.guest_new(store, TransmitIndex::Future(ty))
4267 }
4268
4269 pub(crate) fn stream_new(
4271 self,
4272 store: &mut StoreOpaque,
4273 caller: RuntimeComponentInstanceIndex,
4274 ty: TypeStreamTableIndex,
4275 ) -> Result<ResourcePair> {
4276 self.id().get(store).check_may_leave(caller)?;
4277 self.guest_new(store, TransmitIndex::Stream(ty))
4278 }
4279
4280 pub(crate) fn future_transfer(
4283 self,
4284 store: &mut StoreOpaque,
4285 src_idx: u32,
4286 src: TypeFutureTableIndex,
4287 dst: TypeFutureTableIndex,
4288 ) -> Result<u32> {
4289 self.guest_transfer(
4290 store,
4291 src_idx,
4292 TransmitIndex::Future(src),
4293 TransmitIndex::Future(dst),
4294 )
4295 }
4296
4297 pub(crate) fn stream_transfer(
4300 self,
4301 store: &mut StoreOpaque,
4302 src_idx: u32,
4303 src: TypeStreamTableIndex,
4304 dst: TypeStreamTableIndex,
4305 ) -> Result<u32> {
4306 self.guest_transfer(
4307 store,
4308 src_idx,
4309 TransmitIndex::Stream(src),
4310 TransmitIndex::Stream(dst),
4311 )
4312 }
4313
4314 pub(crate) fn error_context_transfer(
4316 self,
4317 store: &mut StoreOpaque,
4318 src_idx: u32,
4319 src: TypeComponentLocalErrorContextTableIndex,
4320 dst: TypeComponentLocalErrorContextTableIndex,
4321 ) -> Result<u32> {
4322 let mut instance = self.id().get_mut(store);
4323 let rep = instance
4324 .as_mut()
4325 .table_for_error_context(src)
4326 .error_context_rep(src_idx)?;
4327 let dst_idx = instance
4328 .table_for_error_context(dst)
4329 .error_context_insert(rep)?;
4330
4331 let global_ref_count = store
4335 .concurrent_state_mut()
4336 .global_error_context_ref_counts
4337 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4338 .context("global ref count present for existing (sub)component error context")?;
4339 global_ref_count.0 += 1;
4340
4341 Ok(dst_idx)
4342 }
4343}
4344
4345impl ComponentInstance {
4346 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4347 let (states, types) = self.instance_states();
4348 let runtime_instance = match ty {
4349 TransmitIndex::Stream(ty) => types[ty].instance,
4350 TransmitIndex::Future(ty) => types[ty].instance,
4351 };
4352 states[runtime_instance].handle_table()
4353 }
4354
4355 fn table_for_error_context(
4356 self: Pin<&mut Self>,
4357 ty: TypeComponentLocalErrorContextTableIndex,
4358 ) -> &mut HandleTable {
4359 let (states, types) = self.instance_states();
4360 let runtime_instance = types[ty].instance;
4361 states[runtime_instance].handle_table()
4362 }
4363
4364 fn get_mut_by_index(
4365 self: Pin<&mut Self>,
4366 ty: TransmitIndex,
4367 index: u32,
4368 ) -> Result<(u32, &mut TransmitLocalState)> {
4369 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4370 }
4371}
4372
4373impl ConcurrentState {
4374 fn send_write_result(
4375 &mut self,
4376 ty: TransmitIndex,
4377 id: TableId<TransmitState>,
4378 handle: u32,
4379 code: ReturnCode,
4380 ) -> Result<()> {
4381 let write_handle = self.get_mut(id)?.write_handle.rep();
4382 self.set_event(
4383 write_handle,
4384 match ty {
4385 TransmitIndex::Future(ty) => Event::FutureWrite {
4386 code,
4387 pending: Some((ty, handle)),
4388 },
4389 TransmitIndex::Stream(ty) => Event::StreamWrite {
4390 code,
4391 pending: Some((ty, handle)),
4392 },
4393 },
4394 )
4395 }
4396
4397 fn send_read_result(
4398 &mut self,
4399 ty: TransmitIndex,
4400 id: TableId<TransmitState>,
4401 handle: u32,
4402 code: ReturnCode,
4403 ) -> Result<()> {
4404 let read_handle = self.get_mut(id)?.read_handle.rep();
4405 self.set_event(
4406 read_handle,
4407 match ty {
4408 TransmitIndex::Future(ty) => Event::FutureRead {
4409 code,
4410 pending: Some((ty, handle)),
4411 },
4412 TransmitIndex::Stream(ty) => Event::StreamRead {
4413 code,
4414 pending: Some((ty, handle)),
4415 },
4416 },
4417 )
4418 }
4419
4420 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4421 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4422 }
4423
4424 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4425 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4426 }
4427
4428 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4439 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4440
4441 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4442 let (ReturnCode::Completed(count)
4443 | ReturnCode::Dropped(count)
4444 | ReturnCode::Cancelled(count)) = old
4445 else {
4446 unreachable!()
4447 };
4448
4449 match new {
4450 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4451 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4452 _ => unreachable!(),
4453 }
4454 }
4455
4456 let event = match (waitable.take_event(self)?, event) {
4457 (None, _) => event,
4458 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4459 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4460 (
4461 Some(Event::StreamWrite {
4462 code: old_code,
4463 pending: old_pending,
4464 }),
4465 Event::StreamWrite { code, pending },
4466 ) => Event::StreamWrite {
4467 code: update_code(old_code, code),
4468 pending: old_pending.or(pending),
4469 },
4470 (
4471 Some(Event::StreamRead {
4472 code: old_code,
4473 pending: old_pending,
4474 }),
4475 Event::StreamRead { code, pending },
4476 ) => Event::StreamRead {
4477 code: update_code(old_code, code),
4478 pending: old_pending.or(pending),
4479 },
4480 _ => unreachable!(),
4481 };
4482
4483 waitable.set_event(self, Some(event))
4484 }
4485
4486 fn new_transmit(
4489 &mut self,
4490 origin: TransmitOrigin,
4491 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4492 let state_id = self.push(TransmitState::new(origin))?;
4493
4494 let write = self.push(TransmitHandle::new(state_id))?;
4495 let read = self.push(TransmitHandle::new(state_id))?;
4496
4497 let state = self.get_mut(state_id)?;
4498 state.write_handle = write;
4499 state.read_handle = read;
4500
4501 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4502
4503 Ok((write, read))
4504 }
4505
4506 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4508 let state = self.delete(state_id)?;
4509 self.delete(state.write_handle)?;
4510 self.delete(state.read_handle)?;
4511
4512 log::trace!(
4513 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4514 state.write_handle,
4515 state.read_handle,
4516 );
4517
4518 Ok(())
4519 }
4520}
4521
4522pub(crate) struct ResourcePair {
4523 pub(crate) write: u32,
4524 pub(crate) read: u32,
4525}
4526
4527impl Waitable {
4528 pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4531 match event {
4532 Event::FutureRead {
4533 pending: Some((ty, handle)),
4534 ..
4535 }
4536 | Event::FutureWrite {
4537 pending: Some((ty, handle)),
4538 ..
4539 } => {
4540 let instance = instance.id().get_mut(store);
4541 let runtime_instance = instance.component().types()[ty].instance;
4542 let (rep, state) = instance.instance_states().0[runtime_instance]
4543 .handle_table()
4544 .future_rep(ty, handle)
4545 .unwrap();
4546 assert_eq!(rep, self.rep());
4547 assert_eq!(*state, TransmitLocalState::Busy);
4548 *state = match event {
4549 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4550 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4551 _ => unreachable!(),
4552 };
4553 }
4554 Event::StreamRead {
4555 pending: Some((ty, handle)),
4556 code,
4557 }
4558 | Event::StreamWrite {
4559 pending: Some((ty, handle)),
4560 code,
4561 } => {
4562 let instance = instance.id().get_mut(store);
4563 let runtime_instance = instance.component().types()[ty].instance;
4564 let (rep, state) = instance.instance_states().0[runtime_instance]
4565 .handle_table()
4566 .stream_rep(ty, handle)
4567 .unwrap();
4568 assert_eq!(rep, self.rep());
4569 assert_eq!(*state, TransmitLocalState::Busy);
4570 let done = matches!(code, ReturnCode::Dropped(_));
4571 *state = match event {
4572 Event::StreamRead { .. } => TransmitLocalState::Read { done },
4573 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4574 _ => unreachable!(),
4575 };
4576
4577 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4578 let state = store.concurrent_state_mut();
4579 let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4580 let transmit = state.get_mut(transmit_id).unwrap();
4581
4582 match event {
4583 Event::StreamRead { .. } => {
4584 transmit.read = ReadState::Open;
4585 }
4586 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4587 _ => unreachable!(),
4588 };
4589 }
4590 _ => {}
4591 }
4592 }
4593}
4594
4595fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4599 matches!(
4600 ty,
4601 None | Some(
4602 InterfaceType::S8
4603 | InterfaceType::U8
4604 | InterfaceType::S16
4605 | InterfaceType::U16
4606 | InterfaceType::S32
4607 | InterfaceType::U32
4608 | InterfaceType::S64
4609 | InterfaceType::U64
4610 | InterfaceType::Float32
4611 | InterfaceType::Float64
4612 )
4613 )
4614}
4615
4616#[cfg(test)]
4617mod tests {
4618 use super::*;
4619 use crate::{Engine, Store};
4620 use core::future::pending;
4621 use core::pin::pin;
4622 use std::sync::LazyLock;
4623
4624 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4625
4626 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4627 where
4628 T: FutureProducer<()>,
4629 {
4630 rx.poll_produce(
4631 &mut Context::from_waker(Waker::noop()),
4632 Store::new(&ENGINE, ()).as_context_mut(),
4633 finish,
4634 )
4635 }
4636
4637 #[test]
4638 fn future_producer() {
4639 let mut fut = pin!(async { crate::error::Ok(()) });
4640 assert!(matches!(
4641 poll_future_producer(fut.as_mut(), false),
4642 Poll::Ready(Ok(Some(()))),
4643 ));
4644
4645 let mut fut = pin!(async { crate::error::Ok(()) });
4646 assert!(matches!(
4647 poll_future_producer(fut.as_mut(), true),
4648 Poll::Ready(Ok(Some(()))),
4649 ));
4650
4651 let mut fut = pin!(pending::<Result<()>>());
4652 assert!(matches!(
4653 poll_future_producer(fut.as_mut(), false),
4654 Poll::Pending,
4655 ));
4656 assert!(matches!(
4657 poll_future_producer(fut.as_mut(), true),
4658 Poll::Ready(Ok(None)),
4659 ));
4660
4661 let (tx, rx) = oneshot::channel();
4662 let mut rx = pin!(rx);
4663 assert!(matches!(
4664 poll_future_producer(rx.as_mut(), false),
4665 Poll::Pending,
4666 ));
4667 assert!(matches!(
4668 poll_future_producer(rx.as_mut(), true),
4669 Poll::Ready(Ok(None)),
4670 ));
4671 tx.send(()).unwrap();
4672 assert!(matches!(
4673 poll_future_producer(rx.as_mut(), true),
4674 Poll::Ready(Ok(Some(()))),
4675 ));
4676
4677 let (tx, rx) = oneshot::channel();
4678 let mut rx = pin!(rx);
4679 tx.send(()).unwrap();
4680 assert!(matches!(
4681 poll_future_producer(rx.as_mut(), false),
4682 Poll::Ready(Ok(Some(()))),
4683 ));
4684
4685 let (tx, rx) = oneshot::channel::<()>();
4686 let mut rx = pin!(rx);
4687 drop(tx);
4688 assert!(matches!(
4689 poll_future_producer(rx.as_mut(), false),
4690 Poll::Ready(Err(..)),
4691 ));
4692
4693 let (tx, rx) = oneshot::channel::<()>();
4694 let mut rx = pin!(rx);
4695 drop(tx);
4696 assert!(matches!(
4697 poll_future_producer(rx.as_mut(), true),
4698 Poll::Ready(Err(..)),
4699 ));
4700 }
4701}