1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17 Error, Result, bail,
18 error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, MaybeUninit};
26use core::pin::Pin;
27use core::task::{Context, Poll, Waker, ready};
28use futures::channel::oneshot;
29use futures::{FutureExt as _, stream};
30use std::any::{Any, TypeId};
31use std::boxed::Box;
32use std::io::Cursor;
33use std::string::String;
34use std::sync::{Arc, Mutex};
35use std::vec::Vec;
36use wasmtime_environ::component::{
37 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
38 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
39 TypeFutureTableIndex, TypeStreamTableIndex,
40};
41
42pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
43
44mod buffers;
45
46#[derive(Copy, Clone, Debug)]
49pub enum TransmitKind {
50 Stream,
51 Future,
52}
53
54#[derive(Copy, Clone, Debug, PartialEq)]
56pub enum ReturnCode {
57 Blocked,
58 Completed(u32),
59 Dropped(u32),
60 Cancelled(u32),
61}
62
63impl ReturnCode {
64 pub fn encode(&self) -> u32 {
69 const BLOCKED: u32 = 0xffff_ffff;
70 const COMPLETED: u32 = 0x0;
71 const DROPPED: u32 = 0x1;
72 const CANCELLED: u32 = 0x2;
73 match self {
74 ReturnCode::Blocked => BLOCKED,
75 ReturnCode::Completed(n) => {
76 debug_assert!(*n < (1 << 28));
77 (n << 4) | COMPLETED
78 }
79 ReturnCode::Dropped(n) => {
80 debug_assert!(*n < (1 << 28));
81 (n << 4) | DROPPED
82 }
83 ReturnCode::Cancelled(n) => {
84 debug_assert!(*n < (1 << 28));
85 (n << 4) | CANCELLED
86 }
87 }
88 }
89
90 fn completed(kind: TransmitKind, count: u32) -> Self {
93 Self::Completed(if let TransmitKind::Future = kind {
94 0
95 } else {
96 count
97 })
98 }
99}
100
101#[derive(Copy, Clone, Debug)]
106pub enum TransmitIndex {
107 Stream(TypeStreamTableIndex),
108 Future(TypeFutureTableIndex),
109}
110
111impl TransmitIndex {
112 pub fn kind(&self) -> TransmitKind {
113 match self {
114 TransmitIndex::Stream(_) => TransmitKind::Stream,
115 TransmitIndex::Future(_) => TransmitKind::Future,
116 }
117 }
118}
119
120fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
123 match ty {
124 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
125 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
126 }
127}
128
129fn get_mut_by_index_from(
132 handle_table: &mut HandleTable,
133 ty: TransmitIndex,
134 index: u32,
135) -> Result<(u32, &mut TransmitLocalState)> {
136 match ty {
137 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
138 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
139 }
140}
141
142fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
143 mut store: StoreContextMut<U>,
144 instance: Instance,
145 options: OptionsIndex,
146 ty: TransmitIndex,
147 address: usize,
148 count: usize,
149 buffer: &mut B,
150) -> Result<()> {
151 let count = buffer.remaining().len().min(count);
152
153 let lower = &mut if T::MAY_REQUIRE_REALLOC {
154 LowerContext::new
155 } else {
156 LowerContext::new_without_realloc
157 }(store.as_context_mut(), options, instance);
158
159 if address % usize::try_from(T::ALIGN32)? != 0 {
160 bail!("read pointer not aligned");
161 }
162 lower
163 .as_slice_mut()
164 .get_mut(address..)
165 .and_then(|b| b.get_mut(..T::SIZE32 * count))
166 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
167
168 if let Some(ty) = payload(ty, lower.types) {
169 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
170 }
171
172 buffer.skip(count);
173
174 Ok(())
175}
176
177fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
178 lift: &mut LiftContext<'_>,
179 ty: Option<InterfaceType>,
180 buffer: &mut B,
181 address: usize,
182 count: usize,
183) -> Result<()> {
184 let count = count.min(buffer.remaining_capacity());
185 if T::IS_RUST_UNIT_TYPE {
186 buffer.extend(
190 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
191 )
192 } else {
193 let ty = ty.unwrap();
194 if address % usize::try_from(T::ALIGN32)? != 0 {
195 bail!("write pointer not aligned");
196 }
197 lift.memory()
198 .get(address..)
199 .and_then(|b| b.get(..T::SIZE32 * count))
200 .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
201
202 let list = &WasmList::new(address, count, lift, ty)?;
203 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
204 }
205 Ok(())
206}
207
208#[derive(Debug, PartialEq, Eq, PartialOrd)]
210pub(super) struct ErrorContextState {
211 pub(crate) debug_msg: String,
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub(super) struct FlatAbi {
219 pub(super) size: u32,
220 pub(super) align: u32,
221}
222
223pub struct Destination<'a, T, B> {
225 id: TableId<TransmitState>,
226 buffer: &'a mut B,
227 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
228 _phantom: PhantomData<fn() -> T>,
229}
230
231impl<'a, T, B> Destination<'a, T, B> {
232 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
234 Destination {
235 id: self.id,
236 buffer: &mut *self.buffer,
237 host_buffer: self.host_buffer.as_deref_mut(),
238 _phantom: PhantomData,
239 }
240 }
241
242 pub fn take_buffer(&mut self) -> B
248 where
249 B: Default,
250 {
251 mem::take(self.buffer)
252 }
253
254 pub fn set_buffer(&mut self, buffer: B) {
264 *self.buffer = buffer;
265 }
266
267 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
284 let transmit = store
285 .as_context_mut()
286 .0
287 .concurrent_state_mut()
288 .get_mut(self.id)
289 .unwrap();
290
291 if let &ReadState::GuestReady { count, .. } = &transmit.read {
292 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
293 unreachable!()
294 };
295
296 Some(count - guest_offset)
297 } else {
298 None
299 }
300 }
301}
302
303impl<'a, B> Destination<'a, u8, B> {
304 pub fn as_direct<D>(
315 mut self,
316 store: StoreContextMut<'a, D>,
317 capacity: usize,
318 ) -> DirectDestination<'a, D> {
319 if let Some(buffer) = self.host_buffer.as_deref_mut() {
320 buffer.set_position(0);
321 if buffer.get_mut().is_empty() {
322 buffer.get_mut().resize(capacity, 0);
323 }
324 }
325
326 DirectDestination {
327 id: self.id,
328 host_buffer: self.host_buffer,
329 store,
330 }
331 }
332}
333
334pub struct DirectDestination<'a, D: 'static> {
337 id: TableId<TransmitState>,
338 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
339 store: StoreContextMut<'a, D>,
340}
341
342impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
343 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
344 let rem = self.remaining();
345 let n = rem.len().min(buf.len());
346 rem[..n].copy_from_slice(&buf[..n]);
347 self.mark_written(n);
348 Ok(n)
349 }
350
351 fn flush(&mut self) -> std::io::Result<()> {
352 Ok(())
353 }
354}
355
356impl<D: 'static> DirectDestination<'_, D> {
357 pub fn remaining(&mut self) -> &mut [u8] {
359 if let Some(buffer) = self.host_buffer.as_deref_mut() {
360 buffer.get_mut()
361 } else {
362 let transmit = self
363 .store
364 .as_context_mut()
365 .0
366 .concurrent_state_mut()
367 .get_mut(self.id)
368 .unwrap();
369
370 let &ReadState::GuestReady {
371 address,
372 count,
373 options,
374 instance,
375 ..
376 } = &transmit.read
377 else {
378 unreachable!();
379 };
380
381 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
382 unreachable!()
383 };
384
385 instance
386 .options_memory_mut(self.store.0, options)
387 .get_mut((address + guest_offset)..)
388 .and_then(|b| b.get_mut(..(count - guest_offset)))
389 .unwrap()
390 }
391 }
392
393 pub fn mark_written(&mut self, count: usize) {
398 if let Some(buffer) = self.host_buffer.as_deref_mut() {
399 buffer.set_position(
400 buffer
401 .position()
402 .checked_add(u64::try_from(count).unwrap())
403 .unwrap(),
404 );
405 } else {
406 let transmit = self
407 .store
408 .as_context_mut()
409 .0
410 .concurrent_state_mut()
411 .get_mut(self.id)
412 .unwrap();
413
414 let ReadState::GuestReady {
415 count: read_count, ..
416 } = &transmit.read
417 else {
418 unreachable!();
419 };
420
421 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
422 unreachable!()
423 };
424
425 if *guest_offset + count > *read_count {
426 panic!(
427 "write count ({count}) must be less than or equal to read count ({read_count})"
428 )
429 } else {
430 *guest_offset += count;
431 }
432 }
433 }
434}
435
436#[derive(Copy, Clone, Debug)]
438pub enum StreamResult {
439 Completed,
442 Cancelled,
447 Dropped,
450}
451
452pub trait StreamProducer<D>: Send + 'static {
454 type Item;
456
457 type Buffer: WriteBuffer<Self::Item> + Default;
459
460 fn poll_produce<'a>(
596 self: Pin<&mut Self>,
597 cx: &mut Context<'_>,
598 store: StoreContextMut<'a, D>,
599 destination: Destination<'a, Self::Item, Self::Buffer>,
600 finish: bool,
601 ) -> Poll<Result<StreamResult>>;
602
603 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
609 Err(me)
610 }
611}
612
613impl<T, D> StreamProducer<D> for iter::Empty<T>
614where
615 T: Send + Sync + 'static,
616{
617 type Item = T;
618 type Buffer = Option<Self::Item>;
619
620 fn poll_produce<'a>(
621 self: Pin<&mut Self>,
622 _: &mut Context<'_>,
623 _: StoreContextMut<'a, D>,
624 _: Destination<'a, Self::Item, Self::Buffer>,
625 _: bool,
626 ) -> Poll<Result<StreamResult>> {
627 Poll::Ready(Ok(StreamResult::Dropped))
628 }
629}
630
631impl<T, D> StreamProducer<D> for stream::Empty<T>
632where
633 T: Send + Sync + 'static,
634{
635 type Item = T;
636 type Buffer = Option<Self::Item>;
637
638 fn poll_produce<'a>(
639 self: Pin<&mut Self>,
640 _: &mut Context<'_>,
641 _: StoreContextMut<'a, D>,
642 _: Destination<'a, Self::Item, Self::Buffer>,
643 _: bool,
644 ) -> Poll<Result<StreamResult>> {
645 Poll::Ready(Ok(StreamResult::Dropped))
646 }
647}
648
649impl<T, D> StreamProducer<D> for Vec<T>
650where
651 T: Unpin + Send + Sync + 'static,
652{
653 type Item = T;
654 type Buffer = VecBuffer<T>;
655
656 fn poll_produce<'a>(
657 self: Pin<&mut Self>,
658 _: &mut Context<'_>,
659 _: StoreContextMut<'a, D>,
660 mut dst: Destination<'a, Self::Item, Self::Buffer>,
661 _: bool,
662 ) -> Poll<Result<StreamResult>> {
663 dst.set_buffer(mem::take(self.get_mut()).into());
664 Poll::Ready(Ok(StreamResult::Dropped))
665 }
666}
667
668impl<T, D> StreamProducer<D> for Box<[T]>
669where
670 T: Unpin + Send + Sync + 'static,
671{
672 type Item = T;
673 type Buffer = VecBuffer<T>;
674
675 fn poll_produce<'a>(
676 self: Pin<&mut Self>,
677 _: &mut Context<'_>,
678 _: StoreContextMut<'a, D>,
679 mut dst: Destination<'a, Self::Item, Self::Buffer>,
680 _: bool,
681 ) -> Poll<Result<StreamResult>> {
682 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
683 Poll::Ready(Ok(StreamResult::Dropped))
684 }
685}
686
687#[cfg(feature = "component-model-async-bytes")]
688impl<D> StreamProducer<D> for bytes::Bytes {
689 type Item = u8;
690 type Buffer = Cursor<Self>;
691
692 fn poll_produce<'a>(
693 mut self: Pin<&mut Self>,
694 _: &mut Context<'_>,
695 mut store: StoreContextMut<'a, D>,
696 mut dst: Destination<'a, Self::Item, Self::Buffer>,
697 _: bool,
698 ) -> Poll<Result<StreamResult>> {
699 let cap = dst.remaining(&mut store);
700 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
701 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
703 return Poll::Ready(Ok(StreamResult::Dropped));
704 };
705 let cap = cap.into();
706 dst.set_buffer(Cursor::new(self.split_off(cap)));
708 let mut dst = dst.as_direct(store, cap);
709 dst.remaining().copy_from_slice(&self);
710 dst.mark_written(cap);
711 Poll::Ready(Ok(StreamResult::Dropped))
712 }
713}
714
715#[cfg(feature = "component-model-async-bytes")]
716impl<D> StreamProducer<D> for bytes::BytesMut {
717 type Item = u8;
718 type Buffer = Cursor<Self>;
719
720 fn poll_produce<'a>(
721 mut self: Pin<&mut Self>,
722 _: &mut Context<'_>,
723 mut store: StoreContextMut<'a, D>,
724 mut dst: Destination<'a, Self::Item, Self::Buffer>,
725 _: bool,
726 ) -> Poll<Result<StreamResult>> {
727 let cap = dst.remaining(&mut store);
728 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
729 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
731 return Poll::Ready(Ok(StreamResult::Dropped));
732 };
733 let cap = cap.into();
734 dst.set_buffer(Cursor::new(self.split_off(cap)));
736 let mut dst = dst.as_direct(store, cap);
737 dst.remaining().copy_from_slice(&self);
738 dst.mark_written(cap);
739 Poll::Ready(Ok(StreamResult::Dropped))
740 }
741}
742
743pub struct Source<'a, T> {
745 id: TableId<TransmitState>,
746 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
747}
748
749impl<'a, T> Source<'a, T> {
750 pub fn reborrow(&mut self) -> Source<'_, T> {
752 Source {
753 id: self.id,
754 host_buffer: self.host_buffer.as_deref_mut(),
755 }
756 }
757
758 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
760 where
761 T: func::Lift + 'static,
762 B: ReadBuffer<T>,
763 {
764 if let Some(input) = &mut self.host_buffer {
765 let count = input.remaining().len().min(buffer.remaining_capacity());
766 buffer.move_from(*input, count);
767 } else {
768 let store = store.as_context_mut();
769 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
770
771 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
772 unreachable!();
773 };
774
775 let &WriteState::GuestReady {
776 ty,
777 address,
778 count,
779 options,
780 instance,
781 ..
782 } = &transmit.write
783 else {
784 unreachable!()
785 };
786
787 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
788 let ty = payload(ty, cx.types);
789 let old_remaining = buffer.remaining_capacity();
790 lift::<T, B>(
791 cx,
792 ty,
793 buffer,
794 address + (T::SIZE32 * guest_offset),
795 count - guest_offset,
796 )?;
797
798 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
799
800 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
801 unreachable!();
802 };
803
804 *guest_offset += old_remaining - buffer.remaining_capacity();
805 }
806
807 Ok(())
808 }
809
810 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
813 where
814 T: 'static,
815 {
816 let transmit = store
817 .as_context_mut()
818 .0
819 .concurrent_state_mut()
820 .get_mut(self.id)
821 .unwrap();
822
823 if let &WriteState::GuestReady { count, .. } = &transmit.write {
824 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
825 unreachable!()
826 };
827
828 count - guest_offset
829 } else if let Some(host_buffer) = &self.host_buffer {
830 host_buffer.remaining().len()
831 } else {
832 unreachable!()
833 }
834 }
835}
836
837impl<'a> Source<'a, u8> {
838 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
840 DirectSource {
841 id: self.id,
842 host_buffer: self.host_buffer,
843 store,
844 }
845 }
846}
847
848pub struct DirectSource<'a, D: 'static> {
851 id: TableId<TransmitState>,
852 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
853 store: StoreContextMut<'a, D>,
854}
855
856impl<D: 'static> std::io::Read for DirectSource<'_, D> {
857 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
858 let rem = self.remaining();
859 let n = rem.len().min(buf.len());
860 buf[..n].copy_from_slice(&rem[..n]);
861 self.mark_read(n);
862 Ok(n)
863 }
864}
865
866impl<D: 'static> DirectSource<'_, D> {
867 pub fn remaining(&mut self) -> &[u8] {
869 if let Some(buffer) = self.host_buffer.as_deref_mut() {
870 buffer.remaining()
871 } else {
872 let transmit = self
873 .store
874 .as_context_mut()
875 .0
876 .concurrent_state_mut()
877 .get_mut(self.id)
878 .unwrap();
879
880 let &WriteState::GuestReady {
881 address,
882 count,
883 options,
884 instance,
885 ..
886 } = &transmit.write
887 else {
888 unreachable!()
889 };
890
891 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
892 unreachable!()
893 };
894
895 instance
896 .options_memory(self.store.0, options)
897 .get((address + guest_offset)..)
898 .and_then(|b| b.get(..(count - guest_offset)))
899 .unwrap()
900 }
901 }
902
903 pub fn mark_read(&mut self, count: usize) {
908 if let Some(buffer) = self.host_buffer.as_deref_mut() {
909 buffer.skip(count);
910 } else {
911 let transmit = self
912 .store
913 .as_context_mut()
914 .0
915 .concurrent_state_mut()
916 .get_mut(self.id)
917 .unwrap();
918
919 let WriteState::GuestReady {
920 count: write_count, ..
921 } = &transmit.write
922 else {
923 unreachable!()
924 };
925
926 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
927 unreachable!()
928 };
929
930 if *guest_offset + count > *write_count {
931 panic!(
932 "read count ({count}) must be less than or equal to write count ({write_count})"
933 )
934 } else {
935 *guest_offset += count;
936 }
937 }
938 }
939}
940
941pub trait StreamConsumer<D>: Send + 'static {
943 type Item;
945
946 fn poll_consume(
1029 self: Pin<&mut Self>,
1030 cx: &mut Context<'_>,
1031 store: StoreContextMut<D>,
1032 source: Source<'_, Self::Item>,
1033 finish: bool,
1034 ) -> Poll<Result<StreamResult>>;
1035}
1036
1037pub trait FutureProducer<D>: Send + 'static {
1039 type Item;
1041
1042 fn poll_produce(
1052 self: Pin<&mut Self>,
1053 cx: &mut Context<'_>,
1054 store: StoreContextMut<D>,
1055 finish: bool,
1056 ) -> Poll<Result<Option<Self::Item>>>;
1057}
1058
1059impl<T, E, D, Fut> FutureProducer<D> for Fut
1060where
1061 E: Into<Error>,
1062 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1063{
1064 type Item = T;
1065
1066 fn poll_produce<'a>(
1067 self: Pin<&mut Self>,
1068 cx: &mut Context<'_>,
1069 _: StoreContextMut<'a, D>,
1070 finish: bool,
1071 ) -> Poll<Result<Option<T>>> {
1072 match self.poll(cx) {
1073 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1074 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1075 Poll::Pending if finish => Poll::Ready(Ok(None)),
1076 Poll::Pending => Poll::Pending,
1077 }
1078 }
1079}
1080
1081pub trait FutureConsumer<D>: Send + 'static {
1083 type Item;
1085
1086 fn poll_consume(
1098 self: Pin<&mut Self>,
1099 cx: &mut Context<'_>,
1100 store: StoreContextMut<D>,
1101 source: Source<'_, Self::Item>,
1102 finish: bool,
1103 ) -> Poll<Result<()>>;
1104}
1105
1106pub struct FutureReader<T> {
1113 id: TableId<TransmitHandle>,
1114 _phantom: PhantomData<T>,
1115}
1116
1117impl<T> FutureReader<T> {
1118 pub fn new<S: AsContextMut>(
1120 mut store: S,
1121 producer: impl FutureProducer<S::Data, Item = T>,
1122 ) -> Self
1123 where
1124 T: func::Lower + func::Lift + Send + Sync + 'static,
1125 {
1126 struct Producer<P>(P);
1127
1128 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1129 for Producer<P>
1130 {
1131 type Item = P::Item;
1132 type Buffer = Option<P::Item>;
1133
1134 fn poll_produce<'a>(
1135 self: Pin<&mut Self>,
1136 cx: &mut Context<'_>,
1137 store: StoreContextMut<D>,
1138 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1139 finish: bool,
1140 ) -> Poll<Result<StreamResult>> {
1141 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1144
1145 Poll::Ready(Ok(
1146 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1147 destination.set_buffer(Some(value));
1148
1149 StreamResult::Completed
1156 } else {
1157 StreamResult::Cancelled
1158 },
1159 ))
1160 }
1161 }
1162
1163 Self::new_(
1164 store
1165 .as_context_mut()
1166 .new_transmit(TransmitKind::Future, Producer(producer)),
1167 )
1168 }
1169
1170 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1171 Self {
1172 id,
1173 _phantom: PhantomData,
1174 }
1175 }
1176
1177 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1178 self.id
1179 }
1180
1181 pub fn pipe<S: AsContextMut>(
1183 self,
1184 mut store: S,
1185 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1186 ) where
1187 T: func::Lift + 'static,
1188 {
1189 struct Consumer<C>(C);
1190
1191 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1192 for Consumer<C>
1193 {
1194 type Item = T;
1195
1196 fn poll_consume(
1197 self: Pin<&mut Self>,
1198 cx: &mut Context<'_>,
1199 mut store: StoreContextMut<D>,
1200 mut source: Source<Self::Item>,
1201 finish: bool,
1202 ) -> Poll<Result<StreamResult>> {
1203 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1206
1207 ready!(consumer.poll_consume(
1208 cx,
1209 store.as_context_mut(),
1210 source.reborrow(),
1211 finish
1212 ))?;
1213
1214 Poll::Ready(Ok(if source.remaining(store) == 0 {
1215 StreamResult::Completed
1221 } else {
1222 StreamResult::Cancelled
1223 }))
1224 }
1225 }
1226
1227 store
1228 .as_context_mut()
1229 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1230 }
1231
1232 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1234 let id = lift_index_to_future(cx, ty, index)?;
1235 Ok(Self::new_(id))
1236 }
1237
1238 pub fn close(&mut self, mut store: impl AsContextMut) {
1251 future_close(store.as_context_mut().0, &mut self.id)
1252 }
1253
1254 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1256 accessor.as_accessor().with(|access| self.close(access))
1257 }
1258
1259 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1265 where
1266 A: AsAccessor,
1267 {
1268 GuardedFutureReader::new(accessor, self)
1269 }
1270
1271 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1278 where
1279 T: ComponentType + 'static,
1280 {
1281 FutureAny::try_from_future_reader(store, self)
1282 }
1283
1284 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1291 where
1292 T: ComponentType + 'static,
1293 {
1294 future.try_into_future_reader()
1295 }
1296}
1297
1298impl<T> fmt::Debug for FutureReader<T> {
1299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1300 f.debug_struct("FutureReader")
1301 .field("id", &self.id)
1302 .finish()
1303 }
1304}
1305
1306pub(super) fn future_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1307 let id = mem::replace(id, TableId::new(u32::MAX));
1308 store.host_drop_reader(id, TransmitKind::Future).unwrap();
1309}
1310
1311pub(super) fn lift_index_to_future(
1313 cx: &mut LiftContext<'_>,
1314 ty: InterfaceType,
1315 index: u32,
1316) -> Result<TableId<TransmitHandle>> {
1317 match ty {
1318 InterfaceType::Future(src) => {
1319 let handle_table = cx
1320 .instance_mut()
1321 .table_for_transmit(TransmitIndex::Future(src));
1322 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1323 if is_done {
1324 bail!("cannot lift future after being notified that the writable end dropped");
1325 }
1326 let id = TableId::<TransmitHandle>::new(rep);
1327 let concurrent_state = cx.concurrent_state_mut();
1328 let future = concurrent_state.get_mut(id)?;
1329 future.common.handle = None;
1330 let state = future.state;
1331
1332 if concurrent_state.get_mut(state)?.done {
1333 bail!("cannot lift future after previous read succeeded");
1334 }
1335
1336 Ok(id)
1337 }
1338 _ => func::bad_type_info(),
1339 }
1340}
1341
1342pub(super) fn lower_future_to_index<U>(
1344 id: TableId<TransmitHandle>,
1345 cx: &mut LowerContext<'_, U>,
1346 ty: InterfaceType,
1347) -> Result<u32> {
1348 match ty {
1349 InterfaceType::Future(dst) => {
1350 let concurrent_state = cx.store.0.concurrent_state_mut();
1351 let state = concurrent_state.get_mut(id)?.state;
1352 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1353
1354 let handle = cx
1355 .instance_mut()
1356 .table_for_transmit(TransmitIndex::Future(dst))
1357 .future_insert_read(dst, rep)?;
1358
1359 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1360
1361 Ok(handle)
1362 }
1363 _ => func::bad_type_info(),
1364 }
1365}
1366
1367unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1370 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1371
1372 type Lower = <u32 as func::ComponentType>::Lower;
1373
1374 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1375 match ty {
1376 InterfaceType::Future(ty) => {
1377 let ty = types.types[*ty].ty;
1378 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1379 }
1380 other => bail!("expected `future`, found `{}`", func::desc(other)),
1381 }
1382 }
1383}
1384
1385unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1387 fn linear_lower_to_flat<U>(
1388 &self,
1389 cx: &mut LowerContext<'_, U>,
1390 ty: InterfaceType,
1391 dst: &mut MaybeUninit<Self::Lower>,
1392 ) -> Result<()> {
1393 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1394 }
1395
1396 fn linear_lower_to_memory<U>(
1397 &self,
1398 cx: &mut LowerContext<'_, U>,
1399 ty: InterfaceType,
1400 offset: usize,
1401 ) -> Result<()> {
1402 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1403 cx,
1404 InterfaceType::U32,
1405 offset,
1406 )
1407 }
1408}
1409
1410unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1412 fn linear_lift_from_flat(
1413 cx: &mut LiftContext<'_>,
1414 ty: InterfaceType,
1415 src: &Self::Lower,
1416 ) -> Result<Self> {
1417 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1418 Self::lift_from_index(cx, ty, index)
1419 }
1420
1421 fn linear_lift_from_memory(
1422 cx: &mut LiftContext<'_>,
1423 ty: InterfaceType,
1424 bytes: &[u8],
1425 ) -> Result<Self> {
1426 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1427 Self::lift_from_index(cx, ty, index)
1428 }
1429}
1430
1431pub struct GuardedFutureReader<T, A>
1437where
1438 A: AsAccessor,
1439{
1440 reader: Option<FutureReader<T>>,
1444 accessor: A,
1445}
1446
1447impl<T, A> GuardedFutureReader<T, A>
1448where
1449 A: AsAccessor,
1450{
1451 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1453 Self {
1454 reader: Some(reader),
1455 accessor,
1456 }
1457 }
1458
1459 pub fn into_future(self) -> FutureReader<T> {
1462 self.into()
1463 }
1464}
1465
1466impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1467where
1468 A: AsAccessor,
1469{
1470 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1471 guard.reader.take().unwrap()
1472 }
1473}
1474
1475impl<T, A> Drop for GuardedFutureReader<T, A>
1476where
1477 A: AsAccessor,
1478{
1479 fn drop(&mut self) {
1480 if let Some(reader) = &mut self.reader {
1481 reader.close_with(&self.accessor)
1482 }
1483 }
1484}
1485
1486pub struct StreamReader<T> {
1493 id: TableId<TransmitHandle>,
1494 _phantom: PhantomData<T>,
1495}
1496
1497impl<T> StreamReader<T> {
1498 pub fn new<S: AsContextMut>(
1500 mut store: S,
1501 producer: impl StreamProducer<S::Data, Item = T>,
1502 ) -> Self
1503 where
1504 T: func::Lower + func::Lift + Send + Sync + 'static,
1505 {
1506 Self::new_(
1507 store
1508 .as_context_mut()
1509 .new_transmit(TransmitKind::Stream, producer),
1510 )
1511 }
1512
1513 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1514 Self {
1515 id,
1516 _phantom: PhantomData,
1517 }
1518 }
1519
1520 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1521 self.id
1522 }
1523
1524 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1541 let store = store.as_context_mut();
1542 let state = store.0.concurrent_state_mut();
1543 let id = state.get_mut(self.id).unwrap().state;
1544 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1545 match try_into(TypeId::of::<V>()) {
1546 Some(result) => {
1547 self.close(store);
1548 Ok(*result.downcast::<V>().unwrap())
1549 }
1550 None => Err(self),
1551 }
1552 } else {
1553 Err(self)
1554 }
1555 }
1556
1557 pub fn pipe<S: AsContextMut>(
1559 self,
1560 mut store: S,
1561 consumer: impl StreamConsumer<S::Data, Item = T>,
1562 ) where
1563 T: 'static,
1564 {
1565 store
1566 .as_context_mut()
1567 .set_consumer(self.id, TransmitKind::Stream, consumer);
1568 }
1569
1570 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1572 let id = lift_index_to_stream(cx, ty, index)?;
1573 Ok(Self::new_(id))
1574 }
1575
1576 pub fn close(&mut self, mut store: impl AsContextMut) {
1586 stream_close(store.as_context_mut().0, &mut self.id);
1587 }
1588
1589 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1591 accessor.as_accessor().with(|access| self.close(access))
1592 }
1593
1594 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1600 where
1601 A: AsAccessor,
1602 {
1603 GuardedStreamReader::new(accessor, self)
1604 }
1605
1606 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1613 where
1614 T: ComponentType + 'static,
1615 {
1616 StreamAny::try_from_stream_reader(store, self)
1617 }
1618
1619 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1626 where
1627 T: ComponentType + 'static,
1628 {
1629 stream.try_into_stream_reader()
1630 }
1631}
1632
1633impl<T> fmt::Debug for StreamReader<T> {
1634 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1635 f.debug_struct("StreamReader")
1636 .field("id", &self.id)
1637 .finish()
1638 }
1639}
1640
1641pub(super) fn stream_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1642 let id = mem::replace(id, TableId::new(u32::MAX));
1643 store.host_drop_reader(id, TransmitKind::Stream).unwrap();
1644}
1645
1646pub(super) fn lift_index_to_stream(
1648 cx: &mut LiftContext<'_>,
1649 ty: InterfaceType,
1650 index: u32,
1651) -> Result<TableId<TransmitHandle>> {
1652 match ty {
1653 InterfaceType::Stream(src) => {
1654 let handle_table = cx
1655 .instance_mut()
1656 .table_for_transmit(TransmitIndex::Stream(src));
1657 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1658 if is_done {
1659 bail!("cannot lift stream after being notified that the writable end dropped");
1660 }
1661 let id = TableId::<TransmitHandle>::new(rep);
1662 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1663 Ok(id)
1664 }
1665 _ => func::bad_type_info(),
1666 }
1667}
1668
1669pub(super) fn lower_stream_to_index<U>(
1671 id: TableId<TransmitHandle>,
1672 cx: &mut LowerContext<'_, U>,
1673 ty: InterfaceType,
1674) -> Result<u32> {
1675 match ty {
1676 InterfaceType::Stream(dst) => {
1677 let concurrent_state = cx.store.0.concurrent_state_mut();
1678 let state = concurrent_state.get_mut(id)?.state;
1679 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1680
1681 let handle = cx
1682 .instance_mut()
1683 .table_for_transmit(TransmitIndex::Stream(dst))
1684 .stream_insert_read(dst, rep)?;
1685
1686 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1687
1688 Ok(handle)
1689 }
1690 _ => func::bad_type_info(),
1691 }
1692}
1693
1694unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1697 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1698
1699 type Lower = <u32 as func::ComponentType>::Lower;
1700
1701 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1702 match ty {
1703 InterfaceType::Stream(ty) => {
1704 let ty = types.types[*ty].ty;
1705 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1706 }
1707 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1708 }
1709 }
1710}
1711
1712unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1714 fn linear_lower_to_flat<U>(
1715 &self,
1716 cx: &mut LowerContext<'_, U>,
1717 ty: InterfaceType,
1718 dst: &mut MaybeUninit<Self::Lower>,
1719 ) -> Result<()> {
1720 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1721 }
1722
1723 fn linear_lower_to_memory<U>(
1724 &self,
1725 cx: &mut LowerContext<'_, U>,
1726 ty: InterfaceType,
1727 offset: usize,
1728 ) -> Result<()> {
1729 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1730 cx,
1731 InterfaceType::U32,
1732 offset,
1733 )
1734 }
1735}
1736
1737unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1739 fn linear_lift_from_flat(
1740 cx: &mut LiftContext<'_>,
1741 ty: InterfaceType,
1742 src: &Self::Lower,
1743 ) -> Result<Self> {
1744 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1745 Self::lift_from_index(cx, ty, index)
1746 }
1747
1748 fn linear_lift_from_memory(
1749 cx: &mut LiftContext<'_>,
1750 ty: InterfaceType,
1751 bytes: &[u8],
1752 ) -> Result<Self> {
1753 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1754 Self::lift_from_index(cx, ty, index)
1755 }
1756}
1757
1758pub struct GuardedStreamReader<T, A>
1764where
1765 A: AsAccessor,
1766{
1767 reader: Option<StreamReader<T>>,
1771 accessor: A,
1772}
1773
1774impl<T, A> GuardedStreamReader<T, A>
1775where
1776 A: AsAccessor,
1777{
1778 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1781 Self {
1782 reader: Some(reader),
1783 accessor,
1784 }
1785 }
1786
1787 pub fn into_stream(self) -> StreamReader<T> {
1790 self.into()
1791 }
1792}
1793
1794impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1795where
1796 A: AsAccessor,
1797{
1798 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1799 guard.reader.take().unwrap()
1800 }
1801}
1802
1803impl<T, A> Drop for GuardedStreamReader<T, A>
1804where
1805 A: AsAccessor,
1806{
1807 fn drop(&mut self) {
1808 if let Some(reader) = &mut self.reader {
1809 reader.close_with(&self.accessor)
1810 }
1811 }
1812}
1813
1814pub struct ErrorContext {
1816 rep: u32,
1817}
1818
1819impl ErrorContext {
1820 pub(crate) fn new(rep: u32) -> Self {
1821 Self { rep }
1822 }
1823
1824 pub fn into_val(self) -> Val {
1826 Val::ErrorContext(ErrorContextAny(self.rep))
1827 }
1828
1829 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1831 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1832 bail!("expected `error-context`; got `{}`", value.desc());
1833 };
1834 Ok(Self::new(*rep))
1835 }
1836
1837 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1838 match ty {
1839 InterfaceType::ErrorContext(src) => {
1840 let rep = cx
1841 .instance_mut()
1842 .table_for_error_context(src)
1843 .error_context_rep(index)?;
1844
1845 Ok(Self { rep })
1846 }
1847 _ => func::bad_type_info(),
1848 }
1849 }
1850}
1851
1852pub(crate) fn lower_error_context_to_index<U>(
1853 rep: u32,
1854 cx: &mut LowerContext<'_, U>,
1855 ty: InterfaceType,
1856) -> Result<u32> {
1857 match ty {
1858 InterfaceType::ErrorContext(dst) => {
1859 let tbl = cx.instance_mut().table_for_error_context(dst);
1860 tbl.error_context_insert(rep)
1861 }
1862 _ => func::bad_type_info(),
1863 }
1864}
1865unsafe impl func::ComponentType for ErrorContext {
1868 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1869
1870 type Lower = <u32 as func::ComponentType>::Lower;
1871
1872 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1873 match ty {
1874 InterfaceType::ErrorContext(_) => Ok(()),
1875 other => bail!("expected `error`, found `{}`", func::desc(other)),
1876 }
1877 }
1878}
1879
1880unsafe impl func::Lower for ErrorContext {
1882 fn linear_lower_to_flat<T>(
1883 &self,
1884 cx: &mut LowerContext<'_, T>,
1885 ty: InterfaceType,
1886 dst: &mut MaybeUninit<Self::Lower>,
1887 ) -> Result<()> {
1888 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1889 cx,
1890 InterfaceType::U32,
1891 dst,
1892 )
1893 }
1894
1895 fn linear_lower_to_memory<T>(
1896 &self,
1897 cx: &mut LowerContext<'_, T>,
1898 ty: InterfaceType,
1899 offset: usize,
1900 ) -> Result<()> {
1901 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1902 cx,
1903 InterfaceType::U32,
1904 offset,
1905 )
1906 }
1907}
1908
1909unsafe impl func::Lift for ErrorContext {
1911 fn linear_lift_from_flat(
1912 cx: &mut LiftContext<'_>,
1913 ty: InterfaceType,
1914 src: &Self::Lower,
1915 ) -> Result<Self> {
1916 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1917 Self::lift_from_index(cx, ty, index)
1918 }
1919
1920 fn linear_lift_from_memory(
1921 cx: &mut LiftContext<'_>,
1922 ty: InterfaceType,
1923 bytes: &[u8],
1924 ) -> Result<Self> {
1925 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1926 Self::lift_from_index(cx, ty, index)
1927 }
1928}
1929
1930pub(super) struct TransmitHandle {
1932 pub(super) common: WaitableCommon,
1933 state: TableId<TransmitState>,
1935}
1936
1937impl TransmitHandle {
1938 fn new(state: TableId<TransmitState>) -> Self {
1939 Self {
1940 common: WaitableCommon::default(),
1941 state,
1942 }
1943 }
1944}
1945
1946impl TableDebug for TransmitHandle {
1947 fn type_name() -> &'static str {
1948 "TransmitHandle"
1949 }
1950}
1951
1952struct TransmitState {
1954 write_handle: TableId<TransmitHandle>,
1956 read_handle: TableId<TransmitHandle>,
1958 write: WriteState,
1960 read: ReadState,
1962 done: bool,
1964 pub(super) origin: TransmitOrigin,
1967}
1968
1969#[derive(Copy, Clone)]
1970pub(super) enum TransmitOrigin {
1971 Host,
1972 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
1973 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
1974}
1975
1976impl TransmitState {
1977 fn new(origin: TransmitOrigin) -> Self {
1978 Self {
1979 write_handle: TableId::new(u32::MAX),
1980 read_handle: TableId::new(u32::MAX),
1981 read: ReadState::Open,
1982 write: WriteState::Open,
1983 done: false,
1984 origin,
1985 }
1986 }
1987}
1988
1989impl TableDebug for TransmitState {
1990 fn type_name() -> &'static str {
1991 "TransmitState"
1992 }
1993}
1994
1995impl TransmitOrigin {
1996 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
1997 match index {
1998 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
1999 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2000 }
2001 }
2002}
2003
2004type PollStream = Box<
2005 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2006>;
2007
2008type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2009
2010enum WriteState {
2012 Open,
2014 GuestReady {
2016 instance: Instance,
2017 caller: RuntimeComponentInstanceIndex,
2018 ty: TransmitIndex,
2019 flat_abi: Option<FlatAbi>,
2020 options: OptionsIndex,
2021 address: usize,
2022 count: usize,
2023 handle: u32,
2024 },
2025 HostReady {
2027 produce: PollStream,
2028 try_into: TryInto,
2029 guest_offset: usize,
2030 cancel: bool,
2031 cancel_waker: Option<Waker>,
2032 },
2033 Dropped,
2035}
2036
2037impl fmt::Debug for WriteState {
2038 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2039 match self {
2040 Self::Open => f.debug_tuple("Open").finish(),
2041 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2042 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2043 Self::Dropped => f.debug_tuple("Dropped").finish(),
2044 }
2045 }
2046}
2047
2048enum ReadState {
2050 Open,
2052 GuestReady {
2054 ty: TransmitIndex,
2055 caller: RuntimeComponentInstanceIndex,
2056 flat_abi: Option<FlatAbi>,
2057 instance: Instance,
2058 options: OptionsIndex,
2059 address: usize,
2060 count: usize,
2061 handle: u32,
2062 },
2063 HostReady {
2065 consume: PollStream,
2066 guest_offset: usize,
2067 cancel: bool,
2068 cancel_waker: Option<Waker>,
2069 },
2070 HostToHost {
2072 accept: Box<
2073 dyn for<'a> Fn(
2074 &'a mut UntypedWriteBuffer<'a>,
2075 )
2076 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2077 + Send
2078 + Sync,
2079 >,
2080 buffer: Vec<u8>,
2081 limit: usize,
2082 },
2083 Dropped,
2085}
2086
2087impl fmt::Debug for ReadState {
2088 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2089 match self {
2090 Self::Open => f.debug_tuple("Open").finish(),
2091 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2092 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2093 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2094 Self::Dropped => f.debug_tuple("Dropped").finish(),
2095 }
2096 }
2097}
2098
2099fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2100 let count = guest_offset.try_into().unwrap();
2101 match state {
2102 StreamResult::Dropped => ReturnCode::Dropped(count),
2103 StreamResult::Completed => ReturnCode::completed(kind, count),
2104 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2105 }
2106}
2107
2108impl StoreOpaque {
2109 fn pipe_from_guest(
2110 &mut self,
2111 kind: TransmitKind,
2112 id: TableId<TransmitState>,
2113 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2114 ) {
2115 let future = async move {
2116 let stream_state = future.await?;
2117 tls::get(|store| {
2118 let state = store.concurrent_state_mut();
2119 let transmit = state.get_mut(id)?;
2120 let ReadState::HostReady {
2121 consume,
2122 guest_offset,
2123 ..
2124 } = mem::replace(&mut transmit.read, ReadState::Open)
2125 else {
2126 unreachable!();
2127 };
2128 let code = return_code(kind, stream_state, guest_offset);
2129 transmit.read = match stream_state {
2130 StreamResult::Dropped => ReadState::Dropped,
2131 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2132 consume,
2133 guest_offset: 0,
2134 cancel: false,
2135 cancel_waker: None,
2136 },
2137 };
2138 let WriteState::GuestReady { ty, handle, .. } =
2139 mem::replace(&mut transmit.write, WriteState::Open)
2140 else {
2141 unreachable!();
2142 };
2143 state.send_write_result(ty, id, handle, code)?;
2144 Ok(())
2145 })
2146 };
2147
2148 self.concurrent_state_mut().push_future(future.boxed());
2149 }
2150
2151 fn pipe_to_guest(
2152 &mut self,
2153 kind: TransmitKind,
2154 id: TableId<TransmitState>,
2155 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2156 ) {
2157 let future = async move {
2158 let stream_state = future.await?;
2159 tls::get(|store| {
2160 let state = store.concurrent_state_mut();
2161 let transmit = state.get_mut(id)?;
2162 let WriteState::HostReady {
2163 produce,
2164 try_into,
2165 guest_offset,
2166 ..
2167 } = mem::replace(&mut transmit.write, WriteState::Open)
2168 else {
2169 unreachable!();
2170 };
2171 let code = return_code(kind, stream_state, guest_offset);
2172 transmit.write = match stream_state {
2173 StreamResult::Dropped => WriteState::Dropped,
2174 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2175 produce,
2176 try_into,
2177 guest_offset: 0,
2178 cancel: false,
2179 cancel_waker: None,
2180 },
2181 };
2182 let ReadState::GuestReady { ty, handle, .. } =
2183 mem::replace(&mut transmit.read, ReadState::Open)
2184 else {
2185 unreachable!();
2186 };
2187 state.send_read_result(ty, id, handle, code)?;
2188 Ok(())
2189 })
2190 };
2191
2192 self.concurrent_state_mut().push_future(future.boxed());
2193 }
2194
2195 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2197 let state = self.concurrent_state_mut();
2198 let transmit_id = state.get_mut(id)?.state;
2199 let transmit = state
2200 .get_mut(transmit_id)
2201 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2202 log::trace!(
2203 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2204 transmit.read,
2205 transmit.write
2206 );
2207
2208 transmit.read = ReadState::Dropped;
2209
2210 let new_state = if let WriteState::Dropped = &transmit.write {
2213 WriteState::Dropped
2214 } else {
2215 WriteState::Open
2216 };
2217
2218 let write_handle = transmit.write_handle;
2219
2220 match mem::replace(&mut transmit.write, new_state) {
2221 WriteState::GuestReady { ty, handle, .. } => {
2224 state.update_event(
2225 write_handle.rep(),
2226 match ty {
2227 TransmitIndex::Future(ty) => Event::FutureWrite {
2228 code: ReturnCode::Dropped(0),
2229 pending: Some((ty, handle)),
2230 },
2231 TransmitIndex::Stream(ty) => Event::StreamWrite {
2232 code: ReturnCode::Dropped(0),
2233 pending: Some((ty, handle)),
2234 },
2235 },
2236 )?;
2237 }
2238
2239 WriteState::HostReady { .. } => {}
2240
2241 WriteState::Open => {
2242 state.update_event(
2243 write_handle.rep(),
2244 match kind {
2245 TransmitKind::Future => Event::FutureWrite {
2246 code: ReturnCode::Dropped(0),
2247 pending: None,
2248 },
2249 TransmitKind::Stream => Event::StreamWrite {
2250 code: ReturnCode::Dropped(0),
2251 pending: None,
2252 },
2253 },
2254 )?;
2255 }
2256
2257 WriteState::Dropped => {
2258 log::trace!("host_drop_reader delete {transmit_id:?}");
2259 state.delete_transmit(transmit_id)?;
2260 }
2261 }
2262 Ok(())
2263 }
2264
2265 fn host_drop_writer(
2267 &mut self,
2268 id: TableId<TransmitHandle>,
2269 on_drop_open: Option<fn() -> Result<()>>,
2270 ) -> Result<()> {
2271 let state = self.concurrent_state_mut();
2272 let transmit_id = state.get_mut(id)?.state;
2273 let transmit = state
2274 .get_mut(transmit_id)
2275 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2276 log::trace!(
2277 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2278 transmit.read,
2279 transmit.write
2280 );
2281
2282 match &mut transmit.write {
2284 WriteState::GuestReady { .. } => {
2285 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2286 }
2287 WriteState::HostReady { .. } => {}
2288 v @ WriteState::Open => {
2289 if let (Some(on_drop_open), false) = (
2290 on_drop_open,
2291 transmit.done || matches!(transmit.read, ReadState::Dropped),
2292 ) {
2293 on_drop_open()?;
2294 } else {
2295 *v = WriteState::Dropped;
2296 }
2297 }
2298 WriteState::Dropped => unreachable!("write state is already dropped"),
2299 }
2300
2301 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2302
2303 let new_state = if let ReadState::Dropped = &transmit.read {
2309 ReadState::Dropped
2310 } else {
2311 ReadState::Open
2312 };
2313
2314 let read_handle = transmit.read_handle;
2315
2316 match mem::replace(&mut transmit.read, new_state) {
2318 ReadState::GuestReady { ty, handle, .. } => {
2322 self.concurrent_state_mut().update_event(
2324 read_handle.rep(),
2325 match ty {
2326 TransmitIndex::Future(ty) => Event::FutureRead {
2327 code: ReturnCode::Dropped(0),
2328 pending: Some((ty, handle)),
2329 },
2330 TransmitIndex::Stream(ty) => Event::StreamRead {
2331 code: ReturnCode::Dropped(0),
2332 pending: Some((ty, handle)),
2333 },
2334 },
2335 )?;
2336 }
2337
2338 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2339
2340 ReadState::Open => {
2342 self.concurrent_state_mut().update_event(
2343 read_handle.rep(),
2344 match on_drop_open {
2345 Some(_) => Event::FutureRead {
2346 code: ReturnCode::Dropped(0),
2347 pending: None,
2348 },
2349 None => Event::StreamRead {
2350 code: ReturnCode::Dropped(0),
2351 pending: None,
2352 },
2353 },
2354 )?;
2355 }
2356
2357 ReadState::Dropped => {
2360 log::trace!("host_drop_writer delete {transmit_id:?}");
2361 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2362 }
2363 }
2364 Ok(())
2365 }
2366
2367 pub(super) fn transmit_origin(
2368 &mut self,
2369 id: TableId<TransmitHandle>,
2370 ) -> Result<TransmitOrigin> {
2371 let state = self.concurrent_state_mut();
2372 let state_id = state.get_mut(id)?.state;
2373 Ok(state.get_mut(state_id)?.origin)
2374 }
2375}
2376
2377impl<T> StoreContextMut<'_, T> {
2378 fn new_transmit<P: StreamProducer<T>>(
2379 mut self,
2380 kind: TransmitKind,
2381 producer: P,
2382 ) -> TableId<TransmitHandle>
2383 where
2384 P::Item: func::Lower,
2385 {
2386 let token = StoreToken::new(self.as_context_mut());
2387 let state = self.0.concurrent_state_mut();
2388 let (_, read) = state.new_transmit(TransmitOrigin::Host).unwrap();
2389 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2390 let id = state.get_mut(read).unwrap().state;
2391 let mut dropped = false;
2392 let produce = Box::new({
2393 let producer = producer.clone();
2394 move || {
2395 let producer = producer.clone();
2396 async move {
2397 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2398
2399 let (result, cancelled) = if buffer.remaining().is_empty() {
2400 future::poll_fn(|cx| {
2401 tls::get(|store| {
2402 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2403
2404 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2405 unreachable!();
2406 };
2407
2408 let mut host_buffer =
2409 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2410 Some(Cursor::new(mem::take(buffer)))
2411 } else {
2412 None
2413 };
2414
2415 let poll = mine.as_mut().poll_produce(
2416 cx,
2417 token.as_context_mut(store),
2418 Destination {
2419 id,
2420 buffer: &mut buffer,
2421 host_buffer: host_buffer.as_mut(),
2422 _phantom: PhantomData,
2423 },
2424 cancel,
2425 );
2426
2427 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2428
2429 let host_offset = if let (
2430 Some(host_buffer),
2431 ReadState::HostToHost { buffer, limit, .. },
2432 ) = (host_buffer, &mut transmit.read)
2433 {
2434 *limit = usize::try_from(host_buffer.position()).unwrap();
2435 *buffer = host_buffer.into_inner();
2436 *limit
2437 } else {
2438 0
2439 };
2440
2441 {
2442 let WriteState::HostReady {
2443 guest_offset,
2444 cancel,
2445 cancel_waker,
2446 ..
2447 } = &mut transmit.write
2448 else {
2449 unreachable!();
2450 };
2451
2452 if poll.is_pending() {
2453 if !buffer.remaining().is_empty()
2454 || *guest_offset > 0
2455 || host_offset > 0
2456 {
2457 return Poll::Ready(Err(format_err!(
2458 "StreamProducer::poll_produce returned Poll::Pending \
2459 after producing at least one item"
2460 )));
2461 }
2462 *cancel_waker = Some(cx.waker().clone());
2463 } else {
2464 *cancel_waker = None;
2465 *cancel = false;
2466 }
2467 }
2468
2469 poll.map(|v| v.map(|result| (result, cancel)))
2470 })
2471 })
2472 .await?
2473 } else {
2474 (StreamResult::Completed, false)
2475 };
2476
2477 let (guest_offset, host_offset, count) = tls::get(|store| {
2478 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2479 let (count, host_offset) = match &transmit.read {
2480 &ReadState::GuestReady { count, .. } => (count, 0),
2481 &ReadState::HostToHost { limit, .. } => (1, limit),
2482 _ => unreachable!(),
2483 };
2484 let guest_offset = match &transmit.write {
2485 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2486 _ => unreachable!(),
2487 };
2488 (guest_offset, host_offset, count)
2489 });
2490
2491 match result {
2492 StreamResult::Completed => {
2493 if count > 1
2494 && buffer.remaining().is_empty()
2495 && guest_offset == 0
2496 && host_offset == 0
2497 {
2498 bail!(
2499 "StreamProducer::poll_produce returned StreamResult::Completed \
2500 without producing any items"
2501 );
2502 }
2503 }
2504 StreamResult::Cancelled => {
2505 if !cancelled {
2506 bail!(
2507 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2508 without being given a `finish` parameter value of true"
2509 );
2510 }
2511 }
2512 StreamResult::Dropped => {
2513 dropped = true;
2514 }
2515 }
2516
2517 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2518
2519 *producer.lock().unwrap() = Some((mine, buffer));
2520
2521 if write_buffer {
2522 write(token, id, producer.clone(), kind).await?;
2523 }
2524
2525 Ok(if dropped {
2526 if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2527 {
2528 StreamResult::Dropped
2529 } else {
2530 StreamResult::Completed
2531 }
2532 } else {
2533 result
2534 })
2535 }
2536 .boxed()
2537 }
2538 });
2539 let try_into = Box::new(move |ty| {
2540 let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2541 match P::try_into(mine, ty) {
2542 Ok(value) => Some(value),
2543 Err(mine) => {
2544 *producer.lock().unwrap() = Some((mine, buffer));
2545 None
2546 }
2547 }
2548 });
2549 state.get_mut(id).unwrap().write = WriteState::HostReady {
2550 produce,
2551 try_into,
2552 guest_offset: 0,
2553 cancel: false,
2554 cancel_waker: None,
2555 };
2556 read
2557 }
2558
2559 fn set_consumer<C: StreamConsumer<T>>(
2560 mut self,
2561 id: TableId<TransmitHandle>,
2562 kind: TransmitKind,
2563 consumer: C,
2564 ) {
2565 let token = StoreToken::new(self.as_context_mut());
2566 let state = self.0.concurrent_state_mut();
2567 let id = state.get_mut(id).unwrap().state;
2568 let transmit = state.get_mut(id).unwrap();
2569 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2570 let consume_with_buffer = {
2571 let consumer = consumer.clone();
2572 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2573 let mut mine = consumer.lock().unwrap().take().unwrap();
2574
2575 let host_buffer_remaining_before =
2576 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2577
2578 let (result, cancelled) = future::poll_fn(|cx| {
2579 tls::get(|store| {
2580 let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2581 &ReadState::HostReady { cancel, .. } => cancel,
2582 ReadState::Open => false,
2583 _ => unreachable!(),
2584 };
2585
2586 let poll = mine.as_mut().poll_consume(
2587 cx,
2588 token.as_context_mut(store),
2589 Source {
2590 id,
2591 host_buffer: host_buffer.as_deref_mut(),
2592 },
2593 cancel,
2594 );
2595
2596 if let ReadState::HostReady {
2597 cancel_waker,
2598 cancel,
2599 ..
2600 } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2601 {
2602 if poll.is_pending() {
2603 *cancel_waker = Some(cx.waker().clone());
2604 } else {
2605 *cancel_waker = None;
2606 *cancel = false;
2607 }
2608 }
2609
2610 poll.map(|v| v.map(|result| (result, cancel)))
2611 })
2612 })
2613 .await?;
2614
2615 let (guest_offset, count) = tls::get(|store| {
2616 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2617 (
2618 match &transmit.read {
2619 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2620 ReadState::Open => 0,
2621 _ => unreachable!(),
2622 },
2623 match &transmit.write {
2624 &WriteState::GuestReady { count, .. } => count,
2625 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2626 _ => unreachable!(),
2627 },
2628 )
2629 });
2630
2631 match result {
2632 StreamResult::Completed => {
2633 if count > 0
2634 && guest_offset == 0
2635 && host_buffer_remaining_before
2636 .zip(host_buffer.map(|v| v.remaining().len()))
2637 .map(|(before, after)| before == after)
2638 .unwrap_or(false)
2639 {
2640 bail!(
2641 "StreamConsumer::poll_consume returned StreamResult::Completed \
2642 without consuming any items"
2643 );
2644 }
2645
2646 if let TransmitKind::Future = kind {
2647 tls::get(|store| {
2648 store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2649 });
2650 }
2651 }
2652 StreamResult::Cancelled => {
2653 if !cancelled {
2654 bail!(
2655 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2656 without being given a `finish` parameter value of true"
2657 );
2658 }
2659 }
2660 StreamResult::Dropped => {}
2661 }
2662
2663 *consumer.lock().unwrap() = Some(mine);
2664
2665 Ok(result)
2666 }
2667 };
2668 let consume = {
2669 let consume = consume_with_buffer.clone();
2670 Box::new(move || {
2671 let consume = consume.clone();
2672 async move { consume(None).await }.boxed()
2673 })
2674 };
2675
2676 match &transmit.write {
2677 WriteState::Open => {
2678 transmit.read = ReadState::HostReady {
2679 consume,
2680 guest_offset: 0,
2681 cancel: false,
2682 cancel_waker: None,
2683 };
2684 }
2685 &WriteState::GuestReady { .. } => {
2686 let future = consume();
2687 transmit.read = ReadState::HostReady {
2688 consume,
2689 guest_offset: 0,
2690 cancel: false,
2691 cancel_waker: None,
2692 };
2693 self.0.pipe_from_guest(kind, id, future);
2694 }
2695 WriteState::HostReady { .. } => {
2696 let WriteState::HostReady { produce, .. } = mem::replace(
2697 &mut transmit.write,
2698 WriteState::HostReady {
2699 produce: Box::new(|| unreachable!()),
2700 try_into: Box::new(|_| unreachable!()),
2701 guest_offset: 0,
2702 cancel: false,
2703 cancel_waker: None,
2704 },
2705 ) else {
2706 unreachable!();
2707 };
2708
2709 transmit.read = ReadState::HostToHost {
2710 accept: Box::new(move |input| {
2711 let consume = consume_with_buffer.clone();
2712 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2713 }),
2714 buffer: Vec::new(),
2715 limit: 0,
2716 };
2717
2718 let future = async move {
2719 loop {
2720 if tls::get(|store| {
2721 crate::error::Ok(matches!(
2722 store.concurrent_state_mut().get_mut(id)?.read,
2723 ReadState::Dropped
2724 ))
2725 })? {
2726 break Ok(());
2727 }
2728
2729 match produce().await? {
2730 StreamResult::Completed | StreamResult::Cancelled => {}
2731 StreamResult::Dropped => break Ok(()),
2732 }
2733
2734 if let TransmitKind::Future = kind {
2735 break Ok(());
2736 }
2737 }
2738 }
2739 .map(move |result| {
2740 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2741 result
2742 });
2743
2744 state.push_future(Box::pin(future));
2745 }
2746 WriteState::Dropped => {
2747 let reader = transmit.read_handle;
2748 self.0.host_drop_reader(reader, kind).unwrap();
2749 }
2750 }
2751 }
2752}
2753
2754async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2755 token: StoreToken<D>,
2756 id: TableId<TransmitState>,
2757 pair: Arc<Mutex<Option<(P, B)>>>,
2758 kind: TransmitKind,
2759) -> Result<()> {
2760 let (read, guest_offset) = tls::get(|store| {
2761 let transmit = store.concurrent_state_mut().get_mut(id)?;
2762
2763 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2764 Some(guest_offset)
2765 } else {
2766 None
2767 };
2768
2769 crate::error::Ok((
2770 mem::replace(&mut transmit.read, ReadState::Open),
2771 guest_offset,
2772 ))
2773 })?;
2774
2775 match read {
2776 ReadState::GuestReady {
2777 ty,
2778 flat_abi,
2779 options,
2780 address,
2781 count,
2782 handle,
2783 instance,
2784 caller,
2785 } => {
2786 let guest_offset = guest_offset.unwrap();
2787
2788 if let TransmitKind::Future = kind {
2789 tls::get(|store| {
2790 store.concurrent_state_mut().get_mut(id)?.done = true;
2791 crate::error::Ok(())
2792 })?;
2793 }
2794
2795 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2796 let accept = {
2797 let pair = pair.clone();
2798 move |mut store: StoreContextMut<D>| {
2799 lower::<T, B, D>(
2800 store.as_context_mut(),
2801 instance,
2802 options,
2803 ty,
2804 address + (T::SIZE32 * guest_offset),
2805 count - guest_offset,
2806 &mut pair.lock().unwrap().as_mut().unwrap().1,
2807 )?;
2808 crate::error::Ok(())
2809 }
2810 };
2811
2812 if guest_offset < count {
2813 if T::MAY_REQUIRE_REALLOC {
2814 let (tx, rx) = oneshot::channel();
2819 tls::get(move |store| {
2820 store
2821 .concurrent_state_mut()
2822 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2823 move |store| {
2824 _ = tx.send(accept(token.as_context_mut(store))?);
2825 Ok(())
2826 },
2827 ))))
2828 });
2829 rx.await?
2830 } else {
2831 tls::get(|store| accept(token.as_context_mut(store)))?
2836 };
2837 }
2838
2839 tls::get(|store| {
2840 let count =
2841 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2842
2843 let transmit = store.concurrent_state_mut().get_mut(id)?;
2844
2845 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2846 unreachable!();
2847 };
2848
2849 *guest_offset += count;
2850
2851 transmit.read = ReadState::GuestReady {
2852 ty,
2853 flat_abi,
2854 options,
2855 address,
2856 count,
2857 handle,
2858 instance,
2859 caller,
2860 };
2861
2862 crate::error::Ok(())
2863 })?;
2864
2865 Ok(())
2866 }
2867
2868 ReadState::HostToHost {
2869 accept,
2870 mut buffer,
2871 limit,
2872 } => {
2873 let mut state = StreamResult::Completed;
2874 let mut position = 0;
2875
2876 while !matches!(state, StreamResult::Dropped) && position < limit {
2877 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2878 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2879 (buffer, position, _) = slice_buffer.into_parts();
2880 }
2881
2882 {
2883 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2884
2885 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2886 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2887 }
2888
2889 *pair.lock().unwrap() = Some((mine, buffer));
2890 }
2891
2892 tls::get(|store| {
2893 store.concurrent_state_mut().get_mut(id)?.read = match state {
2894 StreamResult::Dropped => ReadState::Dropped,
2895 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2896 accept,
2897 buffer,
2898 limit: 0,
2899 },
2900 };
2901
2902 crate::error::Ok(())
2903 })?;
2904 Ok(())
2905 }
2906
2907 _ => unreachable!(),
2908 }
2909}
2910
2911impl Instance {
2912 fn consume(
2915 self,
2916 store: &mut dyn VMStore,
2917 kind: TransmitKind,
2918 transmit_id: TableId<TransmitState>,
2919 consume: PollStream,
2920 guest_offset: usize,
2921 cancel: bool,
2922 ) -> Result<ReturnCode> {
2923 let mut future = consume();
2924 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2925 consume,
2926 guest_offset,
2927 cancel,
2928 cancel_waker: None,
2929 };
2930 let poll = tls::set(store, || {
2931 future
2932 .as_mut()
2933 .poll(&mut Context::from_waker(&Waker::noop()))
2934 });
2935
2936 Ok(match poll {
2937 Poll::Ready(state) => {
2938 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2939 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2940 unreachable!();
2941 };
2942 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2943 transmit.write = WriteState::Open;
2944 code
2945 }
2946 Poll::Pending => {
2947 store.pipe_from_guest(kind, transmit_id, future);
2948 ReturnCode::Blocked
2949 }
2950 })
2951 }
2952
2953 fn produce(
2956 self,
2957 store: &mut dyn VMStore,
2958 kind: TransmitKind,
2959 transmit_id: TableId<TransmitState>,
2960 produce: PollStream,
2961 try_into: TryInto,
2962 guest_offset: usize,
2963 cancel: bool,
2964 ) -> Result<ReturnCode> {
2965 let mut future = produce();
2966 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2967 produce,
2968 try_into,
2969 guest_offset,
2970 cancel,
2971 cancel_waker: None,
2972 };
2973 let poll = tls::set(store, || {
2974 future
2975 .as_mut()
2976 .poll(&mut Context::from_waker(&Waker::noop()))
2977 });
2978
2979 Ok(match poll {
2980 Poll::Ready(state) => {
2981 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2982 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2983 unreachable!();
2984 };
2985 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2986 transmit.read = ReadState::Open;
2987 code
2988 }
2989 Poll::Pending => {
2990 store.pipe_to_guest(kind, transmit_id, future);
2991 ReturnCode::Blocked
2992 }
2993 })
2994 }
2995
2996 pub(super) fn guest_drop_writable(
2998 self,
2999 store: &mut StoreOpaque,
3000 ty: TransmitIndex,
3001 writer: u32,
3002 ) -> Result<()> {
3003 let table = self.id().get_mut(store).table_for_transmit(ty);
3004 let transmit_rep = match ty {
3005 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3006 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3007 };
3008
3009 let id = TableId::<TransmitHandle>::new(transmit_rep);
3010 log::trace!("guest_drop_writable: drop writer {id:?}");
3011 match ty {
3012 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3013 TransmitIndex::Future(_) => store.host_drop_writer(
3014 id,
3015 Some(|| {
3016 Err(format_err!(
3017 "cannot drop future write end without first writing a value"
3018 ))
3019 }),
3020 ),
3021 }
3022 }
3023
3024 fn copy<T: 'static>(
3027 self,
3028 mut store: StoreContextMut<T>,
3029 flat_abi: Option<FlatAbi>,
3030 write_caller: RuntimeComponentInstanceIndex,
3031 write_ty: TransmitIndex,
3032 write_options: OptionsIndex,
3033 write_address: usize,
3034 read_caller: RuntimeComponentInstanceIndex,
3035 read_ty: TransmitIndex,
3036 read_options: OptionsIndex,
3037 read_address: usize,
3038 count: usize,
3039 rep: u32,
3040 ) -> Result<()> {
3041 let types = self.id().get(store.0).component().types();
3042 match (write_ty, read_ty) {
3043 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3044 assert_eq!(count, 1);
3045
3046 let payload = types[types[write_ty].ty].payload;
3047
3048 if write_caller == read_caller && !allow_intra_component_read_write(payload) {
3049 bail!(
3050 "cannot read from and write to intra-component future with non-numeric payload"
3051 )
3052 }
3053
3054 let val = payload
3055 .map(|ty| {
3056 let lift =
3057 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3058
3059 let abi = lift.types.canonical_abi(&ty);
3060 if write_address % usize::try_from(abi.align32)? != 0 {
3062 bail!("write pointer not aligned");
3063 }
3064
3065 let bytes = lift
3066 .memory()
3067 .get(write_address..)
3068 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
3069 .ok_or_else(|| {
3070 crate::format_err!("write pointer out of bounds of memory")
3071 })?;
3072
3073 Val::load(lift, ty, bytes)
3074 })
3075 .transpose()?;
3076
3077 if let Some(val) = val {
3078 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3079 let types = lower.types;
3080 let ty = types[types[read_ty].ty].payload.unwrap();
3081 let ptr = func::validate_inbounds_dynamic(
3082 types.canonical_abi(&ty),
3083 lower.as_slice_mut(),
3084 &ValRaw::u32(read_address.try_into().unwrap()),
3085 )?;
3086 val.store(lower, ty, ptr)?;
3087 }
3088 }
3089 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3090 if write_caller == read_caller
3091 && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3092 {
3093 bail!(
3094 "cannot read from and write to intra-component stream with non-numeric payload"
3095 )
3096 }
3097
3098 if let Some(flat_abi) = flat_abi {
3099 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
3101 if length_in_bytes > 0 {
3102 if write_address % usize::try_from(flat_abi.align)? != 0 {
3103 bail!("write pointer not aligned");
3104 }
3105 if read_address % usize::try_from(flat_abi.align)? != 0 {
3106 bail!("read pointer not aligned");
3107 }
3108
3109 let store_opaque = store.0.store_opaque_mut();
3110
3111 {
3112 let src = self
3113 .options_memory(store_opaque, write_options)
3114 .get(write_address..)
3115 .and_then(|b| b.get(..length_in_bytes))
3116 .ok_or_else(|| {
3117 crate::format_err!("write pointer out of bounds of memory")
3118 })?
3119 .as_ptr();
3120 let dst = self
3121 .options_memory_mut(store_opaque, read_options)
3122 .get_mut(read_address..)
3123 .and_then(|b| b.get_mut(..length_in_bytes))
3124 .ok_or_else(|| {
3125 crate::format_err!("read pointer out of bounds of memory")
3126 })?
3127 .as_mut_ptr();
3128 unsafe {
3131 if write_caller == read_caller {
3132 src.copy_to(dst, length_in_bytes)
3136 } else {
3137 src.copy_to_nonoverlapping(dst, length_in_bytes)
3142 }
3143 }
3144 }
3145 }
3146 } else {
3147 let store_opaque = store.0.store_opaque_mut();
3148 let lift = &mut LiftContext::new(store_opaque, write_options, self);
3149 let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3150 let abi = lift.types.canonical_abi(&ty);
3151 let size = usize::try_from(abi.size32).unwrap();
3152 if write_address % usize::try_from(abi.align32)? != 0 {
3153 bail!("write pointer not aligned");
3154 }
3155 let bytes = lift
3156 .memory()
3157 .get(write_address..)
3158 .and_then(|b| b.get(..size * count))
3159 .ok_or_else(|| {
3160 crate::format_err!("write pointer out of bounds of memory")
3161 })?;
3162
3163 let values = (0..count)
3164 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3165 .collect::<Result<Vec<_>>>()?;
3166
3167 let id = TableId::<TransmitHandle>::new(rep);
3168 log::trace!("copy values {values:?} for {id:?}");
3169
3170 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3171 let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3172 let abi = lower.types.canonical_abi(&ty);
3173 if read_address % usize::try_from(abi.align32)? != 0 {
3174 bail!("read pointer not aligned");
3175 }
3176 let size = usize::try_from(abi.size32).unwrap();
3177 lower
3178 .as_slice_mut()
3179 .get_mut(read_address..)
3180 .and_then(|b| b.get_mut(..size * count))
3181 .ok_or_else(|| {
3182 crate::format_err!("read pointer out of bounds of memory")
3183 })?;
3184 let mut ptr = read_address;
3185 for value in values {
3186 value.store(lower, ty, ptr)?;
3187 ptr += size
3188 }
3189 }
3190 }
3191 _ => unreachable!(),
3192 }
3193
3194 Ok(())
3195 }
3196
3197 fn check_bounds(
3198 self,
3199 store: &StoreOpaque,
3200 options: OptionsIndex,
3201 ty: TransmitIndex,
3202 address: usize,
3203 count: usize,
3204 ) -> Result<()> {
3205 let types = self.id().get(store).component().types();
3206 let size = usize::try_from(
3207 match ty {
3208 TransmitIndex::Future(ty) => types[types[ty].ty]
3209 .payload
3210 .map(|ty| types.canonical_abi(&ty).size32),
3211 TransmitIndex::Stream(ty) => types[types[ty].ty]
3212 .payload
3213 .map(|ty| types.canonical_abi(&ty).size32),
3214 }
3215 .unwrap_or(0),
3216 )
3217 .unwrap();
3218
3219 if count > 0 && size > 0 {
3220 self.options_memory(store, options)
3221 .get(address..)
3222 .and_then(|b| b.get(..(size * count)))
3223 .map(drop)
3224 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3225 } else {
3226 Ok(())
3227 }
3228 }
3229
3230 pub(super) fn guest_write<T: 'static>(
3232 self,
3233 mut store: StoreContextMut<T>,
3234 caller: RuntimeComponentInstanceIndex,
3235 ty: TransmitIndex,
3236 options: OptionsIndex,
3237 flat_abi: Option<FlatAbi>,
3238 handle: u32,
3239 address: u32,
3240 count: u32,
3241 ) -> Result<ReturnCode> {
3242 if !self.options(store.0, options).async_ {
3243 store.0.check_blocking()?;
3247 }
3248
3249 let address = usize::try_from(address).unwrap();
3250 let count = usize::try_from(count).unwrap();
3251 self.check_bounds(store.0, options, ty, address, count)?;
3252 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3253 let TransmitLocalState::Write { done } = *state else {
3254 bail!(
3255 "invalid handle {handle}; expected `Write`; got {:?}",
3256 *state
3257 );
3258 };
3259
3260 if done {
3261 bail!("cannot write to stream after being notified that the readable end dropped");
3262 }
3263
3264 *state = TransmitLocalState::Busy;
3265 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3266 let concurrent_state = store.0.concurrent_state_mut();
3267 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3268 let transmit = concurrent_state.get_mut(transmit_id)?;
3269 log::trace!(
3270 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3271 transmit.read
3272 );
3273
3274 if transmit.done {
3275 bail!("cannot write to future after previous write succeeded or readable end dropped");
3276 }
3277
3278 let new_state = if let ReadState::Dropped = &transmit.read {
3279 ReadState::Dropped
3280 } else {
3281 ReadState::Open
3282 };
3283
3284 let set_guest_ready = |me: &mut ConcurrentState| {
3285 let transmit = me.get_mut(transmit_id)?;
3286 assert!(
3287 matches!(&transmit.write, WriteState::Open),
3288 "expected `WriteState::Open`; got `{:?}`",
3289 transmit.write
3290 );
3291 transmit.write = WriteState::GuestReady {
3292 instance: self,
3293 caller,
3294 ty,
3295 flat_abi,
3296 options,
3297 address,
3298 count,
3299 handle,
3300 };
3301 Ok::<_, crate::Error>(())
3302 };
3303
3304 let mut result = match mem::replace(&mut transmit.read, new_state) {
3305 ReadState::GuestReady {
3306 ty: read_ty,
3307 flat_abi: read_flat_abi,
3308 options: read_options,
3309 address: read_address,
3310 count: read_count,
3311 handle: read_handle,
3312 instance: read_instance,
3313 caller: read_caller,
3314 } => {
3315 assert_eq!(flat_abi, read_flat_abi);
3316
3317 if let TransmitIndex::Future(_) = ty {
3318 transmit.done = true;
3319 }
3320
3321 let write_complete = count == 0 || read_count > 0;
3343 let read_complete = count > 0;
3344 let read_buffer_remaining = count < read_count;
3345
3346 let read_handle_rep = transmit.read_handle.rep();
3347
3348 let count = count.min(read_count);
3349
3350 self.copy(
3351 store.as_context_mut(),
3352 flat_abi,
3353 caller,
3354 ty,
3355 options,
3356 address,
3357 read_caller,
3358 read_ty,
3359 read_options,
3360 read_address,
3361 count,
3362 rep,
3363 )?;
3364
3365 let instance = self.id().get_mut(store.0);
3366 let types = instance.component().types();
3367 let item_size = payload(ty, types)
3368 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3369 .unwrap_or(0);
3370 let concurrent_state = store.0.concurrent_state_mut();
3371 if read_complete {
3372 let count = u32::try_from(count).unwrap();
3373 let total = if let Some(Event::StreamRead {
3374 code: ReturnCode::Completed(old_total),
3375 ..
3376 }) = concurrent_state.take_event(read_handle_rep)?
3377 {
3378 count + old_total
3379 } else {
3380 count
3381 };
3382
3383 let code = ReturnCode::completed(ty.kind(), total);
3384
3385 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3386 }
3387
3388 if read_buffer_remaining {
3389 let transmit = concurrent_state.get_mut(transmit_id)?;
3390 transmit.read = ReadState::GuestReady {
3391 ty: read_ty,
3392 flat_abi: read_flat_abi,
3393 options: read_options,
3394 address: read_address + (count * item_size),
3395 count: read_count - count,
3396 handle: read_handle,
3397 instance: read_instance,
3398 caller: read_caller,
3399 };
3400 }
3401
3402 if write_complete {
3403 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3404 } else {
3405 set_guest_ready(concurrent_state)?;
3406 ReturnCode::Blocked
3407 }
3408 }
3409
3410 ReadState::HostReady {
3411 consume,
3412 guest_offset,
3413 cancel,
3414 cancel_waker,
3415 } => {
3416 assert!(cancel_waker.is_none());
3417 assert!(!cancel);
3418 assert_eq!(0, guest_offset);
3419
3420 if let TransmitIndex::Future(_) = ty {
3421 transmit.done = true;
3422 }
3423
3424 set_guest_ready(concurrent_state)?;
3425 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3426 }
3427
3428 ReadState::HostToHost { .. } => unreachable!(),
3429
3430 ReadState::Open => {
3431 set_guest_ready(concurrent_state)?;
3432 ReturnCode::Blocked
3433 }
3434
3435 ReadState::Dropped => {
3436 if let TransmitIndex::Future(_) = ty {
3437 transmit.done = true;
3438 }
3439
3440 ReturnCode::Dropped(0)
3441 }
3442 };
3443
3444 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3445 result = self.wait_for_write(store.0, transmit_handle)?;
3446 }
3447
3448 if result != ReturnCode::Blocked {
3449 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3450 TransmitLocalState::Write {
3451 done: matches!(
3452 (result, ty),
3453 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3454 ),
3455 };
3456 }
3457
3458 log::trace!(
3459 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3460 );
3461
3462 Ok(result)
3463 }
3464
3465 pub(super) fn guest_read<T: 'static>(
3467 self,
3468 mut store: StoreContextMut<T>,
3469 caller: RuntimeComponentInstanceIndex,
3470 ty: TransmitIndex,
3471 options: OptionsIndex,
3472 flat_abi: Option<FlatAbi>,
3473 handle: u32,
3474 address: u32,
3475 count: u32,
3476 ) -> Result<ReturnCode> {
3477 if !self.options(store.0, options).async_ {
3478 store.0.check_blocking()?;
3482 }
3483
3484 let address = usize::try_from(address).unwrap();
3485 let count = usize::try_from(count).unwrap();
3486 self.check_bounds(store.0, options, ty, address, count)?;
3487 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3488 let TransmitLocalState::Read { done } = *state else {
3489 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3490 };
3491
3492 if done {
3493 bail!("cannot read from stream after being notified that the writable end dropped");
3494 }
3495
3496 *state = TransmitLocalState::Busy;
3497 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3498 let concurrent_state = store.0.concurrent_state_mut();
3499 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3500 let transmit = concurrent_state.get_mut(transmit_id)?;
3501 log::trace!(
3502 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3503 transmit.write
3504 );
3505
3506 if transmit.done {
3507 bail!("cannot read from future after previous read succeeded");
3508 }
3509
3510 let new_state = if let WriteState::Dropped = &transmit.write {
3511 WriteState::Dropped
3512 } else {
3513 WriteState::Open
3514 };
3515
3516 let set_guest_ready = |me: &mut ConcurrentState| {
3517 let transmit = me.get_mut(transmit_id)?;
3518 assert!(
3519 matches!(&transmit.read, ReadState::Open),
3520 "expected `ReadState::Open`; got `{:?}`",
3521 transmit.read
3522 );
3523 transmit.read = ReadState::GuestReady {
3524 ty,
3525 flat_abi,
3526 options,
3527 address,
3528 count,
3529 handle,
3530 instance: self,
3531 caller,
3532 };
3533 Ok::<_, crate::Error>(())
3534 };
3535
3536 let mut result = match mem::replace(&mut transmit.write, new_state) {
3537 WriteState::GuestReady {
3538 instance: _,
3539 ty: write_ty,
3540 flat_abi: write_flat_abi,
3541 options: write_options,
3542 address: write_address,
3543 count: write_count,
3544 handle: write_handle,
3545 caller: write_caller,
3546 } => {
3547 assert_eq!(flat_abi, write_flat_abi);
3548
3549 if let TransmitIndex::Future(_) = ty {
3550 transmit.done = true;
3551 }
3552
3553 let write_handle_rep = transmit.write_handle.rep();
3554
3555 let write_complete = write_count == 0 || count > 0;
3560 let read_complete = write_count > 0;
3561 let write_buffer_remaining = count < write_count;
3562
3563 let count = count.min(write_count);
3564
3565 self.copy(
3566 store.as_context_mut(),
3567 flat_abi,
3568 write_caller,
3569 write_ty,
3570 write_options,
3571 write_address,
3572 caller,
3573 ty,
3574 options,
3575 address,
3576 count,
3577 rep,
3578 )?;
3579
3580 let instance = self.id().get_mut(store.0);
3581 let types = instance.component().types();
3582 let item_size = payload(ty, types)
3583 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3584 .unwrap_or(0);
3585 let concurrent_state = store.0.concurrent_state_mut();
3586
3587 if write_complete {
3588 let count = u32::try_from(count).unwrap();
3589 let total = if let Some(Event::StreamWrite {
3590 code: ReturnCode::Completed(old_total),
3591 ..
3592 }) = concurrent_state.take_event(write_handle_rep)?
3593 {
3594 count + old_total
3595 } else {
3596 count
3597 };
3598
3599 let code = ReturnCode::completed(ty.kind(), total);
3600
3601 concurrent_state.send_write_result(
3602 write_ty,
3603 transmit_id,
3604 write_handle,
3605 code,
3606 )?;
3607 }
3608
3609 if write_buffer_remaining {
3610 let transmit = concurrent_state.get_mut(transmit_id)?;
3611 transmit.write = WriteState::GuestReady {
3612 instance: self,
3613 caller: write_caller,
3614 ty: write_ty,
3615 flat_abi: write_flat_abi,
3616 options: write_options,
3617 address: write_address + (count * item_size),
3618 count: write_count - count,
3619 handle: write_handle,
3620 };
3621 }
3622
3623 if read_complete {
3624 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3625 } else {
3626 set_guest_ready(concurrent_state)?;
3627 ReturnCode::Blocked
3628 }
3629 }
3630
3631 WriteState::HostReady {
3632 produce,
3633 try_into,
3634 guest_offset,
3635 cancel,
3636 cancel_waker,
3637 } => {
3638 assert!(cancel_waker.is_none());
3639 assert!(!cancel);
3640 assert_eq!(0, guest_offset);
3641
3642 set_guest_ready(concurrent_state)?;
3643
3644 let code =
3645 self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3646
3647 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3648 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3649 }
3650
3651 code
3652 }
3653
3654 WriteState::Open => {
3655 set_guest_ready(concurrent_state)?;
3656 ReturnCode::Blocked
3657 }
3658
3659 WriteState::Dropped => ReturnCode::Dropped(0),
3660 };
3661
3662 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3663 result = self.wait_for_read(store.0, transmit_handle)?;
3664 }
3665
3666 if result != ReturnCode::Blocked {
3667 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3668 TransmitLocalState::Read {
3669 done: matches!(
3670 (result, ty),
3671 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3672 ),
3673 };
3674 }
3675
3676 log::trace!(
3677 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3678 );
3679
3680 Ok(result)
3681 }
3682
3683 fn wait_for_write(
3684 self,
3685 store: &mut StoreOpaque,
3686 handle: TableId<TransmitHandle>,
3687 ) -> Result<ReturnCode> {
3688 let waitable = Waitable::Transmit(handle);
3689 store.wait_for_event(waitable)?;
3690 let event = waitable.take_event(store.concurrent_state_mut())?;
3691 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3692 event
3693 {
3694 waitable.on_delivery(store, self, event);
3695 Ok(code)
3696 } else {
3697 unreachable!()
3698 }
3699 }
3700
3701 fn cancel_write(
3703 self,
3704 store: &mut StoreOpaque,
3705 transmit_id: TableId<TransmitState>,
3706 async_: bool,
3707 ) -> Result<ReturnCode> {
3708 let state = store.concurrent_state_mut();
3709 let transmit = state.get_mut(transmit_id)?;
3710 log::trace!(
3711 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3712 transmit.read,
3713 transmit.write
3714 );
3715
3716 let code = if let Some(event) =
3717 Waitable::Transmit(transmit.write_handle).take_event(state)?
3718 {
3719 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3720 unreachable!();
3721 };
3722 match (code, event) {
3723 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3724 ReturnCode::Cancelled(count)
3725 }
3726 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3727 _ => unreachable!(),
3728 }
3729 } else if let ReadState::HostReady {
3730 cancel,
3731 cancel_waker,
3732 ..
3733 } = &mut state.get_mut(transmit_id)?.read
3734 {
3735 *cancel = true;
3736 if let Some(waker) = cancel_waker.take() {
3737 waker.wake();
3738 }
3739
3740 if async_ {
3741 ReturnCode::Blocked
3742 } else {
3743 let handle = store
3744 .concurrent_state_mut()
3745 .get_mut(transmit_id)?
3746 .write_handle;
3747 self.wait_for_write(store, handle)?
3748 }
3749 } else {
3750 ReturnCode::Cancelled(0)
3751 };
3752
3753 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3754
3755 match &transmit.write {
3756 WriteState::GuestReady { .. } => {
3757 transmit.write = WriteState::Open;
3758 }
3759 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3760 WriteState::Open | WriteState::Dropped => {}
3761 }
3762
3763 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3764
3765 Ok(code)
3766 }
3767
3768 fn wait_for_read(
3769 self,
3770 store: &mut StoreOpaque,
3771 handle: TableId<TransmitHandle>,
3772 ) -> Result<ReturnCode> {
3773 let waitable = Waitable::Transmit(handle);
3774 store.wait_for_event(waitable)?;
3775 let event = waitable.take_event(store.concurrent_state_mut())?;
3776 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3777 event
3778 {
3779 waitable.on_delivery(store, self, event);
3780 Ok(code)
3781 } else {
3782 unreachable!()
3783 }
3784 }
3785
3786 fn cancel_read(
3788 self,
3789 store: &mut StoreOpaque,
3790 transmit_id: TableId<TransmitState>,
3791 async_: bool,
3792 ) -> Result<ReturnCode> {
3793 let state = store.concurrent_state_mut();
3794 let transmit = state.get_mut(transmit_id)?;
3795 log::trace!(
3796 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3797 transmit.read,
3798 transmit.write
3799 );
3800
3801 let code = if let Some(event) =
3802 Waitable::Transmit(transmit.read_handle).take_event(state)?
3803 {
3804 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3805 unreachable!();
3806 };
3807 match (code, event) {
3808 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3809 ReturnCode::Cancelled(count)
3810 }
3811 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3812 _ => unreachable!(),
3813 }
3814 } else if let WriteState::HostReady {
3815 cancel,
3816 cancel_waker,
3817 ..
3818 } = &mut state.get_mut(transmit_id)?.write
3819 {
3820 *cancel = true;
3821 if let Some(waker) = cancel_waker.take() {
3822 waker.wake();
3823 }
3824
3825 if async_ {
3826 ReturnCode::Blocked
3827 } else {
3828 let handle = store
3829 .concurrent_state_mut()
3830 .get_mut(transmit_id)?
3831 .read_handle;
3832 self.wait_for_read(store, handle)?
3833 }
3834 } else {
3835 ReturnCode::Cancelled(0)
3836 };
3837
3838 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3839
3840 match &transmit.read {
3841 ReadState::GuestReady { .. } => {
3842 transmit.read = ReadState::Open;
3843 }
3844 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3845 todo!("support host read cancellation")
3846 }
3847 ReadState::Open | ReadState::Dropped => {}
3848 }
3849
3850 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3851
3852 Ok(code)
3853 }
3854
3855 fn guest_cancel_write(
3857 self,
3858 store: &mut StoreOpaque,
3859 ty: TransmitIndex,
3860 async_: bool,
3861 writer: u32,
3862 ) -> Result<ReturnCode> {
3863 if !async_ {
3864 store.check_blocking()?;
3868 }
3869
3870 let (rep, state) =
3871 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3872 let id = TableId::<TransmitHandle>::new(rep);
3873 log::trace!("guest cancel write {id:?} (handle {writer})");
3874 match state {
3875 TransmitLocalState::Write { .. } => {
3876 bail!("stream or future write cancelled when no write is pending")
3877 }
3878 TransmitLocalState::Read { .. } => {
3879 bail!("passed read end to `{{stream|future}}.cancel-write`")
3880 }
3881 TransmitLocalState::Busy => {}
3882 }
3883 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3884 let code = self.cancel_write(store, transmit_id, async_)?;
3885 if !matches!(code, ReturnCode::Blocked) {
3886 let state =
3887 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3888 .1;
3889 if let TransmitLocalState::Busy = state {
3890 *state = TransmitLocalState::Write { done: false };
3891 }
3892 }
3893 Ok(code)
3894 }
3895
3896 fn guest_cancel_read(
3898 self,
3899 store: &mut StoreOpaque,
3900 ty: TransmitIndex,
3901 async_: bool,
3902 reader: u32,
3903 ) -> Result<ReturnCode> {
3904 if !async_ {
3905 store.check_blocking()?;
3909 }
3910
3911 let (rep, state) =
3912 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3913 let id = TableId::<TransmitHandle>::new(rep);
3914 log::trace!("guest cancel read {id:?} (handle {reader})");
3915 match state {
3916 TransmitLocalState::Read { .. } => {
3917 bail!("stream or future read cancelled when no read is pending")
3918 }
3919 TransmitLocalState::Write { .. } => {
3920 bail!("passed write end to `{{stream|future}}.cancel-read`")
3921 }
3922 TransmitLocalState::Busy => {}
3923 }
3924 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3925 let code = self.cancel_read(store, transmit_id, async_)?;
3926 if !matches!(code, ReturnCode::Blocked) {
3927 let state =
3928 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3929 .1;
3930 if let TransmitLocalState::Busy = state {
3931 *state = TransmitLocalState::Read { done: false };
3932 }
3933 }
3934 Ok(code)
3935 }
3936
3937 fn guest_drop_readable(
3939 self,
3940 store: &mut StoreOpaque,
3941 ty: TransmitIndex,
3942 reader: u32,
3943 ) -> Result<()> {
3944 let table = self.id().get_mut(store).table_for_transmit(ty);
3945 let (rep, _is_done) = match ty {
3946 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3947 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3948 };
3949 let kind = match ty {
3950 TransmitIndex::Stream(_) => TransmitKind::Stream,
3951 TransmitIndex::Future(_) => TransmitKind::Future,
3952 };
3953 let id = TableId::<TransmitHandle>::new(rep);
3954 log::trace!("guest_drop_readable: drop reader {id:?}");
3955 store.host_drop_reader(id, kind)
3956 }
3957
3958 pub(crate) fn error_context_new(
3960 self,
3961 store: &mut StoreOpaque,
3962 caller: RuntimeComponentInstanceIndex,
3963 ty: TypeComponentLocalErrorContextTableIndex,
3964 options: OptionsIndex,
3965 debug_msg_address: u32,
3966 debug_msg_len: u32,
3967 ) -> Result<u32> {
3968 self.id().get(store).check_may_leave(caller)?;
3969 let lift_ctx = &mut LiftContext::new(store, options, self);
3970 let debug_msg = String::linear_lift_from_flat(
3971 lift_ctx,
3972 InterfaceType::String,
3973 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
3974 )?;
3975
3976 let err_ctx = ErrorContextState { debug_msg };
3978 let state = store.concurrent_state_mut();
3979 let table_id = state.push(err_ctx)?;
3980 let global_ref_count_idx =
3981 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3982
3983 let _ = state
3985 .global_error_context_ref_counts
3986 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3987
3988 let local_idx = self
3995 .id()
3996 .get_mut(store)
3997 .table_for_error_context(ty)
3998 .error_context_insert(table_id.rep())?;
3999
4000 Ok(local_idx)
4001 }
4002
4003 pub(super) fn error_context_debug_message<T>(
4005 self,
4006 store: StoreContextMut<T>,
4007 ty: TypeComponentLocalErrorContextTableIndex,
4008 options: OptionsIndex,
4009 err_ctx_handle: u32,
4010 debug_msg_address: u32,
4011 ) -> Result<()> {
4012 let handle_table_id_rep = self
4014 .id()
4015 .get_mut(store.0)
4016 .table_for_error_context(ty)
4017 .error_context_rep(err_ctx_handle)?;
4018
4019 let state = store.0.concurrent_state_mut();
4020 let ErrorContextState { debug_msg } =
4022 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4023 let debug_msg = debug_msg.clone();
4024
4025 let lower_cx = &mut LowerContext::new(store, options, self);
4026 let debug_msg_address = usize::try_from(debug_msg_address)?;
4027 let offset = lower_cx
4029 .as_slice_mut()
4030 .get(debug_msg_address..)
4031 .and_then(|b| b.get(..debug_msg.bytes().len()))
4032 .map(|_| debug_msg_address)
4033 .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4034 debug_msg
4035 .as_str()
4036 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4037
4038 Ok(())
4039 }
4040
4041 pub(crate) fn future_cancel_read(
4043 self,
4044 store: &mut StoreOpaque,
4045 caller: RuntimeComponentInstanceIndex,
4046 ty: TypeFutureTableIndex,
4047 async_: bool,
4048 reader: u32,
4049 ) -> Result<u32> {
4050 self.id().get(store).check_may_leave(caller)?;
4051 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4052 .map(|v| v.encode())
4053 }
4054
4055 pub(crate) fn future_cancel_write(
4057 self,
4058 store: &mut StoreOpaque,
4059 caller: RuntimeComponentInstanceIndex,
4060 ty: TypeFutureTableIndex,
4061 async_: bool,
4062 writer: u32,
4063 ) -> Result<u32> {
4064 self.id().get(store).check_may_leave(caller)?;
4065 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4066 .map(|v| v.encode())
4067 }
4068
4069 pub(crate) fn stream_cancel_read(
4071 self,
4072 store: &mut StoreOpaque,
4073 caller: RuntimeComponentInstanceIndex,
4074 ty: TypeStreamTableIndex,
4075 async_: bool,
4076 reader: u32,
4077 ) -> Result<u32> {
4078 self.id().get(store).check_may_leave(caller)?;
4079 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4080 .map(|v| v.encode())
4081 }
4082
4083 pub(crate) fn stream_cancel_write(
4085 self,
4086 store: &mut StoreOpaque,
4087 caller: RuntimeComponentInstanceIndex,
4088 ty: TypeStreamTableIndex,
4089 async_: bool,
4090 writer: u32,
4091 ) -> Result<u32> {
4092 self.id().get(store).check_may_leave(caller)?;
4093 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4094 .map(|v| v.encode())
4095 }
4096
4097 pub(crate) fn future_drop_readable(
4099 self,
4100 store: &mut StoreOpaque,
4101 caller: RuntimeComponentInstanceIndex,
4102 ty: TypeFutureTableIndex,
4103 reader: u32,
4104 ) -> Result<()> {
4105 self.id().get(store).check_may_leave(caller)?;
4106 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4107 }
4108
4109 pub(crate) fn stream_drop_readable(
4111 self,
4112 store: &mut StoreOpaque,
4113 caller: RuntimeComponentInstanceIndex,
4114 ty: TypeStreamTableIndex,
4115 reader: u32,
4116 ) -> Result<()> {
4117 self.id().get(store).check_may_leave(caller)?;
4118 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4119 }
4120
4121 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4125 let (write, read) = store
4126 .concurrent_state_mut()
4127 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4128
4129 let table = self.id().get_mut(store).table_for_transmit(ty);
4130 let (read_handle, write_handle) = match ty {
4131 TransmitIndex::Future(ty) => (
4132 table.future_insert_read(ty, read.rep())?,
4133 table.future_insert_write(ty, write.rep())?,
4134 ),
4135 TransmitIndex::Stream(ty) => (
4136 table.stream_insert_read(ty, read.rep())?,
4137 table.stream_insert_write(ty, write.rep())?,
4138 ),
4139 };
4140
4141 let state = store.concurrent_state_mut();
4142 state.get_mut(read)?.common.handle = Some(read_handle);
4143 state.get_mut(write)?.common.handle = Some(write_handle);
4144
4145 Ok(ResourcePair {
4146 write: write_handle,
4147 read: read_handle,
4148 })
4149 }
4150
4151 pub(crate) fn error_context_drop(
4153 self,
4154 store: &mut StoreOpaque,
4155 caller: RuntimeComponentInstanceIndex,
4156 ty: TypeComponentLocalErrorContextTableIndex,
4157 error_context: u32,
4158 ) -> Result<()> {
4159 let instance = self.id().get_mut(store);
4160 instance.check_may_leave(caller)?;
4161
4162 let local_handle_table = instance.table_for_error_context(ty);
4163
4164 let rep = local_handle_table.error_context_drop(error_context)?;
4165
4166 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4167
4168 let state = store.concurrent_state_mut();
4169 let GlobalErrorContextRefCount(global_ref_count) = state
4170 .global_error_context_ref_counts
4171 .get_mut(&global_ref_count_idx)
4172 .expect("retrieve concurrent state for error context during drop");
4173
4174 assert!(*global_ref_count >= 1);
4176 *global_ref_count -= 1;
4177 if *global_ref_count == 0 {
4178 state
4179 .global_error_context_ref_counts
4180 .remove(&global_ref_count_idx);
4181
4182 state
4183 .delete(TableId::<ErrorContextState>::new(rep))
4184 .context("deleting component-global error context data")?;
4185 }
4186
4187 Ok(())
4188 }
4189
4190 fn guest_transfer(
4193 self,
4194 store: &mut StoreOpaque,
4195 src_idx: u32,
4196 src: TransmitIndex,
4197 dst: TransmitIndex,
4198 ) -> Result<u32> {
4199 let mut instance = self.id().get_mut(store);
4200 let src_table = instance.as_mut().table_for_transmit(src);
4201 let (rep, is_done) = match src {
4202 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4203 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4204 };
4205 if is_done {
4206 bail!("cannot lift after being notified that the writable end dropped");
4207 }
4208 let dst_table = instance.table_for_transmit(dst);
4209 let handle = match dst {
4210 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4211 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4212 }?;
4213 store
4214 .concurrent_state_mut()
4215 .get_mut(TableId::<TransmitHandle>::new(rep))?
4216 .common
4217 .handle = Some(handle);
4218 Ok(handle)
4219 }
4220
4221 pub(crate) fn future_new(
4223 self,
4224 store: &mut StoreOpaque,
4225 caller: RuntimeComponentInstanceIndex,
4226 ty: TypeFutureTableIndex,
4227 ) -> Result<ResourcePair> {
4228 self.id().get(store).check_may_leave(caller)?;
4229 self.guest_new(store, TransmitIndex::Future(ty))
4230 }
4231
4232 pub(crate) fn stream_new(
4234 self,
4235 store: &mut StoreOpaque,
4236 caller: RuntimeComponentInstanceIndex,
4237 ty: TypeStreamTableIndex,
4238 ) -> Result<ResourcePair> {
4239 self.id().get(store).check_may_leave(caller)?;
4240 self.guest_new(store, TransmitIndex::Stream(ty))
4241 }
4242
4243 pub(crate) fn future_transfer(
4246 self,
4247 store: &mut StoreOpaque,
4248 src_idx: u32,
4249 src: TypeFutureTableIndex,
4250 dst: TypeFutureTableIndex,
4251 ) -> Result<u32> {
4252 self.guest_transfer(
4253 store,
4254 src_idx,
4255 TransmitIndex::Future(src),
4256 TransmitIndex::Future(dst),
4257 )
4258 }
4259
4260 pub(crate) fn stream_transfer(
4263 self,
4264 store: &mut StoreOpaque,
4265 src_idx: u32,
4266 src: TypeStreamTableIndex,
4267 dst: TypeStreamTableIndex,
4268 ) -> Result<u32> {
4269 self.guest_transfer(
4270 store,
4271 src_idx,
4272 TransmitIndex::Stream(src),
4273 TransmitIndex::Stream(dst),
4274 )
4275 }
4276
4277 pub(crate) fn error_context_transfer(
4279 self,
4280 store: &mut StoreOpaque,
4281 src_idx: u32,
4282 src: TypeComponentLocalErrorContextTableIndex,
4283 dst: TypeComponentLocalErrorContextTableIndex,
4284 ) -> Result<u32> {
4285 let mut instance = self.id().get_mut(store);
4286 let rep = instance
4287 .as_mut()
4288 .table_for_error_context(src)
4289 .error_context_rep(src_idx)?;
4290 let dst_idx = instance
4291 .table_for_error_context(dst)
4292 .error_context_insert(rep)?;
4293
4294 let global_ref_count = store
4298 .concurrent_state_mut()
4299 .global_error_context_ref_counts
4300 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4301 .context("global ref count present for existing (sub)component error context")?;
4302 global_ref_count.0 += 1;
4303
4304 Ok(dst_idx)
4305 }
4306}
4307
4308impl ComponentInstance {
4309 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4310 let (states, types) = self.instance_states();
4311 let runtime_instance = match ty {
4312 TransmitIndex::Stream(ty) => types[ty].instance,
4313 TransmitIndex::Future(ty) => types[ty].instance,
4314 };
4315 states[runtime_instance].handle_table()
4316 }
4317
4318 fn table_for_error_context(
4319 self: Pin<&mut Self>,
4320 ty: TypeComponentLocalErrorContextTableIndex,
4321 ) -> &mut HandleTable {
4322 let (states, types) = self.instance_states();
4323 let runtime_instance = types[ty].instance;
4324 states[runtime_instance].handle_table()
4325 }
4326
4327 fn get_mut_by_index(
4328 self: Pin<&mut Self>,
4329 ty: TransmitIndex,
4330 index: u32,
4331 ) -> Result<(u32, &mut TransmitLocalState)> {
4332 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4333 }
4334}
4335
4336impl ConcurrentState {
4337 fn send_write_result(
4338 &mut self,
4339 ty: TransmitIndex,
4340 id: TableId<TransmitState>,
4341 handle: u32,
4342 code: ReturnCode,
4343 ) -> Result<()> {
4344 let write_handle = self.get_mut(id)?.write_handle.rep();
4345 self.set_event(
4346 write_handle,
4347 match ty {
4348 TransmitIndex::Future(ty) => Event::FutureWrite {
4349 code,
4350 pending: Some((ty, handle)),
4351 },
4352 TransmitIndex::Stream(ty) => Event::StreamWrite {
4353 code,
4354 pending: Some((ty, handle)),
4355 },
4356 },
4357 )
4358 }
4359
4360 fn send_read_result(
4361 &mut self,
4362 ty: TransmitIndex,
4363 id: TableId<TransmitState>,
4364 handle: u32,
4365 code: ReturnCode,
4366 ) -> Result<()> {
4367 let read_handle = self.get_mut(id)?.read_handle.rep();
4368 self.set_event(
4369 read_handle,
4370 match ty {
4371 TransmitIndex::Future(ty) => Event::FutureRead {
4372 code,
4373 pending: Some((ty, handle)),
4374 },
4375 TransmitIndex::Stream(ty) => Event::StreamRead {
4376 code,
4377 pending: Some((ty, handle)),
4378 },
4379 },
4380 )
4381 }
4382
4383 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4384 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4385 }
4386
4387 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4388 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4389 }
4390
4391 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4402 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4403
4404 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4405 let (ReturnCode::Completed(count)
4406 | ReturnCode::Dropped(count)
4407 | ReturnCode::Cancelled(count)) = old
4408 else {
4409 unreachable!()
4410 };
4411
4412 match new {
4413 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4414 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4415 _ => unreachable!(),
4416 }
4417 }
4418
4419 let event = match (waitable.take_event(self)?, event) {
4420 (None, _) => event,
4421 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4422 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4423 (
4424 Some(Event::StreamWrite {
4425 code: old_code,
4426 pending: old_pending,
4427 }),
4428 Event::StreamWrite { code, pending },
4429 ) => Event::StreamWrite {
4430 code: update_code(old_code, code),
4431 pending: old_pending.or(pending),
4432 },
4433 (
4434 Some(Event::StreamRead {
4435 code: old_code,
4436 pending: old_pending,
4437 }),
4438 Event::StreamRead { code, pending },
4439 ) => Event::StreamRead {
4440 code: update_code(old_code, code),
4441 pending: old_pending.or(pending),
4442 },
4443 _ => unreachable!(),
4444 };
4445
4446 waitable.set_event(self, Some(event))
4447 }
4448
4449 fn new_transmit(
4452 &mut self,
4453 origin: TransmitOrigin,
4454 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4455 let state_id = self.push(TransmitState::new(origin))?;
4456
4457 let write = self.push(TransmitHandle::new(state_id))?;
4458 let read = self.push(TransmitHandle::new(state_id))?;
4459
4460 let state = self.get_mut(state_id)?;
4461 state.write_handle = write;
4462 state.read_handle = read;
4463
4464 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4465
4466 Ok((write, read))
4467 }
4468
4469 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4471 let state = self.delete(state_id)?;
4472 self.delete(state.write_handle)?;
4473 self.delete(state.read_handle)?;
4474
4475 log::trace!(
4476 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4477 state.write_handle,
4478 state.read_handle,
4479 );
4480
4481 Ok(())
4482 }
4483}
4484
4485pub(crate) struct ResourcePair {
4486 pub(crate) write: u32,
4487 pub(crate) read: u32,
4488}
4489
4490impl Waitable {
4491 pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4494 match event {
4495 Event::FutureRead {
4496 pending: Some((ty, handle)),
4497 ..
4498 }
4499 | Event::FutureWrite {
4500 pending: Some((ty, handle)),
4501 ..
4502 } => {
4503 let instance = instance.id().get_mut(store);
4504 let runtime_instance = instance.component().types()[ty].instance;
4505 let (rep, state) = instance.instance_states().0[runtime_instance]
4506 .handle_table()
4507 .future_rep(ty, handle)
4508 .unwrap();
4509 assert_eq!(rep, self.rep());
4510 assert_eq!(*state, TransmitLocalState::Busy);
4511 *state = match event {
4512 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4513 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4514 _ => unreachable!(),
4515 };
4516 }
4517 Event::StreamRead {
4518 pending: Some((ty, handle)),
4519 code,
4520 }
4521 | Event::StreamWrite {
4522 pending: Some((ty, handle)),
4523 code,
4524 } => {
4525 let instance = instance.id().get_mut(store);
4526 let runtime_instance = instance.component().types()[ty].instance;
4527 let (rep, state) = instance.instance_states().0[runtime_instance]
4528 .handle_table()
4529 .stream_rep(ty, handle)
4530 .unwrap();
4531 assert_eq!(rep, self.rep());
4532 assert_eq!(*state, TransmitLocalState::Busy);
4533 let done = matches!(code, ReturnCode::Dropped(_));
4534 *state = match event {
4535 Event::StreamRead { .. } => TransmitLocalState::Read { done },
4536 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4537 _ => unreachable!(),
4538 };
4539
4540 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4541 let state = store.concurrent_state_mut();
4542 let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4543 let transmit = state.get_mut(transmit_id).unwrap();
4544
4545 match event {
4546 Event::StreamRead { .. } => {
4547 transmit.read = ReadState::Open;
4548 }
4549 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4550 _ => unreachable!(),
4551 };
4552 }
4553 _ => {}
4554 }
4555 }
4556}
4557
4558fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4562 matches!(
4563 ty,
4564 None | Some(
4565 InterfaceType::S8
4566 | InterfaceType::U8
4567 | InterfaceType::S16
4568 | InterfaceType::U16
4569 | InterfaceType::S32
4570 | InterfaceType::U32
4571 | InterfaceType::S64
4572 | InterfaceType::U64
4573 | InterfaceType::Float32
4574 | InterfaceType::Float64
4575 )
4576 )
4577}
4578
4579#[cfg(test)]
4580mod tests {
4581 use super::*;
4582 use crate::{Engine, Store};
4583 use core::future::pending;
4584 use core::pin::pin;
4585 use std::sync::LazyLock;
4586
4587 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4588
4589 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4590 where
4591 T: FutureProducer<()>,
4592 {
4593 rx.poll_produce(
4594 &mut Context::from_waker(Waker::noop()),
4595 Store::new(&ENGINE, ()).as_context_mut(),
4596 finish,
4597 )
4598 }
4599
4600 #[test]
4601 fn future_producer() {
4602 let mut fut = pin!(async { crate::error::Ok(()) });
4603 assert!(matches!(
4604 poll_future_producer(fut.as_mut(), false),
4605 Poll::Ready(Ok(Some(()))),
4606 ));
4607
4608 let mut fut = pin!(async { crate::error::Ok(()) });
4609 assert!(matches!(
4610 poll_future_producer(fut.as_mut(), true),
4611 Poll::Ready(Ok(Some(()))),
4612 ));
4613
4614 let mut fut = pin!(pending::<Result<()>>());
4615 assert!(matches!(
4616 poll_future_producer(fut.as_mut(), false),
4617 Poll::Pending,
4618 ));
4619 assert!(matches!(
4620 poll_future_producer(fut.as_mut(), true),
4621 Poll::Ready(Ok(None)),
4622 ));
4623
4624 let (tx, rx) = oneshot::channel();
4625 let mut rx = pin!(rx);
4626 assert!(matches!(
4627 poll_future_producer(rx.as_mut(), false),
4628 Poll::Pending,
4629 ));
4630 assert!(matches!(
4631 poll_future_producer(rx.as_mut(), true),
4632 Poll::Ready(Ok(None)),
4633 ));
4634 tx.send(()).unwrap();
4635 assert!(matches!(
4636 poll_future_producer(rx.as_mut(), true),
4637 Poll::Ready(Ok(Some(()))),
4638 ));
4639
4640 let (tx, rx) = oneshot::channel();
4641 let mut rx = pin!(rx);
4642 tx.send(()).unwrap();
4643 assert!(matches!(
4644 poll_future_producer(rx.as_mut(), false),
4645 Poll::Ready(Ok(Some(()))),
4646 ));
4647
4648 let (tx, rx) = oneshot::channel::<()>();
4649 let mut rx = pin!(rx);
4650 drop(tx);
4651 assert!(matches!(
4652 poll_future_producer(rx.as_mut(), false),
4653 Poll::Ready(Err(..)),
4654 ));
4655
4656 let (tx, rx) = oneshot::channel::<()>();
4657 let mut rx = pin!(rx);
4658 drop(tx);
4659 assert!(matches!(
4660 poll_future_producer(rx.as_mut(), true),
4661 Poll::Ready(Err(..)),
4662 ));
4663 }
4664}