1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17 Error, Result, bail,
18 error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, MaybeUninit};
26use core::pin::Pin;
27use core::task::{Context, Poll, Waker, ready};
28use futures::channel::oneshot;
29use futures::{FutureExt as _, stream};
30use std::any::{Any, TypeId};
31use std::boxed::Box;
32use std::io::Cursor;
33use std::string::String;
34use std::sync::{Arc, Mutex};
35use std::vec::Vec;
36use wasmtime_environ::component::{
37 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
38 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
39 TypeFutureTableIndex, TypeStreamTableIndex,
40};
41
42pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
43
44mod buffers;
45
46#[derive(Copy, Clone, Debug)]
49pub enum TransmitKind {
50 Stream,
51 Future,
52}
53
54#[derive(Copy, Clone, Debug, PartialEq)]
56pub enum ReturnCode {
57 Blocked,
58 Completed(u32),
59 Dropped(u32),
60 Cancelled(u32),
61}
62
63impl ReturnCode {
64 pub fn encode(&self) -> u32 {
69 const BLOCKED: u32 = 0xffff_ffff;
70 const COMPLETED: u32 = 0x0;
71 const DROPPED: u32 = 0x1;
72 const CANCELLED: u32 = 0x2;
73 match self {
74 ReturnCode::Blocked => BLOCKED,
75 ReturnCode::Completed(n) => {
76 debug_assert!(*n < (1 << 28));
77 (n << 4) | COMPLETED
78 }
79 ReturnCode::Dropped(n) => {
80 debug_assert!(*n < (1 << 28));
81 (n << 4) | DROPPED
82 }
83 ReturnCode::Cancelled(n) => {
84 debug_assert!(*n < (1 << 28));
85 (n << 4) | CANCELLED
86 }
87 }
88 }
89
90 fn completed(kind: TransmitKind, count: u32) -> Self {
93 Self::Completed(if let TransmitKind::Future = kind {
94 0
95 } else {
96 count
97 })
98 }
99}
100
101#[derive(Copy, Clone, Debug)]
106pub enum TransmitIndex {
107 Stream(TypeStreamTableIndex),
108 Future(TypeFutureTableIndex),
109}
110
111impl TransmitIndex {
112 pub fn kind(&self) -> TransmitKind {
113 match self {
114 TransmitIndex::Stream(_) => TransmitKind::Stream,
115 TransmitIndex::Future(_) => TransmitKind::Future,
116 }
117 }
118}
119
120fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
123 match ty {
124 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
125 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
126 }
127}
128
129fn get_mut_by_index_from(
132 handle_table: &mut HandleTable,
133 ty: TransmitIndex,
134 index: u32,
135) -> Result<(u32, &mut TransmitLocalState)> {
136 match ty {
137 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
138 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
139 }
140}
141
142fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
143 mut store: StoreContextMut<U>,
144 instance: Instance,
145 options: OptionsIndex,
146 ty: TransmitIndex,
147 address: usize,
148 count: usize,
149 buffer: &mut B,
150) -> Result<()> {
151 let count = buffer.remaining().len().min(count);
152
153 let lower = &mut if T::MAY_REQUIRE_REALLOC {
154 LowerContext::new
155 } else {
156 LowerContext::new_without_realloc
157 }(store.as_context_mut(), options, instance);
158
159 if address % usize::try_from(T::ALIGN32)? != 0 {
160 bail!("read pointer not aligned");
161 }
162 lower
163 .as_slice_mut()
164 .get_mut(address..)
165 .and_then(|b| b.get_mut(..T::SIZE32 * count))
166 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
167
168 if let Some(ty) = payload(ty, lower.types) {
169 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
170 }
171
172 buffer.skip(count);
173
174 Ok(())
175}
176
177fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
178 lift: &mut LiftContext<'_>,
179 ty: Option<InterfaceType>,
180 buffer: &mut B,
181 address: usize,
182 count: usize,
183) -> Result<()> {
184 let count = count.min(buffer.remaining_capacity());
185 if T::IS_RUST_UNIT_TYPE {
186 buffer.extend(
190 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
191 )
192 } else {
193 let ty = ty.unwrap();
194 if address % usize::try_from(T::ALIGN32)? != 0 {
195 bail!("write pointer not aligned");
196 }
197 lift.memory()
198 .get(address..)
199 .and_then(|b| b.get(..T::SIZE32 * count))
200 .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
201
202 let list = &WasmList::new(address, count, lift, ty)?;
203 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
204 }
205 Ok(())
206}
207
208#[derive(Debug, PartialEq, Eq, PartialOrd)]
210pub(super) struct ErrorContextState {
211 pub(crate) debug_msg: String,
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub(super) struct FlatAbi {
219 pub(super) size: u32,
220 pub(super) align: u32,
221}
222
223pub struct Destination<'a, T, B> {
225 id: TableId<TransmitState>,
226 buffer: &'a mut B,
227 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
228 _phantom: PhantomData<fn() -> T>,
229}
230
231impl<'a, T, B> Destination<'a, T, B> {
232 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
234 Destination {
235 id: self.id,
236 buffer: &mut *self.buffer,
237 host_buffer: self.host_buffer.as_deref_mut(),
238 _phantom: PhantomData,
239 }
240 }
241
242 pub fn take_buffer(&mut self) -> B
248 where
249 B: Default,
250 {
251 mem::take(self.buffer)
252 }
253
254 pub fn set_buffer(&mut self, buffer: B) {
264 *self.buffer = buffer;
265 }
266
267 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
284 let transmit = store
285 .as_context_mut()
286 .0
287 .concurrent_state_mut()
288 .get_mut(self.id)
289 .unwrap();
290
291 if let &ReadState::GuestReady { count, .. } = &transmit.read {
292 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
293 unreachable!()
294 };
295
296 Some(count - guest_offset)
297 } else {
298 None
299 }
300 }
301}
302
303impl<'a, B> Destination<'a, u8, B> {
304 pub fn as_direct<D>(
315 mut self,
316 store: StoreContextMut<'a, D>,
317 capacity: usize,
318 ) -> DirectDestination<'a, D> {
319 if let Some(buffer) = self.host_buffer.as_deref_mut() {
320 buffer.set_position(0);
321 if buffer.get_mut().is_empty() {
322 buffer.get_mut().resize(capacity, 0);
323 }
324 }
325
326 DirectDestination {
327 id: self.id,
328 host_buffer: self.host_buffer,
329 store,
330 }
331 }
332}
333
334pub struct DirectDestination<'a, D: 'static> {
337 id: TableId<TransmitState>,
338 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
339 store: StoreContextMut<'a, D>,
340}
341
342impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
343 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
344 let rem = self.remaining();
345 let n = rem.len().min(buf.len());
346 rem[..n].copy_from_slice(&buf[..n]);
347 self.mark_written(n);
348 Ok(n)
349 }
350
351 fn flush(&mut self) -> std::io::Result<()> {
352 Ok(())
353 }
354}
355
356impl<D: 'static> DirectDestination<'_, D> {
357 pub fn remaining(&mut self) -> &mut [u8] {
359 if let Some(buffer) = self.host_buffer.as_deref_mut() {
360 buffer.get_mut()
361 } else {
362 let transmit = self
363 .store
364 .as_context_mut()
365 .0
366 .concurrent_state_mut()
367 .get_mut(self.id)
368 .unwrap();
369
370 let &ReadState::GuestReady {
371 address,
372 count,
373 options,
374 instance,
375 ..
376 } = &transmit.read
377 else {
378 unreachable!();
379 };
380
381 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
382 unreachable!()
383 };
384
385 instance
386 .options_memory_mut(self.store.0, options)
387 .get_mut((address + guest_offset)..)
388 .and_then(|b| b.get_mut(..(count - guest_offset)))
389 .unwrap()
390 }
391 }
392
393 pub fn mark_written(&mut self, count: usize) {
398 if let Some(buffer) = self.host_buffer.as_deref_mut() {
399 buffer.set_position(
400 buffer
401 .position()
402 .checked_add(u64::try_from(count).unwrap())
403 .unwrap(),
404 );
405 } else {
406 let transmit = self
407 .store
408 .as_context_mut()
409 .0
410 .concurrent_state_mut()
411 .get_mut(self.id)
412 .unwrap();
413
414 let ReadState::GuestReady {
415 count: read_count, ..
416 } = &transmit.read
417 else {
418 unreachable!();
419 };
420
421 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
422 unreachable!()
423 };
424
425 if *guest_offset + count > *read_count {
426 panic!(
427 "write count ({count}) must be less than or equal to read count ({read_count})"
428 )
429 } else {
430 *guest_offset += count;
431 }
432 }
433 }
434}
435
436#[derive(Copy, Clone, Debug)]
438pub enum StreamResult {
439 Completed,
442 Cancelled,
447 Dropped,
450}
451
452pub trait StreamProducer<D>: Send + 'static {
454 type Item;
456
457 type Buffer: WriteBuffer<Self::Item> + Default;
459
460 fn poll_produce<'a>(
596 self: Pin<&mut Self>,
597 cx: &mut Context<'_>,
598 store: StoreContextMut<'a, D>,
599 destination: Destination<'a, Self::Item, Self::Buffer>,
600 finish: bool,
601 ) -> Poll<Result<StreamResult>>;
602
603 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
609 Err(me)
610 }
611}
612
613impl<T, D> StreamProducer<D> for iter::Empty<T>
614where
615 T: Send + Sync + 'static,
616{
617 type Item = T;
618 type Buffer = Option<Self::Item>;
619
620 fn poll_produce<'a>(
621 self: Pin<&mut Self>,
622 _: &mut Context<'_>,
623 _: StoreContextMut<'a, D>,
624 _: Destination<'a, Self::Item, Self::Buffer>,
625 _: bool,
626 ) -> Poll<Result<StreamResult>> {
627 Poll::Ready(Ok(StreamResult::Dropped))
628 }
629}
630
631impl<T, D> StreamProducer<D> for stream::Empty<T>
632where
633 T: Send + Sync + 'static,
634{
635 type Item = T;
636 type Buffer = Option<Self::Item>;
637
638 fn poll_produce<'a>(
639 self: Pin<&mut Self>,
640 _: &mut Context<'_>,
641 _: StoreContextMut<'a, D>,
642 _: Destination<'a, Self::Item, Self::Buffer>,
643 _: bool,
644 ) -> Poll<Result<StreamResult>> {
645 Poll::Ready(Ok(StreamResult::Dropped))
646 }
647}
648
649impl<T, D> StreamProducer<D> for Vec<T>
650where
651 T: Unpin + Send + Sync + 'static,
652{
653 type Item = T;
654 type Buffer = VecBuffer<T>;
655
656 fn poll_produce<'a>(
657 self: Pin<&mut Self>,
658 _: &mut Context<'_>,
659 _: StoreContextMut<'a, D>,
660 mut dst: Destination<'a, Self::Item, Self::Buffer>,
661 _: bool,
662 ) -> Poll<Result<StreamResult>> {
663 dst.set_buffer(mem::take(self.get_mut()).into());
664 Poll::Ready(Ok(StreamResult::Dropped))
665 }
666}
667
668impl<T, D> StreamProducer<D> for Box<[T]>
669where
670 T: Unpin + Send + Sync + 'static,
671{
672 type Item = T;
673 type Buffer = VecBuffer<T>;
674
675 fn poll_produce<'a>(
676 self: Pin<&mut Self>,
677 _: &mut Context<'_>,
678 _: StoreContextMut<'a, D>,
679 mut dst: Destination<'a, Self::Item, Self::Buffer>,
680 _: bool,
681 ) -> Poll<Result<StreamResult>> {
682 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
683 Poll::Ready(Ok(StreamResult::Dropped))
684 }
685}
686
687#[cfg(feature = "component-model-async-bytes")]
688impl<D> StreamProducer<D> for bytes::Bytes {
689 type Item = u8;
690 type Buffer = Cursor<Self>;
691
692 fn poll_produce<'a>(
693 mut self: Pin<&mut Self>,
694 _: &mut Context<'_>,
695 mut store: StoreContextMut<'a, D>,
696 mut dst: Destination<'a, Self::Item, Self::Buffer>,
697 _: bool,
698 ) -> Poll<Result<StreamResult>> {
699 let cap = dst.remaining(&mut store);
700 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
701 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
703 return Poll::Ready(Ok(StreamResult::Dropped));
704 };
705 let cap = cap.into();
706 dst.set_buffer(Cursor::new(self.split_off(cap)));
708 let mut dst = dst.as_direct(store, cap);
709 dst.remaining().copy_from_slice(&self);
710 dst.mark_written(cap);
711 Poll::Ready(Ok(StreamResult::Dropped))
712 }
713}
714
715#[cfg(feature = "component-model-async-bytes")]
716impl<D> StreamProducer<D> for bytes::BytesMut {
717 type Item = u8;
718 type Buffer = Cursor<Self>;
719
720 fn poll_produce<'a>(
721 mut self: Pin<&mut Self>,
722 _: &mut Context<'_>,
723 mut store: StoreContextMut<'a, D>,
724 mut dst: Destination<'a, Self::Item, Self::Buffer>,
725 _: bool,
726 ) -> Poll<Result<StreamResult>> {
727 let cap = dst.remaining(&mut store);
728 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
729 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
731 return Poll::Ready(Ok(StreamResult::Dropped));
732 };
733 let cap = cap.into();
734 dst.set_buffer(Cursor::new(self.split_off(cap)));
736 let mut dst = dst.as_direct(store, cap);
737 dst.remaining().copy_from_slice(&self);
738 dst.mark_written(cap);
739 Poll::Ready(Ok(StreamResult::Dropped))
740 }
741}
742
743pub struct Source<'a, T> {
745 id: TableId<TransmitState>,
746 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
747}
748
749impl<'a, T> Source<'a, T> {
750 pub fn reborrow(&mut self) -> Source<'_, T> {
752 Source {
753 id: self.id,
754 host_buffer: self.host_buffer.as_deref_mut(),
755 }
756 }
757
758 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
760 where
761 T: func::Lift + 'static,
762 B: ReadBuffer<T>,
763 {
764 if let Some(input) = &mut self.host_buffer {
765 let count = input.remaining().len().min(buffer.remaining_capacity());
766 buffer.move_from(*input, count);
767 } else {
768 let store = store.as_context_mut();
769 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
770
771 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
772 unreachable!();
773 };
774
775 let &WriteState::GuestReady {
776 ty,
777 address,
778 count,
779 options,
780 instance,
781 ..
782 } = &transmit.write
783 else {
784 unreachable!()
785 };
786
787 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
788 let ty = payload(ty, cx.types);
789 let old_remaining = buffer.remaining_capacity();
790 lift::<T, B>(
791 cx,
792 ty,
793 buffer,
794 address + (T::SIZE32 * guest_offset),
795 count - guest_offset,
796 )?;
797
798 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
799
800 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
801 unreachable!();
802 };
803
804 *guest_offset += old_remaining - buffer.remaining_capacity();
805 }
806
807 Ok(())
808 }
809
810 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
813 where
814 T: 'static,
815 {
816 let transmit = store
817 .as_context_mut()
818 .0
819 .concurrent_state_mut()
820 .get_mut(self.id)
821 .unwrap();
822
823 if let &WriteState::GuestReady { count, .. } = &transmit.write {
824 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
825 unreachable!()
826 };
827
828 count - guest_offset
829 } else if let Some(host_buffer) = &self.host_buffer {
830 host_buffer.remaining().len()
831 } else {
832 unreachable!()
833 }
834 }
835}
836
837impl<'a> Source<'a, u8> {
838 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
840 DirectSource {
841 id: self.id,
842 host_buffer: self.host_buffer,
843 store,
844 }
845 }
846}
847
848pub struct DirectSource<'a, D: 'static> {
851 id: TableId<TransmitState>,
852 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
853 store: StoreContextMut<'a, D>,
854}
855
856impl<D: 'static> std::io::Read for DirectSource<'_, D> {
857 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
858 let rem = self.remaining();
859 let n = rem.len().min(buf.len());
860 buf[..n].copy_from_slice(&rem[..n]);
861 self.mark_read(n);
862 Ok(n)
863 }
864}
865
866impl<D: 'static> DirectSource<'_, D> {
867 pub fn remaining(&mut self) -> &[u8] {
869 if let Some(buffer) = self.host_buffer.as_deref_mut() {
870 buffer.remaining()
871 } else {
872 let transmit = self
873 .store
874 .as_context_mut()
875 .0
876 .concurrent_state_mut()
877 .get_mut(self.id)
878 .unwrap();
879
880 let &WriteState::GuestReady {
881 address,
882 count,
883 options,
884 instance,
885 ..
886 } = &transmit.write
887 else {
888 unreachable!()
889 };
890
891 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
892 unreachable!()
893 };
894
895 instance
896 .options_memory(self.store.0, options)
897 .get((address + guest_offset)..)
898 .and_then(|b| b.get(..(count - guest_offset)))
899 .unwrap()
900 }
901 }
902
903 pub fn mark_read(&mut self, count: usize) {
908 if let Some(buffer) = self.host_buffer.as_deref_mut() {
909 buffer.skip(count);
910 } else {
911 let transmit = self
912 .store
913 .as_context_mut()
914 .0
915 .concurrent_state_mut()
916 .get_mut(self.id)
917 .unwrap();
918
919 let WriteState::GuestReady {
920 count: write_count, ..
921 } = &transmit.write
922 else {
923 unreachable!()
924 };
925
926 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
927 unreachable!()
928 };
929
930 if *guest_offset + count > *write_count {
931 panic!(
932 "read count ({count}) must be less than or equal to write count ({write_count})"
933 )
934 } else {
935 *guest_offset += count;
936 }
937 }
938 }
939}
940
941pub trait StreamConsumer<D>: Send + 'static {
943 type Item;
945
946 fn poll_consume(
1029 self: Pin<&mut Self>,
1030 cx: &mut Context<'_>,
1031 store: StoreContextMut<D>,
1032 source: Source<'_, Self::Item>,
1033 finish: bool,
1034 ) -> Poll<Result<StreamResult>>;
1035}
1036
1037pub trait FutureProducer<D>: Send + 'static {
1039 type Item;
1041
1042 fn poll_produce(
1052 self: Pin<&mut Self>,
1053 cx: &mut Context<'_>,
1054 store: StoreContextMut<D>,
1055 finish: bool,
1056 ) -> Poll<Result<Option<Self::Item>>>;
1057}
1058
1059impl<T, E, D, Fut> FutureProducer<D> for Fut
1060where
1061 E: Into<Error>,
1062 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1063{
1064 type Item = T;
1065
1066 fn poll_produce<'a>(
1067 self: Pin<&mut Self>,
1068 cx: &mut Context<'_>,
1069 _: StoreContextMut<'a, D>,
1070 finish: bool,
1071 ) -> Poll<Result<Option<T>>> {
1072 match self.poll(cx) {
1073 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1074 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1075 Poll::Pending if finish => Poll::Ready(Ok(None)),
1076 Poll::Pending => Poll::Pending,
1077 }
1078 }
1079}
1080
1081pub trait FutureConsumer<D>: Send + 'static {
1083 type Item;
1085
1086 fn poll_consume(
1098 self: Pin<&mut Self>,
1099 cx: &mut Context<'_>,
1100 store: StoreContextMut<D>,
1101 source: Source<'_, Self::Item>,
1102 finish: bool,
1103 ) -> Poll<Result<()>>;
1104}
1105
1106pub struct FutureReader<T> {
1113 id: TableId<TransmitHandle>,
1114 _phantom: PhantomData<T>,
1115}
1116
1117impl<T> FutureReader<T> {
1118 pub fn new<S: AsContextMut>(
1126 mut store: S,
1127 producer: impl FutureProducer<S::Data, Item = T>,
1128 ) -> Self
1129 where
1130 T: func::Lower + func::Lift + Send + Sync + 'static,
1131 {
1132 assert!(store.as_context().0.concurrency_support());
1133
1134 struct Producer<P>(P);
1135
1136 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1137 for Producer<P>
1138 {
1139 type Item = P::Item;
1140 type Buffer = Option<P::Item>;
1141
1142 fn poll_produce<'a>(
1143 self: Pin<&mut Self>,
1144 cx: &mut Context<'_>,
1145 store: StoreContextMut<D>,
1146 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1147 finish: bool,
1148 ) -> Poll<Result<StreamResult>> {
1149 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1152
1153 Poll::Ready(Ok(
1154 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1155 destination.set_buffer(Some(value));
1156
1157 StreamResult::Completed
1164 } else {
1165 StreamResult::Cancelled
1166 },
1167 ))
1168 }
1169 }
1170
1171 Self::new_(
1172 store
1173 .as_context_mut()
1174 .new_transmit(TransmitKind::Future, Producer(producer)),
1175 )
1176 }
1177
1178 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1179 Self {
1180 id,
1181 _phantom: PhantomData,
1182 }
1183 }
1184
1185 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1186 self.id
1187 }
1188
1189 pub fn pipe<S: AsContextMut>(
1191 self,
1192 mut store: S,
1193 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1194 ) where
1195 T: func::Lift + 'static,
1196 {
1197 struct Consumer<C>(C);
1198
1199 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1200 for Consumer<C>
1201 {
1202 type Item = T;
1203
1204 fn poll_consume(
1205 self: Pin<&mut Self>,
1206 cx: &mut Context<'_>,
1207 mut store: StoreContextMut<D>,
1208 mut source: Source<Self::Item>,
1209 finish: bool,
1210 ) -> Poll<Result<StreamResult>> {
1211 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1214
1215 ready!(consumer.poll_consume(
1216 cx,
1217 store.as_context_mut(),
1218 source.reborrow(),
1219 finish
1220 ))?;
1221
1222 Poll::Ready(Ok(if source.remaining(store) == 0 {
1223 StreamResult::Completed
1229 } else {
1230 StreamResult::Cancelled
1231 }))
1232 }
1233 }
1234
1235 store
1236 .as_context_mut()
1237 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1238 }
1239
1240 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1242 let id = lift_index_to_future(cx, ty, index)?;
1243 Ok(Self::new_(id))
1244 }
1245
1246 pub fn close(&mut self, mut store: impl AsContextMut) {
1261 future_close(store.as_context_mut().0, &mut self.id)
1262 }
1263
1264 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1266 accessor.as_accessor().with(|access| self.close(access))
1267 }
1268
1269 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1275 where
1276 A: AsAccessor,
1277 {
1278 GuardedFutureReader::new(accessor, self)
1279 }
1280
1281 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1288 where
1289 T: ComponentType + 'static,
1290 {
1291 FutureAny::try_from_future_reader(store, self)
1292 }
1293
1294 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1301 where
1302 T: ComponentType + 'static,
1303 {
1304 future.try_into_future_reader()
1305 }
1306}
1307
1308impl<T> fmt::Debug for FutureReader<T> {
1309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1310 f.debug_struct("FutureReader")
1311 .field("id", &self.id)
1312 .finish()
1313 }
1314}
1315
1316pub(super) fn future_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1317 let id = mem::replace(id, TableId::new(u32::MAX));
1318 store.host_drop_reader(id, TransmitKind::Future).unwrap();
1319}
1320
1321pub(super) fn lift_index_to_future(
1323 cx: &mut LiftContext<'_>,
1324 ty: InterfaceType,
1325 index: u32,
1326) -> Result<TableId<TransmitHandle>> {
1327 match ty {
1328 InterfaceType::Future(src) => {
1329 let handle_table = cx
1330 .instance_mut()
1331 .table_for_transmit(TransmitIndex::Future(src));
1332 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1333 if is_done {
1334 bail!("cannot lift future after being notified that the writable end dropped");
1335 }
1336 let id = TableId::<TransmitHandle>::new(rep);
1337 let concurrent_state = cx.concurrent_state_mut();
1338 let future = concurrent_state.get_mut(id)?;
1339 future.common.handle = None;
1340 let state = future.state;
1341
1342 if concurrent_state.get_mut(state)?.done {
1343 bail!("cannot lift future after previous read succeeded");
1344 }
1345
1346 Ok(id)
1347 }
1348 _ => func::bad_type_info(),
1349 }
1350}
1351
1352pub(super) fn lower_future_to_index<U>(
1354 id: TableId<TransmitHandle>,
1355 cx: &mut LowerContext<'_, U>,
1356 ty: InterfaceType,
1357) -> Result<u32> {
1358 match ty {
1359 InterfaceType::Future(dst) => {
1360 let concurrent_state = cx.store.0.concurrent_state_mut();
1361 let state = concurrent_state.get_mut(id)?.state;
1362 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1363
1364 let handle = cx
1365 .instance_mut()
1366 .table_for_transmit(TransmitIndex::Future(dst))
1367 .future_insert_read(dst, rep)?;
1368
1369 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1370
1371 Ok(handle)
1372 }
1373 _ => func::bad_type_info(),
1374 }
1375}
1376
1377unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1380 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1381
1382 type Lower = <u32 as func::ComponentType>::Lower;
1383
1384 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1385 match ty {
1386 InterfaceType::Future(ty) => {
1387 let ty = types.types[*ty].ty;
1388 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1389 }
1390 other => bail!("expected `future`, found `{}`", func::desc(other)),
1391 }
1392 }
1393}
1394
1395unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1397 fn linear_lower_to_flat<U>(
1398 &self,
1399 cx: &mut LowerContext<'_, U>,
1400 ty: InterfaceType,
1401 dst: &mut MaybeUninit<Self::Lower>,
1402 ) -> Result<()> {
1403 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1404 }
1405
1406 fn linear_lower_to_memory<U>(
1407 &self,
1408 cx: &mut LowerContext<'_, U>,
1409 ty: InterfaceType,
1410 offset: usize,
1411 ) -> Result<()> {
1412 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1413 cx,
1414 InterfaceType::U32,
1415 offset,
1416 )
1417 }
1418}
1419
1420unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1422 fn linear_lift_from_flat(
1423 cx: &mut LiftContext<'_>,
1424 ty: InterfaceType,
1425 src: &Self::Lower,
1426 ) -> Result<Self> {
1427 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1428 Self::lift_from_index(cx, ty, index)
1429 }
1430
1431 fn linear_lift_from_memory(
1432 cx: &mut LiftContext<'_>,
1433 ty: InterfaceType,
1434 bytes: &[u8],
1435 ) -> Result<Self> {
1436 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1437 Self::lift_from_index(cx, ty, index)
1438 }
1439}
1440
1441pub struct GuardedFutureReader<T, A>
1449where
1450 A: AsAccessor,
1451{
1452 reader: Option<FutureReader<T>>,
1456 accessor: A,
1457}
1458
1459impl<T, A> GuardedFutureReader<T, A>
1460where
1461 A: AsAccessor,
1462{
1463 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1471 assert!(
1472 accessor
1473 .as_accessor()
1474 .with(|a| a.as_context().0.concurrency_support())
1475 );
1476 Self {
1477 reader: Some(reader),
1478 accessor,
1479 }
1480 }
1481
1482 pub fn into_future(self) -> FutureReader<T> {
1485 self.into()
1486 }
1487}
1488
1489impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1490where
1491 A: AsAccessor,
1492{
1493 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1494 guard.reader.take().unwrap()
1495 }
1496}
1497
1498impl<T, A> Drop for GuardedFutureReader<T, A>
1499where
1500 A: AsAccessor,
1501{
1502 fn drop(&mut self) {
1503 if let Some(reader) = &mut self.reader {
1504 reader.close_with(&self.accessor)
1505 }
1506 }
1507}
1508
1509pub struct StreamReader<T> {
1516 id: TableId<TransmitHandle>,
1517 _phantom: PhantomData<T>,
1518}
1519
1520impl<T> StreamReader<T> {
1521 pub fn new<S: AsContextMut>(
1529 mut store: S,
1530 producer: impl StreamProducer<S::Data, Item = T>,
1531 ) -> Self
1532 where
1533 T: func::Lower + func::Lift + Send + Sync + 'static,
1534 {
1535 assert!(store.as_context().0.concurrency_support());
1536 Self::new_(
1537 store
1538 .as_context_mut()
1539 .new_transmit(TransmitKind::Stream, producer),
1540 )
1541 }
1542
1543 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1544 Self {
1545 id,
1546 _phantom: PhantomData,
1547 }
1548 }
1549
1550 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1551 self.id
1552 }
1553
1554 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1571 let store = store.as_context_mut();
1572 let state = store.0.concurrent_state_mut();
1573 let id = state.get_mut(self.id).unwrap().state;
1574 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1575 match try_into(TypeId::of::<V>()) {
1576 Some(result) => {
1577 self.close(store);
1578 Ok(*result.downcast::<V>().unwrap())
1579 }
1580 None => Err(self),
1581 }
1582 } else {
1583 Err(self)
1584 }
1585 }
1586
1587 pub fn pipe<S: AsContextMut>(
1589 self,
1590 mut store: S,
1591 consumer: impl StreamConsumer<S::Data, Item = T>,
1592 ) where
1593 T: 'static,
1594 {
1595 store
1596 .as_context_mut()
1597 .set_consumer(self.id, TransmitKind::Stream, consumer);
1598 }
1599
1600 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1602 let id = lift_index_to_stream(cx, ty, index)?;
1603 Ok(Self::new_(id))
1604 }
1605
1606 pub fn close(&mut self, mut store: impl AsContextMut) {
1616 stream_close(store.as_context_mut().0, &mut self.id);
1617 }
1618
1619 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1621 accessor.as_accessor().with(|access| self.close(access))
1622 }
1623
1624 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1630 where
1631 A: AsAccessor,
1632 {
1633 GuardedStreamReader::new(accessor, self)
1634 }
1635
1636 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1643 where
1644 T: ComponentType + 'static,
1645 {
1646 StreamAny::try_from_stream_reader(store, self)
1647 }
1648
1649 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1656 where
1657 T: ComponentType + 'static,
1658 {
1659 stream.try_into_stream_reader()
1660 }
1661}
1662
1663impl<T> fmt::Debug for StreamReader<T> {
1664 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1665 f.debug_struct("StreamReader")
1666 .field("id", &self.id)
1667 .finish()
1668 }
1669}
1670
1671pub(super) fn stream_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1672 let id = mem::replace(id, TableId::new(u32::MAX));
1673 store.host_drop_reader(id, TransmitKind::Stream).unwrap();
1674}
1675
1676pub(super) fn lift_index_to_stream(
1678 cx: &mut LiftContext<'_>,
1679 ty: InterfaceType,
1680 index: u32,
1681) -> Result<TableId<TransmitHandle>> {
1682 match ty {
1683 InterfaceType::Stream(src) => {
1684 let handle_table = cx
1685 .instance_mut()
1686 .table_for_transmit(TransmitIndex::Stream(src));
1687 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1688 if is_done {
1689 bail!("cannot lift stream after being notified that the writable end dropped");
1690 }
1691 let id = TableId::<TransmitHandle>::new(rep);
1692 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1693 Ok(id)
1694 }
1695 _ => func::bad_type_info(),
1696 }
1697}
1698
1699pub(super) fn lower_stream_to_index<U>(
1701 id: TableId<TransmitHandle>,
1702 cx: &mut LowerContext<'_, U>,
1703 ty: InterfaceType,
1704) -> Result<u32> {
1705 match ty {
1706 InterfaceType::Stream(dst) => {
1707 let concurrent_state = cx.store.0.concurrent_state_mut();
1708 let state = concurrent_state.get_mut(id)?.state;
1709 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1710
1711 let handle = cx
1712 .instance_mut()
1713 .table_for_transmit(TransmitIndex::Stream(dst))
1714 .stream_insert_read(dst, rep)?;
1715
1716 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1717
1718 Ok(handle)
1719 }
1720 _ => func::bad_type_info(),
1721 }
1722}
1723
1724unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1727 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1728
1729 type Lower = <u32 as func::ComponentType>::Lower;
1730
1731 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1732 match ty {
1733 InterfaceType::Stream(ty) => {
1734 let ty = types.types[*ty].ty;
1735 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1736 }
1737 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1738 }
1739 }
1740}
1741
1742unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1744 fn linear_lower_to_flat<U>(
1745 &self,
1746 cx: &mut LowerContext<'_, U>,
1747 ty: InterfaceType,
1748 dst: &mut MaybeUninit<Self::Lower>,
1749 ) -> Result<()> {
1750 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1751 }
1752
1753 fn linear_lower_to_memory<U>(
1754 &self,
1755 cx: &mut LowerContext<'_, U>,
1756 ty: InterfaceType,
1757 offset: usize,
1758 ) -> Result<()> {
1759 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1760 cx,
1761 InterfaceType::U32,
1762 offset,
1763 )
1764 }
1765}
1766
1767unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1769 fn linear_lift_from_flat(
1770 cx: &mut LiftContext<'_>,
1771 ty: InterfaceType,
1772 src: &Self::Lower,
1773 ) -> Result<Self> {
1774 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1775 Self::lift_from_index(cx, ty, index)
1776 }
1777
1778 fn linear_lift_from_memory(
1779 cx: &mut LiftContext<'_>,
1780 ty: InterfaceType,
1781 bytes: &[u8],
1782 ) -> Result<Self> {
1783 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1784 Self::lift_from_index(cx, ty, index)
1785 }
1786}
1787
1788pub struct GuardedStreamReader<T, A>
1796where
1797 A: AsAccessor,
1798{
1799 reader: Option<StreamReader<T>>,
1803 accessor: A,
1804}
1805
1806impl<T, A> GuardedStreamReader<T, A>
1807where
1808 A: AsAccessor,
1809{
1810 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1819 assert!(
1820 accessor
1821 .as_accessor()
1822 .with(|a| a.as_context().0.concurrency_support())
1823 );
1824 Self {
1825 reader: Some(reader),
1826 accessor,
1827 }
1828 }
1829
1830 pub fn into_stream(self) -> StreamReader<T> {
1833 self.into()
1834 }
1835}
1836
1837impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1838where
1839 A: AsAccessor,
1840{
1841 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1842 guard.reader.take().unwrap()
1843 }
1844}
1845
1846impl<T, A> Drop for GuardedStreamReader<T, A>
1847where
1848 A: AsAccessor,
1849{
1850 fn drop(&mut self) {
1851 if let Some(reader) = &mut self.reader {
1852 reader.close_with(&self.accessor)
1853 }
1854 }
1855}
1856
1857pub struct ErrorContext {
1859 rep: u32,
1860}
1861
1862impl ErrorContext {
1863 pub(crate) fn new(rep: u32) -> Self {
1864 Self { rep }
1865 }
1866
1867 pub fn into_val(self) -> Val {
1869 Val::ErrorContext(ErrorContextAny(self.rep))
1870 }
1871
1872 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1874 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1875 bail!("expected `error-context`; got `{}`", value.desc());
1876 };
1877 Ok(Self::new(*rep))
1878 }
1879
1880 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1881 match ty {
1882 InterfaceType::ErrorContext(src) => {
1883 let rep = cx
1884 .instance_mut()
1885 .table_for_error_context(src)
1886 .error_context_rep(index)?;
1887
1888 Ok(Self { rep })
1889 }
1890 _ => func::bad_type_info(),
1891 }
1892 }
1893}
1894
1895pub(crate) fn lower_error_context_to_index<U>(
1896 rep: u32,
1897 cx: &mut LowerContext<'_, U>,
1898 ty: InterfaceType,
1899) -> Result<u32> {
1900 match ty {
1901 InterfaceType::ErrorContext(dst) => {
1902 let tbl = cx.instance_mut().table_for_error_context(dst);
1903 tbl.error_context_insert(rep)
1904 }
1905 _ => func::bad_type_info(),
1906 }
1907}
1908unsafe impl func::ComponentType for ErrorContext {
1911 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1912
1913 type Lower = <u32 as func::ComponentType>::Lower;
1914
1915 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1916 match ty {
1917 InterfaceType::ErrorContext(_) => Ok(()),
1918 other => bail!("expected `error`, found `{}`", func::desc(other)),
1919 }
1920 }
1921}
1922
1923unsafe impl func::Lower for ErrorContext {
1925 fn linear_lower_to_flat<T>(
1926 &self,
1927 cx: &mut LowerContext<'_, T>,
1928 ty: InterfaceType,
1929 dst: &mut MaybeUninit<Self::Lower>,
1930 ) -> Result<()> {
1931 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1932 cx,
1933 InterfaceType::U32,
1934 dst,
1935 )
1936 }
1937
1938 fn linear_lower_to_memory<T>(
1939 &self,
1940 cx: &mut LowerContext<'_, T>,
1941 ty: InterfaceType,
1942 offset: usize,
1943 ) -> Result<()> {
1944 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1945 cx,
1946 InterfaceType::U32,
1947 offset,
1948 )
1949 }
1950}
1951
1952unsafe impl func::Lift for ErrorContext {
1954 fn linear_lift_from_flat(
1955 cx: &mut LiftContext<'_>,
1956 ty: InterfaceType,
1957 src: &Self::Lower,
1958 ) -> Result<Self> {
1959 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1960 Self::lift_from_index(cx, ty, index)
1961 }
1962
1963 fn linear_lift_from_memory(
1964 cx: &mut LiftContext<'_>,
1965 ty: InterfaceType,
1966 bytes: &[u8],
1967 ) -> Result<Self> {
1968 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1969 Self::lift_from_index(cx, ty, index)
1970 }
1971}
1972
1973pub(super) struct TransmitHandle {
1975 pub(super) common: WaitableCommon,
1976 state: TableId<TransmitState>,
1978}
1979
1980impl TransmitHandle {
1981 fn new(state: TableId<TransmitState>) -> Self {
1982 Self {
1983 common: WaitableCommon::default(),
1984 state,
1985 }
1986 }
1987}
1988
1989impl TableDebug for TransmitHandle {
1990 fn type_name() -> &'static str {
1991 "TransmitHandle"
1992 }
1993}
1994
1995struct TransmitState {
1997 write_handle: TableId<TransmitHandle>,
1999 read_handle: TableId<TransmitHandle>,
2001 write: WriteState,
2003 read: ReadState,
2005 done: bool,
2007 pub(super) origin: TransmitOrigin,
2010}
2011
2012#[derive(Copy, Clone)]
2013pub(super) enum TransmitOrigin {
2014 Host,
2015 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2016 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2017}
2018
2019impl TransmitState {
2020 fn new(origin: TransmitOrigin) -> Self {
2021 Self {
2022 write_handle: TableId::new(u32::MAX),
2023 read_handle: TableId::new(u32::MAX),
2024 read: ReadState::Open,
2025 write: WriteState::Open,
2026 done: false,
2027 origin,
2028 }
2029 }
2030}
2031
2032impl TableDebug for TransmitState {
2033 fn type_name() -> &'static str {
2034 "TransmitState"
2035 }
2036}
2037
2038impl TransmitOrigin {
2039 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2040 match index {
2041 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2042 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2043 }
2044 }
2045}
2046
2047type PollStream = Box<
2048 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2049>;
2050
2051type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2052
2053enum WriteState {
2055 Open,
2057 GuestReady {
2059 instance: Instance,
2060 caller: RuntimeComponentInstanceIndex,
2061 ty: TransmitIndex,
2062 flat_abi: Option<FlatAbi>,
2063 options: OptionsIndex,
2064 address: usize,
2065 count: usize,
2066 handle: u32,
2067 },
2068 HostReady {
2070 produce: PollStream,
2071 try_into: TryInto,
2072 guest_offset: usize,
2073 cancel: bool,
2074 cancel_waker: Option<Waker>,
2075 },
2076 Dropped,
2078}
2079
2080impl fmt::Debug for WriteState {
2081 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2082 match self {
2083 Self::Open => f.debug_tuple("Open").finish(),
2084 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2085 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2086 Self::Dropped => f.debug_tuple("Dropped").finish(),
2087 }
2088 }
2089}
2090
2091enum ReadState {
2093 Open,
2095 GuestReady {
2097 ty: TransmitIndex,
2098 caller: RuntimeComponentInstanceIndex,
2099 flat_abi: Option<FlatAbi>,
2100 instance: Instance,
2101 options: OptionsIndex,
2102 address: usize,
2103 count: usize,
2104 handle: u32,
2105 },
2106 HostReady {
2108 consume: PollStream,
2109 guest_offset: usize,
2110 cancel: bool,
2111 cancel_waker: Option<Waker>,
2112 },
2113 HostToHost {
2115 accept: Box<
2116 dyn for<'a> Fn(
2117 &'a mut UntypedWriteBuffer<'a>,
2118 )
2119 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2120 + Send
2121 + Sync,
2122 >,
2123 buffer: Vec<u8>,
2124 limit: usize,
2125 },
2126 Dropped,
2128}
2129
2130impl fmt::Debug for ReadState {
2131 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2132 match self {
2133 Self::Open => f.debug_tuple("Open").finish(),
2134 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2135 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2136 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2137 Self::Dropped => f.debug_tuple("Dropped").finish(),
2138 }
2139 }
2140}
2141
2142fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2143 let count = guest_offset.try_into().unwrap();
2144 match state {
2145 StreamResult::Dropped => ReturnCode::Dropped(count),
2146 StreamResult::Completed => ReturnCode::completed(kind, count),
2147 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2148 }
2149}
2150
2151impl StoreOpaque {
2152 fn pipe_from_guest(
2153 &mut self,
2154 kind: TransmitKind,
2155 id: TableId<TransmitState>,
2156 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2157 ) {
2158 let future = async move {
2159 let stream_state = future.await?;
2160 tls::get(|store| {
2161 let state = store.concurrent_state_mut();
2162 let transmit = state.get_mut(id)?;
2163 let ReadState::HostReady {
2164 consume,
2165 guest_offset,
2166 ..
2167 } = mem::replace(&mut transmit.read, ReadState::Open)
2168 else {
2169 unreachable!();
2170 };
2171 let code = return_code(kind, stream_state, guest_offset);
2172 transmit.read = match stream_state {
2173 StreamResult::Dropped => ReadState::Dropped,
2174 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2175 consume,
2176 guest_offset: 0,
2177 cancel: false,
2178 cancel_waker: None,
2179 },
2180 };
2181 let WriteState::GuestReady { ty, handle, .. } =
2182 mem::replace(&mut transmit.write, WriteState::Open)
2183 else {
2184 unreachable!();
2185 };
2186 state.send_write_result(ty, id, handle, code)?;
2187 Ok(())
2188 })
2189 };
2190
2191 self.concurrent_state_mut().push_future(future.boxed());
2192 }
2193
2194 fn pipe_to_guest(
2195 &mut self,
2196 kind: TransmitKind,
2197 id: TableId<TransmitState>,
2198 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2199 ) {
2200 let future = async move {
2201 let stream_state = future.await?;
2202 tls::get(|store| {
2203 let state = store.concurrent_state_mut();
2204 let transmit = state.get_mut(id)?;
2205 let WriteState::HostReady {
2206 produce,
2207 try_into,
2208 guest_offset,
2209 ..
2210 } = mem::replace(&mut transmit.write, WriteState::Open)
2211 else {
2212 unreachable!();
2213 };
2214 let code = return_code(kind, stream_state, guest_offset);
2215 transmit.write = match stream_state {
2216 StreamResult::Dropped => WriteState::Dropped,
2217 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2218 produce,
2219 try_into,
2220 guest_offset: 0,
2221 cancel: false,
2222 cancel_waker: None,
2223 },
2224 };
2225 let ReadState::GuestReady { ty, handle, .. } =
2226 mem::replace(&mut transmit.read, ReadState::Open)
2227 else {
2228 unreachable!();
2229 };
2230 state.send_read_result(ty, id, handle, code)?;
2231 Ok(())
2232 })
2233 };
2234
2235 self.concurrent_state_mut().push_future(future.boxed());
2236 }
2237
2238 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2240 let state = self.concurrent_state_mut();
2241 let transmit_id = state.get_mut(id)?.state;
2242 let transmit = state
2243 .get_mut(transmit_id)
2244 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2245 log::trace!(
2246 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2247 transmit.read,
2248 transmit.write
2249 );
2250
2251 transmit.read = ReadState::Dropped;
2252
2253 let new_state = if let WriteState::Dropped = &transmit.write {
2256 WriteState::Dropped
2257 } else {
2258 WriteState::Open
2259 };
2260
2261 let write_handle = transmit.write_handle;
2262
2263 match mem::replace(&mut transmit.write, new_state) {
2264 WriteState::GuestReady { ty, handle, .. } => {
2267 state.update_event(
2268 write_handle.rep(),
2269 match ty {
2270 TransmitIndex::Future(ty) => Event::FutureWrite {
2271 code: ReturnCode::Dropped(0),
2272 pending: Some((ty, handle)),
2273 },
2274 TransmitIndex::Stream(ty) => Event::StreamWrite {
2275 code: ReturnCode::Dropped(0),
2276 pending: Some((ty, handle)),
2277 },
2278 },
2279 )?;
2280 }
2281
2282 WriteState::HostReady { .. } => {}
2283
2284 WriteState::Open => {
2285 state.update_event(
2286 write_handle.rep(),
2287 match kind {
2288 TransmitKind::Future => Event::FutureWrite {
2289 code: ReturnCode::Dropped(0),
2290 pending: None,
2291 },
2292 TransmitKind::Stream => Event::StreamWrite {
2293 code: ReturnCode::Dropped(0),
2294 pending: None,
2295 },
2296 },
2297 )?;
2298 }
2299
2300 WriteState::Dropped => {
2301 log::trace!("host_drop_reader delete {transmit_id:?}");
2302 state.delete_transmit(transmit_id)?;
2303 }
2304 }
2305 Ok(())
2306 }
2307
2308 fn host_drop_writer(
2310 &mut self,
2311 id: TableId<TransmitHandle>,
2312 on_drop_open: Option<fn() -> Result<()>>,
2313 ) -> Result<()> {
2314 let state = self.concurrent_state_mut();
2315 let transmit_id = state.get_mut(id)?.state;
2316 let transmit = state
2317 .get_mut(transmit_id)
2318 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2319 log::trace!(
2320 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2321 transmit.read,
2322 transmit.write
2323 );
2324
2325 match &mut transmit.write {
2327 WriteState::GuestReady { .. } => {
2328 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2329 }
2330 WriteState::HostReady { .. } => {}
2331 v @ WriteState::Open => {
2332 if let (Some(on_drop_open), false) = (
2333 on_drop_open,
2334 transmit.done || matches!(transmit.read, ReadState::Dropped),
2335 ) {
2336 on_drop_open()?;
2337 } else {
2338 *v = WriteState::Dropped;
2339 }
2340 }
2341 WriteState::Dropped => unreachable!("write state is already dropped"),
2342 }
2343
2344 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2345
2346 let new_state = if let ReadState::Dropped = &transmit.read {
2352 ReadState::Dropped
2353 } else {
2354 ReadState::Open
2355 };
2356
2357 let read_handle = transmit.read_handle;
2358
2359 match mem::replace(&mut transmit.read, new_state) {
2361 ReadState::GuestReady { ty, handle, .. } => {
2365 self.concurrent_state_mut().update_event(
2367 read_handle.rep(),
2368 match ty {
2369 TransmitIndex::Future(ty) => Event::FutureRead {
2370 code: ReturnCode::Dropped(0),
2371 pending: Some((ty, handle)),
2372 },
2373 TransmitIndex::Stream(ty) => Event::StreamRead {
2374 code: ReturnCode::Dropped(0),
2375 pending: Some((ty, handle)),
2376 },
2377 },
2378 )?;
2379 }
2380
2381 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2382
2383 ReadState::Open => {
2385 self.concurrent_state_mut().update_event(
2386 read_handle.rep(),
2387 match on_drop_open {
2388 Some(_) => Event::FutureRead {
2389 code: ReturnCode::Dropped(0),
2390 pending: None,
2391 },
2392 None => Event::StreamRead {
2393 code: ReturnCode::Dropped(0),
2394 pending: None,
2395 },
2396 },
2397 )?;
2398 }
2399
2400 ReadState::Dropped => {
2403 log::trace!("host_drop_writer delete {transmit_id:?}");
2404 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2405 }
2406 }
2407 Ok(())
2408 }
2409
2410 pub(super) fn transmit_origin(
2411 &mut self,
2412 id: TableId<TransmitHandle>,
2413 ) -> Result<TransmitOrigin> {
2414 let state = self.concurrent_state_mut();
2415 let state_id = state.get_mut(id)?.state;
2416 Ok(state.get_mut(state_id)?.origin)
2417 }
2418}
2419
2420impl<T> StoreContextMut<'_, T> {
2421 fn new_transmit<P: StreamProducer<T>>(
2422 mut self,
2423 kind: TransmitKind,
2424 producer: P,
2425 ) -> TableId<TransmitHandle>
2426 where
2427 P::Item: func::Lower,
2428 {
2429 let token = StoreToken::new(self.as_context_mut());
2430 let state = self.0.concurrent_state_mut();
2431 let (_, read) = state.new_transmit(TransmitOrigin::Host).unwrap();
2432 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2433 let id = state.get_mut(read).unwrap().state;
2434 let mut dropped = false;
2435 let produce = Box::new({
2436 let producer = producer.clone();
2437 move || {
2438 let producer = producer.clone();
2439 async move {
2440 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2441
2442 let (result, cancelled) = if buffer.remaining().is_empty() {
2443 future::poll_fn(|cx| {
2444 tls::get(|store| {
2445 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2446
2447 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2448 unreachable!();
2449 };
2450
2451 let mut host_buffer =
2452 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2453 Some(Cursor::new(mem::take(buffer)))
2454 } else {
2455 None
2456 };
2457
2458 let poll = mine.as_mut().poll_produce(
2459 cx,
2460 token.as_context_mut(store),
2461 Destination {
2462 id,
2463 buffer: &mut buffer,
2464 host_buffer: host_buffer.as_mut(),
2465 _phantom: PhantomData,
2466 },
2467 cancel,
2468 );
2469
2470 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2471
2472 let host_offset = if let (
2473 Some(host_buffer),
2474 ReadState::HostToHost { buffer, limit, .. },
2475 ) = (host_buffer, &mut transmit.read)
2476 {
2477 *limit = usize::try_from(host_buffer.position()).unwrap();
2478 *buffer = host_buffer.into_inner();
2479 *limit
2480 } else {
2481 0
2482 };
2483
2484 {
2485 let WriteState::HostReady {
2486 guest_offset,
2487 cancel,
2488 cancel_waker,
2489 ..
2490 } = &mut transmit.write
2491 else {
2492 unreachable!();
2493 };
2494
2495 if poll.is_pending() {
2496 if !buffer.remaining().is_empty()
2497 || *guest_offset > 0
2498 || host_offset > 0
2499 {
2500 return Poll::Ready(Err(format_err!(
2501 "StreamProducer::poll_produce returned Poll::Pending \
2502 after producing at least one item"
2503 )));
2504 }
2505 *cancel_waker = Some(cx.waker().clone());
2506 } else {
2507 *cancel_waker = None;
2508 *cancel = false;
2509 }
2510 }
2511
2512 poll.map(|v| v.map(|result| (result, cancel)))
2513 })
2514 })
2515 .await?
2516 } else {
2517 (StreamResult::Completed, false)
2518 };
2519
2520 let (guest_offset, host_offset, count) = tls::get(|store| {
2521 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2522 let (count, host_offset) = match &transmit.read {
2523 &ReadState::GuestReady { count, .. } => (count, 0),
2524 &ReadState::HostToHost { limit, .. } => (1, limit),
2525 _ => unreachable!(),
2526 };
2527 let guest_offset = match &transmit.write {
2528 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2529 _ => unreachable!(),
2530 };
2531 (guest_offset, host_offset, count)
2532 });
2533
2534 match result {
2535 StreamResult::Completed => {
2536 if count > 1
2537 && buffer.remaining().is_empty()
2538 && guest_offset == 0
2539 && host_offset == 0
2540 {
2541 bail!(
2542 "StreamProducer::poll_produce returned StreamResult::Completed \
2543 without producing any items"
2544 );
2545 }
2546 }
2547 StreamResult::Cancelled => {
2548 if !cancelled {
2549 bail!(
2550 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2551 without being given a `finish` parameter value of true"
2552 );
2553 }
2554 }
2555 StreamResult::Dropped => {
2556 dropped = true;
2557 }
2558 }
2559
2560 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2561
2562 *producer.lock().unwrap() = Some((mine, buffer));
2563
2564 if write_buffer {
2565 write(token, id, producer.clone(), kind).await?;
2566 }
2567
2568 Ok(if dropped {
2569 if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2570 {
2571 StreamResult::Dropped
2572 } else {
2573 StreamResult::Completed
2574 }
2575 } else {
2576 result
2577 })
2578 }
2579 .boxed()
2580 }
2581 });
2582 let try_into = Box::new(move |ty| {
2583 let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2584 match P::try_into(mine, ty) {
2585 Ok(value) => Some(value),
2586 Err(mine) => {
2587 *producer.lock().unwrap() = Some((mine, buffer));
2588 None
2589 }
2590 }
2591 });
2592 state.get_mut(id).unwrap().write = WriteState::HostReady {
2593 produce,
2594 try_into,
2595 guest_offset: 0,
2596 cancel: false,
2597 cancel_waker: None,
2598 };
2599 read
2600 }
2601
2602 fn set_consumer<C: StreamConsumer<T>>(
2603 mut self,
2604 id: TableId<TransmitHandle>,
2605 kind: TransmitKind,
2606 consumer: C,
2607 ) {
2608 let token = StoreToken::new(self.as_context_mut());
2609 let state = self.0.concurrent_state_mut();
2610 let id = state.get_mut(id).unwrap().state;
2611 let transmit = state.get_mut(id).unwrap();
2612 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2613 let consume_with_buffer = {
2614 let consumer = consumer.clone();
2615 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2616 let mut mine = consumer.lock().unwrap().take().unwrap();
2617
2618 let host_buffer_remaining_before =
2619 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2620
2621 let (result, cancelled) = future::poll_fn(|cx| {
2622 tls::get(|store| {
2623 let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2624 &ReadState::HostReady { cancel, .. } => cancel,
2625 ReadState::Open => false,
2626 _ => unreachable!(),
2627 };
2628
2629 let poll = mine.as_mut().poll_consume(
2630 cx,
2631 token.as_context_mut(store),
2632 Source {
2633 id,
2634 host_buffer: host_buffer.as_deref_mut(),
2635 },
2636 cancel,
2637 );
2638
2639 if let ReadState::HostReady {
2640 cancel_waker,
2641 cancel,
2642 ..
2643 } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2644 {
2645 if poll.is_pending() {
2646 *cancel_waker = Some(cx.waker().clone());
2647 } else {
2648 *cancel_waker = None;
2649 *cancel = false;
2650 }
2651 }
2652
2653 poll.map(|v| v.map(|result| (result, cancel)))
2654 })
2655 })
2656 .await?;
2657
2658 let (guest_offset, count) = tls::get(|store| {
2659 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2660 (
2661 match &transmit.read {
2662 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2663 ReadState::Open => 0,
2664 _ => unreachable!(),
2665 },
2666 match &transmit.write {
2667 &WriteState::GuestReady { count, .. } => count,
2668 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2669 _ => unreachable!(),
2670 },
2671 )
2672 });
2673
2674 match result {
2675 StreamResult::Completed => {
2676 if count > 0
2677 && guest_offset == 0
2678 && host_buffer_remaining_before
2679 .zip(host_buffer.map(|v| v.remaining().len()))
2680 .map(|(before, after)| before == after)
2681 .unwrap_or(false)
2682 {
2683 bail!(
2684 "StreamConsumer::poll_consume returned StreamResult::Completed \
2685 without consuming any items"
2686 );
2687 }
2688
2689 if let TransmitKind::Future = kind {
2690 tls::get(|store| {
2691 store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2692 });
2693 }
2694 }
2695 StreamResult::Cancelled => {
2696 if !cancelled {
2697 bail!(
2698 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2699 without being given a `finish` parameter value of true"
2700 );
2701 }
2702 }
2703 StreamResult::Dropped => {}
2704 }
2705
2706 *consumer.lock().unwrap() = Some(mine);
2707
2708 Ok(result)
2709 }
2710 };
2711 let consume = {
2712 let consume = consume_with_buffer.clone();
2713 Box::new(move || {
2714 let consume = consume.clone();
2715 async move { consume(None).await }.boxed()
2716 })
2717 };
2718
2719 match &transmit.write {
2720 WriteState::Open => {
2721 transmit.read = ReadState::HostReady {
2722 consume,
2723 guest_offset: 0,
2724 cancel: false,
2725 cancel_waker: None,
2726 };
2727 }
2728 &WriteState::GuestReady { .. } => {
2729 let future = consume();
2730 transmit.read = ReadState::HostReady {
2731 consume,
2732 guest_offset: 0,
2733 cancel: false,
2734 cancel_waker: None,
2735 };
2736 self.0.pipe_from_guest(kind, id, future);
2737 }
2738 WriteState::HostReady { .. } => {
2739 let WriteState::HostReady { produce, .. } = mem::replace(
2740 &mut transmit.write,
2741 WriteState::HostReady {
2742 produce: Box::new(|| unreachable!()),
2743 try_into: Box::new(|_| unreachable!()),
2744 guest_offset: 0,
2745 cancel: false,
2746 cancel_waker: None,
2747 },
2748 ) else {
2749 unreachable!();
2750 };
2751
2752 transmit.read = ReadState::HostToHost {
2753 accept: Box::new(move |input| {
2754 let consume = consume_with_buffer.clone();
2755 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2756 }),
2757 buffer: Vec::new(),
2758 limit: 0,
2759 };
2760
2761 let future = async move {
2762 loop {
2763 if tls::get(|store| {
2764 crate::error::Ok(matches!(
2765 store.concurrent_state_mut().get_mut(id)?.read,
2766 ReadState::Dropped
2767 ))
2768 })? {
2769 break Ok(());
2770 }
2771
2772 match produce().await? {
2773 StreamResult::Completed | StreamResult::Cancelled => {}
2774 StreamResult::Dropped => break Ok(()),
2775 }
2776
2777 if let TransmitKind::Future = kind {
2778 break Ok(());
2779 }
2780 }
2781 }
2782 .map(move |result| {
2783 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2784 result
2785 });
2786
2787 state.push_future(Box::pin(future));
2788 }
2789 WriteState::Dropped => {
2790 let reader = transmit.read_handle;
2791 self.0.host_drop_reader(reader, kind).unwrap();
2792 }
2793 }
2794 }
2795}
2796
2797async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2798 token: StoreToken<D>,
2799 id: TableId<TransmitState>,
2800 pair: Arc<Mutex<Option<(P, B)>>>,
2801 kind: TransmitKind,
2802) -> Result<()> {
2803 let (read, guest_offset) = tls::get(|store| {
2804 let transmit = store.concurrent_state_mut().get_mut(id)?;
2805
2806 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2807 Some(guest_offset)
2808 } else {
2809 None
2810 };
2811
2812 crate::error::Ok((
2813 mem::replace(&mut transmit.read, ReadState::Open),
2814 guest_offset,
2815 ))
2816 })?;
2817
2818 match read {
2819 ReadState::GuestReady {
2820 ty,
2821 flat_abi,
2822 options,
2823 address,
2824 count,
2825 handle,
2826 instance,
2827 caller,
2828 } => {
2829 let guest_offset = guest_offset.unwrap();
2830
2831 if let TransmitKind::Future = kind {
2832 tls::get(|store| {
2833 store.concurrent_state_mut().get_mut(id)?.done = true;
2834 crate::error::Ok(())
2835 })?;
2836 }
2837
2838 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2839 let accept = {
2840 let pair = pair.clone();
2841 move |mut store: StoreContextMut<D>| {
2842 lower::<T, B, D>(
2843 store.as_context_mut(),
2844 instance,
2845 options,
2846 ty,
2847 address + (T::SIZE32 * guest_offset),
2848 count - guest_offset,
2849 &mut pair.lock().unwrap().as_mut().unwrap().1,
2850 )?;
2851 crate::error::Ok(())
2852 }
2853 };
2854
2855 if guest_offset < count {
2856 if T::MAY_REQUIRE_REALLOC {
2857 let (tx, rx) = oneshot::channel();
2862 tls::get(move |store| {
2863 store
2864 .concurrent_state_mut()
2865 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2866 move |store| {
2867 _ = tx.send(accept(token.as_context_mut(store))?);
2868 Ok(())
2869 },
2870 ))))
2871 });
2872 rx.await?
2873 } else {
2874 tls::get(|store| accept(token.as_context_mut(store)))?
2879 };
2880 }
2881
2882 tls::get(|store| {
2883 let count =
2884 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2885
2886 let transmit = store.concurrent_state_mut().get_mut(id)?;
2887
2888 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2889 unreachable!();
2890 };
2891
2892 *guest_offset += count;
2893
2894 transmit.read = ReadState::GuestReady {
2895 ty,
2896 flat_abi,
2897 options,
2898 address,
2899 count,
2900 handle,
2901 instance,
2902 caller,
2903 };
2904
2905 crate::error::Ok(())
2906 })?;
2907
2908 Ok(())
2909 }
2910
2911 ReadState::HostToHost {
2912 accept,
2913 mut buffer,
2914 limit,
2915 } => {
2916 let mut state = StreamResult::Completed;
2917 let mut position = 0;
2918
2919 while !matches!(state, StreamResult::Dropped) && position < limit {
2920 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2921 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2922 (buffer, position, _) = slice_buffer.into_parts();
2923 }
2924
2925 {
2926 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2927
2928 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2929 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2930 }
2931
2932 *pair.lock().unwrap() = Some((mine, buffer));
2933 }
2934
2935 tls::get(|store| {
2936 store.concurrent_state_mut().get_mut(id)?.read = match state {
2937 StreamResult::Dropped => ReadState::Dropped,
2938 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2939 accept,
2940 buffer,
2941 limit: 0,
2942 },
2943 };
2944
2945 crate::error::Ok(())
2946 })?;
2947 Ok(())
2948 }
2949
2950 _ => unreachable!(),
2951 }
2952}
2953
2954impl Instance {
2955 fn consume(
2958 self,
2959 store: &mut dyn VMStore,
2960 kind: TransmitKind,
2961 transmit_id: TableId<TransmitState>,
2962 consume: PollStream,
2963 guest_offset: usize,
2964 cancel: bool,
2965 ) -> Result<ReturnCode> {
2966 let mut future = consume();
2967 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2968 consume,
2969 guest_offset,
2970 cancel,
2971 cancel_waker: None,
2972 };
2973 let poll = tls::set(store, || {
2974 future
2975 .as_mut()
2976 .poll(&mut Context::from_waker(&Waker::noop()))
2977 });
2978
2979 Ok(match poll {
2980 Poll::Ready(state) => {
2981 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2982 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2983 unreachable!();
2984 };
2985 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2986 transmit.write = WriteState::Open;
2987 code
2988 }
2989 Poll::Pending => {
2990 store.pipe_from_guest(kind, transmit_id, future);
2991 ReturnCode::Blocked
2992 }
2993 })
2994 }
2995
2996 fn produce(
2999 self,
3000 store: &mut dyn VMStore,
3001 kind: TransmitKind,
3002 transmit_id: TableId<TransmitState>,
3003 produce: PollStream,
3004 try_into: TryInto,
3005 guest_offset: usize,
3006 cancel: bool,
3007 ) -> Result<ReturnCode> {
3008 let mut future = produce();
3009 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3010 produce,
3011 try_into,
3012 guest_offset,
3013 cancel,
3014 cancel_waker: None,
3015 };
3016 let poll = tls::set(store, || {
3017 future
3018 .as_mut()
3019 .poll(&mut Context::from_waker(&Waker::noop()))
3020 });
3021
3022 Ok(match poll {
3023 Poll::Ready(state) => {
3024 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3025 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3026 unreachable!();
3027 };
3028 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3029 transmit.read = ReadState::Open;
3030 code
3031 }
3032 Poll::Pending => {
3033 store.pipe_to_guest(kind, transmit_id, future);
3034 ReturnCode::Blocked
3035 }
3036 })
3037 }
3038
3039 pub(super) fn guest_drop_writable(
3041 self,
3042 store: &mut StoreOpaque,
3043 ty: TransmitIndex,
3044 writer: u32,
3045 ) -> Result<()> {
3046 let table = self.id().get_mut(store).table_for_transmit(ty);
3047 let transmit_rep = match ty {
3048 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3049 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3050 };
3051
3052 let id = TableId::<TransmitHandle>::new(transmit_rep);
3053 log::trace!("guest_drop_writable: drop writer {id:?}");
3054 match ty {
3055 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3056 TransmitIndex::Future(_) => store.host_drop_writer(
3057 id,
3058 Some(|| {
3059 Err(format_err!(
3060 "cannot drop future write end without first writing a value"
3061 ))
3062 }),
3063 ),
3064 }
3065 }
3066
3067 fn copy<T: 'static>(
3070 self,
3071 mut store: StoreContextMut<T>,
3072 flat_abi: Option<FlatAbi>,
3073 write_caller: RuntimeComponentInstanceIndex,
3074 write_ty: TransmitIndex,
3075 write_options: OptionsIndex,
3076 write_address: usize,
3077 read_caller: RuntimeComponentInstanceIndex,
3078 read_ty: TransmitIndex,
3079 read_options: OptionsIndex,
3080 read_address: usize,
3081 count: usize,
3082 rep: u32,
3083 ) -> Result<()> {
3084 let types = self.id().get(store.0).component().types();
3085 match (write_ty, read_ty) {
3086 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3087 assert_eq!(count, 1);
3088
3089 let payload = types[types[write_ty].ty].payload;
3090
3091 if write_caller == read_caller && !allow_intra_component_read_write(payload) {
3092 bail!(
3093 "cannot read from and write to intra-component future with non-numeric payload"
3094 )
3095 }
3096
3097 let val = payload
3098 .map(|ty| {
3099 let lift =
3100 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3101
3102 let abi = lift.types.canonical_abi(&ty);
3103 if write_address % usize::try_from(abi.align32)? != 0 {
3105 bail!("write pointer not aligned");
3106 }
3107
3108 let bytes = lift
3109 .memory()
3110 .get(write_address..)
3111 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
3112 .ok_or_else(|| {
3113 crate::format_err!("write pointer out of bounds of memory")
3114 })?;
3115
3116 Val::load(lift, ty, bytes)
3117 })
3118 .transpose()?;
3119
3120 if let Some(val) = val {
3121 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3122 let types = lower.types;
3123 let ty = types[types[read_ty].ty].payload.unwrap();
3124 let ptr = func::validate_inbounds_dynamic(
3125 types.canonical_abi(&ty),
3126 lower.as_slice_mut(),
3127 &ValRaw::u32(read_address.try_into().unwrap()),
3128 )?;
3129 val.store(lower, ty, ptr)?;
3130 }
3131 }
3132 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3133 if write_caller == read_caller
3134 && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3135 {
3136 bail!(
3137 "cannot read from and write to intra-component stream with non-numeric payload"
3138 )
3139 }
3140
3141 if let Some(flat_abi) = flat_abi {
3142 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
3144 if length_in_bytes > 0 {
3145 if write_address % usize::try_from(flat_abi.align)? != 0 {
3146 bail!("write pointer not aligned");
3147 }
3148 if read_address % usize::try_from(flat_abi.align)? != 0 {
3149 bail!("read pointer not aligned");
3150 }
3151
3152 let store_opaque = store.0.store_opaque_mut();
3153
3154 {
3155 let src = self
3156 .options_memory(store_opaque, write_options)
3157 .get(write_address..)
3158 .and_then(|b| b.get(..length_in_bytes))
3159 .ok_or_else(|| {
3160 crate::format_err!("write pointer out of bounds of memory")
3161 })?
3162 .as_ptr();
3163 let dst = self
3164 .options_memory_mut(store_opaque, read_options)
3165 .get_mut(read_address..)
3166 .and_then(|b| b.get_mut(..length_in_bytes))
3167 .ok_or_else(|| {
3168 crate::format_err!("read pointer out of bounds of memory")
3169 })?
3170 .as_mut_ptr();
3171 unsafe {
3174 if write_caller == read_caller {
3175 src.copy_to(dst, length_in_bytes)
3179 } else {
3180 src.copy_to_nonoverlapping(dst, length_in_bytes)
3185 }
3186 }
3187 }
3188 }
3189 } else {
3190 let store_opaque = store.0.store_opaque_mut();
3191 let lift = &mut LiftContext::new(store_opaque, write_options, self);
3192 let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3193 let abi = lift.types.canonical_abi(&ty);
3194 let size = usize::try_from(abi.size32).unwrap();
3195 if write_address % usize::try_from(abi.align32)? != 0 {
3196 bail!("write pointer not aligned");
3197 }
3198 let bytes = lift
3199 .memory()
3200 .get(write_address..)
3201 .and_then(|b| b.get(..size * count))
3202 .ok_or_else(|| {
3203 crate::format_err!("write pointer out of bounds of memory")
3204 })?;
3205
3206 let values = (0..count)
3207 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3208 .collect::<Result<Vec<_>>>()?;
3209
3210 let id = TableId::<TransmitHandle>::new(rep);
3211 log::trace!("copy values {values:?} for {id:?}");
3212
3213 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3214 let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3215 let abi = lower.types.canonical_abi(&ty);
3216 if read_address % usize::try_from(abi.align32)? != 0 {
3217 bail!("read pointer not aligned");
3218 }
3219 let size = usize::try_from(abi.size32).unwrap();
3220 lower
3221 .as_slice_mut()
3222 .get_mut(read_address..)
3223 .and_then(|b| b.get_mut(..size * count))
3224 .ok_or_else(|| {
3225 crate::format_err!("read pointer out of bounds of memory")
3226 })?;
3227 let mut ptr = read_address;
3228 for value in values {
3229 value.store(lower, ty, ptr)?;
3230 ptr += size
3231 }
3232 }
3233 }
3234 _ => unreachable!(),
3235 }
3236
3237 Ok(())
3238 }
3239
3240 fn check_bounds(
3241 self,
3242 store: &StoreOpaque,
3243 options: OptionsIndex,
3244 ty: TransmitIndex,
3245 address: usize,
3246 count: usize,
3247 ) -> Result<()> {
3248 let types = self.id().get(store).component().types();
3249 let size = usize::try_from(
3250 match ty {
3251 TransmitIndex::Future(ty) => types[types[ty].ty]
3252 .payload
3253 .map(|ty| types.canonical_abi(&ty).size32),
3254 TransmitIndex::Stream(ty) => types[types[ty].ty]
3255 .payload
3256 .map(|ty| types.canonical_abi(&ty).size32),
3257 }
3258 .unwrap_or(0),
3259 )
3260 .unwrap();
3261
3262 if count > 0 && size > 0 {
3263 self.options_memory(store, options)
3264 .get(address..)
3265 .and_then(|b| b.get(..(size * count)))
3266 .map(drop)
3267 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3268 } else {
3269 Ok(())
3270 }
3271 }
3272
3273 pub(super) fn guest_write<T: 'static>(
3275 self,
3276 mut store: StoreContextMut<T>,
3277 caller: RuntimeComponentInstanceIndex,
3278 ty: TransmitIndex,
3279 options: OptionsIndex,
3280 flat_abi: Option<FlatAbi>,
3281 handle: u32,
3282 address: u32,
3283 count: u32,
3284 ) -> Result<ReturnCode> {
3285 if !self.options(store.0, options).async_ {
3286 store.0.check_blocking()?;
3290 }
3291
3292 let address = usize::try_from(address).unwrap();
3293 let count = usize::try_from(count).unwrap();
3294 self.check_bounds(store.0, options, ty, address, count)?;
3295 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3296 let TransmitLocalState::Write { done } = *state else {
3297 bail!(
3298 "invalid handle {handle}; expected `Write`; got {:?}",
3299 *state
3300 );
3301 };
3302
3303 if done {
3304 bail!("cannot write to stream after being notified that the readable end dropped");
3305 }
3306
3307 *state = TransmitLocalState::Busy;
3308 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3309 let concurrent_state = store.0.concurrent_state_mut();
3310 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3311 let transmit = concurrent_state.get_mut(transmit_id)?;
3312 log::trace!(
3313 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3314 transmit.read
3315 );
3316
3317 if transmit.done {
3318 bail!("cannot write to future after previous write succeeded or readable end dropped");
3319 }
3320
3321 let new_state = if let ReadState::Dropped = &transmit.read {
3322 ReadState::Dropped
3323 } else {
3324 ReadState::Open
3325 };
3326
3327 let set_guest_ready = |me: &mut ConcurrentState| {
3328 let transmit = me.get_mut(transmit_id)?;
3329 assert!(
3330 matches!(&transmit.write, WriteState::Open),
3331 "expected `WriteState::Open`; got `{:?}`",
3332 transmit.write
3333 );
3334 transmit.write = WriteState::GuestReady {
3335 instance: self,
3336 caller,
3337 ty,
3338 flat_abi,
3339 options,
3340 address,
3341 count,
3342 handle,
3343 };
3344 Ok::<_, crate::Error>(())
3345 };
3346
3347 let mut result = match mem::replace(&mut transmit.read, new_state) {
3348 ReadState::GuestReady {
3349 ty: read_ty,
3350 flat_abi: read_flat_abi,
3351 options: read_options,
3352 address: read_address,
3353 count: read_count,
3354 handle: read_handle,
3355 instance: read_instance,
3356 caller: read_caller,
3357 } => {
3358 assert_eq!(flat_abi, read_flat_abi);
3359
3360 if let TransmitIndex::Future(_) = ty {
3361 transmit.done = true;
3362 }
3363
3364 let write_complete = count == 0 || read_count > 0;
3386 let read_complete = count > 0;
3387 let read_buffer_remaining = count < read_count;
3388
3389 let read_handle_rep = transmit.read_handle.rep();
3390
3391 let count = count.min(read_count);
3392
3393 self.copy(
3394 store.as_context_mut(),
3395 flat_abi,
3396 caller,
3397 ty,
3398 options,
3399 address,
3400 read_caller,
3401 read_ty,
3402 read_options,
3403 read_address,
3404 count,
3405 rep,
3406 )?;
3407
3408 let instance = self.id().get_mut(store.0);
3409 let types = instance.component().types();
3410 let item_size = payload(ty, types)
3411 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3412 .unwrap_or(0);
3413 let concurrent_state = store.0.concurrent_state_mut();
3414 if read_complete {
3415 let count = u32::try_from(count).unwrap();
3416 let total = if let Some(Event::StreamRead {
3417 code: ReturnCode::Completed(old_total),
3418 ..
3419 }) = concurrent_state.take_event(read_handle_rep)?
3420 {
3421 count + old_total
3422 } else {
3423 count
3424 };
3425
3426 let code = ReturnCode::completed(ty.kind(), total);
3427
3428 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3429 }
3430
3431 if read_buffer_remaining {
3432 let transmit = concurrent_state.get_mut(transmit_id)?;
3433 transmit.read = ReadState::GuestReady {
3434 ty: read_ty,
3435 flat_abi: read_flat_abi,
3436 options: read_options,
3437 address: read_address + (count * item_size),
3438 count: read_count - count,
3439 handle: read_handle,
3440 instance: read_instance,
3441 caller: read_caller,
3442 };
3443 }
3444
3445 if write_complete {
3446 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3447 } else {
3448 set_guest_ready(concurrent_state)?;
3449 ReturnCode::Blocked
3450 }
3451 }
3452
3453 ReadState::HostReady {
3454 consume,
3455 guest_offset,
3456 cancel,
3457 cancel_waker,
3458 } => {
3459 assert!(cancel_waker.is_none());
3460 assert!(!cancel);
3461 assert_eq!(0, guest_offset);
3462
3463 if let TransmitIndex::Future(_) = ty {
3464 transmit.done = true;
3465 }
3466
3467 set_guest_ready(concurrent_state)?;
3468 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3469 }
3470
3471 ReadState::HostToHost { .. } => unreachable!(),
3472
3473 ReadState::Open => {
3474 set_guest_ready(concurrent_state)?;
3475 ReturnCode::Blocked
3476 }
3477
3478 ReadState::Dropped => {
3479 if let TransmitIndex::Future(_) = ty {
3480 transmit.done = true;
3481 }
3482
3483 ReturnCode::Dropped(0)
3484 }
3485 };
3486
3487 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3488 result = self.wait_for_write(store.0, transmit_handle)?;
3489 }
3490
3491 if result != ReturnCode::Blocked {
3492 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3493 TransmitLocalState::Write {
3494 done: matches!(
3495 (result, ty),
3496 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3497 ),
3498 };
3499 }
3500
3501 log::trace!(
3502 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3503 );
3504
3505 Ok(result)
3506 }
3507
3508 pub(super) fn guest_read<T: 'static>(
3510 self,
3511 mut store: StoreContextMut<T>,
3512 caller: RuntimeComponentInstanceIndex,
3513 ty: TransmitIndex,
3514 options: OptionsIndex,
3515 flat_abi: Option<FlatAbi>,
3516 handle: u32,
3517 address: u32,
3518 count: u32,
3519 ) -> Result<ReturnCode> {
3520 if !self.options(store.0, options).async_ {
3521 store.0.check_blocking()?;
3525 }
3526
3527 let address = usize::try_from(address).unwrap();
3528 let count = usize::try_from(count).unwrap();
3529 self.check_bounds(store.0, options, ty, address, count)?;
3530 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3531 let TransmitLocalState::Read { done } = *state else {
3532 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3533 };
3534
3535 if done {
3536 bail!("cannot read from stream after being notified that the writable end dropped");
3537 }
3538
3539 *state = TransmitLocalState::Busy;
3540 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3541 let concurrent_state = store.0.concurrent_state_mut();
3542 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3543 let transmit = concurrent_state.get_mut(transmit_id)?;
3544 log::trace!(
3545 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3546 transmit.write
3547 );
3548
3549 if transmit.done {
3550 bail!("cannot read from future after previous read succeeded");
3551 }
3552
3553 let new_state = if let WriteState::Dropped = &transmit.write {
3554 WriteState::Dropped
3555 } else {
3556 WriteState::Open
3557 };
3558
3559 let set_guest_ready = |me: &mut ConcurrentState| {
3560 let transmit = me.get_mut(transmit_id)?;
3561 assert!(
3562 matches!(&transmit.read, ReadState::Open),
3563 "expected `ReadState::Open`; got `{:?}`",
3564 transmit.read
3565 );
3566 transmit.read = ReadState::GuestReady {
3567 ty,
3568 flat_abi,
3569 options,
3570 address,
3571 count,
3572 handle,
3573 instance: self,
3574 caller,
3575 };
3576 Ok::<_, crate::Error>(())
3577 };
3578
3579 let mut result = match mem::replace(&mut transmit.write, new_state) {
3580 WriteState::GuestReady {
3581 instance: _,
3582 ty: write_ty,
3583 flat_abi: write_flat_abi,
3584 options: write_options,
3585 address: write_address,
3586 count: write_count,
3587 handle: write_handle,
3588 caller: write_caller,
3589 } => {
3590 assert_eq!(flat_abi, write_flat_abi);
3591
3592 if let TransmitIndex::Future(_) = ty {
3593 transmit.done = true;
3594 }
3595
3596 let write_handle_rep = transmit.write_handle.rep();
3597
3598 let write_complete = write_count == 0 || count > 0;
3603 let read_complete = write_count > 0;
3604 let write_buffer_remaining = count < write_count;
3605
3606 let count = count.min(write_count);
3607
3608 self.copy(
3609 store.as_context_mut(),
3610 flat_abi,
3611 write_caller,
3612 write_ty,
3613 write_options,
3614 write_address,
3615 caller,
3616 ty,
3617 options,
3618 address,
3619 count,
3620 rep,
3621 )?;
3622
3623 let instance = self.id().get_mut(store.0);
3624 let types = instance.component().types();
3625 let item_size = payload(ty, types)
3626 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3627 .unwrap_or(0);
3628 let concurrent_state = store.0.concurrent_state_mut();
3629
3630 if write_complete {
3631 let count = u32::try_from(count).unwrap();
3632 let total = if let Some(Event::StreamWrite {
3633 code: ReturnCode::Completed(old_total),
3634 ..
3635 }) = concurrent_state.take_event(write_handle_rep)?
3636 {
3637 count + old_total
3638 } else {
3639 count
3640 };
3641
3642 let code = ReturnCode::completed(ty.kind(), total);
3643
3644 concurrent_state.send_write_result(
3645 write_ty,
3646 transmit_id,
3647 write_handle,
3648 code,
3649 )?;
3650 }
3651
3652 if write_buffer_remaining {
3653 let transmit = concurrent_state.get_mut(transmit_id)?;
3654 transmit.write = WriteState::GuestReady {
3655 instance: self,
3656 caller: write_caller,
3657 ty: write_ty,
3658 flat_abi: write_flat_abi,
3659 options: write_options,
3660 address: write_address + (count * item_size),
3661 count: write_count - count,
3662 handle: write_handle,
3663 };
3664 }
3665
3666 if read_complete {
3667 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3668 } else {
3669 set_guest_ready(concurrent_state)?;
3670 ReturnCode::Blocked
3671 }
3672 }
3673
3674 WriteState::HostReady {
3675 produce,
3676 try_into,
3677 guest_offset,
3678 cancel,
3679 cancel_waker,
3680 } => {
3681 assert!(cancel_waker.is_none());
3682 assert!(!cancel);
3683 assert_eq!(0, guest_offset);
3684
3685 set_guest_ready(concurrent_state)?;
3686
3687 let code =
3688 self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3689
3690 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3691 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3692 }
3693
3694 code
3695 }
3696
3697 WriteState::Open => {
3698 set_guest_ready(concurrent_state)?;
3699 ReturnCode::Blocked
3700 }
3701
3702 WriteState::Dropped => ReturnCode::Dropped(0),
3703 };
3704
3705 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3706 result = self.wait_for_read(store.0, transmit_handle)?;
3707 }
3708
3709 if result != ReturnCode::Blocked {
3710 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3711 TransmitLocalState::Read {
3712 done: matches!(
3713 (result, ty),
3714 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3715 ),
3716 };
3717 }
3718
3719 log::trace!(
3720 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3721 );
3722
3723 Ok(result)
3724 }
3725
3726 fn wait_for_write(
3727 self,
3728 store: &mut StoreOpaque,
3729 handle: TableId<TransmitHandle>,
3730 ) -> Result<ReturnCode> {
3731 let waitable = Waitable::Transmit(handle);
3732 store.wait_for_event(waitable)?;
3733 let event = waitable.take_event(store.concurrent_state_mut())?;
3734 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3735 event
3736 {
3737 waitable.on_delivery(store, self, event);
3738 Ok(code)
3739 } else {
3740 unreachable!()
3741 }
3742 }
3743
3744 fn cancel_write(
3746 self,
3747 store: &mut StoreOpaque,
3748 transmit_id: TableId<TransmitState>,
3749 async_: bool,
3750 ) -> Result<ReturnCode> {
3751 let state = store.concurrent_state_mut();
3752 let transmit = state.get_mut(transmit_id)?;
3753 log::trace!(
3754 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3755 transmit.read,
3756 transmit.write
3757 );
3758
3759 let code = if let Some(event) =
3760 Waitable::Transmit(transmit.write_handle).take_event(state)?
3761 {
3762 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3763 unreachable!();
3764 };
3765 match (code, event) {
3766 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3767 ReturnCode::Cancelled(count)
3768 }
3769 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3770 _ => unreachable!(),
3771 }
3772 } else if let ReadState::HostReady {
3773 cancel,
3774 cancel_waker,
3775 ..
3776 } = &mut state.get_mut(transmit_id)?.read
3777 {
3778 *cancel = true;
3779 if let Some(waker) = cancel_waker.take() {
3780 waker.wake();
3781 }
3782
3783 if async_ {
3784 ReturnCode::Blocked
3785 } else {
3786 let handle = store
3787 .concurrent_state_mut()
3788 .get_mut(transmit_id)?
3789 .write_handle;
3790 self.wait_for_write(store, handle)?
3791 }
3792 } else {
3793 ReturnCode::Cancelled(0)
3794 };
3795
3796 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3797
3798 match &transmit.write {
3799 WriteState::GuestReady { .. } => {
3800 transmit.write = WriteState::Open;
3801 }
3802 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3803 WriteState::Open | WriteState::Dropped => {}
3804 }
3805
3806 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3807
3808 Ok(code)
3809 }
3810
3811 fn wait_for_read(
3812 self,
3813 store: &mut StoreOpaque,
3814 handle: TableId<TransmitHandle>,
3815 ) -> Result<ReturnCode> {
3816 let waitable = Waitable::Transmit(handle);
3817 store.wait_for_event(waitable)?;
3818 let event = waitable.take_event(store.concurrent_state_mut())?;
3819 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3820 event
3821 {
3822 waitable.on_delivery(store, self, event);
3823 Ok(code)
3824 } else {
3825 unreachable!()
3826 }
3827 }
3828
3829 fn cancel_read(
3831 self,
3832 store: &mut StoreOpaque,
3833 transmit_id: TableId<TransmitState>,
3834 async_: bool,
3835 ) -> Result<ReturnCode> {
3836 let state = store.concurrent_state_mut();
3837 let transmit = state.get_mut(transmit_id)?;
3838 log::trace!(
3839 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3840 transmit.read,
3841 transmit.write
3842 );
3843
3844 let code = if let Some(event) =
3845 Waitable::Transmit(transmit.read_handle).take_event(state)?
3846 {
3847 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3848 unreachable!();
3849 };
3850 match (code, event) {
3851 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3852 ReturnCode::Cancelled(count)
3853 }
3854 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3855 _ => unreachable!(),
3856 }
3857 } else if let WriteState::HostReady {
3858 cancel,
3859 cancel_waker,
3860 ..
3861 } = &mut state.get_mut(transmit_id)?.write
3862 {
3863 *cancel = true;
3864 if let Some(waker) = cancel_waker.take() {
3865 waker.wake();
3866 }
3867
3868 if async_ {
3869 ReturnCode::Blocked
3870 } else {
3871 let handle = store
3872 .concurrent_state_mut()
3873 .get_mut(transmit_id)?
3874 .read_handle;
3875 self.wait_for_read(store, handle)?
3876 }
3877 } else {
3878 ReturnCode::Cancelled(0)
3879 };
3880
3881 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3882
3883 match &transmit.read {
3884 ReadState::GuestReady { .. } => {
3885 transmit.read = ReadState::Open;
3886 }
3887 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3888 todo!("support host read cancellation")
3889 }
3890 ReadState::Open | ReadState::Dropped => {}
3891 }
3892
3893 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3894
3895 Ok(code)
3896 }
3897
3898 fn guest_cancel_write(
3900 self,
3901 store: &mut StoreOpaque,
3902 ty: TransmitIndex,
3903 async_: bool,
3904 writer: u32,
3905 ) -> Result<ReturnCode> {
3906 if !async_ {
3907 store.check_blocking()?;
3911 }
3912
3913 let (rep, state) =
3914 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3915 let id = TableId::<TransmitHandle>::new(rep);
3916 log::trace!("guest cancel write {id:?} (handle {writer})");
3917 match state {
3918 TransmitLocalState::Write { .. } => {
3919 bail!("stream or future write cancelled when no write is pending")
3920 }
3921 TransmitLocalState::Read { .. } => {
3922 bail!("passed read end to `{{stream|future}}.cancel-write`")
3923 }
3924 TransmitLocalState::Busy => {}
3925 }
3926 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3927 let code = self.cancel_write(store, transmit_id, async_)?;
3928 if !matches!(code, ReturnCode::Blocked) {
3929 let state =
3930 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3931 .1;
3932 if let TransmitLocalState::Busy = state {
3933 *state = TransmitLocalState::Write { done: false };
3934 }
3935 }
3936 Ok(code)
3937 }
3938
3939 fn guest_cancel_read(
3941 self,
3942 store: &mut StoreOpaque,
3943 ty: TransmitIndex,
3944 async_: bool,
3945 reader: u32,
3946 ) -> Result<ReturnCode> {
3947 if !async_ {
3948 store.check_blocking()?;
3952 }
3953
3954 let (rep, state) =
3955 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3956 let id = TableId::<TransmitHandle>::new(rep);
3957 log::trace!("guest cancel read {id:?} (handle {reader})");
3958 match state {
3959 TransmitLocalState::Read { .. } => {
3960 bail!("stream or future read cancelled when no read is pending")
3961 }
3962 TransmitLocalState::Write { .. } => {
3963 bail!("passed write end to `{{stream|future}}.cancel-read`")
3964 }
3965 TransmitLocalState::Busy => {}
3966 }
3967 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3968 let code = self.cancel_read(store, transmit_id, async_)?;
3969 if !matches!(code, ReturnCode::Blocked) {
3970 let state =
3971 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3972 .1;
3973 if let TransmitLocalState::Busy = state {
3974 *state = TransmitLocalState::Read { done: false };
3975 }
3976 }
3977 Ok(code)
3978 }
3979
3980 fn guest_drop_readable(
3982 self,
3983 store: &mut StoreOpaque,
3984 ty: TransmitIndex,
3985 reader: u32,
3986 ) -> Result<()> {
3987 let table = self.id().get_mut(store).table_for_transmit(ty);
3988 let (rep, _is_done) = match ty {
3989 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3990 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3991 };
3992 let kind = match ty {
3993 TransmitIndex::Stream(_) => TransmitKind::Stream,
3994 TransmitIndex::Future(_) => TransmitKind::Future,
3995 };
3996 let id = TableId::<TransmitHandle>::new(rep);
3997 log::trace!("guest_drop_readable: drop reader {id:?}");
3998 store.host_drop_reader(id, kind)
3999 }
4000
4001 pub(crate) fn error_context_new(
4003 self,
4004 store: &mut StoreOpaque,
4005 caller: RuntimeComponentInstanceIndex,
4006 ty: TypeComponentLocalErrorContextTableIndex,
4007 options: OptionsIndex,
4008 debug_msg_address: u32,
4009 debug_msg_len: u32,
4010 ) -> Result<u32> {
4011 self.id().get(store).check_may_leave(caller)?;
4012 let lift_ctx = &mut LiftContext::new(store, options, self);
4013 let debug_msg = String::linear_lift_from_flat(
4014 lift_ctx,
4015 InterfaceType::String,
4016 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4017 )?;
4018
4019 let err_ctx = ErrorContextState { debug_msg };
4021 let state = store.concurrent_state_mut();
4022 let table_id = state.push(err_ctx)?;
4023 let global_ref_count_idx =
4024 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4025
4026 let _ = state
4028 .global_error_context_ref_counts
4029 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4030
4031 let local_idx = self
4038 .id()
4039 .get_mut(store)
4040 .table_for_error_context(ty)
4041 .error_context_insert(table_id.rep())?;
4042
4043 Ok(local_idx)
4044 }
4045
4046 pub(super) fn error_context_debug_message<T>(
4048 self,
4049 store: StoreContextMut<T>,
4050 ty: TypeComponentLocalErrorContextTableIndex,
4051 options: OptionsIndex,
4052 err_ctx_handle: u32,
4053 debug_msg_address: u32,
4054 ) -> Result<()> {
4055 let handle_table_id_rep = self
4057 .id()
4058 .get_mut(store.0)
4059 .table_for_error_context(ty)
4060 .error_context_rep(err_ctx_handle)?;
4061
4062 let state = store.0.concurrent_state_mut();
4063 let ErrorContextState { debug_msg } =
4065 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4066 let debug_msg = debug_msg.clone();
4067
4068 let lower_cx = &mut LowerContext::new(store, options, self);
4069 let debug_msg_address = usize::try_from(debug_msg_address)?;
4070 let offset = lower_cx
4072 .as_slice_mut()
4073 .get(debug_msg_address..)
4074 .and_then(|b| b.get(..debug_msg.bytes().len()))
4075 .map(|_| debug_msg_address)
4076 .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4077 debug_msg
4078 .as_str()
4079 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4080
4081 Ok(())
4082 }
4083
4084 pub(crate) fn future_cancel_read(
4086 self,
4087 store: &mut StoreOpaque,
4088 caller: RuntimeComponentInstanceIndex,
4089 ty: TypeFutureTableIndex,
4090 async_: bool,
4091 reader: u32,
4092 ) -> Result<u32> {
4093 self.id().get(store).check_may_leave(caller)?;
4094 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4095 .map(|v| v.encode())
4096 }
4097
4098 pub(crate) fn future_cancel_write(
4100 self,
4101 store: &mut StoreOpaque,
4102 caller: RuntimeComponentInstanceIndex,
4103 ty: TypeFutureTableIndex,
4104 async_: bool,
4105 writer: u32,
4106 ) -> Result<u32> {
4107 self.id().get(store).check_may_leave(caller)?;
4108 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4109 .map(|v| v.encode())
4110 }
4111
4112 pub(crate) fn stream_cancel_read(
4114 self,
4115 store: &mut StoreOpaque,
4116 caller: RuntimeComponentInstanceIndex,
4117 ty: TypeStreamTableIndex,
4118 async_: bool,
4119 reader: u32,
4120 ) -> Result<u32> {
4121 self.id().get(store).check_may_leave(caller)?;
4122 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4123 .map(|v| v.encode())
4124 }
4125
4126 pub(crate) fn stream_cancel_write(
4128 self,
4129 store: &mut StoreOpaque,
4130 caller: RuntimeComponentInstanceIndex,
4131 ty: TypeStreamTableIndex,
4132 async_: bool,
4133 writer: u32,
4134 ) -> Result<u32> {
4135 self.id().get(store).check_may_leave(caller)?;
4136 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4137 .map(|v| v.encode())
4138 }
4139
4140 pub(crate) fn future_drop_readable(
4142 self,
4143 store: &mut StoreOpaque,
4144 caller: RuntimeComponentInstanceIndex,
4145 ty: TypeFutureTableIndex,
4146 reader: u32,
4147 ) -> Result<()> {
4148 self.id().get(store).check_may_leave(caller)?;
4149 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4150 }
4151
4152 pub(crate) fn stream_drop_readable(
4154 self,
4155 store: &mut StoreOpaque,
4156 caller: RuntimeComponentInstanceIndex,
4157 ty: TypeStreamTableIndex,
4158 reader: u32,
4159 ) -> Result<()> {
4160 self.id().get(store).check_may_leave(caller)?;
4161 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4162 }
4163
4164 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4168 let (write, read) = store
4169 .concurrent_state_mut()
4170 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4171
4172 let table = self.id().get_mut(store).table_for_transmit(ty);
4173 let (read_handle, write_handle) = match ty {
4174 TransmitIndex::Future(ty) => (
4175 table.future_insert_read(ty, read.rep())?,
4176 table.future_insert_write(ty, write.rep())?,
4177 ),
4178 TransmitIndex::Stream(ty) => (
4179 table.stream_insert_read(ty, read.rep())?,
4180 table.stream_insert_write(ty, write.rep())?,
4181 ),
4182 };
4183
4184 let state = store.concurrent_state_mut();
4185 state.get_mut(read)?.common.handle = Some(read_handle);
4186 state.get_mut(write)?.common.handle = Some(write_handle);
4187
4188 Ok(ResourcePair {
4189 write: write_handle,
4190 read: read_handle,
4191 })
4192 }
4193
4194 pub(crate) fn error_context_drop(
4196 self,
4197 store: &mut StoreOpaque,
4198 caller: RuntimeComponentInstanceIndex,
4199 ty: TypeComponentLocalErrorContextTableIndex,
4200 error_context: u32,
4201 ) -> Result<()> {
4202 let instance = self.id().get_mut(store);
4203 instance.check_may_leave(caller)?;
4204
4205 let local_handle_table = instance.table_for_error_context(ty);
4206
4207 let rep = local_handle_table.error_context_drop(error_context)?;
4208
4209 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4210
4211 let state = store.concurrent_state_mut();
4212 let GlobalErrorContextRefCount(global_ref_count) = state
4213 .global_error_context_ref_counts
4214 .get_mut(&global_ref_count_idx)
4215 .expect("retrieve concurrent state for error context during drop");
4216
4217 assert!(*global_ref_count >= 1);
4219 *global_ref_count -= 1;
4220 if *global_ref_count == 0 {
4221 state
4222 .global_error_context_ref_counts
4223 .remove(&global_ref_count_idx);
4224
4225 state
4226 .delete(TableId::<ErrorContextState>::new(rep))
4227 .context("deleting component-global error context data")?;
4228 }
4229
4230 Ok(())
4231 }
4232
4233 fn guest_transfer(
4236 self,
4237 store: &mut StoreOpaque,
4238 src_idx: u32,
4239 src: TransmitIndex,
4240 dst: TransmitIndex,
4241 ) -> Result<u32> {
4242 let mut instance = self.id().get_mut(store);
4243 let src_table = instance.as_mut().table_for_transmit(src);
4244 let (rep, is_done) = match src {
4245 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4246 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4247 };
4248 if is_done {
4249 bail!("cannot lift after being notified that the writable end dropped");
4250 }
4251 let dst_table = instance.table_for_transmit(dst);
4252 let handle = match dst {
4253 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4254 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4255 }?;
4256 store
4257 .concurrent_state_mut()
4258 .get_mut(TableId::<TransmitHandle>::new(rep))?
4259 .common
4260 .handle = Some(handle);
4261 Ok(handle)
4262 }
4263
4264 pub(crate) fn future_new(
4266 self,
4267 store: &mut StoreOpaque,
4268 caller: RuntimeComponentInstanceIndex,
4269 ty: TypeFutureTableIndex,
4270 ) -> Result<ResourcePair> {
4271 self.id().get(store).check_may_leave(caller)?;
4272 self.guest_new(store, TransmitIndex::Future(ty))
4273 }
4274
4275 pub(crate) fn stream_new(
4277 self,
4278 store: &mut StoreOpaque,
4279 caller: RuntimeComponentInstanceIndex,
4280 ty: TypeStreamTableIndex,
4281 ) -> Result<ResourcePair> {
4282 self.id().get(store).check_may_leave(caller)?;
4283 self.guest_new(store, TransmitIndex::Stream(ty))
4284 }
4285
4286 pub(crate) fn future_transfer(
4289 self,
4290 store: &mut StoreOpaque,
4291 src_idx: u32,
4292 src: TypeFutureTableIndex,
4293 dst: TypeFutureTableIndex,
4294 ) -> Result<u32> {
4295 self.guest_transfer(
4296 store,
4297 src_idx,
4298 TransmitIndex::Future(src),
4299 TransmitIndex::Future(dst),
4300 )
4301 }
4302
4303 pub(crate) fn stream_transfer(
4306 self,
4307 store: &mut StoreOpaque,
4308 src_idx: u32,
4309 src: TypeStreamTableIndex,
4310 dst: TypeStreamTableIndex,
4311 ) -> Result<u32> {
4312 self.guest_transfer(
4313 store,
4314 src_idx,
4315 TransmitIndex::Stream(src),
4316 TransmitIndex::Stream(dst),
4317 )
4318 }
4319
4320 pub(crate) fn error_context_transfer(
4322 self,
4323 store: &mut StoreOpaque,
4324 src_idx: u32,
4325 src: TypeComponentLocalErrorContextTableIndex,
4326 dst: TypeComponentLocalErrorContextTableIndex,
4327 ) -> Result<u32> {
4328 let mut instance = self.id().get_mut(store);
4329 let rep = instance
4330 .as_mut()
4331 .table_for_error_context(src)
4332 .error_context_rep(src_idx)?;
4333 let dst_idx = instance
4334 .table_for_error_context(dst)
4335 .error_context_insert(rep)?;
4336
4337 let global_ref_count = store
4341 .concurrent_state_mut()
4342 .global_error_context_ref_counts
4343 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4344 .context("global ref count present for existing (sub)component error context")?;
4345 global_ref_count.0 += 1;
4346
4347 Ok(dst_idx)
4348 }
4349}
4350
4351impl ComponentInstance {
4352 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4353 let (states, types) = self.instance_states();
4354 let runtime_instance = match ty {
4355 TransmitIndex::Stream(ty) => types[ty].instance,
4356 TransmitIndex::Future(ty) => types[ty].instance,
4357 };
4358 states[runtime_instance].handle_table()
4359 }
4360
4361 fn table_for_error_context(
4362 self: Pin<&mut Self>,
4363 ty: TypeComponentLocalErrorContextTableIndex,
4364 ) -> &mut HandleTable {
4365 let (states, types) = self.instance_states();
4366 let runtime_instance = types[ty].instance;
4367 states[runtime_instance].handle_table()
4368 }
4369
4370 fn get_mut_by_index(
4371 self: Pin<&mut Self>,
4372 ty: TransmitIndex,
4373 index: u32,
4374 ) -> Result<(u32, &mut TransmitLocalState)> {
4375 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4376 }
4377}
4378
4379impl ConcurrentState {
4380 fn send_write_result(
4381 &mut self,
4382 ty: TransmitIndex,
4383 id: TableId<TransmitState>,
4384 handle: u32,
4385 code: ReturnCode,
4386 ) -> Result<()> {
4387 let write_handle = self.get_mut(id)?.write_handle.rep();
4388 self.set_event(
4389 write_handle,
4390 match ty {
4391 TransmitIndex::Future(ty) => Event::FutureWrite {
4392 code,
4393 pending: Some((ty, handle)),
4394 },
4395 TransmitIndex::Stream(ty) => Event::StreamWrite {
4396 code,
4397 pending: Some((ty, handle)),
4398 },
4399 },
4400 )
4401 }
4402
4403 fn send_read_result(
4404 &mut self,
4405 ty: TransmitIndex,
4406 id: TableId<TransmitState>,
4407 handle: u32,
4408 code: ReturnCode,
4409 ) -> Result<()> {
4410 let read_handle = self.get_mut(id)?.read_handle.rep();
4411 self.set_event(
4412 read_handle,
4413 match ty {
4414 TransmitIndex::Future(ty) => Event::FutureRead {
4415 code,
4416 pending: Some((ty, handle)),
4417 },
4418 TransmitIndex::Stream(ty) => Event::StreamRead {
4419 code,
4420 pending: Some((ty, handle)),
4421 },
4422 },
4423 )
4424 }
4425
4426 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4427 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4428 }
4429
4430 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4431 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4432 }
4433
4434 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4445 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4446
4447 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4448 let (ReturnCode::Completed(count)
4449 | ReturnCode::Dropped(count)
4450 | ReturnCode::Cancelled(count)) = old
4451 else {
4452 unreachable!()
4453 };
4454
4455 match new {
4456 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4457 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4458 _ => unreachable!(),
4459 }
4460 }
4461
4462 let event = match (waitable.take_event(self)?, event) {
4463 (None, _) => event,
4464 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4465 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4466 (
4467 Some(Event::StreamWrite {
4468 code: old_code,
4469 pending: old_pending,
4470 }),
4471 Event::StreamWrite { code, pending },
4472 ) => Event::StreamWrite {
4473 code: update_code(old_code, code),
4474 pending: old_pending.or(pending),
4475 },
4476 (
4477 Some(Event::StreamRead {
4478 code: old_code,
4479 pending: old_pending,
4480 }),
4481 Event::StreamRead { code, pending },
4482 ) => Event::StreamRead {
4483 code: update_code(old_code, code),
4484 pending: old_pending.or(pending),
4485 },
4486 _ => unreachable!(),
4487 };
4488
4489 waitable.set_event(self, Some(event))
4490 }
4491
4492 fn new_transmit(
4495 &mut self,
4496 origin: TransmitOrigin,
4497 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4498 let state_id = self.push(TransmitState::new(origin))?;
4499
4500 let write = self.push(TransmitHandle::new(state_id))?;
4501 let read = self.push(TransmitHandle::new(state_id))?;
4502
4503 let state = self.get_mut(state_id)?;
4504 state.write_handle = write;
4505 state.read_handle = read;
4506
4507 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4508
4509 Ok((write, read))
4510 }
4511
4512 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4514 let state = self.delete(state_id)?;
4515 self.delete(state.write_handle)?;
4516 self.delete(state.read_handle)?;
4517
4518 log::trace!(
4519 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4520 state.write_handle,
4521 state.read_handle,
4522 );
4523
4524 Ok(())
4525 }
4526}
4527
4528pub(crate) struct ResourcePair {
4529 pub(crate) write: u32,
4530 pub(crate) read: u32,
4531}
4532
4533impl Waitable {
4534 pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4537 match event {
4538 Event::FutureRead {
4539 pending: Some((ty, handle)),
4540 ..
4541 }
4542 | Event::FutureWrite {
4543 pending: Some((ty, handle)),
4544 ..
4545 } => {
4546 let instance = instance.id().get_mut(store);
4547 let runtime_instance = instance.component().types()[ty].instance;
4548 let (rep, state) = instance.instance_states().0[runtime_instance]
4549 .handle_table()
4550 .future_rep(ty, handle)
4551 .unwrap();
4552 assert_eq!(rep, self.rep());
4553 assert_eq!(*state, TransmitLocalState::Busy);
4554 *state = match event {
4555 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4556 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4557 _ => unreachable!(),
4558 };
4559 }
4560 Event::StreamRead {
4561 pending: Some((ty, handle)),
4562 code,
4563 }
4564 | Event::StreamWrite {
4565 pending: Some((ty, handle)),
4566 code,
4567 } => {
4568 let instance = instance.id().get_mut(store);
4569 let runtime_instance = instance.component().types()[ty].instance;
4570 let (rep, state) = instance.instance_states().0[runtime_instance]
4571 .handle_table()
4572 .stream_rep(ty, handle)
4573 .unwrap();
4574 assert_eq!(rep, self.rep());
4575 assert_eq!(*state, TransmitLocalState::Busy);
4576 let done = matches!(code, ReturnCode::Dropped(_));
4577 *state = match event {
4578 Event::StreamRead { .. } => TransmitLocalState::Read { done },
4579 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4580 _ => unreachable!(),
4581 };
4582
4583 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4584 let state = store.concurrent_state_mut();
4585 let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4586 let transmit = state.get_mut(transmit_id).unwrap();
4587
4588 match event {
4589 Event::StreamRead { .. } => {
4590 transmit.read = ReadState::Open;
4591 }
4592 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4593 _ => unreachable!(),
4594 };
4595 }
4596 _ => {}
4597 }
4598 }
4599}
4600
4601fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4605 matches!(
4606 ty,
4607 None | Some(
4608 InterfaceType::S8
4609 | InterfaceType::U8
4610 | InterfaceType::S16
4611 | InterfaceType::U16
4612 | InterfaceType::S32
4613 | InterfaceType::U32
4614 | InterfaceType::S64
4615 | InterfaceType::U64
4616 | InterfaceType::Float32
4617 | InterfaceType::Float64
4618 )
4619 )
4620}
4621
4622#[cfg(test)]
4623mod tests {
4624 use super::*;
4625 use crate::{Engine, Store};
4626 use core::future::pending;
4627 use core::pin::pin;
4628 use std::sync::LazyLock;
4629
4630 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4631
4632 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4633 where
4634 T: FutureProducer<()>,
4635 {
4636 rx.poll_produce(
4637 &mut Context::from_waker(Waker::noop()),
4638 Store::new(&ENGINE, ()).as_context_mut(),
4639 finish,
4640 )
4641 }
4642
4643 #[test]
4644 fn future_producer() {
4645 let mut fut = pin!(async { crate::error::Ok(()) });
4646 assert!(matches!(
4647 poll_future_producer(fut.as_mut(), false),
4648 Poll::Ready(Ok(Some(()))),
4649 ));
4650
4651 let mut fut = pin!(async { crate::error::Ok(()) });
4652 assert!(matches!(
4653 poll_future_producer(fut.as_mut(), true),
4654 Poll::Ready(Ok(Some(()))),
4655 ));
4656
4657 let mut fut = pin!(pending::<Result<()>>());
4658 assert!(matches!(
4659 poll_future_producer(fut.as_mut(), false),
4660 Poll::Pending,
4661 ));
4662 assert!(matches!(
4663 poll_future_producer(fut.as_mut(), true),
4664 Poll::Ready(Ok(None)),
4665 ));
4666
4667 let (tx, rx) = oneshot::channel();
4668 let mut rx = pin!(rx);
4669 assert!(matches!(
4670 poll_future_producer(rx.as_mut(), false),
4671 Poll::Pending,
4672 ));
4673 assert!(matches!(
4674 poll_future_producer(rx.as_mut(), true),
4675 Poll::Ready(Ok(None)),
4676 ));
4677 tx.send(()).unwrap();
4678 assert!(matches!(
4679 poll_future_producer(rx.as_mut(), true),
4680 Poll::Ready(Ok(Some(()))),
4681 ));
4682
4683 let (tx, rx) = oneshot::channel();
4684 let mut rx = pin!(rx);
4685 tx.send(()).unwrap();
4686 assert!(matches!(
4687 poll_future_producer(rx.as_mut(), false),
4688 Poll::Ready(Ok(Some(()))),
4689 ));
4690
4691 let (tx, rx) = oneshot::channel::<()>();
4692 let mut rx = pin!(rx);
4693 drop(tx);
4694 assert!(matches!(
4695 poll_future_producer(rx.as_mut(), false),
4696 Poll::Ready(Err(..)),
4697 ));
4698
4699 let (tx, rx) = oneshot::channel::<()>();
4700 let mut rx = pin!(rx);
4701 drop(tx);
4702 assert!(matches!(
4703 poll_future_producer(rx.as_mut(), true),
4704 Poll::Ready(Err(..)),
4705 ));
4706 }
4707}