1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17 Error, Result, bail, bail_bug, ensure,
18 error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, ManuallyDrop, MaybeUninit};
26use core::ops::{Deref, DerefMut};
27use core::pin::Pin;
28use core::task::{Context, Poll, Waker, ready};
29use futures::channel::oneshot;
30use futures::{FutureExt as _, stream};
31use std::any::{Any, TypeId};
32use std::boxed::Box;
33use std::io::Cursor;
34use std::string::String;
35use std::sync::{Arc, Mutex, MutexGuard};
36use std::vec::Vec;
37use wasmtime_environ::component::{
38 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
39 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
40 TypeFutureTableIndex, TypeStreamTableIndex,
41};
42
43pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
44
45mod buffers;
46
47#[derive(Copy, Clone, Debug)]
50pub enum TransmitKind {
51 Stream,
52 Future,
53}
54
55#[derive(Copy, Clone, Debug, PartialEq)]
57pub enum ReturnCode {
58 Blocked,
59 Completed(u32),
60 Dropped(u32),
61 Cancelled(u32),
62}
63
64impl ReturnCode {
65 pub fn encode(&self) -> u32 {
70 const BLOCKED: u32 = 0xffff_ffff;
71 const COMPLETED: u32 = 0x0;
72 const DROPPED: u32 = 0x1;
73 const CANCELLED: u32 = 0x2;
74 match self {
75 ReturnCode::Blocked => BLOCKED,
76 ReturnCode::Completed(n) => {
77 debug_assert!(*n < (1 << 28));
78 (n << 4) | COMPLETED
79 }
80 ReturnCode::Dropped(n) => {
81 debug_assert!(*n < (1 << 28));
82 (n << 4) | DROPPED
83 }
84 ReturnCode::Cancelled(n) => {
85 debug_assert!(*n < (1 << 28));
86 (n << 4) | CANCELLED
87 }
88 }
89 }
90
91 fn completed(kind: TransmitKind, count: u32) -> Self {
94 Self::Completed(if let TransmitKind::Future = kind {
95 0
96 } else {
97 count
98 })
99 }
100}
101
102#[derive(Copy, Clone, Debug)]
107pub enum TransmitIndex {
108 Stream(TypeStreamTableIndex),
109 Future(TypeFutureTableIndex),
110}
111
112impl TransmitIndex {
113 pub fn kind(&self) -> TransmitKind {
114 match self {
115 TransmitIndex::Stream(_) => TransmitKind::Stream,
116 TransmitIndex::Future(_) => TransmitKind::Future,
117 }
118 }
119}
120
121fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
124 match ty {
125 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
126 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
127 }
128}
129
130fn get_mut_by_index_from(
133 handle_table: &mut HandleTable,
134 ty: TransmitIndex,
135 index: u32,
136) -> Result<(u32, &mut TransmitLocalState)> {
137 match ty {
138 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
139 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
140 }
141}
142
143fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
144 mut store: StoreContextMut<U>,
145 instance: Instance,
146 caller_thread: QualifiedThreadId,
147 options: OptionsIndex,
148 ty: TransmitIndex,
149 address: usize,
150 count: usize,
151 buffer: &mut B,
152) -> Result<()> {
153 let count = buffer.remaining().len().min(count);
154
155 let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
159 let old_thread = store.0.set_thread(caller_thread)?;
160 (
161 &mut LowerContext::new(store.as_context_mut(), options, instance),
162 Some(old_thread),
163 )
164 } else {
165 (
166 &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
167 None,
168 )
169 };
170
171 if address % usize::try_from(T::ALIGN32)? != 0 {
172 bail!("read pointer not aligned");
173 }
174 lower
175 .as_slice_mut()
176 .get_mut(address..)
177 .and_then(|b| b.get_mut(..T::SIZE32 * count))
178 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
179
180 if let Some(ty) = payload(ty, lower.types) {
181 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
182 }
183
184 if let Some(old_thread) = old_thread {
185 store.0.set_thread(old_thread)?;
186 }
187
188 buffer.skip(count);
189
190 Ok(())
191}
192
193fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
194 lift: &mut LiftContext<'_>,
195 ty: Option<InterfaceType>,
196 buffer: &mut B,
197 address: usize,
198 count: usize,
199) -> Result<()> {
200 let count = count.min(buffer.remaining_capacity());
201 if T::IS_RUST_UNIT_TYPE {
202 buffer.extend(
206 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
207 )
208 } else {
209 let ty = match ty {
210 Some(ty) => ty,
211 None => bail_bug!("type required for non-unit lift"),
212 };
213 if address % usize::try_from(T::ALIGN32)? != 0 {
214 bail!("write pointer not aligned");
215 }
216 lift.memory()
217 .get(address..)
218 .and_then(|b| b.get(..T::SIZE32 * count))
219 .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
220
221 let list = &WasmList::new(address, count, lift, ty)?;
222 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
223 }
224 Ok(())
225}
226
227#[derive(Debug, PartialEq, Eq, PartialOrd)]
229pub(super) struct ErrorContextState {
230 pub(crate) debug_msg: String,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub(super) struct FlatAbi {
238 pub(super) size: u32,
239 pub(super) align: u32,
240}
241
242pub struct Destination<'a, T, B> {
244 id: TableId<TransmitState>,
245 buffer: &'a mut B,
246 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
247 _phantom: PhantomData<fn() -> T>,
248}
249
250impl<'a, T, B> Destination<'a, T, B> {
251 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
253 Destination {
254 id: self.id,
255 buffer: &mut *self.buffer,
256 host_buffer: self.host_buffer.as_deref_mut(),
257 _phantom: PhantomData,
258 }
259 }
260
261 pub fn take_buffer(&mut self) -> B
267 where
268 B: Default,
269 {
270 mem::take(self.buffer)
271 }
272
273 pub fn set_buffer(&mut self, buffer: B) {
283 *self.buffer = buffer;
284 }
285
286 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
303 self.remaining_(store.as_context_mut().0).unwrap()
307 }
308
309 fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
310 let transmit = store.concurrent_state_mut().get_mut(self.id)?;
311
312 if let &ReadState::GuestReady { count, .. } = &transmit.read {
313 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
314 bail_bug!("expected WriteState::HostReady")
315 };
316
317 Ok(Some(count - guest_offset))
318 } else {
319 Ok(None)
320 }
321 }
322}
323
324impl<'a, B> Destination<'a, u8, B> {
325 pub fn as_direct<D>(
336 mut self,
337 store: StoreContextMut<'a, D>,
338 capacity: usize,
339 ) -> DirectDestination<'a, D> {
340 if let Some(buffer) = self.host_buffer.as_deref_mut() {
341 buffer.set_position(0);
342 if buffer.get_mut().is_empty() {
343 buffer.get_mut().resize(capacity, 0);
344 }
345 }
346
347 DirectDestination {
348 id: self.id,
349 host_buffer: self.host_buffer,
350 store,
351 }
352 }
353}
354
355pub struct DirectDestination<'a, D: 'static> {
358 id: TableId<TransmitState>,
359 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
360 store: StoreContextMut<'a, D>,
361}
362
363impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
364 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
365 let rem = self.remaining();
366 let n = rem.len().min(buf.len());
367 rem[..n].copy_from_slice(&buf[..n]);
368 self.mark_written(n);
369 Ok(n)
370 }
371
372 fn flush(&mut self) -> std::io::Result<()> {
373 Ok(())
374 }
375}
376
377impl<D: 'static> DirectDestination<'_, D> {
378 pub fn remaining(&mut self) -> &mut [u8] {
380 self.remaining_().unwrap()
384 }
385
386 fn remaining_(&mut self) -> Result<&mut [u8]> {
387 if let Some(buffer) = self.host_buffer.as_deref_mut() {
388 return Ok(buffer.get_mut());
389 }
390 let transmit = self
391 .store
392 .as_context_mut()
393 .0
394 .concurrent_state_mut()
395 .get_mut(self.id)?;
396
397 let &ReadState::GuestReady {
398 address,
399 count,
400 options,
401 instance,
402 ..
403 } = &transmit.read
404 else {
405 bail_bug!("expected ReadState::GuestReady")
406 };
407
408 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
409 bail_bug!("expected WriteState::HostReady")
410 };
411
412 let memory = instance
413 .options_memory_mut(self.store.0, options)
414 .get_mut((address + guest_offset)..)
415 .and_then(|b| b.get_mut(..(count - guest_offset)));
416 match memory {
417 Some(memory) => Ok(memory),
418 None => bail_bug!("guest buffer unexpectedly out of bounds"),
419 }
420 }
421
422 pub fn mark_written(&mut self, count: usize) {
429 self.mark_written_(count).unwrap()
433 }
434
435 fn mark_written_(&mut self, count: usize) -> Result<()> {
436 if let Some(buffer) = self.host_buffer.as_deref_mut() {
437 buffer.set_position(
438 buffer
441 .position()
442 .checked_add(u64::try_from(count).unwrap())
443 .unwrap(),
444 );
445 } else {
446 let transmit = self
447 .store
448 .as_context_mut()
449 .0
450 .concurrent_state_mut()
451 .get_mut(self.id)?;
452
453 let ReadState::GuestReady {
454 count: read_count, ..
455 } = &transmit.read
456 else {
457 bail_bug!("expected ReadState::GuestReady")
458 };
459
460 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
461 bail_bug!("expected WriteState::HostReady");
462 };
463
464 if *guest_offset + count > *read_count {
465 panic!(
468 "write count ({count}) must be less than or equal to read count ({read_count})"
469 )
470 } else {
471 *guest_offset += count;
472 }
473 }
474 Ok(())
475 }
476}
477
478#[derive(Copy, Clone, Debug)]
480pub enum StreamResult {
481 Completed,
484 Cancelled,
489 Dropped,
492}
493
494pub trait StreamProducer<D>: Send + 'static {
496 type Item;
498
499 type Buffer: WriteBuffer<Self::Item> + Default;
501
502 fn poll_produce<'a>(
638 self: Pin<&mut Self>,
639 cx: &mut Context<'_>,
640 store: StoreContextMut<'a, D>,
641 destination: Destination<'a, Self::Item, Self::Buffer>,
642 finish: bool,
643 ) -> Poll<Result<StreamResult>>;
644
645 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
651 Err(me)
652 }
653}
654
655impl<T, D> StreamProducer<D> for iter::Empty<T>
656where
657 T: Send + Sync + 'static,
658{
659 type Item = T;
660 type Buffer = Option<Self::Item>;
661
662 fn poll_produce<'a>(
663 self: Pin<&mut Self>,
664 _: &mut Context<'_>,
665 _: StoreContextMut<'a, D>,
666 _: Destination<'a, Self::Item, Self::Buffer>,
667 _: bool,
668 ) -> Poll<Result<StreamResult>> {
669 Poll::Ready(Ok(StreamResult::Dropped))
670 }
671}
672
673impl<T, D> StreamProducer<D> for stream::Empty<T>
674where
675 T: Send + Sync + 'static,
676{
677 type Item = T;
678 type Buffer = Option<Self::Item>;
679
680 fn poll_produce<'a>(
681 self: Pin<&mut Self>,
682 _: &mut Context<'_>,
683 _: StoreContextMut<'a, D>,
684 _: Destination<'a, Self::Item, Self::Buffer>,
685 _: bool,
686 ) -> Poll<Result<StreamResult>> {
687 Poll::Ready(Ok(StreamResult::Dropped))
688 }
689}
690
691impl<T, D> StreamProducer<D> for Vec<T>
692where
693 T: Unpin + Send + Sync + 'static,
694{
695 type Item = T;
696 type Buffer = VecBuffer<T>;
697
698 fn poll_produce<'a>(
699 self: Pin<&mut Self>,
700 _: &mut Context<'_>,
701 _: StoreContextMut<'a, D>,
702 mut dst: Destination<'a, Self::Item, Self::Buffer>,
703 _: bool,
704 ) -> Poll<Result<StreamResult>> {
705 dst.set_buffer(mem::take(self.get_mut()).into());
706 Poll::Ready(Ok(StreamResult::Dropped))
707 }
708}
709
710impl<T, D> StreamProducer<D> for Box<[T]>
711where
712 T: Unpin + Send + Sync + 'static,
713{
714 type Item = T;
715 type Buffer = VecBuffer<T>;
716
717 fn poll_produce<'a>(
718 self: Pin<&mut Self>,
719 _: &mut Context<'_>,
720 _: StoreContextMut<'a, D>,
721 mut dst: Destination<'a, Self::Item, Self::Buffer>,
722 _: bool,
723 ) -> Poll<Result<StreamResult>> {
724 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
725 Poll::Ready(Ok(StreamResult::Dropped))
726 }
727}
728
729#[cfg(feature = "component-model-async-bytes")]
730impl<D> StreamProducer<D> for bytes::Bytes {
731 type Item = u8;
732 type Buffer = Cursor<Self>;
733
734 fn poll_produce<'a>(
735 mut self: Pin<&mut Self>,
736 _: &mut Context<'_>,
737 mut store: StoreContextMut<'a, D>,
738 mut dst: Destination<'a, Self::Item, Self::Buffer>,
739 _: bool,
740 ) -> Poll<Result<StreamResult>> {
741 let cap = dst.remaining(&mut store);
742 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
743 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
745 return Poll::Ready(Ok(StreamResult::Dropped));
746 };
747 let cap = cap.into();
748 dst.set_buffer(Cursor::new(self.split_off(cap)));
750 let mut dst = dst.as_direct(store, cap);
751 dst.remaining().copy_from_slice(&self);
752 dst.mark_written(cap);
753 Poll::Ready(Ok(StreamResult::Dropped))
754 }
755}
756
757#[cfg(feature = "component-model-async-bytes")]
758impl<D> StreamProducer<D> for bytes::BytesMut {
759 type Item = u8;
760 type Buffer = Cursor<Self>;
761
762 fn poll_produce<'a>(
763 mut self: Pin<&mut Self>,
764 _: &mut Context<'_>,
765 mut store: StoreContextMut<'a, D>,
766 mut dst: Destination<'a, Self::Item, Self::Buffer>,
767 _: bool,
768 ) -> Poll<Result<StreamResult>> {
769 let cap = dst.remaining(&mut store);
770 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
771 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
773 return Poll::Ready(Ok(StreamResult::Dropped));
774 };
775 let cap = cap.into();
776 dst.set_buffer(Cursor::new(self.split_off(cap)));
778 let mut dst = dst.as_direct(store, cap);
779 dst.remaining().copy_from_slice(&self);
780 dst.mark_written(cap);
781 Poll::Ready(Ok(StreamResult::Dropped))
782 }
783}
784
785pub struct Source<'a, T> {
787 id: TableId<TransmitState>,
788 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
789}
790
791impl<'a, T> Source<'a, T> {
792 pub fn reborrow(&mut self) -> Source<'_, T> {
794 Source {
795 id: self.id,
796 host_buffer: self.host_buffer.as_deref_mut(),
797 }
798 }
799
800 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
802 where
803 T: func::Lift + 'static,
804 B: ReadBuffer<T>,
805 {
806 if let Some(input) = &mut self.host_buffer {
807 let count = input.remaining().len().min(buffer.remaining_capacity());
808 buffer.move_from(*input, count);
809 } else {
810 let store = store.as_context_mut();
811 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
812
813 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
814 bail_bug!("expected ReadState::HostReady");
815 };
816
817 let &WriteState::GuestReady {
818 ty,
819 address,
820 count,
821 options,
822 instance,
823 ..
824 } = &transmit.write
825 else {
826 bail_bug!("expected WriteState::HostReady");
827 };
828
829 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
830 let ty = payload(ty, cx.types);
831 let old_remaining = buffer.remaining_capacity();
832 lift::<T, B>(
833 cx,
834 ty,
835 buffer,
836 address + (T::SIZE32 * guest_offset),
837 count - guest_offset,
838 )?;
839
840 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
841
842 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
843 bail_bug!("expected ReadState::HostReady");
844 };
845
846 *guest_offset += old_remaining - buffer.remaining_capacity();
847 }
848
849 Ok(())
850 }
851
852 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
855 where
856 T: 'static,
857 {
858 self.remaining_(store.as_context_mut().0).unwrap()
862 }
863
864 fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
865 where
866 T: 'static,
867 {
868 let transmit = store.concurrent_state_mut().get_mut(self.id)?;
869
870 if let &WriteState::GuestReady { count, .. } = &transmit.write {
871 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
872 bail_bug!("expected ReadState::HostReady")
873 };
874
875 Ok(count - guest_offset)
876 } else if let Some(host_buffer) = &self.host_buffer {
877 Ok(host_buffer.remaining().len())
878 } else {
879 bail_bug!("expected either WriteState::GuestReady or host buffer")
880 }
881 }
882}
883
884impl<'a> Source<'a, u8> {
885 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
887 DirectSource {
888 id: self.id,
889 host_buffer: self.host_buffer,
890 store,
891 }
892 }
893}
894
895pub struct DirectSource<'a, D: 'static> {
898 id: TableId<TransmitState>,
899 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
900 store: StoreContextMut<'a, D>,
901}
902
903impl<D: 'static> std::io::Read for DirectSource<'_, D> {
904 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
905 let rem = self.remaining();
906 let n = rem.len().min(buf.len());
907 buf[..n].copy_from_slice(&rem[..n]);
908 self.mark_read(n);
909 Ok(n)
910 }
911}
912
913impl<D: 'static> DirectSource<'_, D> {
914 pub fn remaining(&mut self) -> &[u8] {
916 self.remaining_().unwrap()
920 }
921
922 fn remaining_(&mut self) -> Result<&[u8]> {
923 if let Some(buffer) = self.host_buffer.as_deref_mut() {
924 return Ok(buffer.remaining());
925 }
926 let transmit = self
927 .store
928 .as_context_mut()
929 .0
930 .concurrent_state_mut()
931 .get_mut(self.id)?;
932
933 let &WriteState::GuestReady {
934 address,
935 count,
936 options,
937 instance,
938 ..
939 } = &transmit.write
940 else {
941 bail_bug!("expected WriteState::GuestReady")
942 };
943
944 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
945 bail_bug!("expected ReadState::HostReady")
946 };
947
948 let memory = instance
949 .options_memory(self.store.0, options)
950 .get((address + guest_offset)..)
951 .and_then(|b| b.get(..(count - guest_offset)));
952 match memory {
953 Some(memory) => Ok(memory),
954 None => bail_bug!("guest buffer unexpectedly out of bounds"),
955 }
956 }
957
958 pub fn mark_read(&mut self, count: usize) {
965 self.mark_read_(count).unwrap()
969 }
970
971 fn mark_read_(&mut self, count: usize) -> Result<()> {
972 if let Some(buffer) = self.host_buffer.as_deref_mut() {
973 buffer.skip(count);
974 return Ok(());
975 }
976
977 let transmit = self
978 .store
979 .as_context_mut()
980 .0
981 .concurrent_state_mut()
982 .get_mut(self.id)?;
983
984 let WriteState::GuestReady {
985 count: write_count, ..
986 } = &transmit.write
987 else {
988 bail_bug!("expected WriteState::GuestReady");
989 };
990
991 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
992 bail_bug!("expected ReadState::HostReady");
993 };
994
995 if *guest_offset + count > *write_count {
996 panic!("read count ({count}) must be less than or equal to write count ({write_count})")
998 } else {
999 *guest_offset += count;
1000 }
1001 Ok(())
1002 }
1003}
1004
1005pub trait StreamConsumer<D>: Send + 'static {
1007 type Item;
1009
1010 fn poll_consume(
1093 self: Pin<&mut Self>,
1094 cx: &mut Context<'_>,
1095 store: StoreContextMut<D>,
1096 source: Source<'_, Self::Item>,
1097 finish: bool,
1098 ) -> Poll<Result<StreamResult>>;
1099}
1100
1101pub trait FutureProducer<D>: Send + 'static {
1103 type Item;
1105
1106 fn poll_produce(
1116 self: Pin<&mut Self>,
1117 cx: &mut Context<'_>,
1118 store: StoreContextMut<D>,
1119 finish: bool,
1120 ) -> Poll<Result<Option<Self::Item>>>;
1121}
1122
1123impl<T, E, D, Fut> FutureProducer<D> for Fut
1124where
1125 E: Into<Error>,
1126 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1127{
1128 type Item = T;
1129
1130 fn poll_produce<'a>(
1131 self: Pin<&mut Self>,
1132 cx: &mut Context<'_>,
1133 _: StoreContextMut<'a, D>,
1134 finish: bool,
1135 ) -> Poll<Result<Option<T>>> {
1136 match self.poll(cx) {
1137 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1138 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1139 Poll::Pending if finish => Poll::Ready(Ok(None)),
1140 Poll::Pending => Poll::Pending,
1141 }
1142 }
1143}
1144
1145pub trait FutureConsumer<D>: Send + 'static {
1147 type Item;
1149
1150 fn poll_consume(
1162 self: Pin<&mut Self>,
1163 cx: &mut Context<'_>,
1164 store: StoreContextMut<D>,
1165 source: Source<'_, Self::Item>,
1166 finish: bool,
1167 ) -> Poll<Result<()>>;
1168}
1169
1170pub struct FutureReader<T> {
1177 id: TableId<TransmitHandle>,
1178 _phantom: PhantomData<T>,
1179}
1180
1181impl<T> FutureReader<T> {
1182 pub fn new<S: AsContextMut>(
1191 mut store: S,
1192 producer: impl FutureProducer<S::Data, Item = T>,
1193 ) -> Result<Self>
1194 where
1195 T: func::Lower + func::Lift + Send + Sync + 'static,
1196 {
1197 ensure!(
1198 store.as_context().0.concurrency_support(),
1199 "concurrency support is not enabled"
1200 );
1201
1202 struct Producer<P>(P);
1203
1204 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1205 for Producer<P>
1206 {
1207 type Item = P::Item;
1208 type Buffer = Option<P::Item>;
1209
1210 fn poll_produce<'a>(
1211 self: Pin<&mut Self>,
1212 cx: &mut Context<'_>,
1213 store: StoreContextMut<D>,
1214 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1215 finish: bool,
1216 ) -> Poll<Result<StreamResult>> {
1217 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1220
1221 Poll::Ready(Ok(
1222 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1223 destination.set_buffer(Some(value));
1224
1225 StreamResult::Completed
1232 } else {
1233 StreamResult::Cancelled
1234 },
1235 ))
1236 }
1237 }
1238
1239 Ok(Self::new_(
1240 store
1241 .as_context_mut()
1242 .new_transmit(TransmitKind::Future, Producer(producer))?,
1243 ))
1244 }
1245
1246 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1247 Self {
1248 id,
1249 _phantom: PhantomData,
1250 }
1251 }
1252
1253 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1254 self.id
1255 }
1256
1257 pub fn pipe<S: AsContextMut>(
1267 self,
1268 mut store: S,
1269 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1270 ) -> Result<()>
1271 where
1272 T: func::Lift + 'static,
1273 {
1274 struct Consumer<C>(C);
1275
1276 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1277 for Consumer<C>
1278 {
1279 type Item = T;
1280
1281 fn poll_consume(
1282 self: Pin<&mut Self>,
1283 cx: &mut Context<'_>,
1284 mut store: StoreContextMut<D>,
1285 mut source: Source<Self::Item>,
1286 finish: bool,
1287 ) -> Poll<Result<StreamResult>> {
1288 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1291
1292 ready!(consumer.poll_consume(
1293 cx,
1294 store.as_context_mut(),
1295 source.reborrow(),
1296 finish
1297 ))?;
1298
1299 Poll::Ready(Ok(if source.remaining(store) == 0 {
1300 StreamResult::Completed
1306 } else {
1307 StreamResult::Cancelled
1308 }))
1309 }
1310 }
1311
1312 store
1313 .as_context_mut()
1314 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1315 }
1316
1317 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1319 let id = lift_index_to_future(cx, ty, index)?;
1320 Ok(Self::new_(id))
1321 }
1322
1323 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1341 future_close(store.as_context_mut().0, &mut self.id)
1342 }
1343
1344 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1346 accessor.as_accessor().with(|access| self.close(access))
1347 }
1348
1349 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1355 where
1356 A: AsAccessor,
1357 {
1358 GuardedFutureReader::new(accessor, self)
1359 }
1360
1361 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1368 where
1369 T: ComponentType + 'static,
1370 {
1371 FutureAny::try_from_future_reader(store, self)
1372 }
1373
1374 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1381 where
1382 T: ComponentType + 'static,
1383 {
1384 future.try_into_future_reader()
1385 }
1386}
1387
1388impl<T> fmt::Debug for FutureReader<T> {
1389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1390 f.debug_struct("FutureReader")
1391 .field("id", &self.id)
1392 .finish()
1393 }
1394}
1395
1396pub(super) fn future_close(
1397 store: &mut StoreOpaque,
1398 id: &mut TableId<TransmitHandle>,
1399) -> Result<()> {
1400 let id = mem::replace(id, TableId::new(u32::MAX));
1401 store.host_drop_reader(id, TransmitKind::Future)
1402}
1403
1404pub(super) fn lift_index_to_future(
1406 cx: &mut LiftContext<'_>,
1407 ty: InterfaceType,
1408 index: u32,
1409) -> Result<TableId<TransmitHandle>> {
1410 match ty {
1411 InterfaceType::Future(src) => {
1412 let handle_table = cx
1413 .instance_mut()
1414 .table_for_transmit(TransmitIndex::Future(src));
1415 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1416 if is_done {
1417 bail!("cannot lift future after being notified that the writable end dropped");
1418 }
1419 let id = TableId::<TransmitHandle>::new(rep);
1420 let concurrent_state = cx.concurrent_state_mut();
1421 let future = concurrent_state.get_mut(id)?;
1422 future.common.handle = None;
1423 let state = future.state;
1424
1425 if concurrent_state.get_mut(state)?.done {
1426 bail!("cannot lift future after previous read succeeded");
1427 }
1428
1429 Ok(id)
1430 }
1431 _ => func::bad_type_info(),
1432 }
1433}
1434
1435pub(super) fn lower_future_to_index<U>(
1437 id: TableId<TransmitHandle>,
1438 cx: &mut LowerContext<'_, U>,
1439 ty: InterfaceType,
1440) -> Result<u32> {
1441 match ty {
1442 InterfaceType::Future(dst) => {
1443 let concurrent_state = cx.store.0.concurrent_state_mut();
1444 let state = concurrent_state.get_mut(id)?.state;
1445 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1446
1447 let handle = cx
1448 .instance_mut()
1449 .table_for_transmit(TransmitIndex::Future(dst))
1450 .future_insert_read(dst, rep)?;
1451
1452 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1453
1454 Ok(handle)
1455 }
1456 _ => func::bad_type_info(),
1457 }
1458}
1459
1460unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1463 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1464
1465 type Lower = <u32 as func::ComponentType>::Lower;
1466
1467 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1468 match ty {
1469 InterfaceType::Future(ty) => {
1470 let ty = types.types[*ty].ty;
1471 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1472 }
1473 other => bail!("expected `future`, found `{}`", func::desc(other)),
1474 }
1475 }
1476}
1477
1478unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1480 fn linear_lower_to_flat<U>(
1481 &self,
1482 cx: &mut LowerContext<'_, U>,
1483 ty: InterfaceType,
1484 dst: &mut MaybeUninit<Self::Lower>,
1485 ) -> Result<()> {
1486 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1487 }
1488
1489 fn linear_lower_to_memory<U>(
1490 &self,
1491 cx: &mut LowerContext<'_, U>,
1492 ty: InterfaceType,
1493 offset: usize,
1494 ) -> Result<()> {
1495 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1496 cx,
1497 InterfaceType::U32,
1498 offset,
1499 )
1500 }
1501}
1502
1503unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1505 fn linear_lift_from_flat(
1506 cx: &mut LiftContext<'_>,
1507 ty: InterfaceType,
1508 src: &Self::Lower,
1509 ) -> Result<Self> {
1510 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1511 Self::lift_from_index(cx, ty, index)
1512 }
1513
1514 fn linear_lift_from_memory(
1515 cx: &mut LiftContext<'_>,
1516 ty: InterfaceType,
1517 bytes: &[u8],
1518 ) -> Result<Self> {
1519 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1520 Self::lift_from_index(cx, ty, index)
1521 }
1522}
1523
1524pub struct GuardedFutureReader<T, A>
1532where
1533 A: AsAccessor,
1534{
1535 reader: Option<FutureReader<T>>,
1539 accessor: A,
1540}
1541
1542impl<T, A> GuardedFutureReader<T, A>
1543where
1544 A: AsAccessor,
1545{
1546 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1554 assert!(
1555 accessor
1556 .as_accessor()
1557 .with(|a| a.as_context().0.concurrency_support())
1558 );
1559 Self {
1560 reader: Some(reader),
1561 accessor,
1562 }
1563 }
1564
1565 pub fn into_future(self) -> FutureReader<T> {
1568 self.into()
1569 }
1570}
1571
1572impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1573where
1574 A: AsAccessor,
1575{
1576 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1577 guard.reader.take().unwrap()
1578 }
1579}
1580
1581impl<T, A> Drop for GuardedFutureReader<T, A>
1582where
1583 A: AsAccessor,
1584{
1585 fn drop(&mut self) {
1586 if let Some(reader) = &mut self.reader {
1587 let result = reader.close_with(&self.accessor);
1590 debug_assert!(result.is_ok());
1591 }
1592 }
1593}
1594
1595pub struct StreamReader<T> {
1602 id: TableId<TransmitHandle>,
1603 _phantom: PhantomData<T>,
1604}
1605
1606impl<T> StreamReader<T> {
1607 pub fn new<S: AsContextMut>(
1616 mut store: S,
1617 producer: impl StreamProducer<S::Data, Item = T>,
1618 ) -> Result<Self>
1619 where
1620 T: func::Lower + func::Lift + Send + Sync + 'static,
1621 {
1622 ensure!(
1623 store.as_context().0.concurrency_support(),
1624 "concurrency support is not enabled",
1625 );
1626 Ok(Self::new_(
1627 store
1628 .as_context_mut()
1629 .new_transmit(TransmitKind::Stream, producer)?,
1630 ))
1631 }
1632
1633 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1634 Self {
1635 id,
1636 _phantom: PhantomData,
1637 }
1638 }
1639
1640 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1641 self.id
1642 }
1643
1644 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1666 let store = store.as_context_mut();
1667 let state = store.0.concurrent_state_mut();
1668 let id = state.get_mut(self.id).unwrap().state;
1669 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1670 match try_into(TypeId::of::<V>()) {
1671 Some(result) => {
1672 self.close(store).unwrap();
1673 Ok(*result.downcast::<V>().unwrap())
1674 }
1675 None => Err(self),
1676 }
1677 } else {
1678 Err(self)
1679 }
1680 }
1681
1682 pub fn pipe<S: AsContextMut>(
1692 self,
1693 mut store: S,
1694 consumer: impl StreamConsumer<S::Data, Item = T>,
1695 ) -> Result<()>
1696 where
1697 T: 'static,
1698 {
1699 store
1700 .as_context_mut()
1701 .set_consumer(self.id, TransmitKind::Stream, consumer)
1702 }
1703
1704 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1706 let id = lift_index_to_stream(cx, ty, index)?;
1707 Ok(Self::new_(id))
1708 }
1709
1710 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1726 stream_close(store.as_context_mut().0, &mut self.id)
1727 }
1728
1729 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1731 accessor.as_accessor().with(|access| self.close(access))
1732 }
1733
1734 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1740 where
1741 A: AsAccessor,
1742 {
1743 GuardedStreamReader::new(accessor, self)
1744 }
1745
1746 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1753 where
1754 T: ComponentType + 'static,
1755 {
1756 StreamAny::try_from_stream_reader(store, self)
1757 }
1758
1759 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1766 where
1767 T: ComponentType + 'static,
1768 {
1769 stream.try_into_stream_reader()
1770 }
1771}
1772
1773impl<T> fmt::Debug for StreamReader<T> {
1774 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1775 f.debug_struct("StreamReader")
1776 .field("id", &self.id)
1777 .finish()
1778 }
1779}
1780
1781pub(super) fn stream_close(
1782 store: &mut StoreOpaque,
1783 id: &mut TableId<TransmitHandle>,
1784) -> Result<()> {
1785 let id = mem::replace(id, TableId::new(u32::MAX));
1786 store.host_drop_reader(id, TransmitKind::Stream)
1787}
1788
1789pub(super) fn lift_index_to_stream(
1791 cx: &mut LiftContext<'_>,
1792 ty: InterfaceType,
1793 index: u32,
1794) -> Result<TableId<TransmitHandle>> {
1795 match ty {
1796 InterfaceType::Stream(src) => {
1797 let handle_table = cx
1798 .instance_mut()
1799 .table_for_transmit(TransmitIndex::Stream(src));
1800 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1801 if is_done {
1802 bail!("cannot lift stream after being notified that the writable end dropped");
1803 }
1804 let id = TableId::<TransmitHandle>::new(rep);
1805 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1806 Ok(id)
1807 }
1808 _ => func::bad_type_info(),
1809 }
1810}
1811
1812pub(super) fn lower_stream_to_index<U>(
1814 id: TableId<TransmitHandle>,
1815 cx: &mut LowerContext<'_, U>,
1816 ty: InterfaceType,
1817) -> Result<u32> {
1818 match ty {
1819 InterfaceType::Stream(dst) => {
1820 let concurrent_state = cx.store.0.concurrent_state_mut();
1821 let state = concurrent_state.get_mut(id)?.state;
1822 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1823
1824 let handle = cx
1825 .instance_mut()
1826 .table_for_transmit(TransmitIndex::Stream(dst))
1827 .stream_insert_read(dst, rep)?;
1828
1829 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1830
1831 Ok(handle)
1832 }
1833 _ => func::bad_type_info(),
1834 }
1835}
1836
1837unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1840 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1841
1842 type Lower = <u32 as func::ComponentType>::Lower;
1843
1844 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1845 match ty {
1846 InterfaceType::Stream(ty) => {
1847 let ty = types.types[*ty].ty;
1848 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1849 }
1850 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1851 }
1852 }
1853}
1854
1855unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1857 fn linear_lower_to_flat<U>(
1858 &self,
1859 cx: &mut LowerContext<'_, U>,
1860 ty: InterfaceType,
1861 dst: &mut MaybeUninit<Self::Lower>,
1862 ) -> Result<()> {
1863 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1864 }
1865
1866 fn linear_lower_to_memory<U>(
1867 &self,
1868 cx: &mut LowerContext<'_, U>,
1869 ty: InterfaceType,
1870 offset: usize,
1871 ) -> Result<()> {
1872 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1873 cx,
1874 InterfaceType::U32,
1875 offset,
1876 )
1877 }
1878}
1879
1880unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1882 fn linear_lift_from_flat(
1883 cx: &mut LiftContext<'_>,
1884 ty: InterfaceType,
1885 src: &Self::Lower,
1886 ) -> Result<Self> {
1887 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1888 Self::lift_from_index(cx, ty, index)
1889 }
1890
1891 fn linear_lift_from_memory(
1892 cx: &mut LiftContext<'_>,
1893 ty: InterfaceType,
1894 bytes: &[u8],
1895 ) -> Result<Self> {
1896 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1897 Self::lift_from_index(cx, ty, index)
1898 }
1899}
1900
1901pub struct GuardedStreamReader<T, A>
1909where
1910 A: AsAccessor,
1911{
1912 reader: Option<StreamReader<T>>,
1916 accessor: A,
1917}
1918
1919impl<T, A> GuardedStreamReader<T, A>
1920where
1921 A: AsAccessor,
1922{
1923 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1932 assert!(
1933 accessor
1934 .as_accessor()
1935 .with(|a| a.as_context().0.concurrency_support())
1936 );
1937 Self {
1938 reader: Some(reader),
1939 accessor,
1940 }
1941 }
1942
1943 pub fn into_stream(self) -> StreamReader<T> {
1946 self.into()
1947 }
1948}
1949
1950impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1951where
1952 A: AsAccessor,
1953{
1954 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1955 guard.reader.take().unwrap()
1956 }
1957}
1958
1959impl<T, A> Drop for GuardedStreamReader<T, A>
1960where
1961 A: AsAccessor,
1962{
1963 fn drop(&mut self) {
1964 if let Some(reader) = &mut self.reader {
1965 let result = reader.close_with(&self.accessor);
1968 debug_assert!(result.is_ok());
1969 }
1970 }
1971}
1972
1973pub struct ErrorContext {
1975 rep: u32,
1976}
1977
1978impl ErrorContext {
1979 pub(crate) fn new(rep: u32) -> Self {
1980 Self { rep }
1981 }
1982
1983 pub fn into_val(self) -> Val {
1985 Val::ErrorContext(ErrorContextAny(self.rep))
1986 }
1987
1988 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1990 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1991 bail!("expected `error-context`; got `{}`", value.desc());
1992 };
1993 Ok(Self::new(*rep))
1994 }
1995
1996 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1997 match ty {
1998 InterfaceType::ErrorContext(src) => {
1999 let rep = cx
2000 .instance_mut()
2001 .table_for_error_context(src)
2002 .error_context_rep(index)?;
2003
2004 Ok(Self { rep })
2005 }
2006 _ => func::bad_type_info(),
2007 }
2008 }
2009}
2010
2011pub(crate) fn lower_error_context_to_index<U>(
2012 rep: u32,
2013 cx: &mut LowerContext<'_, U>,
2014 ty: InterfaceType,
2015) -> Result<u32> {
2016 match ty {
2017 InterfaceType::ErrorContext(dst) => {
2018 let tbl = cx.instance_mut().table_for_error_context(dst);
2019 tbl.error_context_insert(rep)
2020 }
2021 _ => func::bad_type_info(),
2022 }
2023}
2024unsafe impl func::ComponentType for ErrorContext {
2027 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2028
2029 type Lower = <u32 as func::ComponentType>::Lower;
2030
2031 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2032 match ty {
2033 InterfaceType::ErrorContext(_) => Ok(()),
2034 other => bail!("expected `error`, found `{}`", func::desc(other)),
2035 }
2036 }
2037}
2038
2039unsafe impl func::Lower for ErrorContext {
2041 fn linear_lower_to_flat<T>(
2042 &self,
2043 cx: &mut LowerContext<'_, T>,
2044 ty: InterfaceType,
2045 dst: &mut MaybeUninit<Self::Lower>,
2046 ) -> Result<()> {
2047 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2048 cx,
2049 InterfaceType::U32,
2050 dst,
2051 )
2052 }
2053
2054 fn linear_lower_to_memory<T>(
2055 &self,
2056 cx: &mut LowerContext<'_, T>,
2057 ty: InterfaceType,
2058 offset: usize,
2059 ) -> Result<()> {
2060 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2061 cx,
2062 InterfaceType::U32,
2063 offset,
2064 )
2065 }
2066}
2067
2068unsafe impl func::Lift for ErrorContext {
2070 fn linear_lift_from_flat(
2071 cx: &mut LiftContext<'_>,
2072 ty: InterfaceType,
2073 src: &Self::Lower,
2074 ) -> Result<Self> {
2075 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2076 Self::lift_from_index(cx, ty, index)
2077 }
2078
2079 fn linear_lift_from_memory(
2080 cx: &mut LiftContext<'_>,
2081 ty: InterfaceType,
2082 bytes: &[u8],
2083 ) -> Result<Self> {
2084 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2085 Self::lift_from_index(cx, ty, index)
2086 }
2087}
2088
2089pub(super) struct TransmitHandle {
2091 pub(super) common: WaitableCommon,
2092 state: TableId<TransmitState>,
2094}
2095
2096impl TransmitHandle {
2097 fn new(state: TableId<TransmitState>) -> Self {
2098 Self {
2099 common: WaitableCommon::default(),
2100 state,
2101 }
2102 }
2103}
2104
2105impl TableDebug for TransmitHandle {
2106 fn type_name() -> &'static str {
2107 "TransmitHandle"
2108 }
2109}
2110
2111struct TransmitState {
2113 write_handle: TableId<TransmitHandle>,
2115 read_handle: TableId<TransmitHandle>,
2117 write: WriteState,
2119 read: ReadState,
2121 done: bool,
2123 pub(super) origin: TransmitOrigin,
2126}
2127
2128#[derive(Copy, Clone)]
2129pub(super) enum TransmitOrigin {
2130 Host,
2131 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2132 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2133}
2134
2135impl TransmitState {
2136 fn new(origin: TransmitOrigin) -> Self {
2137 Self {
2138 write_handle: TableId::new(u32::MAX),
2139 read_handle: TableId::new(u32::MAX),
2140 read: ReadState::Open,
2141 write: WriteState::Open,
2142 done: false,
2143 origin,
2144 }
2145 }
2146}
2147
2148impl TableDebug for TransmitState {
2149 fn type_name() -> &'static str {
2150 "TransmitState"
2151 }
2152}
2153
2154impl TransmitOrigin {
2155 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2156 match index {
2157 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2158 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2159 }
2160 }
2161}
2162
2163type PollStream = Box<
2164 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2165>;
2166
2167type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2168
2169enum WriteState {
2171 Open,
2173 GuestReady {
2175 instance: Instance,
2176 caller: RuntimeComponentInstanceIndex,
2177 ty: TransmitIndex,
2178 flat_abi: Option<FlatAbi>,
2179 options: OptionsIndex,
2180 address: usize,
2181 count: usize,
2182 handle: u32,
2183 },
2184 HostReady {
2186 produce: PollStream,
2187 try_into: TryInto,
2188 guest_offset: usize,
2189 cancel: bool,
2190 cancel_waker: Option<Waker>,
2191 },
2192 Dropped,
2194}
2195
2196impl fmt::Debug for WriteState {
2197 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2198 match self {
2199 Self::Open => f.debug_tuple("Open").finish(),
2200 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2201 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2202 Self::Dropped => f.debug_tuple("Dropped").finish(),
2203 }
2204 }
2205}
2206
2207enum ReadState {
2209 Open,
2211 GuestReady {
2213 ty: TransmitIndex,
2214 caller_instance: RuntimeComponentInstanceIndex,
2215 caller_thread: QualifiedThreadId,
2216 flat_abi: Option<FlatAbi>,
2217 instance: Instance,
2218 options: OptionsIndex,
2219 address: usize,
2220 count: usize,
2221 handle: u32,
2222 },
2223 HostReady {
2225 consume: PollStream,
2226 guest_offset: usize,
2227 cancel: bool,
2228 cancel_waker: Option<Waker>,
2229 },
2230 HostToHost {
2232 accept: Box<
2233 dyn for<'a> Fn(
2234 &'a mut UntypedWriteBuffer<'a>,
2235 )
2236 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2237 + Send
2238 + Sync,
2239 >,
2240 buffer: Vec<u8>,
2241 limit: usize,
2242 },
2243 Dropped,
2245}
2246
2247impl fmt::Debug for ReadState {
2248 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2249 match self {
2250 Self::Open => f.debug_tuple("Open").finish(),
2251 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2252 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2253 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2254 Self::Dropped => f.debug_tuple("Dropped").finish(),
2255 }
2256 }
2257}
2258
2259fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> Result<ReturnCode> {
2260 let count = guest_offset.try_into()?;
2261 Ok(match state {
2262 StreamResult::Dropped => ReturnCode::Dropped(count),
2263 StreamResult::Completed => ReturnCode::completed(kind, count),
2264 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2265 })
2266}
2267
2268impl StoreOpaque {
2269 fn pipe_from_guest(
2270 &mut self,
2271 kind: TransmitKind,
2272 id: TableId<TransmitState>,
2273 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2274 ) {
2275 let future = async move {
2276 let stream_state = future.await?;
2277 tls::get(|store| {
2278 let state = store.concurrent_state_mut();
2279 let transmit = state.get_mut(id)?;
2280 let ReadState::HostReady {
2281 consume,
2282 guest_offset,
2283 ..
2284 } = mem::replace(&mut transmit.read, ReadState::Open)
2285 else {
2286 bail_bug!("expected ReadState::HostReady")
2287 };
2288 let code = return_code(kind, stream_state, guest_offset)?;
2289 transmit.read = match stream_state {
2290 StreamResult::Dropped => ReadState::Dropped,
2291 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2292 consume,
2293 guest_offset: 0,
2294 cancel: false,
2295 cancel_waker: None,
2296 },
2297 };
2298 let WriteState::GuestReady { ty, handle, .. } =
2299 mem::replace(&mut transmit.write, WriteState::Open)
2300 else {
2301 bail_bug!("expected WriteState::HostReady")
2302 };
2303 state.send_write_result(ty, id, handle, code)?;
2304 Ok(())
2305 })
2306 };
2307
2308 self.concurrent_state_mut().push_future(future.boxed());
2309 }
2310
2311 fn pipe_to_guest(
2312 &mut self,
2313 kind: TransmitKind,
2314 id: TableId<TransmitState>,
2315 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2316 ) {
2317 let future = async move {
2318 let stream_state = future.await?;
2319 tls::get(|store| {
2320 let state = store.concurrent_state_mut();
2321 let transmit = state.get_mut(id)?;
2322 let WriteState::HostReady {
2323 produce,
2324 try_into,
2325 guest_offset,
2326 ..
2327 } = mem::replace(&mut transmit.write, WriteState::Open)
2328 else {
2329 bail_bug!("expected WriteState::HostReady")
2330 };
2331 let code = return_code(kind, stream_state, guest_offset)?;
2332 transmit.write = match stream_state {
2333 StreamResult::Dropped => WriteState::Dropped,
2334 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2335 produce,
2336 try_into,
2337 guest_offset: 0,
2338 cancel: false,
2339 cancel_waker: None,
2340 },
2341 };
2342 let ReadState::GuestReady { ty, handle, .. } =
2343 mem::replace(&mut transmit.read, ReadState::Open)
2344 else {
2345 bail_bug!("expected ReadState::HostReady")
2346 };
2347 state.send_read_result(ty, id, handle, code)?;
2348 Ok(())
2349 })
2350 };
2351
2352 self.concurrent_state_mut().push_future(future.boxed());
2353 }
2354
2355 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2357 let state = self.concurrent_state_mut();
2358 let transmit_id = state.get_mut(id)?.state;
2359 let transmit = state
2360 .get_mut(transmit_id)
2361 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2362 log::trace!(
2363 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2364 transmit.read,
2365 transmit.write
2366 );
2367
2368 transmit.read = ReadState::Dropped;
2369
2370 let new_state = if let WriteState::Dropped = &transmit.write {
2373 WriteState::Dropped
2374 } else {
2375 WriteState::Open
2376 };
2377
2378 let write_handle = transmit.write_handle;
2379
2380 match mem::replace(&mut transmit.write, new_state) {
2381 WriteState::GuestReady { ty, handle, .. } => {
2384 state.update_event(
2385 write_handle.rep(),
2386 match ty {
2387 TransmitIndex::Future(ty) => Event::FutureWrite {
2388 code: ReturnCode::Dropped(0),
2389 pending: Some((ty, handle)),
2390 },
2391 TransmitIndex::Stream(ty) => Event::StreamWrite {
2392 code: ReturnCode::Dropped(0),
2393 pending: Some((ty, handle)),
2394 },
2395 },
2396 )?;
2397 }
2398
2399 WriteState::HostReady { .. } => {}
2400
2401 WriteState::Open => {
2402 state.update_event(
2403 write_handle.rep(),
2404 match kind {
2405 TransmitKind::Future => Event::FutureWrite {
2406 code: ReturnCode::Dropped(0),
2407 pending: None,
2408 },
2409 TransmitKind::Stream => Event::StreamWrite {
2410 code: ReturnCode::Dropped(0),
2411 pending: None,
2412 },
2413 },
2414 )?;
2415 }
2416
2417 WriteState::Dropped => {
2418 log::trace!("host_drop_reader delete {transmit_id:?}");
2419 state.delete_transmit(transmit_id)?;
2420 }
2421 }
2422 Ok(())
2423 }
2424
2425 fn host_drop_writer(
2427 &mut self,
2428 id: TableId<TransmitHandle>,
2429 on_drop_open: Option<fn() -> Result<()>>,
2430 ) -> Result<()> {
2431 let state = self.concurrent_state_mut();
2432 let transmit_id = state.get_mut(id)?.state;
2433 let transmit = state
2434 .get_mut(transmit_id)
2435 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2436 log::trace!(
2437 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2438 transmit.read,
2439 transmit.write
2440 );
2441
2442 match &mut transmit.write {
2444 WriteState::GuestReady { .. } => {
2445 bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2446 }
2447 WriteState::HostReady { .. } => {}
2448 v @ WriteState::Open => {
2449 if let (Some(on_drop_open), false) = (
2450 on_drop_open,
2451 transmit.done || matches!(transmit.read, ReadState::Dropped),
2452 ) {
2453 on_drop_open()?;
2454 } else {
2455 *v = WriteState::Dropped;
2456 }
2457 }
2458 WriteState::Dropped => bail_bug!("write state is already dropped"),
2459 }
2460
2461 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2462
2463 let new_state = if let ReadState::Dropped = &transmit.read {
2469 ReadState::Dropped
2470 } else {
2471 ReadState::Open
2472 };
2473
2474 let read_handle = transmit.read_handle;
2475
2476 match mem::replace(&mut transmit.read, new_state) {
2478 ReadState::GuestReady { ty, handle, .. } => {
2482 self.concurrent_state_mut().update_event(
2484 read_handle.rep(),
2485 match ty {
2486 TransmitIndex::Future(ty) => Event::FutureRead {
2487 code: ReturnCode::Dropped(0),
2488 pending: Some((ty, handle)),
2489 },
2490 TransmitIndex::Stream(ty) => Event::StreamRead {
2491 code: ReturnCode::Dropped(0),
2492 pending: Some((ty, handle)),
2493 },
2494 },
2495 )?;
2496 }
2497
2498 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2499
2500 ReadState::Open => {
2502 self.concurrent_state_mut().update_event(
2503 read_handle.rep(),
2504 match on_drop_open {
2505 Some(_) => Event::FutureRead {
2506 code: ReturnCode::Dropped(0),
2507 pending: None,
2508 },
2509 None => Event::StreamRead {
2510 code: ReturnCode::Dropped(0),
2511 pending: None,
2512 },
2513 },
2514 )?;
2515 }
2516
2517 ReadState::Dropped => {
2520 log::trace!("host_drop_writer delete {transmit_id:?}");
2521 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2522 }
2523 }
2524 Ok(())
2525 }
2526
2527 pub(super) fn transmit_origin(
2528 &mut self,
2529 id: TableId<TransmitHandle>,
2530 ) -> Result<TransmitOrigin> {
2531 let state = self.concurrent_state_mut();
2532 let state_id = state.get_mut(id)?.state;
2533 Ok(state.get_mut(state_id)?.origin)
2534 }
2535}
2536
2537impl<T> StoreContextMut<'_, T> {
2538 fn new_transmit<P: StreamProducer<T>>(
2539 mut self,
2540 kind: TransmitKind,
2541 producer: P,
2542 ) -> Result<TableId<TransmitHandle>>
2543 where
2544 P::Item: func::Lower,
2545 {
2546 let token = StoreToken::new(self.as_context_mut());
2547 let state = self.0.concurrent_state_mut();
2548 let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2549 let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2550 let id = state.get_mut(read)?.state;
2551 let mut dropped = false;
2552 let produce = Box::new({
2553 let producer = producer.clone();
2554 move || {
2555 let producer = producer.clone();
2556 async move {
2557 let mut state = producer.take()?;
2558 let (mine, buffer) = &mut *state;
2559
2560 let (result, cancelled) = if buffer.remaining().is_empty() {
2561 future::poll_fn(|cx| {
2562 tls::get(|store| {
2563 let transmit = store.concurrent_state_mut().get_mut(id)?;
2564
2565 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2566 bail_bug!("expected WriteState::HostReady")
2567 };
2568
2569 let mut host_buffer =
2570 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2571 Some(Cursor::new(mem::take(buffer)))
2572 } else {
2573 None
2574 };
2575
2576 let poll = mine.as_mut().poll_produce(
2577 cx,
2578 token.as_context_mut(store),
2579 Destination {
2580 id,
2581 buffer,
2582 host_buffer: host_buffer.as_mut(),
2583 _phantom: PhantomData,
2584 },
2585 cancel,
2586 );
2587
2588 let transmit = store.concurrent_state_mut().get_mut(id)?;
2589
2590 let host_offset = if let (
2591 Some(host_buffer),
2592 ReadState::HostToHost { buffer, limit, .. },
2593 ) = (host_buffer, &mut transmit.read)
2594 {
2595 *limit = usize::try_from(host_buffer.position())?;
2596 *buffer = host_buffer.into_inner();
2597 *limit
2598 } else {
2599 0
2600 };
2601
2602 {
2603 let WriteState::HostReady {
2604 guest_offset,
2605 cancel,
2606 cancel_waker,
2607 ..
2608 } = &mut transmit.write
2609 else {
2610 bail_bug!("expected WriteState::HostReady")
2611 };
2612
2613 if poll.is_pending() {
2614 if !buffer.remaining().is_empty()
2615 || *guest_offset > 0
2616 || host_offset > 0
2617 {
2618 bail!(
2619 "StreamProducer::poll_produce returned Poll::Pending \
2620 after producing at least one item"
2621 )
2622 }
2623 *cancel_waker = Some(cx.waker().clone());
2624 } else {
2625 *cancel_waker = None;
2626 *cancel = false;
2627 }
2628 }
2629
2630 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2631 })?
2632 })
2633 .await?
2634 } else {
2635 (StreamResult::Completed, false)
2636 };
2637
2638 let (guest_offset, host_offset, count) = tls::get(|store| {
2639 let transmit = store.concurrent_state_mut().get_mut(id)?;
2640 let (count, host_offset) = match &transmit.read {
2641 &ReadState::GuestReady { count, .. } => (count, 0),
2642 &ReadState::HostToHost { limit, .. } => (1, limit),
2643 _ => bail_bug!("invalid read state"),
2644 };
2645 let guest_offset = match &transmit.write {
2646 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2647 _ => bail_bug!("invalid write state"),
2648 };
2649 Ok((guest_offset, host_offset, count))
2650 })?;
2651
2652 match result {
2653 StreamResult::Completed => {
2654 if count > 1
2655 && buffer.remaining().is_empty()
2656 && guest_offset == 0
2657 && host_offset == 0
2658 {
2659 bail!(
2660 "StreamProducer::poll_produce returned StreamResult::Completed \
2661 without producing any items"
2662 );
2663 }
2664 }
2665 StreamResult::Cancelled => {
2666 if !cancelled {
2667 bail!(
2668 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2669 without being given a `finish` parameter value of true"
2670 );
2671 }
2672 }
2673 StreamResult::Dropped => {
2674 dropped = true;
2675 }
2676 }
2677
2678 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2679
2680 drop(state);
2681
2682 if write_buffer {
2683 write(token, id, producer.clone(), kind).await?;
2684 }
2685
2686 Ok(if dropped {
2687 if producer.with(|p| p.1.remaining().is_empty())? {
2688 StreamResult::Dropped
2689 } else {
2690 StreamResult::Completed
2691 }
2692 } else {
2693 result
2694 })
2695 }
2696 .boxed()
2697 }
2698 });
2699 let try_into = Box::new(move |ty| {
2700 let (mine, buffer) = producer.try_lock().ok()?.take()?;
2701 match P::try_into(mine, ty) {
2702 Ok(value) => Some(value),
2703 Err(mine) => {
2704 *producer.try_lock().ok()? = Some((mine, buffer));
2705 None
2706 }
2707 }
2708 });
2709 state.get_mut(id)?.write = WriteState::HostReady {
2710 produce,
2711 try_into,
2712 guest_offset: 0,
2713 cancel: false,
2714 cancel_waker: None,
2715 };
2716 Ok(read)
2717 }
2718
2719 fn set_consumer<C: StreamConsumer<T>>(
2720 mut self,
2721 id: TableId<TransmitHandle>,
2722 kind: TransmitKind,
2723 consumer: C,
2724 ) -> Result<()> {
2725 let token = StoreToken::new(self.as_context_mut());
2726 let state = self.0.concurrent_state_mut();
2727 let id = state.get_mut(id)?.state;
2728 let transmit = state.get_mut(id)?;
2729 let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2730 let consume_with_buffer = {
2731 let consumer = consumer.clone();
2732 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2733 let mut mine = consumer.take()?;
2734
2735 let host_buffer_remaining_before =
2736 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2737
2738 let (result, cancelled) = future::poll_fn(|cx| {
2739 tls::get(|store| {
2740 let cancel = match &store.concurrent_state_mut().get_mut(id)?.read {
2741 &ReadState::HostReady { cancel, .. } => cancel,
2742 ReadState::Open => false,
2743 _ => bail_bug!("unexpected read state"),
2744 };
2745
2746 let poll = mine.as_mut().poll_consume(
2747 cx,
2748 token.as_context_mut(store),
2749 Source {
2750 id,
2751 host_buffer: host_buffer.as_deref_mut(),
2752 },
2753 cancel,
2754 );
2755
2756 if let ReadState::HostReady {
2757 cancel_waker,
2758 cancel,
2759 ..
2760 } = &mut store.concurrent_state_mut().get_mut(id)?.read
2761 {
2762 if poll.is_pending() {
2763 *cancel_waker = Some(cx.waker().clone());
2764 } else {
2765 *cancel_waker = None;
2766 *cancel = false;
2767 }
2768 }
2769
2770 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2771 })?
2772 })
2773 .await?;
2774
2775 let (guest_offset, count) = tls::get(|store| {
2776 let transmit = store.concurrent_state_mut().get_mut(id)?;
2777 Ok((
2778 match &transmit.read {
2779 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2780 ReadState::Open => 0,
2781 _ => bail_bug!("invalid read state"),
2782 },
2783 match &transmit.write {
2784 &WriteState::GuestReady { count, .. } => count,
2785 WriteState::HostReady { .. } => match host_buffer_remaining_before {
2786 Some(n) => n,
2787 None => bail_bug!("host_buffer_remaining_before should be set"),
2788 },
2789 _ => bail_bug!("invalid write state"),
2790 },
2791 ))
2792 })?;
2793
2794 match result {
2795 StreamResult::Completed => {
2796 if count > 0
2797 && guest_offset == 0
2798 && host_buffer_remaining_before
2799 .zip(host_buffer.map(|v| v.remaining().len()))
2800 .map(|(before, after)| before == after)
2801 .unwrap_or(false)
2802 {
2803 bail!(
2804 "StreamConsumer::poll_consume returned StreamResult::Completed \
2805 without consuming any items"
2806 );
2807 }
2808
2809 if let TransmitKind::Future = kind {
2810 tls::get(|store| {
2811 store.concurrent_state_mut().get_mut(id)?.done = true;
2812 crate::error::Ok(())
2813 })?;
2814 }
2815 }
2816 StreamResult::Cancelled => {
2817 if !cancelled {
2818 bail!(
2819 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2820 without being given a `finish` parameter value of true"
2821 );
2822 }
2823 }
2824 StreamResult::Dropped => {}
2825 }
2826
2827 Ok(result)
2828 }
2829 };
2830 let consume = {
2831 let consume = consume_with_buffer.clone();
2832 Box::new(move || {
2833 let consume = consume.clone();
2834 async move { consume(None).await }.boxed()
2835 })
2836 };
2837
2838 match &transmit.write {
2839 WriteState::Open => {
2840 transmit.read = ReadState::HostReady {
2841 consume,
2842 guest_offset: 0,
2843 cancel: false,
2844 cancel_waker: None,
2845 };
2846 }
2847 &WriteState::GuestReady { .. } => {
2848 let future = consume();
2849 transmit.read = ReadState::HostReady {
2850 consume,
2851 guest_offset: 0,
2852 cancel: false,
2853 cancel_waker: None,
2854 };
2855 self.0.pipe_from_guest(kind, id, future);
2856 }
2857 WriteState::HostReady { .. } => {
2858 let WriteState::HostReady { produce, .. } = mem::replace(
2859 &mut transmit.write,
2860 WriteState::HostReady {
2861 produce: Box::new(|| {
2862 Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2863 }),
2864 try_into: Box::new(|_| None),
2865 guest_offset: 0,
2866 cancel: false,
2867 cancel_waker: None,
2868 },
2869 ) else {
2870 bail_bug!("expected WriteState::HostReady")
2871 };
2872
2873 transmit.read = ReadState::HostToHost {
2874 accept: Box::new(move |input| {
2875 let consume = consume_with_buffer.clone();
2876 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2877 }),
2878 buffer: Vec::new(),
2879 limit: 0,
2880 };
2881
2882 let future = async move {
2883 loop {
2884 if tls::get(|store| {
2885 crate::error::Ok(matches!(
2886 store.concurrent_state_mut().get_mut(id)?.read,
2887 ReadState::Dropped
2888 ))
2889 })? {
2890 break Ok(());
2891 }
2892
2893 match produce().await? {
2894 StreamResult::Completed | StreamResult::Cancelled => {}
2895 StreamResult::Dropped => break Ok(()),
2896 }
2897
2898 if let TransmitKind::Future = kind {
2899 break Ok(());
2900 }
2901 }
2902 }
2903 .map(move |result| {
2904 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2905 result
2906 });
2907
2908 state.push_future(Box::pin(future));
2909 }
2910 WriteState::Dropped => {
2911 let reader = transmit.read_handle;
2912 self.0.host_drop_reader(reader, kind)?;
2913 }
2914 }
2915 Ok(())
2916 }
2917}
2918
2919async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2920 token: StoreToken<D>,
2921 id: TableId<TransmitState>,
2922 pair: Arc<LockedState<(P, B)>>,
2923 kind: TransmitKind,
2924) -> Result<()> {
2925 let (read, guest_offset) = tls::get(|store| {
2926 let transmit = store.concurrent_state_mut().get_mut(id)?;
2927
2928 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2929 Some(guest_offset)
2930 } else {
2931 None
2932 };
2933
2934 crate::error::Ok((
2935 mem::replace(&mut transmit.read, ReadState::Open),
2936 guest_offset,
2937 ))
2938 })?;
2939
2940 match read {
2941 ReadState::GuestReady {
2942 ty,
2943 flat_abi,
2944 options,
2945 address,
2946 count,
2947 handle,
2948 instance,
2949 caller_instance,
2950 caller_thread,
2951 } => {
2952 let guest_offset = match guest_offset {
2953 Some(i) => i,
2954 None => bail_bug!("guest_offset should be present if ready"),
2955 };
2956
2957 if let TransmitKind::Future = kind {
2958 tls::get(|store| {
2959 store.concurrent_state_mut().get_mut(id)?.done = true;
2960 crate::error::Ok(())
2961 })?;
2962 }
2963
2964 let old_remaining = pair.with(|p| p.1.remaining().len())?;
2965 let accept = {
2966 let pair = pair.clone();
2967 move |mut store: StoreContextMut<D>| {
2968 let mut state = pair.take()?;
2969 lower::<T, B, D>(
2970 store.as_context_mut(),
2971 instance,
2972 caller_thread,
2973 options,
2974 ty,
2975 address + (T::SIZE32 * guest_offset),
2976 count - guest_offset,
2977 &mut state.1,
2978 )?;
2979 crate::error::Ok(())
2980 }
2981 };
2982
2983 if guest_offset < count {
2984 if T::MAY_REQUIRE_REALLOC {
2985 let (tx, rx) = oneshot::channel();
2990 tls::get(move |store| {
2991 store
2992 .concurrent_state_mut()
2993 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2994 move |store| {
2995 _ = tx.send(accept(token.as_context_mut(store))?);
2996 Ok(())
2997 },
2998 ))))
2999 });
3000 rx.await?
3001 } else {
3002 tls::get(|store| accept(token.as_context_mut(store)))?
3007 };
3008 }
3009
3010 tls::get(|store| {
3011 let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3012
3013 let transmit = store.concurrent_state_mut().get_mut(id)?;
3014
3015 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3016 bail_bug!("expected WriteState::HostReady")
3017 };
3018
3019 *guest_offset += count;
3020
3021 transmit.read = ReadState::GuestReady {
3022 ty,
3023 flat_abi,
3024 options,
3025 address,
3026 count,
3027 handle,
3028 instance,
3029 caller_instance,
3030 caller_thread,
3031 };
3032
3033 crate::error::Ok(())
3034 })?;
3035
3036 Ok(())
3037 }
3038
3039 ReadState::HostToHost {
3040 accept,
3041 mut buffer,
3042 limit,
3043 } => {
3044 let mut state = StreamResult::Completed;
3045 let mut position = 0;
3046
3047 while !matches!(state, StreamResult::Dropped) && position < limit {
3048 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3049 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3050 (buffer, position, _) = slice_buffer.into_parts();
3051 }
3052
3053 {
3054 let mut pair = pair.take()?;
3055 let (_, buffer) = &mut *pair;
3056
3057 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3058 state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3059 }
3060 }
3061
3062 tls::get(|store| {
3063 store.concurrent_state_mut().get_mut(id)?.read = match state {
3064 StreamResult::Dropped => ReadState::Dropped,
3065 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3066 accept,
3067 buffer,
3068 limit: 0,
3069 },
3070 };
3071
3072 crate::error::Ok(())
3073 })?;
3074 Ok(())
3075 }
3076
3077 _ => bail_bug!("unexpected read state"),
3078 }
3079}
3080
3081impl Instance {
3082 fn consume(
3085 self,
3086 store: &mut dyn VMStore,
3087 kind: TransmitKind,
3088 transmit_id: TableId<TransmitState>,
3089 consume: PollStream,
3090 guest_offset: usize,
3091 cancel: bool,
3092 ) -> Result<ReturnCode> {
3093 let mut future = consume();
3094 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
3095 consume,
3096 guest_offset,
3097 cancel,
3098 cancel_waker: None,
3099 };
3100 let poll = tls::set(store, || {
3101 future
3102 .as_mut()
3103 .poll(&mut Context::from_waker(&Waker::noop()))
3104 });
3105
3106 Ok(match poll {
3107 Poll::Ready(state) => {
3108 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3109 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3110 bail_bug!("expected ReadState::HostReady")
3111 };
3112 let code = return_code(kind, state?, mem::replace(guest_offset, 0))?;
3113 transmit.write = WriteState::Open;
3114 code
3115 }
3116 Poll::Pending => {
3117 store.pipe_from_guest(kind, transmit_id, future);
3118 ReturnCode::Blocked
3119 }
3120 })
3121 }
3122
3123 fn produce(
3126 self,
3127 store: &mut dyn VMStore,
3128 kind: TransmitKind,
3129 transmit_id: TableId<TransmitState>,
3130 produce: PollStream,
3131 try_into: TryInto,
3132 guest_offset: usize,
3133 cancel: bool,
3134 ) -> Result<ReturnCode> {
3135 let mut future = produce();
3136 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3137 produce,
3138 try_into,
3139 guest_offset,
3140 cancel,
3141 cancel_waker: None,
3142 };
3143 let poll = tls::set(store, || {
3144 future
3145 .as_mut()
3146 .poll(&mut Context::from_waker(&Waker::noop()))
3147 });
3148
3149 Ok(match poll {
3150 Poll::Ready(state) => {
3151 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3152 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3153 bail_bug!("expected WriteState::HostReady")
3154 };
3155 let code = return_code(kind, state?, mem::replace(guest_offset, 0))?;
3156 transmit.read = ReadState::Open;
3157 code
3158 }
3159 Poll::Pending => {
3160 store.pipe_to_guest(kind, transmit_id, future);
3161 ReturnCode::Blocked
3162 }
3163 })
3164 }
3165
3166 pub(super) fn guest_drop_writable(
3168 self,
3169 store: &mut StoreOpaque,
3170 ty: TransmitIndex,
3171 writer: u32,
3172 ) -> Result<()> {
3173 let table = self.id().get_mut(store).table_for_transmit(ty);
3174 let transmit_rep = match ty {
3175 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3176 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3177 };
3178
3179 let id = TableId::<TransmitHandle>::new(transmit_rep);
3180 log::trace!("guest_drop_writable: drop writer {id:?}");
3181 match ty {
3182 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3183 TransmitIndex::Future(_) => store.host_drop_writer(
3184 id,
3185 Some(|| {
3186 Err(format_err!(
3187 "cannot drop future write end without first writing a value"
3188 ))
3189 }),
3190 ),
3191 }
3192 }
3193
3194 fn copy<T: 'static>(
3197 self,
3198 mut store: StoreContextMut<T>,
3199 flat_abi: Option<FlatAbi>,
3200 write_caller_instance: RuntimeComponentInstanceIndex,
3201 write_ty: TransmitIndex,
3202 write_options: OptionsIndex,
3203 write_address: usize,
3204 read_caller_instance: RuntimeComponentInstanceIndex,
3205 read_caller_thread: QualifiedThreadId,
3206 read_ty: TransmitIndex,
3207 read_options: OptionsIndex,
3208 read_address: usize,
3209 count: usize,
3210 rep: u32,
3211 ) -> Result<()> {
3212 let types = self.id().get(store.0).component().types();
3213 match (write_ty, read_ty) {
3214 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3215 if count != 1 {
3216 bail_bug!("futures can only send 1 item");
3217 }
3218
3219 let payload = types[types[write_ty].ty].payload;
3220
3221 if write_caller_instance == read_caller_instance
3222 && !allow_intra_component_read_write(payload)
3223 {
3224 bail!(
3225 "cannot read from and write to intra-component future with non-numeric payload"
3226 )
3227 }
3228
3229 let val = payload
3230 .map(|ty| {
3231 let lift =
3232 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3233
3234 let abi = lift.types.canonical_abi(&ty);
3235 if write_address % usize::try_from(abi.align32)? != 0 {
3237 bail!("write pointer not aligned");
3238 }
3239
3240 let bytes = lift
3241 .memory()
3242 .get(write_address..)
3243 .and_then(|b| b.get(..usize::try_from(abi.size32).ok()?))
3244 .ok_or_else(|| {
3245 crate::format_err!("write pointer out of bounds of memory")
3246 })?;
3247
3248 Val::load(lift, ty, bytes)
3249 })
3250 .transpose()?;
3251
3252 if let Some(val) = val {
3253 let old_thread = store.0.set_thread(read_caller_thread)?;
3257 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3258 let types = lower.types;
3259 let ty = match types[types[read_ty].ty].payload {
3260 Some(ty) => ty,
3261 None => bail_bug!("expected payload type to be present"),
3262 };
3263 let ptr = func::validate_inbounds_dynamic(
3264 types.canonical_abi(&ty),
3265 lower.as_slice_mut(),
3266 &ValRaw::u32(read_address.try_into()?),
3267 )?;
3268 val.store(lower, ty, ptr)?;
3269 store.0.set_thread(old_thread)?;
3270 }
3271 }
3272 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3273 if write_caller_instance == read_caller_instance
3274 && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3275 {
3276 bail!(
3277 "cannot read from and write to intra-component stream with non-numeric payload"
3278 )
3279 }
3280
3281 if let Some(flat_abi) = flat_abi {
3282 let length_in_bytes = usize::try_from(flat_abi.size)? * count;
3284 if length_in_bytes > 0 {
3285 if write_address % usize::try_from(flat_abi.align)? != 0 {
3286 bail!("write pointer not aligned");
3287 }
3288 if read_address % usize::try_from(flat_abi.align)? != 0 {
3289 bail!("read pointer not aligned");
3290 }
3291
3292 let store_opaque = store.0.store_opaque_mut();
3293
3294 {
3295 let src = self
3296 .options_memory(store_opaque, write_options)
3297 .get(write_address..)
3298 .and_then(|b| b.get(..length_in_bytes))
3299 .ok_or_else(|| {
3300 crate::format_err!("write pointer out of bounds of memory")
3301 })?
3302 .as_ptr();
3303 let dst = self
3304 .options_memory_mut(store_opaque, read_options)
3305 .get_mut(read_address..)
3306 .and_then(|b| b.get_mut(..length_in_bytes))
3307 .ok_or_else(|| {
3308 crate::format_err!("read pointer out of bounds of memory")
3309 })?
3310 .as_mut_ptr();
3311 unsafe {
3314 if write_caller_instance == read_caller_instance {
3315 src.copy_to(dst, length_in_bytes)
3319 } else {
3320 src.copy_to_nonoverlapping(dst, length_in_bytes)
3325 }
3326 }
3327 }
3328 }
3329 } else {
3330 let store_opaque = store.0.store_opaque_mut();
3331 let lift = &mut LiftContext::new(store_opaque, write_options, self);
3332 let ty = match lift.types[lift.types[write_ty].ty].payload {
3333 Some(ty) => ty,
3334 None => bail_bug!("expected payload type to be present"),
3335 };
3336 let abi = lift.types.canonical_abi(&ty);
3337 let size = usize::try_from(abi.size32)?;
3338 if write_address % usize::try_from(abi.align32)? != 0 {
3339 bail!("write pointer not aligned");
3340 }
3341 let bytes = lift
3342 .memory()
3343 .get(write_address..)
3344 .and_then(|b| b.get(..size * count))
3345 .ok_or_else(|| {
3346 crate::format_err!("write pointer out of bounds of memory")
3347 })?;
3348 lift.consume_fuel_array(count, size_of::<Val>())?;
3349
3350 let values = (0..count)
3351 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3352 .collect::<Result<Vec<_>>>()?;
3353
3354 let id = TableId::<TransmitHandle>::new(rep);
3355 log::trace!("copy values {values:?} for {id:?}");
3356
3357 let old_thread = store.0.set_thread(read_caller_thread)?;
3361 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3362 let ty = match lower.types[lower.types[read_ty].ty].payload {
3363 Some(ty) => ty,
3364 None => bail_bug!("expected payload type to be present"),
3365 };
3366 let abi = lower.types.canonical_abi(&ty);
3367 if read_address % usize::try_from(abi.align32)? != 0 {
3368 bail!("read pointer not aligned");
3369 }
3370 let size = usize::try_from(abi.size32)?;
3371 lower
3372 .as_slice_mut()
3373 .get_mut(read_address..)
3374 .and_then(|b| b.get_mut(..size * count))
3375 .ok_or_else(|| {
3376 crate::format_err!("read pointer out of bounds of memory")
3377 })?;
3378 let mut ptr = read_address;
3379 for value in values {
3380 value.store(lower, ty, ptr)?;
3381 ptr += size
3382 }
3383 store.0.set_thread(old_thread)?;
3384 }
3385 }
3386 _ => bail_bug!("mismatched transmit types in copy"),
3387 }
3388
3389 Ok(())
3390 }
3391
3392 fn check_bounds(
3393 self,
3394 store: &StoreOpaque,
3395 options: OptionsIndex,
3396 ty: TransmitIndex,
3397 address: usize,
3398 count: usize,
3399 ) -> Result<()> {
3400 let types = self.id().get(store).component().types();
3401 let size = usize::try_from(
3402 match ty {
3403 TransmitIndex::Future(ty) => types[types[ty].ty]
3404 .payload
3405 .map(|ty| types.canonical_abi(&ty).size32),
3406 TransmitIndex::Stream(ty) => types[types[ty].ty]
3407 .payload
3408 .map(|ty| types.canonical_abi(&ty).size32),
3409 }
3410 .unwrap_or(0),
3411 )?;
3412
3413 if count > 0 && size > 0 {
3414 self.options_memory(store, options)
3415 .get(address..)
3416 .and_then(|b| b.get(..(size * count)))
3417 .map(drop)
3418 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3419 } else {
3420 Ok(())
3421 }
3422 }
3423
3424 pub(super) fn guest_write<T: 'static>(
3426 self,
3427 mut store: StoreContextMut<T>,
3428 caller: RuntimeComponentInstanceIndex,
3429 ty: TransmitIndex,
3430 options: OptionsIndex,
3431 flat_abi: Option<FlatAbi>,
3432 handle: u32,
3433 address: u32,
3434 count: u32,
3435 ) -> Result<ReturnCode> {
3436 if !self.options(store.0, options).async_ {
3437 store.0.check_blocking()?;
3441 }
3442
3443 let address = usize::try_from(address)?;
3444 let count = usize::try_from(count)?;
3445 self.check_bounds(store.0, options, ty, address, count)?;
3446 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3447 let TransmitLocalState::Write { done } = *state else {
3448 bail!(
3449 "invalid handle {handle}; expected `Write`; got {:?}",
3450 *state
3451 );
3452 };
3453
3454 if done {
3455 bail!("cannot write to stream after being notified that the readable end dropped");
3456 }
3457
3458 *state = TransmitLocalState::Busy;
3459 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3460 let concurrent_state = store.0.concurrent_state_mut();
3461 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3462 let transmit = concurrent_state.get_mut(transmit_id)?;
3463 log::trace!(
3464 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3465 transmit.read
3466 );
3467
3468 if transmit.done {
3469 bail!("cannot write to future after previous write succeeded or readable end dropped");
3470 }
3471
3472 let new_state = if let ReadState::Dropped = &transmit.read {
3473 ReadState::Dropped
3474 } else {
3475 ReadState::Open
3476 };
3477
3478 let set_guest_ready = |me: &mut ConcurrentState| {
3479 let transmit = me.get_mut(transmit_id)?;
3480 if !matches!(&transmit.write, WriteState::Open) {
3481 bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3482 }
3483 transmit.write = WriteState::GuestReady {
3484 instance: self,
3485 caller,
3486 ty,
3487 flat_abi,
3488 options,
3489 address,
3490 count,
3491 handle,
3492 };
3493 Ok::<_, crate::Error>(())
3494 };
3495
3496 let mut result = match mem::replace(&mut transmit.read, new_state) {
3497 ReadState::GuestReady {
3498 ty: read_ty,
3499 flat_abi: read_flat_abi,
3500 options: read_options,
3501 address: read_address,
3502 count: read_count,
3503 handle: read_handle,
3504 instance: read_instance,
3505 caller_instance: read_caller_instance,
3506 caller_thread: read_caller_thread,
3507 } => {
3508 if flat_abi != read_flat_abi {
3509 bail_bug!("expected flat ABI calculations to be the same");
3510 }
3511
3512 if let TransmitIndex::Future(_) = ty {
3513 transmit.done = true;
3514 }
3515
3516 let write_complete = count == 0 || read_count > 0;
3538 let read_complete = count > 0;
3539 let read_buffer_remaining = count < read_count;
3540
3541 let read_handle_rep = transmit.read_handle.rep();
3542
3543 let count = count.min(read_count);
3544
3545 self.copy(
3546 store.as_context_mut(),
3547 flat_abi,
3548 caller,
3549 ty,
3550 options,
3551 address,
3552 read_caller_instance,
3553 read_caller_thread,
3554 read_ty,
3555 read_options,
3556 read_address,
3557 count,
3558 rep,
3559 )?;
3560
3561 let instance = self.id().get_mut(store.0);
3562 let types = instance.component().types();
3563 let item_size = match payload(ty, types) {
3564 Some(ty) => usize::try_from(types.canonical_abi(&ty).size32)?,
3565 None => 0,
3566 };
3567 let concurrent_state = store.0.concurrent_state_mut();
3568 if read_complete {
3569 let count = u32::try_from(count)?;
3570 let total = if let Some(Event::StreamRead {
3571 code: ReturnCode::Completed(old_total),
3572 ..
3573 }) = concurrent_state.take_event(read_handle_rep)?
3574 {
3575 count + old_total
3576 } else {
3577 count
3578 };
3579
3580 let code = ReturnCode::completed(ty.kind(), total);
3581
3582 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3583 }
3584
3585 if read_buffer_remaining {
3586 let transmit = concurrent_state.get_mut(transmit_id)?;
3587 transmit.read = ReadState::GuestReady {
3588 ty: read_ty,
3589 flat_abi: read_flat_abi,
3590 options: read_options,
3591 address: read_address + (count * item_size),
3592 count: read_count - count,
3593 handle: read_handle,
3594 instance: read_instance,
3595 caller_instance: read_caller_instance,
3596 caller_thread: read_caller_thread,
3597 };
3598 }
3599
3600 if write_complete {
3601 ReturnCode::completed(ty.kind(), count.try_into()?)
3602 } else {
3603 set_guest_ready(concurrent_state)?;
3604 ReturnCode::Blocked
3605 }
3606 }
3607
3608 ReadState::HostReady {
3609 consume,
3610 guest_offset,
3611 cancel,
3612 cancel_waker,
3613 } => {
3614 if cancel_waker.is_some() {
3615 bail_bug!("expected cancel_waker to be none");
3616 }
3617 if cancel {
3618 bail_bug!("expected cancel to be false");
3619 }
3620 if guest_offset != 0 {
3621 bail_bug!("expected guest_offset to be 0");
3622 }
3623
3624 if let TransmitIndex::Future(_) = ty {
3625 transmit.done = true;
3626 }
3627
3628 set_guest_ready(concurrent_state)?;
3629 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3630 }
3631
3632 ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3633
3634 ReadState::Open => {
3635 set_guest_ready(concurrent_state)?;
3636 ReturnCode::Blocked
3637 }
3638
3639 ReadState::Dropped => {
3640 if let TransmitIndex::Future(_) = ty {
3641 transmit.done = true;
3642 }
3643
3644 ReturnCode::Dropped(0)
3645 }
3646 };
3647
3648 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3649 result = self.wait_for_write(store.0, transmit_handle)?;
3650 }
3651
3652 if result != ReturnCode::Blocked {
3653 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3654 TransmitLocalState::Write {
3655 done: matches!(
3656 (result, ty),
3657 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3658 ),
3659 };
3660 }
3661
3662 log::trace!(
3663 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3664 );
3665
3666 Ok(result)
3667 }
3668
3669 pub(super) fn guest_read<T: 'static>(
3671 self,
3672 mut store: StoreContextMut<T>,
3673 caller_instance: RuntimeComponentInstanceIndex,
3674 ty: TransmitIndex,
3675 options: OptionsIndex,
3676 flat_abi: Option<FlatAbi>,
3677 handle: u32,
3678 address: u32,
3679 count: u32,
3680 ) -> Result<ReturnCode> {
3681 if !self.options(store.0, options).async_ {
3682 store.0.check_blocking()?;
3686 }
3687
3688 let address = usize::try_from(address)?;
3689 let count = usize::try_from(count)?;
3690 self.check_bounds(store.0, options, ty, address, count)?;
3691 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3692 let TransmitLocalState::Read { done } = *state else {
3693 bail_bug!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3694 };
3695
3696 if done {
3697 bail!("cannot read from stream after being notified that the writable end dropped");
3698 }
3699
3700 *state = TransmitLocalState::Busy;
3701 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3702 let concurrent_state = store.0.concurrent_state_mut();
3703 let caller_thread = concurrent_state.current_guest_thread()?;
3704 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3705 let transmit = concurrent_state.get_mut(transmit_id)?;
3706 log::trace!(
3707 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3708 transmit.write
3709 );
3710
3711 if transmit.done {
3712 bail!("cannot read from future after previous read succeeded");
3713 }
3714
3715 let new_state = if let WriteState::Dropped = &transmit.write {
3716 WriteState::Dropped
3717 } else {
3718 WriteState::Open
3719 };
3720
3721 let set_guest_ready = |me: &mut ConcurrentState| {
3722 let transmit = me.get_mut(transmit_id)?;
3723 if !matches!(&transmit.read, ReadState::Open) {
3724 bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3725 }
3726 transmit.read = ReadState::GuestReady {
3727 ty,
3728 flat_abi,
3729 options,
3730 address,
3731 count,
3732 handle,
3733 instance: self,
3734 caller_instance,
3735 caller_thread,
3736 };
3737 Ok::<_, crate::Error>(())
3738 };
3739
3740 let mut result = match mem::replace(&mut transmit.write, new_state) {
3741 WriteState::GuestReady {
3742 instance: _,
3743 ty: write_ty,
3744 flat_abi: write_flat_abi,
3745 options: write_options,
3746 address: write_address,
3747 count: write_count,
3748 handle: write_handle,
3749 caller: write_caller,
3750 } => {
3751 if flat_abi != write_flat_abi {
3752 bail_bug!("expected flat ABI calculations to be the same");
3753 }
3754
3755 if let TransmitIndex::Future(_) = ty {
3756 transmit.done = true;
3757 }
3758
3759 let write_handle_rep = transmit.write_handle.rep();
3760
3761 let write_complete = write_count == 0 || count > 0;
3766 let read_complete = write_count > 0;
3767 let write_buffer_remaining = count < write_count;
3768
3769 let count = count.min(write_count);
3770
3771 self.copy(
3772 store.as_context_mut(),
3773 flat_abi,
3774 write_caller,
3775 write_ty,
3776 write_options,
3777 write_address,
3778 caller_instance,
3779 caller_thread,
3780 ty,
3781 options,
3782 address,
3783 count,
3784 rep,
3785 )?;
3786
3787 let instance = self.id().get_mut(store.0);
3788 let types = instance.component().types();
3789 let item_size = match payload(ty, types) {
3790 Some(ty) => usize::try_from(types.canonical_abi(&ty).size32)?,
3791 None => 0,
3792 };
3793 let concurrent_state = store.0.concurrent_state_mut();
3794
3795 if write_complete {
3796 let count = u32::try_from(count)?;
3797 let total = if let Some(Event::StreamWrite {
3798 code: ReturnCode::Completed(old_total),
3799 ..
3800 }) = concurrent_state.take_event(write_handle_rep)?
3801 {
3802 count + old_total
3803 } else {
3804 count
3805 };
3806
3807 let code = ReturnCode::completed(ty.kind(), total);
3808
3809 concurrent_state.send_write_result(
3810 write_ty,
3811 transmit_id,
3812 write_handle,
3813 code,
3814 )?;
3815 }
3816
3817 if write_buffer_remaining {
3818 let transmit = concurrent_state.get_mut(transmit_id)?;
3819 transmit.write = WriteState::GuestReady {
3820 instance: self,
3821 caller: write_caller,
3822 ty: write_ty,
3823 flat_abi: write_flat_abi,
3824 options: write_options,
3825 address: write_address + (count * item_size),
3826 count: write_count - count,
3827 handle: write_handle,
3828 };
3829 }
3830
3831 if read_complete {
3832 ReturnCode::completed(ty.kind(), count.try_into()?)
3833 } else {
3834 set_guest_ready(concurrent_state)?;
3835 ReturnCode::Blocked
3836 }
3837 }
3838
3839 WriteState::HostReady {
3840 produce,
3841 try_into,
3842 guest_offset,
3843 cancel,
3844 cancel_waker,
3845 } => {
3846 if cancel_waker.is_some() {
3847 bail_bug!("expected cancel_waker to be none");
3848 }
3849 if cancel {
3850 bail_bug!("expected cancel to be false");
3851 }
3852 if guest_offset != 0 {
3853 bail_bug!("expected guest_offset to be 0");
3854 }
3855
3856 set_guest_ready(concurrent_state)?;
3857
3858 let code =
3859 self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3860
3861 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3862 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3863 }
3864
3865 code
3866 }
3867
3868 WriteState::Open => {
3869 set_guest_ready(concurrent_state)?;
3870 ReturnCode::Blocked
3871 }
3872
3873 WriteState::Dropped => ReturnCode::Dropped(0),
3874 };
3875
3876 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3877 result = self.wait_for_read(store.0, transmit_handle)?;
3878 }
3879
3880 if result != ReturnCode::Blocked {
3881 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3882 TransmitLocalState::Read {
3883 done: matches!(
3884 (result, ty),
3885 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3886 ),
3887 };
3888 }
3889
3890 log::trace!(
3891 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3892 );
3893
3894 Ok(result)
3895 }
3896
3897 fn wait_for_write(
3898 self,
3899 store: &mut StoreOpaque,
3900 handle: TableId<TransmitHandle>,
3901 ) -> Result<ReturnCode> {
3902 let waitable = Waitable::Transmit(handle);
3903 store.wait_for_event(waitable)?;
3904 let event = waitable.take_event(store.concurrent_state_mut())?;
3905 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3906 event
3907 {
3908 waitable.on_delivery(store, self, event)?;
3909 Ok(code)
3910 } else {
3911 bail_bug!("expected either a stream or future write event")
3912 }
3913 }
3914
3915 fn cancel_write(
3917 self,
3918 store: &mut StoreOpaque,
3919 transmit_id: TableId<TransmitState>,
3920 async_: bool,
3921 ) -> Result<ReturnCode> {
3922 let state = store.concurrent_state_mut();
3923 let transmit = state.get_mut(transmit_id)?;
3924 log::trace!(
3925 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3926 transmit.read,
3927 transmit.write
3928 );
3929
3930 let code = if let Some(event) =
3931 Waitable::Transmit(transmit.write_handle).take_event(state)?
3932 {
3933 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3934 bail_bug!("expected either a stream or future write event")
3935 };
3936 match (code, event) {
3937 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3938 ReturnCode::Cancelled(count)
3939 }
3940 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3941 _ => bail_bug!("unexpected code/event combo"),
3942 }
3943 } else if let ReadState::HostReady {
3944 cancel,
3945 cancel_waker,
3946 ..
3947 } = &mut state.get_mut(transmit_id)?.read
3948 {
3949 *cancel = true;
3950 if let Some(waker) = cancel_waker.take() {
3951 waker.wake();
3952 }
3953
3954 if async_ {
3955 ReturnCode::Blocked
3956 } else {
3957 let handle = store
3958 .concurrent_state_mut()
3959 .get_mut(transmit_id)?
3960 .write_handle;
3961 self.wait_for_write(store, handle)?
3962 }
3963 } else {
3964 ReturnCode::Cancelled(0)
3965 };
3966
3967 if !matches!(code, ReturnCode::Blocked) {
3968 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3969
3970 match &transmit.write {
3971 WriteState::GuestReady { .. } => {
3972 transmit.write = WriteState::Open;
3973 }
3974 WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
3975 WriteState::Open | WriteState::Dropped => {}
3976 }
3977 }
3978
3979 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3980
3981 Ok(code)
3982 }
3983
3984 fn wait_for_read(
3985 self,
3986 store: &mut StoreOpaque,
3987 handle: TableId<TransmitHandle>,
3988 ) -> Result<ReturnCode> {
3989 let waitable = Waitable::Transmit(handle);
3990 store.wait_for_event(waitable)?;
3991 let event = waitable.take_event(store.concurrent_state_mut())?;
3992 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3993 event
3994 {
3995 waitable.on_delivery(store, self, event)?;
3996 Ok(code)
3997 } else {
3998 bail_bug!("expected either a stream or future read event")
3999 }
4000 }
4001
4002 fn cancel_read(
4004 self,
4005 store: &mut StoreOpaque,
4006 transmit_id: TableId<TransmitState>,
4007 async_: bool,
4008 ) -> Result<ReturnCode> {
4009 let state = store.concurrent_state_mut();
4010 let transmit = state.get_mut(transmit_id)?;
4011 log::trace!(
4012 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4013 transmit.read,
4014 transmit.write
4015 );
4016
4017 let code = if let Some(event) =
4018 Waitable::Transmit(transmit.read_handle).take_event(state)?
4019 {
4020 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4021 bail_bug!("expected either a stream or future read event")
4022 };
4023 match (code, event) {
4024 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4025 ReturnCode::Cancelled(count)
4026 }
4027 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4028 _ => bail_bug!("unexpected code/event combo"),
4029 }
4030 } else if let WriteState::HostReady {
4031 cancel,
4032 cancel_waker,
4033 ..
4034 } = &mut state.get_mut(transmit_id)?.write
4035 {
4036 *cancel = true;
4037 if let Some(waker) = cancel_waker.take() {
4038 waker.wake();
4039 }
4040
4041 if async_ {
4042 ReturnCode::Blocked
4043 } else {
4044 let handle = store
4045 .concurrent_state_mut()
4046 .get_mut(transmit_id)?
4047 .read_handle;
4048 self.wait_for_read(store, handle)?
4049 }
4050 } else {
4051 ReturnCode::Cancelled(0)
4052 };
4053
4054 if !matches!(code, ReturnCode::Blocked) {
4055 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4056
4057 match &transmit.read {
4058 ReadState::GuestReady { .. } => {
4059 transmit.read = ReadState::Open;
4060 }
4061 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4062 bail_bug!("support host read cancellation")
4063 }
4064 ReadState::Open | ReadState::Dropped => {}
4065 }
4066 }
4067
4068 log::trace!("cancelled read {transmit_id:?}: {code:?}");
4069
4070 Ok(code)
4071 }
4072
4073 fn guest_cancel_write(
4075 self,
4076 store: &mut StoreOpaque,
4077 ty: TransmitIndex,
4078 async_: bool,
4079 writer: u32,
4080 ) -> Result<ReturnCode> {
4081 if !async_ {
4082 store.check_blocking()?;
4086 }
4087
4088 let (rep, state) =
4089 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4090 let id = TableId::<TransmitHandle>::new(rep);
4091 log::trace!("guest cancel write {id:?} (handle {writer})");
4092 match state {
4093 TransmitLocalState::Write { .. } => {
4094 bail!("stream or future write cancelled when no write is pending")
4095 }
4096 TransmitLocalState::Read { .. } => {
4097 bail!("passed read end to `{{stream|future}}.cancel-write`")
4098 }
4099 TransmitLocalState::Busy => {}
4100 }
4101 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4102 let code = self.cancel_write(store, transmit_id, async_)?;
4103 if !matches!(code, ReturnCode::Blocked) {
4104 let state =
4105 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4106 .1;
4107 if let TransmitLocalState::Busy = state {
4108 *state = TransmitLocalState::Write { done: false };
4109 }
4110 }
4111 Ok(code)
4112 }
4113
4114 fn guest_cancel_read(
4116 self,
4117 store: &mut StoreOpaque,
4118 ty: TransmitIndex,
4119 async_: bool,
4120 reader: u32,
4121 ) -> Result<ReturnCode> {
4122 if !async_ {
4123 store.check_blocking()?;
4127 }
4128
4129 let (rep, state) =
4130 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4131 let id = TableId::<TransmitHandle>::new(rep);
4132 log::trace!("guest cancel read {id:?} (handle {reader})");
4133 match state {
4134 TransmitLocalState::Read { .. } => {
4135 bail!("stream or future read cancelled when no read is pending")
4136 }
4137 TransmitLocalState::Write { .. } => {
4138 bail!("passed write end to `{{stream|future}}.cancel-read`")
4139 }
4140 TransmitLocalState::Busy => {}
4141 }
4142 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4143 let code = self.cancel_read(store, transmit_id, async_)?;
4144 if !matches!(code, ReturnCode::Blocked) {
4145 let state =
4146 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4147 .1;
4148 if let TransmitLocalState::Busy = state {
4149 *state = TransmitLocalState::Read { done: false };
4150 }
4151 }
4152 Ok(code)
4153 }
4154
4155 fn guest_drop_readable(
4157 self,
4158 store: &mut StoreOpaque,
4159 ty: TransmitIndex,
4160 reader: u32,
4161 ) -> Result<()> {
4162 let table = self.id().get_mut(store).table_for_transmit(ty);
4163 let (rep, _is_done) = match ty {
4164 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4165 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4166 };
4167 let kind = match ty {
4168 TransmitIndex::Stream(_) => TransmitKind::Stream,
4169 TransmitIndex::Future(_) => TransmitKind::Future,
4170 };
4171 let id = TableId::<TransmitHandle>::new(rep);
4172 log::trace!("guest_drop_readable: drop reader {id:?}");
4173 store.host_drop_reader(id, kind)
4174 }
4175
4176 pub(crate) fn error_context_new(
4178 self,
4179 store: &mut StoreOpaque,
4180 ty: TypeComponentLocalErrorContextTableIndex,
4181 options: OptionsIndex,
4182 debug_msg_address: u32,
4183 debug_msg_len: u32,
4184 ) -> Result<u32> {
4185 let lift_ctx = &mut LiftContext::new(store, options, self);
4186 let debug_msg = String::linear_lift_from_flat(
4187 lift_ctx,
4188 InterfaceType::String,
4189 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4190 )?;
4191
4192 let err_ctx = ErrorContextState { debug_msg };
4194 let state = store.concurrent_state_mut();
4195 let table_id = state.push(err_ctx)?;
4196 let global_ref_count_idx =
4197 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4198
4199 let _ = state
4201 .global_error_context_ref_counts
4202 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4203
4204 let local_idx = self
4211 .id()
4212 .get_mut(store)
4213 .table_for_error_context(ty)
4214 .error_context_insert(table_id.rep())?;
4215
4216 Ok(local_idx)
4217 }
4218
4219 pub(super) fn error_context_debug_message<T>(
4221 self,
4222 store: StoreContextMut<T>,
4223 ty: TypeComponentLocalErrorContextTableIndex,
4224 options: OptionsIndex,
4225 err_ctx_handle: u32,
4226 debug_msg_address: u32,
4227 ) -> Result<()> {
4228 let handle_table_id_rep = self
4230 .id()
4231 .get_mut(store.0)
4232 .table_for_error_context(ty)
4233 .error_context_rep(err_ctx_handle)?;
4234
4235 let state = store.0.concurrent_state_mut();
4236 let ErrorContextState { debug_msg } =
4238 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4239 let debug_msg = debug_msg.clone();
4240
4241 let lower_cx = &mut LowerContext::new(store, options, self);
4242 let debug_msg_address = usize::try_from(debug_msg_address)?;
4243 let offset = lower_cx
4245 .as_slice_mut()
4246 .get(debug_msg_address..)
4247 .and_then(|b| b.get(..debug_msg.bytes().len()))
4248 .map(|_| debug_msg_address)
4249 .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4250 debug_msg
4251 .as_str()
4252 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4253
4254 Ok(())
4255 }
4256
4257 pub(crate) fn future_cancel_read(
4259 self,
4260 store: &mut StoreOpaque,
4261 ty: TypeFutureTableIndex,
4262 async_: bool,
4263 reader: u32,
4264 ) -> Result<u32> {
4265 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4266 .map(|v| v.encode())
4267 }
4268
4269 pub(crate) fn future_cancel_write(
4271 self,
4272 store: &mut StoreOpaque,
4273 ty: TypeFutureTableIndex,
4274 async_: bool,
4275 writer: u32,
4276 ) -> Result<u32> {
4277 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4278 .map(|v| v.encode())
4279 }
4280
4281 pub(crate) fn stream_cancel_read(
4283 self,
4284 store: &mut StoreOpaque,
4285 ty: TypeStreamTableIndex,
4286 async_: bool,
4287 reader: u32,
4288 ) -> Result<u32> {
4289 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4290 .map(|v| v.encode())
4291 }
4292
4293 pub(crate) fn stream_cancel_write(
4295 self,
4296 store: &mut StoreOpaque,
4297 ty: TypeStreamTableIndex,
4298 async_: bool,
4299 writer: u32,
4300 ) -> Result<u32> {
4301 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4302 .map(|v| v.encode())
4303 }
4304
4305 pub(crate) fn future_drop_readable(
4307 self,
4308 store: &mut StoreOpaque,
4309 ty: TypeFutureTableIndex,
4310 reader: u32,
4311 ) -> Result<()> {
4312 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4313 }
4314
4315 pub(crate) fn stream_drop_readable(
4317 self,
4318 store: &mut StoreOpaque,
4319 ty: TypeStreamTableIndex,
4320 reader: u32,
4321 ) -> Result<()> {
4322 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4323 }
4324
4325 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4329 let (write, read) = store
4330 .concurrent_state_mut()
4331 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4332
4333 let table = self.id().get_mut(store).table_for_transmit(ty);
4334 let (read_handle, write_handle) = match ty {
4335 TransmitIndex::Future(ty) => (
4336 table.future_insert_read(ty, read.rep())?,
4337 table.future_insert_write(ty, write.rep())?,
4338 ),
4339 TransmitIndex::Stream(ty) => (
4340 table.stream_insert_read(ty, read.rep())?,
4341 table.stream_insert_write(ty, write.rep())?,
4342 ),
4343 };
4344
4345 let state = store.concurrent_state_mut();
4346 state.get_mut(read)?.common.handle = Some(read_handle);
4347 state.get_mut(write)?.common.handle = Some(write_handle);
4348
4349 Ok(ResourcePair {
4350 write: write_handle,
4351 read: read_handle,
4352 })
4353 }
4354
4355 pub(crate) fn error_context_drop(
4357 self,
4358 store: &mut StoreOpaque,
4359 ty: TypeComponentLocalErrorContextTableIndex,
4360 error_context: u32,
4361 ) -> Result<()> {
4362 let instance = self.id().get_mut(store);
4363
4364 let local_handle_table = instance.table_for_error_context(ty);
4365
4366 let rep = local_handle_table.error_context_drop(error_context)?;
4367
4368 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4369
4370 let state = store.concurrent_state_mut();
4371 let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4372 .global_error_context_ref_counts
4373 .get_mut(&global_ref_count_idx)
4374 else {
4375 bail_bug!("retrieve concurrent state for error context during drop")
4376 };
4377
4378 if *global_ref_count < 1 {
4380 bail_bug!("ref count unexpectedly zero");
4381 }
4382 *global_ref_count -= 1;
4383 if *global_ref_count == 0 {
4384 state
4385 .global_error_context_ref_counts
4386 .remove(&global_ref_count_idx);
4387
4388 state
4389 .delete(TableId::<ErrorContextState>::new(rep))
4390 .context("deleting component-global error context data")?;
4391 }
4392
4393 Ok(())
4394 }
4395
4396 fn guest_transfer(
4399 self,
4400 store: &mut StoreOpaque,
4401 src_idx: u32,
4402 src: TransmitIndex,
4403 dst: TransmitIndex,
4404 ) -> Result<u32> {
4405 let mut instance = self.id().get_mut(store);
4406 let src_table = instance.as_mut().table_for_transmit(src);
4407 let (rep, is_done) = match src {
4408 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4409 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4410 };
4411 if is_done {
4412 bail!("cannot lift after being notified that the writable end dropped");
4413 }
4414 let dst_table = instance.table_for_transmit(dst);
4415 let handle = match dst {
4416 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4417 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4418 }?;
4419 store
4420 .concurrent_state_mut()
4421 .get_mut(TableId::<TransmitHandle>::new(rep))?
4422 .common
4423 .handle = Some(handle);
4424 Ok(handle)
4425 }
4426
4427 pub(crate) fn future_new(
4429 self,
4430 store: &mut StoreOpaque,
4431 ty: TypeFutureTableIndex,
4432 ) -> Result<ResourcePair> {
4433 self.guest_new(store, TransmitIndex::Future(ty))
4434 }
4435
4436 pub(crate) fn stream_new(
4438 self,
4439 store: &mut StoreOpaque,
4440 ty: TypeStreamTableIndex,
4441 ) -> Result<ResourcePair> {
4442 self.guest_new(store, TransmitIndex::Stream(ty))
4443 }
4444
4445 pub(crate) fn future_transfer(
4448 self,
4449 store: &mut StoreOpaque,
4450 src_idx: u32,
4451 src: TypeFutureTableIndex,
4452 dst: TypeFutureTableIndex,
4453 ) -> Result<u32> {
4454 self.guest_transfer(
4455 store,
4456 src_idx,
4457 TransmitIndex::Future(src),
4458 TransmitIndex::Future(dst),
4459 )
4460 }
4461
4462 pub(crate) fn stream_transfer(
4465 self,
4466 store: &mut StoreOpaque,
4467 src_idx: u32,
4468 src: TypeStreamTableIndex,
4469 dst: TypeStreamTableIndex,
4470 ) -> Result<u32> {
4471 self.guest_transfer(
4472 store,
4473 src_idx,
4474 TransmitIndex::Stream(src),
4475 TransmitIndex::Stream(dst),
4476 )
4477 }
4478
4479 pub(crate) fn error_context_transfer(
4481 self,
4482 store: &mut StoreOpaque,
4483 src_idx: u32,
4484 src: TypeComponentLocalErrorContextTableIndex,
4485 dst: TypeComponentLocalErrorContextTableIndex,
4486 ) -> Result<u32> {
4487 let mut instance = self.id().get_mut(store);
4488 let rep = instance
4489 .as_mut()
4490 .table_for_error_context(src)
4491 .error_context_rep(src_idx)?;
4492 let dst_idx = instance
4493 .table_for_error_context(dst)
4494 .error_context_insert(rep)?;
4495
4496 let global_ref_count = store
4500 .concurrent_state_mut()
4501 .global_error_context_ref_counts
4502 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4503 .context("global ref count present for existing (sub)component error context")?;
4504 global_ref_count.0 += 1;
4505
4506 Ok(dst_idx)
4507 }
4508}
4509
4510impl ComponentInstance {
4511 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4512 let (states, types) = self.instance_states();
4513 let runtime_instance = match ty {
4514 TransmitIndex::Stream(ty) => types[ty].instance,
4515 TransmitIndex::Future(ty) => types[ty].instance,
4516 };
4517 states[runtime_instance].handle_table()
4518 }
4519
4520 fn table_for_error_context(
4521 self: Pin<&mut Self>,
4522 ty: TypeComponentLocalErrorContextTableIndex,
4523 ) -> &mut HandleTable {
4524 let (states, types) = self.instance_states();
4525 let runtime_instance = types[ty].instance;
4526 states[runtime_instance].handle_table()
4527 }
4528
4529 fn get_mut_by_index(
4530 self: Pin<&mut Self>,
4531 ty: TransmitIndex,
4532 index: u32,
4533 ) -> Result<(u32, &mut TransmitLocalState)> {
4534 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4535 }
4536}
4537
4538impl ConcurrentState {
4539 fn send_write_result(
4540 &mut self,
4541 ty: TransmitIndex,
4542 id: TableId<TransmitState>,
4543 handle: u32,
4544 code: ReturnCode,
4545 ) -> Result<()> {
4546 let write_handle = self.get_mut(id)?.write_handle.rep();
4547 self.set_event(
4548 write_handle,
4549 match ty {
4550 TransmitIndex::Future(ty) => Event::FutureWrite {
4551 code,
4552 pending: Some((ty, handle)),
4553 },
4554 TransmitIndex::Stream(ty) => Event::StreamWrite {
4555 code,
4556 pending: Some((ty, handle)),
4557 },
4558 },
4559 )
4560 }
4561
4562 fn send_read_result(
4563 &mut self,
4564 ty: TransmitIndex,
4565 id: TableId<TransmitState>,
4566 handle: u32,
4567 code: ReturnCode,
4568 ) -> Result<()> {
4569 let read_handle = self.get_mut(id)?.read_handle.rep();
4570 self.set_event(
4571 read_handle,
4572 match ty {
4573 TransmitIndex::Future(ty) => Event::FutureRead {
4574 code,
4575 pending: Some((ty, handle)),
4576 },
4577 TransmitIndex::Stream(ty) => Event::StreamRead {
4578 code,
4579 pending: Some((ty, handle)),
4580 },
4581 },
4582 )
4583 }
4584
4585 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4586 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4587 }
4588
4589 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4590 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4591 }
4592
4593 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4604 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4605
4606 fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4607 let (ReturnCode::Completed(count)
4608 | ReturnCode::Dropped(count)
4609 | ReturnCode::Cancelled(count)) = old
4610 else {
4611 bail_bug!("unexpected old return code")
4612 };
4613
4614 Ok(match new {
4615 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4616 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4617 _ => bail_bug!("unexpected new return code"),
4618 })
4619 }
4620
4621 let event = match (waitable.take_event(self)?, event) {
4622 (None, _) => event,
4623 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4624 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4625 (
4626 Some(Event::StreamWrite {
4627 code: old_code,
4628 pending: old_pending,
4629 }),
4630 Event::StreamWrite { code, pending },
4631 ) => Event::StreamWrite {
4632 code: update_code(old_code, code)?,
4633 pending: old_pending.or(pending),
4634 },
4635 (
4636 Some(Event::StreamRead {
4637 code: old_code,
4638 pending: old_pending,
4639 }),
4640 Event::StreamRead { code, pending },
4641 ) => Event::StreamRead {
4642 code: update_code(old_code, code)?,
4643 pending: old_pending.or(pending),
4644 },
4645 _ => bail_bug!("unexpected event combination"),
4646 };
4647
4648 waitable.set_event(self, Some(event))
4649 }
4650
4651 fn new_transmit(
4654 &mut self,
4655 origin: TransmitOrigin,
4656 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4657 let state_id = self.push(TransmitState::new(origin))?;
4658
4659 let write = self.push(TransmitHandle::new(state_id))?;
4660 let read = self.push(TransmitHandle::new(state_id))?;
4661
4662 let state = self.get_mut(state_id)?;
4663 state.write_handle = write;
4664 state.read_handle = read;
4665
4666 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4667
4668 Ok((write, read))
4669 }
4670
4671 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4673 let state = self.delete(state_id)?;
4674 self.delete(state.write_handle)?;
4675 self.delete(state.read_handle)?;
4676
4677 log::trace!(
4678 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4679 state.write_handle,
4680 state.read_handle,
4681 );
4682
4683 Ok(())
4684 }
4685}
4686
4687pub(crate) struct ResourcePair {
4688 pub(crate) write: u32,
4689 pub(crate) read: u32,
4690}
4691
4692impl Waitable {
4693 pub(super) fn on_delivery(
4696 &self,
4697 store: &mut StoreOpaque,
4698 instance: Instance,
4699 event: Event,
4700 ) -> Result<()> {
4701 match event {
4702 Event::FutureRead {
4703 pending: Some((ty, handle)),
4704 ..
4705 }
4706 | Event::FutureWrite {
4707 pending: Some((ty, handle)),
4708 ..
4709 } => {
4710 let instance = instance.id().get_mut(store);
4711 let runtime_instance = instance.component().types()[ty].instance;
4712 let (rep, state) = instance.instance_states().0[runtime_instance]
4713 .handle_table()
4714 .future_rep(ty, handle)?;
4715 if rep != self.rep() {
4716 bail_bug!("unexpected rep mismatch");
4717 }
4718 if *state != TransmitLocalState::Busy {
4719 bail_bug!("expected state to be busy");
4720 }
4721 *state = match event {
4722 Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4723 Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4724 _ => bail_bug!("unexpected event for future"),
4725 };
4726 }
4727 Event::StreamRead {
4728 pending: Some((ty, handle)),
4729 code,
4730 }
4731 | Event::StreamWrite {
4732 pending: Some((ty, handle)),
4733 code,
4734 } => {
4735 let instance = instance.id().get_mut(store);
4736 let runtime_instance = instance.component().types()[ty].instance;
4737 let (rep, state) = instance.instance_states().0[runtime_instance]
4738 .handle_table()
4739 .stream_rep(ty, handle)?;
4740 if rep != self.rep() {
4741 bail_bug!("unexpected rep mismatch");
4742 }
4743 if *state != TransmitLocalState::Busy {
4744 bail_bug!("expected state to be busy");
4745 }
4746 let done = matches!(code, ReturnCode::Dropped(_));
4747 *state = match event {
4748 Event::StreamRead { .. } => TransmitLocalState::Read { done },
4749 Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4750 _ => bail_bug!("unexpected event for stream"),
4751 };
4752
4753 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4754 let state = store.concurrent_state_mut();
4755 let transmit_id = state.get_mut(transmit_handle)?.state;
4756 let transmit = state.get_mut(transmit_id)?;
4757
4758 match event {
4759 Event::StreamRead { .. } => {
4760 transmit.read = ReadState::Open;
4761 }
4762 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4763 _ => bail_bug!("unexpected event for stream"),
4764 };
4765 }
4766 _ => {}
4767 }
4768 Ok(())
4769 }
4770}
4771
4772fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4776 matches!(
4777 ty,
4778 None | Some(
4779 InterfaceType::S8
4780 | InterfaceType::U8
4781 | InterfaceType::S16
4782 | InterfaceType::U16
4783 | InterfaceType::S32
4784 | InterfaceType::U32
4785 | InterfaceType::S64
4786 | InterfaceType::U64
4787 | InterfaceType::Float32
4788 | InterfaceType::Float64
4789 )
4790 )
4791}
4792
4793struct LockedState<T> {
4797 inner: Mutex<Option<T>>,
4798}
4799
4800impl<T> LockedState<T> {
4801 fn new(value: T) -> Self {
4803 Self {
4804 inner: Mutex::new(Some(value)),
4805 }
4806 }
4807
4808 fn try_lock(&self) -> Result<MutexGuard<'_, Option<T>>> {
4817 match self.inner.try_lock() {
4818 Ok(lock) => Ok(lock),
4819 Err(_) => bail_bug!("should not have contention on state lock"),
4820 }
4821 }
4822
4823 fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4830 let result = self.try_lock()?.take();
4831 match result {
4832 Some(result) => Ok(LockedStateGuard {
4833 value: ManuallyDrop::new(result),
4834 state: self,
4835 }),
4836 None => bail_bug!("lock value unexpectedly missing"),
4837 }
4838 }
4839
4840 fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4849 let mut inner = self.try_lock()?;
4850 match &mut *inner {
4851 Some(state) => Ok(f(state)),
4852 None => bail_bug!("lock value unexpectedly missing"),
4853 }
4854 }
4855}
4856
4857struct LockedStateGuard<'a, T> {
4860 value: ManuallyDrop<T>,
4861 state: &'a LockedState<T>,
4862}
4863
4864impl<T> Deref for LockedStateGuard<'_, T> {
4865 type Target = T;
4866
4867 fn deref(&self) -> &T {
4868 &self.value
4869 }
4870}
4871
4872impl<T> DerefMut for LockedStateGuard<'_, T> {
4873 fn deref_mut(&mut self) -> &mut T {
4874 &mut self.value
4875 }
4876}
4877
4878impl<T> Drop for LockedStateGuard<'_, T> {
4879 fn drop(&mut self) {
4880 let value = unsafe { ManuallyDrop::take(&mut self.value) };
4885
4886 if let Ok(mut lock) = self.state.try_lock() {
4890 *lock = Some(value);
4891 }
4892 }
4893}
4894
4895#[cfg(test)]
4896mod tests {
4897 use super::*;
4898 use crate::{Engine, Store};
4899 use core::future::pending;
4900 use core::pin::pin;
4901 use std::sync::LazyLock;
4902
4903 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4904
4905 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4906 where
4907 T: FutureProducer<()>,
4908 {
4909 rx.poll_produce(
4910 &mut Context::from_waker(Waker::noop()),
4911 Store::new(&ENGINE, ()).as_context_mut(),
4912 finish,
4913 )
4914 }
4915
4916 #[test]
4917 fn future_producer() {
4918 let mut fut = pin!(async { crate::error::Ok(()) });
4919 assert!(matches!(
4920 poll_future_producer(fut.as_mut(), false),
4921 Poll::Ready(Ok(Some(()))),
4922 ));
4923
4924 let mut fut = pin!(async { crate::error::Ok(()) });
4925 assert!(matches!(
4926 poll_future_producer(fut.as_mut(), true),
4927 Poll::Ready(Ok(Some(()))),
4928 ));
4929
4930 let mut fut = pin!(pending::<Result<()>>());
4931 assert!(matches!(
4932 poll_future_producer(fut.as_mut(), false),
4933 Poll::Pending,
4934 ));
4935 assert!(matches!(
4936 poll_future_producer(fut.as_mut(), true),
4937 Poll::Ready(Ok(None)),
4938 ));
4939
4940 let (tx, rx) = oneshot::channel();
4941 let mut rx = pin!(rx);
4942 assert!(matches!(
4943 poll_future_producer(rx.as_mut(), false),
4944 Poll::Pending,
4945 ));
4946 assert!(matches!(
4947 poll_future_producer(rx.as_mut(), true),
4948 Poll::Ready(Ok(None)),
4949 ));
4950 tx.send(()).unwrap();
4951 assert!(matches!(
4952 poll_future_producer(rx.as_mut(), true),
4953 Poll::Ready(Ok(Some(()))),
4954 ));
4955
4956 let (tx, rx) = oneshot::channel();
4957 let mut rx = pin!(rx);
4958 tx.send(()).unwrap();
4959 assert!(matches!(
4960 poll_future_producer(rx.as_mut(), false),
4961 Poll::Ready(Ok(Some(()))),
4962 ));
4963
4964 let (tx, rx) = oneshot::channel::<()>();
4965 let mut rx = pin!(rx);
4966 drop(tx);
4967 assert!(matches!(
4968 poll_future_producer(rx.as_mut(), false),
4969 Poll::Ready(Err(..)),
4970 ));
4971
4972 let (tx, rx) = oneshot::channel::<()>();
4973 let mut rx = pin!(rx);
4974 drop(tx);
4975 assert!(matches!(
4976 poll_future_producer(rx.as_mut(), true),
4977 Poll::Ready(Err(..)),
4978 ));
4979 }
4980}