1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext, Options};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use futures::FutureExt;
15use futures::channel::oneshot;
16use std::boxed::Box;
17use std::fmt;
18use std::future;
19use std::io::Cursor;
20use std::iter;
21use std::marker::PhantomData;
22use std::mem::{self, MaybeUninit};
23use std::pin::Pin;
24use std::string::{String, ToString};
25use std::sync::{Arc, Mutex};
26use std::task::{self, Context, Poll, Waker};
27use std::vec::Vec;
28use wasmtime_environ::component::{
29 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
30 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
31 TypeFutureTableIndex, TypeStreamTableIndex,
32};
33
34pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
35
36mod buffers;
37
38#[derive(Copy, Clone, Debug)]
41pub enum TransmitKind {
42 Stream,
43 Future,
44}
45
46#[derive(Copy, Clone, Debug, PartialEq)]
48pub enum ReturnCode {
49 Blocked,
50 Completed(u32),
51 Dropped(u32),
52 Cancelled(u32),
53}
54
55impl ReturnCode {
56 pub fn encode(&self) -> u32 {
61 const BLOCKED: u32 = 0xffff_ffff;
62 const COMPLETED: u32 = 0x0;
63 const DROPPED: u32 = 0x1;
64 const CANCELLED: u32 = 0x2;
65 match self {
66 ReturnCode::Blocked => BLOCKED,
67 ReturnCode::Completed(n) => {
68 debug_assert!(*n < (1 << 28));
69 (n << 4) | COMPLETED
70 }
71 ReturnCode::Dropped(n) => {
72 debug_assert!(*n < (1 << 28));
73 (n << 4) | DROPPED
74 }
75 ReturnCode::Cancelled(n) => {
76 debug_assert!(*n < (1 << 28));
77 (n << 4) | CANCELLED
78 }
79 }
80 }
81
82 fn completed(kind: TransmitKind, count: u32) -> Self {
85 Self::Completed(if let TransmitKind::Future = kind {
86 0
87 } else {
88 count
89 })
90 }
91}
92
93#[derive(Copy, Clone, Debug)]
98pub enum TransmitIndex {
99 Stream(TypeStreamTableIndex),
100 Future(TypeFutureTableIndex),
101}
102
103impl TransmitIndex {
104 pub fn kind(&self) -> TransmitKind {
105 match self {
106 TransmitIndex::Stream(_) => TransmitKind::Stream,
107 TransmitIndex::Future(_) => TransmitKind::Future,
108 }
109 }
110}
111
112fn payload(ty: TransmitIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
115 match ty {
116 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
117 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
118 }
119}
120
121fn get_mut_by_index_from(
124 handle_table: &mut HandleTable,
125 ty: TransmitIndex,
126 index: u32,
127) -> Result<(u32, &mut TransmitLocalState)> {
128 match ty {
129 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
130 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
131 }
132}
133
134fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
135 mut store: StoreContextMut<U>,
136 instance: Instance,
137 options: &Options,
138 ty: TransmitIndex,
139 address: usize,
140 count: usize,
141 buffer: &mut B,
142) -> Result<()> {
143 let types = instance.id().get(store.0).component().types().clone();
144 let count = buffer.remaining().len().min(count);
145
146 let lower = &mut if T::MAY_REQUIRE_REALLOC {
147 LowerContext::new
148 } else {
149 LowerContext::new_without_realloc
150 }(store.as_context_mut(), options, &types, instance);
151
152 if address % usize::try_from(T::ALIGN32)? != 0 {
153 bail!("read pointer not aligned");
154 }
155 lower
156 .as_slice_mut()
157 .get_mut(address..)
158 .and_then(|b| b.get_mut(..T::SIZE32 * count))
159 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
160
161 if let Some(ty) = payload(ty, &types) {
162 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
163 }
164
165 buffer.skip(count);
166
167 Ok(())
168}
169
170fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
171 lift: &mut LiftContext<'_>,
172 ty: Option<InterfaceType>,
173 buffer: &mut B,
174 address: usize,
175 count: usize,
176) -> Result<()> {
177 let count = count.min(buffer.remaining_capacity());
178 if T::IS_RUST_UNIT_TYPE {
179 buffer.extend(
183 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
184 )
185 } else {
186 let ty = ty.unwrap();
187 if address % usize::try_from(T::ALIGN32)? != 0 {
188 bail!("write pointer not aligned");
189 }
190 lift.memory()
191 .get(address..)
192 .and_then(|b| b.get(..T::SIZE32 * count))
193 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
194
195 let list = &WasmList::new(address, count, lift, ty)?;
196 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
197 }
198 Ok(())
199}
200
201#[derive(Debug, PartialEq, Eq, PartialOrd)]
203pub(super) struct ErrorContextState {
204 pub(crate) debug_msg: String,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub(super) struct FlatAbi {
212 pub(super) size: u32,
213 pub(super) align: u32,
214}
215
216pub struct Destination<'a, T, B> {
218 instance: Instance,
219 id: TableId<TransmitState>,
220 buffer: &'a mut B,
221 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
222 _phantom: PhantomData<fn() -> T>,
223}
224
225impl<'a, T, B> Destination<'a, T, B> {
226 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
228 Destination {
229 instance: self.instance,
230 id: self.id,
231 buffer: &mut *self.buffer,
232 host_buffer: self.host_buffer.as_deref_mut(),
233 _phantom: PhantomData,
234 }
235 }
236
237 pub fn take_buffer(&mut self) -> B
243 where
244 B: Default,
245 {
246 mem::take(self.buffer)
247 }
248
249 pub fn set_buffer(&mut self, buffer: B) {
259 *self.buffer = buffer;
260 }
261
262 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
275 let transmit = self
276 .instance
277 .concurrent_state_mut(store.as_context_mut().0)
278 .get_mut(self.id)
279 .unwrap();
280
281 if let &ReadState::GuestReady { count, .. } = &transmit.read {
282 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
283 unreachable!()
284 };
285
286 Some(count - guest_offset)
287 } else {
288 None
289 }
290 }
291}
292
293impl<'a, B> Destination<'a, u8, B> {
294 pub fn as_direct<D>(
305 mut self,
306 store: StoreContextMut<'a, D>,
307 capacity: usize,
308 ) -> DirectDestination<'a, D> {
309 if let Some(buffer) = self.host_buffer.as_deref_mut() {
310 buffer.set_position(0);
311 if buffer.get_mut().is_empty() {
312 buffer.get_mut().resize(capacity, 0);
313 }
314 }
315
316 DirectDestination {
317 instance: self.instance,
318 id: self.id,
319 host_buffer: self.host_buffer,
320 store,
321 }
322 }
323}
324
325pub struct DirectDestination<'a, D: 'static> {
328 instance: Instance,
329 id: TableId<TransmitState>,
330 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
331 store: StoreContextMut<'a, D>,
332}
333
334impl<D: 'static> DirectDestination<'_, D> {
335 pub fn remaining(&mut self) -> &mut [u8] {
337 if let Some(buffer) = self.host_buffer.as_deref_mut() {
338 buffer.get_mut()
339 } else {
340 let transmit = self
341 .instance
342 .concurrent_state_mut(self.store.as_context_mut().0)
343 .get_mut(self.id)
344 .unwrap();
345
346 let &ReadState::GuestReady {
347 address,
348 count,
349 options,
350 ..
351 } = &transmit.read
352 else {
353 unreachable!();
354 };
355
356 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
357 unreachable!()
358 };
359
360 options
361 .memory_mut(self.store.0)
362 .get_mut((address + guest_offset)..)
363 .and_then(|b| b.get_mut(..(count - guest_offset)))
364 .unwrap()
365 }
366 }
367
368 pub fn mark_written(&mut self, count: usize) {
373 if let Some(buffer) = self.host_buffer.as_deref_mut() {
374 buffer.set_position(
375 buffer
376 .position()
377 .checked_add(u64::try_from(count).unwrap())
378 .unwrap(),
379 );
380 } else {
381 let transmit = self
382 .instance
383 .concurrent_state_mut(self.store.as_context_mut().0)
384 .get_mut(self.id)
385 .unwrap();
386
387 let ReadState::GuestReady {
388 count: read_count, ..
389 } = &transmit.read
390 else {
391 unreachable!();
392 };
393
394 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
395 unreachable!()
396 };
397
398 if *guest_offset + count > *read_count {
399 panic!(
400 "write count ({count}) must be less than or equal to read count ({read_count})"
401 )
402 } else {
403 *guest_offset += count;
404 }
405 }
406 }
407}
408
409#[derive(Copy, Clone, Debug)]
411pub enum StreamResult {
412 Completed,
415 Cancelled,
420 Dropped,
423}
424
425pub trait StreamProducer<D>: Send + 'static {
427 type Item;
429
430 type Buffer: WriteBuffer<Self::Item> + Default;
432
433 fn poll_produce<'a>(
504 self: Pin<&mut Self>,
505 cx: &mut Context<'_>,
506 store: StoreContextMut<'a, D>,
507 destination: Destination<'a, Self::Item, Self::Buffer>,
508 finish: bool,
509 ) -> Poll<Result<StreamResult>>;
510}
511
512pub struct Source<'a, T> {
514 instance: Instance,
515 id: TableId<TransmitState>,
516 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
517}
518
519impl<'a, T> Source<'a, T> {
520 pub fn reborrow(&mut self) -> Source<'_, T> {
522 Source {
523 instance: self.instance,
524 id: self.id,
525 host_buffer: self.host_buffer.as_deref_mut(),
526 }
527 }
528
529 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
531 where
532 T: func::Lift + 'static,
533 B: ReadBuffer<T>,
534 {
535 if let Some(input) = &mut self.host_buffer {
536 let count = input.remaining().len().min(buffer.remaining_capacity());
537 buffer.move_from(*input, count);
538 } else {
539 let store = store.as_context_mut();
540 let transmit = self
541 .instance
542 .concurrent_state_mut(store.0)
543 .get_mut(self.id)?;
544
545 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
546 unreachable!();
547 };
548
549 let &WriteState::GuestReady {
550 ty,
551 address,
552 count,
553 options,
554 ..
555 } = &transmit.write
556 else {
557 unreachable!()
558 };
559
560 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self.instance);
561 let ty = payload(ty, cx.types);
562 let old_remaining = buffer.remaining_capacity();
563 lift::<T, B, S::Data>(
564 cx,
565 ty,
566 buffer,
567 address + (T::SIZE32 * guest_offset),
568 count - guest_offset,
569 )?;
570
571 let transmit = self
572 .instance
573 .concurrent_state_mut(store.0)
574 .get_mut(self.id)?;
575
576 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
577 unreachable!();
578 };
579
580 *guest_offset += old_remaining - buffer.remaining_capacity();
581 }
582
583 Ok(())
584 }
585
586 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
589 where
590 T: 'static,
591 {
592 let transmit = self
593 .instance
594 .concurrent_state_mut(store.as_context_mut().0)
595 .get_mut(self.id)
596 .unwrap();
597
598 if let &WriteState::GuestReady { count, .. } = &transmit.write {
599 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
600 unreachable!()
601 };
602
603 count - guest_offset
604 } else if let Some(host_buffer) = &self.host_buffer {
605 host_buffer.remaining().len()
606 } else {
607 unreachable!()
608 }
609 }
610}
611
612impl<'a> Source<'a, u8> {
613 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
615 DirectSource {
616 instance: self.instance,
617 id: self.id,
618 host_buffer: self.host_buffer,
619 store,
620 }
621 }
622}
623
624pub struct DirectSource<'a, D: 'static> {
627 instance: Instance,
628 id: TableId<TransmitState>,
629 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
630 store: StoreContextMut<'a, D>,
631}
632
633impl<D: 'static> DirectSource<'_, D> {
634 pub fn remaining(&mut self) -> &[u8] {
636 if let Some(buffer) = self.host_buffer.as_deref_mut() {
637 buffer.remaining()
638 } else {
639 let transmit = self
640 .instance
641 .concurrent_state_mut(self.store.as_context_mut().0)
642 .get_mut(self.id)
643 .unwrap();
644
645 let &WriteState::GuestReady {
646 address,
647 count,
648 options,
649 ..
650 } = &transmit.write
651 else {
652 unreachable!()
653 };
654
655 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
656 unreachable!()
657 };
658
659 options
660 .memory(self.store.0)
661 .get((address + guest_offset)..)
662 .and_then(|b| b.get(..(count - guest_offset)))
663 .unwrap()
664 }
665 }
666
667 pub fn mark_read(&mut self, count: usize) {
672 if let Some(buffer) = self.host_buffer.as_deref_mut() {
673 buffer.skip(count);
674 } else {
675 let transmit = self
676 .instance
677 .concurrent_state_mut(self.store.as_context_mut().0)
678 .get_mut(self.id)
679 .unwrap();
680
681 let WriteState::GuestReady {
682 count: write_count, ..
683 } = &transmit.write
684 else {
685 unreachable!()
686 };
687
688 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
689 unreachable!()
690 };
691
692 if *guest_offset + count > *write_count {
693 panic!(
694 "read count ({count}) must be less than or equal to write count ({write_count})"
695 )
696 } else {
697 *guest_offset += count;
698 }
699 }
700 }
701}
702
703pub trait StreamConsumer<D>: Send + 'static {
705 type Item;
707
708 fn poll_consume(
791 self: Pin<&mut Self>,
792 cx: &mut Context<'_>,
793 store: StoreContextMut<D>,
794 source: Source<'_, Self::Item>,
795 finish: bool,
796 ) -> Poll<Result<StreamResult>>;
797}
798
799pub trait FutureProducer<D>: Send + 'static {
801 type Item;
803
804 fn poll_produce(
814 self: Pin<&mut Self>,
815 cx: &mut Context<'_>,
816 store: StoreContextMut<D>,
817 finish: bool,
818 ) -> Poll<Result<Option<Self::Item>>>;
819}
820
821pub trait FutureConsumer<D>: Send + 'static {
823 type Item;
825
826 fn poll_consume(
838 self: Pin<&mut Self>,
839 cx: &mut Context<'_>,
840 store: StoreContextMut<D>,
841 source: Source<'_, Self::Item>,
842 finish: bool,
843 ) -> Poll<Result<()>>;
844}
845
846pub struct FutureReader<T> {
853 instance: Instance,
854 id: TableId<TransmitHandle>,
855 _phantom: PhantomData<T>,
856}
857
858impl<T> FutureReader<T> {
859 pub fn new<S: AsContextMut>(
861 instance: Instance,
862 mut store: S,
863 producer: impl FutureProducer<S::Data, Item = T>,
864 ) -> Self
865 where
866 T: func::Lower + func::Lift + Send + Sync + 'static,
867 {
868 struct Producer<P>(P);
869
870 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
871 for Producer<P>
872 {
873 type Item = P::Item;
874 type Buffer = Option<P::Item>;
875
876 fn poll_produce<'a>(
877 self: Pin<&mut Self>,
878 cx: &mut Context<'_>,
879 store: StoreContextMut<D>,
880 mut destination: Destination<'a, Self::Item, Self::Buffer>,
881 finish: bool,
882 ) -> Poll<Result<StreamResult>> {
883 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
886
887 Poll::Ready(Ok(
888 if let Some(value) = task::ready!(producer.poll_produce(cx, store, finish))? {
889 destination.set_buffer(Some(value));
890
891 StreamResult::Completed
898 } else {
899 StreamResult::Cancelled
900 },
901 ))
902 }
903 }
904
905 Self::new_(
906 instance.new_transmit(
907 store.as_context_mut(),
908 TransmitKind::Future,
909 Producer(producer),
910 ),
911 instance,
912 )
913 }
914
915 fn new_(id: TableId<TransmitHandle>, instance: Instance) -> Self {
916 Self {
917 instance,
918 id,
919 _phantom: PhantomData,
920 }
921 }
922
923 pub fn pipe<S: AsContextMut>(
925 self,
926 store: S,
927 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
928 ) where
929 T: func::Lift + 'static,
930 {
931 struct Consumer<C>(C);
932
933 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
934 for Consumer<C>
935 {
936 type Item = T;
937
938 fn poll_consume(
939 self: Pin<&mut Self>,
940 cx: &mut Context<'_>,
941 mut store: StoreContextMut<D>,
942 mut source: Source<Self::Item>,
943 finish: bool,
944 ) -> Poll<Result<StreamResult>> {
945 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
948
949 task::ready!(consumer.poll_consume(
950 cx,
951 store.as_context_mut(),
952 source.reborrow(),
953 finish
954 ))?;
955
956 Poll::Ready(Ok(if source.remaining(store) == 0 {
957 StreamResult::Completed
963 } else {
964 StreamResult::Cancelled
965 }))
966 }
967 }
968
969 self.instance
970 .set_consumer(store, self.id, TransmitKind::Future, Consumer(consumer));
971 }
972
973 pub fn into_val(self) -> Val {
976 Val::Future(FutureAny(self.id.rep()))
977 }
978
979 pub fn from_val(
981 mut store: impl AsContextMut<Data: Send>,
982 instance: Instance,
983 value: &Val,
984 ) -> Result<Self> {
985 let Val::Future(FutureAny(rep)) = value else {
986 bail!("expected `future`; got `{}`", value.desc());
987 };
988 let store = store.as_context_mut();
989 let id = TableId::<TransmitHandle>::new(*rep);
990 instance.concurrent_state_mut(store.0).get_mut(id)?; Ok(Self::new_(id, instance))
992 }
993
994 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
996 match ty {
997 InterfaceType::Future(src) => {
998 let handle_table = cx
999 .instance_mut()
1000 .table_for_transmit(TransmitIndex::Future(src));
1001 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1002 if is_done {
1003 bail!("cannot lift future after being notified that the writable end dropped");
1004 }
1005 let id = TableId::<TransmitHandle>::new(rep);
1006 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1007 let future = concurrent_state.get_mut(id)?;
1008 future.common.handle = None;
1009 let state = future.state;
1010
1011 if concurrent_state.get_mut(state)?.done {
1012 bail!("cannot lift future after previous read succeeded");
1013 }
1014
1015 Ok(Self::new_(id, cx.instance_handle()))
1016 }
1017 _ => func::bad_type_info(),
1018 }
1019 }
1020
1021 pub fn close(&mut self, mut store: impl AsContextMut) {
1029 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1031 self.instance
1032 .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Future)
1033 .unwrap();
1034 }
1035
1036 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1038 accessor.as_accessor().with(|access| self.close(access))
1039 }
1040
1041 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1047 where
1048 A: AsAccessor,
1049 {
1050 GuardedFutureReader::new(accessor, self)
1051 }
1052}
1053
1054impl<T> fmt::Debug for FutureReader<T> {
1055 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1056 f.debug_struct("FutureReader")
1057 .field("id", &self.id)
1058 .field("instance", &self.instance)
1059 .finish()
1060 }
1061}
1062
1063pub(crate) fn lower_future_to_index<U>(
1065 rep: u32,
1066 cx: &mut LowerContext<'_, U>,
1067 ty: InterfaceType,
1068) -> Result<u32> {
1069 match ty {
1070 InterfaceType::Future(dst) => {
1071 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1072 let id = TableId::<TransmitHandle>::new(rep);
1073 let state = concurrent_state.get_mut(id)?.state;
1074 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1075
1076 let handle = cx
1077 .instance_mut()
1078 .table_for_transmit(TransmitIndex::Future(dst))
1079 .future_insert_read(dst, rep)?;
1080
1081 cx.instance_mut()
1082 .concurrent_state_mut()
1083 .get_mut(id)?
1084 .common
1085 .handle = Some(handle);
1086
1087 Ok(handle)
1088 }
1089 _ => func::bad_type_info(),
1090 }
1091}
1092
1093unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1096 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1097
1098 type Lower = <u32 as func::ComponentType>::Lower;
1099
1100 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1101 match ty {
1102 InterfaceType::Future(_) => Ok(()),
1103 other => bail!("expected `future`, found `{}`", func::desc(other)),
1104 }
1105 }
1106}
1107
1108unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1110 fn linear_lower_to_flat<U>(
1111 &self,
1112 cx: &mut LowerContext<'_, U>,
1113 ty: InterfaceType,
1114 dst: &mut MaybeUninit<Self::Lower>,
1115 ) -> Result<()> {
1116 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1117 cx,
1118 InterfaceType::U32,
1119 dst,
1120 )
1121 }
1122
1123 fn linear_lower_to_memory<U>(
1124 &self,
1125 cx: &mut LowerContext<'_, U>,
1126 ty: InterfaceType,
1127 offset: usize,
1128 ) -> Result<()> {
1129 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1130 cx,
1131 InterfaceType::U32,
1132 offset,
1133 )
1134 }
1135}
1136
1137unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1139 fn linear_lift_from_flat(
1140 cx: &mut LiftContext<'_>,
1141 ty: InterfaceType,
1142 src: &Self::Lower,
1143 ) -> Result<Self> {
1144 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1145 Self::lift_from_index(cx, ty, index)
1146 }
1147
1148 fn linear_lift_from_memory(
1149 cx: &mut LiftContext<'_>,
1150 ty: InterfaceType,
1151 bytes: &[u8],
1152 ) -> Result<Self> {
1153 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1154 Self::lift_from_index(cx, ty, index)
1155 }
1156}
1157
1158pub struct GuardedFutureReader<T, A>
1164where
1165 A: AsAccessor,
1166{
1167 reader: Option<FutureReader<T>>,
1171 accessor: A,
1172}
1173
1174impl<T, A> GuardedFutureReader<T, A>
1175where
1176 A: AsAccessor,
1177{
1178 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1180 Self {
1181 reader: Some(reader),
1182 accessor,
1183 }
1184 }
1185
1186 pub fn into_future(self) -> FutureReader<T> {
1189 self.into()
1190 }
1191}
1192
1193impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1194where
1195 A: AsAccessor,
1196{
1197 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1198 guard.reader.take().unwrap()
1199 }
1200}
1201
1202impl<T, A> Drop for GuardedFutureReader<T, A>
1203where
1204 A: AsAccessor,
1205{
1206 fn drop(&mut self) {
1207 if let Some(reader) = &mut self.reader {
1208 reader.close_with(&self.accessor)
1209 }
1210 }
1211}
1212
1213pub struct StreamReader<T> {
1220 instance: Instance,
1221 id: TableId<TransmitHandle>,
1222 _phantom: PhantomData<T>,
1223}
1224
1225impl<T> StreamReader<T> {
1226 pub fn new<S: AsContextMut>(
1228 instance: Instance,
1229 store: S,
1230 producer: impl StreamProducer<S::Data, Item = T>,
1231 ) -> Self
1232 where
1233 T: func::Lower + func::Lift + Send + Sync + 'static,
1234 {
1235 Self::new_(
1236 instance.new_transmit(store, TransmitKind::Stream, producer),
1237 instance,
1238 )
1239 }
1240
1241 fn new_(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1242 Self {
1243 instance,
1244 id,
1245 _phantom: PhantomData,
1246 }
1247 }
1248
1249 pub fn pipe<S: AsContextMut>(self, store: S, consumer: impl StreamConsumer<S::Data, Item = T>)
1251 where
1252 T: 'static,
1253 {
1254 self.instance
1255 .set_consumer(store, self.id, TransmitKind::Stream, consumer);
1256 }
1257
1258 pub fn into_val(self) -> Val {
1261 Val::Stream(StreamAny(self.id.rep()))
1262 }
1263
1264 pub fn from_val(
1266 mut store: impl AsContextMut<Data: Send>,
1267 instance: Instance,
1268 value: &Val,
1269 ) -> Result<Self> {
1270 let Val::Stream(StreamAny(rep)) = value else {
1271 bail!("expected `stream`; got `{}`", value.desc());
1272 };
1273 let store = store.as_context_mut();
1274 let id = TableId::<TransmitHandle>::new(*rep);
1275 instance.concurrent_state_mut(store.0).get_mut(id)?; Ok(Self::new_(id, instance))
1277 }
1278
1279 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1281 match ty {
1282 InterfaceType::Stream(src) => {
1283 let handle_table = cx
1284 .instance_mut()
1285 .table_for_transmit(TransmitIndex::Stream(src));
1286 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1287 if is_done {
1288 bail!("cannot lift stream after being notified that the writable end dropped");
1289 }
1290 let id = TableId::<TransmitHandle>::new(rep);
1291 cx.instance_mut()
1292 .concurrent_state_mut()
1293 .get_mut(id)?
1294 .common
1295 .handle = None;
1296 Ok(Self::new_(id, cx.instance_handle()))
1297 }
1298 _ => func::bad_type_info(),
1299 }
1300 }
1301
1302 pub fn close(&mut self, mut store: impl AsContextMut) {
1310 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1312 self.instance
1313 .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Stream)
1314 .unwrap()
1315 }
1316
1317 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1319 accessor.as_accessor().with(|access| self.close(access))
1320 }
1321
1322 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1328 where
1329 A: AsAccessor,
1330 {
1331 GuardedStreamReader::new(accessor, self)
1332 }
1333}
1334
1335impl<T> fmt::Debug for StreamReader<T> {
1336 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1337 f.debug_struct("StreamReader")
1338 .field("id", &self.id)
1339 .field("instance", &self.instance)
1340 .finish()
1341 }
1342}
1343
1344pub(crate) fn lower_stream_to_index<U>(
1346 rep: u32,
1347 cx: &mut LowerContext<'_, U>,
1348 ty: InterfaceType,
1349) -> Result<u32> {
1350 match ty {
1351 InterfaceType::Stream(dst) => {
1352 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1353 let id = TableId::<TransmitHandle>::new(rep);
1354 let state = concurrent_state.get_mut(id)?.state;
1355 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1356
1357 let handle = cx
1358 .instance_mut()
1359 .table_for_transmit(TransmitIndex::Stream(dst))
1360 .stream_insert_read(dst, rep)?;
1361
1362 cx.instance_mut()
1363 .concurrent_state_mut()
1364 .get_mut(id)?
1365 .common
1366 .handle = Some(handle);
1367
1368 Ok(handle)
1369 }
1370 _ => func::bad_type_info(),
1371 }
1372}
1373
1374unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1377 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1378
1379 type Lower = <u32 as func::ComponentType>::Lower;
1380
1381 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1382 match ty {
1383 InterfaceType::Stream(_) => Ok(()),
1384 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1385 }
1386 }
1387}
1388
1389unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1391 fn linear_lower_to_flat<U>(
1392 &self,
1393 cx: &mut LowerContext<'_, U>,
1394 ty: InterfaceType,
1395 dst: &mut MaybeUninit<Self::Lower>,
1396 ) -> Result<()> {
1397 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1398 cx,
1399 InterfaceType::U32,
1400 dst,
1401 )
1402 }
1403
1404 fn linear_lower_to_memory<U>(
1405 &self,
1406 cx: &mut LowerContext<'_, U>,
1407 ty: InterfaceType,
1408 offset: usize,
1409 ) -> Result<()> {
1410 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1411 cx,
1412 InterfaceType::U32,
1413 offset,
1414 )
1415 }
1416}
1417
1418unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1420 fn linear_lift_from_flat(
1421 cx: &mut LiftContext<'_>,
1422 ty: InterfaceType,
1423 src: &Self::Lower,
1424 ) -> Result<Self> {
1425 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1426 Self::lift_from_index(cx, ty, index)
1427 }
1428
1429 fn linear_lift_from_memory(
1430 cx: &mut LiftContext<'_>,
1431 ty: InterfaceType,
1432 bytes: &[u8],
1433 ) -> Result<Self> {
1434 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1435 Self::lift_from_index(cx, ty, index)
1436 }
1437}
1438
1439pub struct GuardedStreamReader<T, A>
1445where
1446 A: AsAccessor,
1447{
1448 reader: Option<StreamReader<T>>,
1452 accessor: A,
1453}
1454
1455impl<T, A> GuardedStreamReader<T, A>
1456where
1457 A: AsAccessor,
1458{
1459 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1462 Self {
1463 reader: Some(reader),
1464 accessor,
1465 }
1466 }
1467
1468 pub fn into_stream(self) -> StreamReader<T> {
1471 self.into()
1472 }
1473}
1474
1475impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1476where
1477 A: AsAccessor,
1478{
1479 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1480 guard.reader.take().unwrap()
1481 }
1482}
1483
1484impl<T, A> Drop for GuardedStreamReader<T, A>
1485where
1486 A: AsAccessor,
1487{
1488 fn drop(&mut self) {
1489 if let Some(reader) = &mut self.reader {
1490 reader.close_with(&self.accessor)
1491 }
1492 }
1493}
1494
1495pub struct ErrorContext {
1497 rep: u32,
1498}
1499
1500impl ErrorContext {
1501 pub(crate) fn new(rep: u32) -> Self {
1502 Self { rep }
1503 }
1504
1505 pub fn into_val(self) -> Val {
1507 Val::ErrorContext(ErrorContextAny(self.rep))
1508 }
1509
1510 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1512 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1513 bail!("expected `error-context`; got `{}`", value.desc());
1514 };
1515 Ok(Self::new(*rep))
1516 }
1517
1518 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1519 match ty {
1520 InterfaceType::ErrorContext(src) => {
1521 let rep = cx
1522 .instance_mut()
1523 .table_for_error_context(src)
1524 .error_context_rep(index)?;
1525
1526 Ok(Self { rep })
1527 }
1528 _ => func::bad_type_info(),
1529 }
1530 }
1531}
1532
1533pub(crate) fn lower_error_context_to_index<U>(
1534 rep: u32,
1535 cx: &mut LowerContext<'_, U>,
1536 ty: InterfaceType,
1537) -> Result<u32> {
1538 match ty {
1539 InterfaceType::ErrorContext(dst) => {
1540 let tbl = cx.instance_mut().table_for_error_context(dst);
1541 tbl.error_context_insert(rep)
1542 }
1543 _ => func::bad_type_info(),
1544 }
1545}
1546unsafe impl func::ComponentType for ErrorContext {
1549 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1550
1551 type Lower = <u32 as func::ComponentType>::Lower;
1552
1553 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1554 match ty {
1555 InterfaceType::ErrorContext(_) => Ok(()),
1556 other => bail!("expected `error`, found `{}`", func::desc(other)),
1557 }
1558 }
1559}
1560
1561unsafe impl func::Lower for ErrorContext {
1563 fn linear_lower_to_flat<T>(
1564 &self,
1565 cx: &mut LowerContext<'_, T>,
1566 ty: InterfaceType,
1567 dst: &mut MaybeUninit<Self::Lower>,
1568 ) -> Result<()> {
1569 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1570 cx,
1571 InterfaceType::U32,
1572 dst,
1573 )
1574 }
1575
1576 fn linear_lower_to_memory<T>(
1577 &self,
1578 cx: &mut LowerContext<'_, T>,
1579 ty: InterfaceType,
1580 offset: usize,
1581 ) -> Result<()> {
1582 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1583 cx,
1584 InterfaceType::U32,
1585 offset,
1586 )
1587 }
1588}
1589
1590unsafe impl func::Lift for ErrorContext {
1592 fn linear_lift_from_flat(
1593 cx: &mut LiftContext<'_>,
1594 ty: InterfaceType,
1595 src: &Self::Lower,
1596 ) -> Result<Self> {
1597 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1598 Self::lift_from_index(cx, ty, index)
1599 }
1600
1601 fn linear_lift_from_memory(
1602 cx: &mut LiftContext<'_>,
1603 ty: InterfaceType,
1604 bytes: &[u8],
1605 ) -> Result<Self> {
1606 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1607 Self::lift_from_index(cx, ty, index)
1608 }
1609}
1610
1611pub(super) struct TransmitHandle {
1613 pub(super) common: WaitableCommon,
1614 state: TableId<TransmitState>,
1616}
1617
1618impl TransmitHandle {
1619 fn new(state: TableId<TransmitState>) -> Self {
1620 Self {
1621 common: WaitableCommon::default(),
1622 state,
1623 }
1624 }
1625}
1626
1627impl TableDebug for TransmitHandle {
1628 fn type_name() -> &'static str {
1629 "TransmitHandle"
1630 }
1631}
1632
1633struct TransmitState {
1635 write_handle: TableId<TransmitHandle>,
1637 read_handle: TableId<TransmitHandle>,
1639 write: WriteState,
1641 read: ReadState,
1643 done: bool,
1645}
1646
1647impl Default for TransmitState {
1648 fn default() -> Self {
1649 Self {
1650 write_handle: TableId::new(u32::MAX),
1651 read_handle: TableId::new(u32::MAX),
1652 read: ReadState::Open,
1653 write: WriteState::Open,
1654 done: false,
1655 }
1656 }
1657}
1658
1659impl TableDebug for TransmitState {
1660 fn type_name() -> &'static str {
1661 "TransmitState"
1662 }
1663}
1664
1665type PollStream = Box<
1666 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
1667>;
1668
1669enum WriteState {
1671 Open,
1673 GuestReady {
1675 ty: TransmitIndex,
1676 flat_abi: Option<FlatAbi>,
1677 options: Options,
1678 address: usize,
1679 count: usize,
1680 handle: u32,
1681 },
1682 HostReady {
1684 produce: PollStream,
1685 guest_offset: usize,
1686 cancel: bool,
1687 cancel_waker: Option<Waker>,
1688 },
1689 Dropped,
1691}
1692
1693impl fmt::Debug for WriteState {
1694 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1695 match self {
1696 Self::Open => f.debug_tuple("Open").finish(),
1697 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1698 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1699 Self::Dropped => f.debug_tuple("Dropped").finish(),
1700 }
1701 }
1702}
1703
1704enum ReadState {
1706 Open,
1708 GuestReady {
1710 ty: TransmitIndex,
1711 flat_abi: Option<FlatAbi>,
1712 options: Options,
1713 address: usize,
1714 count: usize,
1715 handle: u32,
1716 },
1717 HostReady {
1719 consume: PollStream,
1720 guest_offset: usize,
1721 cancel: bool,
1722 cancel_waker: Option<Waker>,
1723 },
1724 HostToHost {
1726 accept: Box<
1727 dyn for<'a> Fn(
1728 &'a mut UntypedWriteBuffer<'a>,
1729 )
1730 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1731 + Send
1732 + Sync,
1733 >,
1734 buffer: Vec<u8>,
1735 limit: usize,
1736 },
1737 Dropped,
1739}
1740
1741impl fmt::Debug for ReadState {
1742 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1743 match self {
1744 Self::Open => f.debug_tuple("Open").finish(),
1745 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1746 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1747 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1748 Self::Dropped => f.debug_tuple("Dropped").finish(),
1749 }
1750 }
1751}
1752
1753fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
1754 let count = guest_offset.try_into().unwrap();
1755 match state {
1756 StreamResult::Dropped => ReturnCode::Dropped(count),
1757 StreamResult::Completed => ReturnCode::completed(kind, count),
1758 StreamResult::Cancelled => ReturnCode::Cancelled(count),
1759 }
1760}
1761
1762impl Instance {
1763 fn new_transmit<S: AsContextMut, P: StreamProducer<S::Data>>(
1764 self,
1765 mut store: S,
1766 kind: TransmitKind,
1767 producer: P,
1768 ) -> TableId<TransmitHandle>
1769 where
1770 P::Item: func::Lower,
1771 {
1772 let mut store = store.as_context_mut();
1773 let token = StoreToken::new(store.as_context_mut());
1774 let state = self.concurrent_state_mut(store.0);
1775 let (_, read) = state.new_transmit().unwrap();
1776 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
1777 let id = state.get_mut(read).unwrap().state;
1778 let produce = Box::new(move || {
1779 let producer = producer.clone();
1780 async move {
1781 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
1782
1783 let (result, cancelled) = if buffer.remaining().is_empty() {
1784 future::poll_fn(|cx| {
1785 tls::get(|store| {
1786 let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1787
1788 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
1789 unreachable!();
1790 };
1791
1792 let mut host_buffer =
1793 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
1794 Some(Cursor::new(mem::take(buffer)))
1795 } else {
1796 None
1797 };
1798
1799 let poll = mine.as_mut().poll_produce(
1800 cx,
1801 token.as_context_mut(store),
1802 Destination {
1803 instance: self,
1804 id,
1805 buffer: &mut buffer,
1806 host_buffer: host_buffer.as_mut(),
1807 _phantom: PhantomData,
1808 },
1809 cancel,
1810 );
1811
1812 let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1813
1814 let host_offset = if let (
1815 Some(host_buffer),
1816 ReadState::HostToHost { buffer, limit, .. },
1817 ) = (host_buffer, &mut transmit.read)
1818 {
1819 *limit = usize::try_from(host_buffer.position()).unwrap();
1820 *buffer = host_buffer.into_inner();
1821 *limit
1822 } else {
1823 0
1824 };
1825
1826 {
1827 let WriteState::HostReady {
1828 guest_offset,
1829 cancel,
1830 cancel_waker,
1831 ..
1832 } = &mut transmit.write
1833 else {
1834 unreachable!();
1835 };
1836
1837 if poll.is_pending() {
1838 if !buffer.remaining().is_empty()
1839 || *guest_offset > 0
1840 || host_offset > 0
1841 {
1842 return Poll::Ready(Err(anyhow!(
1843 "StreamProducer::poll_produce returned Poll::Pending \
1844 after producing at least one item"
1845 )));
1846 }
1847 *cancel_waker = Some(cx.waker().clone());
1848 } else {
1849 *cancel_waker = None;
1850 *cancel = false;
1851 }
1852 }
1853
1854 poll.map(|v| v.map(|result| (result, cancel)))
1855 })
1856 })
1857 .await?
1858 } else {
1859 (StreamResult::Completed, false)
1860 };
1861
1862 let (guest_offset, host_offset, count) = tls::get(|store| {
1863 let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1864 let (count, host_offset) = match &transmit.read {
1865 &ReadState::GuestReady { count, .. } => (count, 0),
1866 &ReadState::HostToHost { limit, .. } => (1, limit),
1867 _ => unreachable!(),
1868 };
1869 let guest_offset = match &transmit.write {
1870 &WriteState::HostReady { guest_offset, .. } => guest_offset,
1871 _ => unreachable!(),
1872 };
1873 (guest_offset, host_offset, count)
1874 });
1875
1876 match result {
1877 StreamResult::Completed => {
1878 if count > 1
1879 && buffer.remaining().is_empty()
1880 && guest_offset == 0
1881 && host_offset == 0
1882 {
1883 bail!(
1884 "StreamProducer::poll_produce returned StreamResult::Completed \
1885 without producing any items"
1886 );
1887 }
1888 }
1889 StreamResult::Cancelled => {
1890 if !cancelled {
1891 bail!(
1892 "StreamProducer::poll_produce returned StreamResult::Cancelled \
1893 without being given a `finish` parameter value of true"
1894 );
1895 }
1896 }
1897 StreamResult::Dropped => {}
1898 }
1899
1900 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
1901
1902 *producer.lock().unwrap() = Some((mine, buffer));
1903
1904 if write_buffer {
1905 self.write(token, id, producer, kind).await?;
1906 }
1907
1908 Ok(result)
1909 }
1910 .boxed()
1911 });
1912 state.get_mut(id).unwrap().write = WriteState::HostReady {
1913 produce,
1914 guest_offset: 0,
1915 cancel: false,
1916 cancel_waker: None,
1917 };
1918 read
1919 }
1920
1921 fn set_consumer<S: AsContextMut, C: StreamConsumer<S::Data>>(
1922 self,
1923 mut store: S,
1924 id: TableId<TransmitHandle>,
1925 kind: TransmitKind,
1926 consumer: C,
1927 ) {
1928 let mut store = store.as_context_mut();
1929 let token = StoreToken::new(store.as_context_mut());
1930 let state = self.concurrent_state_mut(store.0);
1931 let id = state.get_mut(id).unwrap().state;
1932 let transmit = state.get_mut(id).unwrap();
1933 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
1934 let consume_with_buffer = {
1935 let consumer = consumer.clone();
1936 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
1937 let mut mine = consumer.lock().unwrap().take().unwrap();
1938
1939 let host_buffer_remaining_before =
1940 host_buffer.as_deref_mut().map(|v| v.remaining().len());
1941
1942 let (result, cancelled) = future::poll_fn(|cx| {
1943 tls::get(|store| {
1944 let cancel =
1945 match &self.concurrent_state_mut(store).get_mut(id).unwrap().read {
1946 &ReadState::HostReady { cancel, .. } => cancel,
1947 ReadState::Open => false,
1948 _ => unreachable!(),
1949 };
1950
1951 let poll = mine.as_mut().poll_consume(
1952 cx,
1953 token.as_context_mut(store),
1954 Source {
1955 instance: self,
1956 id,
1957 host_buffer: host_buffer.as_deref_mut(),
1958 },
1959 cancel,
1960 );
1961
1962 if let ReadState::HostReady {
1963 cancel_waker,
1964 cancel,
1965 ..
1966 } = &mut self.concurrent_state_mut(store).get_mut(id).unwrap().read
1967 {
1968 if poll.is_pending() {
1969 *cancel_waker = Some(cx.waker().clone());
1970 } else {
1971 *cancel_waker = None;
1972 *cancel = false;
1973 }
1974 }
1975
1976 poll.map(|v| v.map(|result| (result, cancel)))
1977 })
1978 })
1979 .await?;
1980
1981 let (guest_offset, count) = tls::get(|store| {
1982 let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1983 (
1984 match &transmit.read {
1985 &ReadState::HostReady { guest_offset, .. } => guest_offset,
1986 ReadState::Open => 0,
1987 _ => unreachable!(),
1988 },
1989 match &transmit.write {
1990 &WriteState::GuestReady { count, .. } => count,
1991 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
1992 _ => unreachable!(),
1993 },
1994 )
1995 });
1996
1997 match result {
1998 StreamResult::Completed => {
1999 if count > 0
2000 && guest_offset == 0
2001 && host_buffer_remaining_before
2002 .zip(host_buffer.map(|v| v.remaining().len()))
2003 .map(|(before, after)| before == after)
2004 .unwrap_or(false)
2005 {
2006 bail!(
2007 "StreamConsumer::poll_consume returned StreamResult::Completed \
2008 without consuming any items"
2009 );
2010 }
2011
2012 if let TransmitKind::Future = kind {
2013 tls::get(|store| {
2014 self.concurrent_state_mut(store).get_mut(id).unwrap().done = true;
2015 });
2016 }
2017 }
2018 StreamResult::Cancelled => {
2019 if !cancelled {
2020 bail!(
2021 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2022 without being given a `finish` parameter value of true"
2023 );
2024 }
2025 }
2026 StreamResult::Dropped => {}
2027 }
2028
2029 *consumer.lock().unwrap() = Some(mine);
2030
2031 Ok(result)
2032 }
2033 };
2034 let consume = {
2035 let consume = consume_with_buffer.clone();
2036 Box::new(move || {
2037 let consume = consume.clone();
2038 async move { consume(None).await }.boxed()
2039 })
2040 };
2041
2042 match &transmit.write {
2043 WriteState::Open => {
2044 transmit.read = ReadState::HostReady {
2045 consume,
2046 guest_offset: 0,
2047 cancel: false,
2048 cancel_waker: None,
2049 };
2050 }
2051 WriteState::GuestReady { .. } => {
2052 let future = consume();
2053 transmit.read = ReadState::HostReady {
2054 consume,
2055 guest_offset: 0,
2056 cancel: false,
2057 cancel_waker: None,
2058 };
2059 self.pipe_from_guest(store.0, kind, id, future);
2060 }
2061 WriteState::HostReady { .. } => {
2062 let WriteState::HostReady { produce, .. } = mem::replace(
2063 &mut transmit.write,
2064 WriteState::HostReady {
2065 produce: Box::new(|| unreachable!()),
2066 guest_offset: 0,
2067 cancel: false,
2068 cancel_waker: None,
2069 },
2070 ) else {
2071 unreachable!();
2072 };
2073
2074 transmit.read = ReadState::HostToHost {
2075 accept: Box::new(move |input| {
2076 let consume = consume_with_buffer.clone();
2077 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2078 }),
2079 buffer: Vec::new(),
2080 limit: 0,
2081 };
2082
2083 let future = async move {
2084 loop {
2085 if tls::get(|store| {
2086 anyhow::Ok(matches!(
2087 self.concurrent_state_mut(store).get_mut(id)?.read,
2088 ReadState::Dropped
2089 ))
2090 })? {
2091 break Ok(());
2092 }
2093
2094 match produce().await? {
2095 StreamResult::Completed | StreamResult::Cancelled => {}
2096 StreamResult::Dropped => break Ok(()),
2097 }
2098
2099 if let TransmitKind::Future = kind {
2100 break Ok(());
2101 }
2102 }
2103 }
2104 .map(move |result| {
2105 tls::get(|store| self.concurrent_state_mut(store).delete_transmit(id))?;
2106 result
2107 });
2108
2109 state.push_future(Box::pin(future));
2110 }
2111 WriteState::Dropped => {
2112 let reader = transmit.read_handle;
2113 self.host_drop_reader(store.0, reader, kind).unwrap();
2114 }
2115 }
2116 }
2117
2118 async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2119 self,
2120 token: StoreToken<D>,
2121 id: TableId<TransmitState>,
2122 pair: Arc<Mutex<Option<(P, B)>>>,
2123 kind: TransmitKind,
2124 ) -> Result<()> {
2125 let (read, guest_offset) = tls::get(|store| {
2126 let transmit = self.concurrent_state_mut(store).get_mut(id)?;
2127
2128 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write
2129 {
2130 Some(guest_offset)
2131 } else {
2132 None
2133 };
2134
2135 anyhow::Ok((
2136 mem::replace(&mut transmit.read, ReadState::Open),
2137 guest_offset,
2138 ))
2139 })?;
2140
2141 match read {
2142 ReadState::GuestReady {
2143 ty,
2144 flat_abi,
2145 options,
2146 address,
2147 count,
2148 handle,
2149 } => {
2150 let guest_offset = guest_offset.unwrap();
2151
2152 if let TransmitKind::Future = kind {
2153 tls::get(|store| {
2154 self.concurrent_state_mut(store).get_mut(id)?.done = true;
2155 anyhow::Ok(())
2156 })?;
2157 }
2158
2159 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2160 let accept = {
2161 let pair = pair.clone();
2162 move |mut store: StoreContextMut<D>| {
2163 lower::<T, B, D>(
2164 store.as_context_mut(),
2165 self,
2166 &options,
2167 ty,
2168 address + (T::SIZE32 * guest_offset),
2169 count - guest_offset,
2170 &mut pair.lock().unwrap().as_mut().unwrap().1,
2171 )?;
2172 anyhow::Ok(())
2173 }
2174 };
2175
2176 if guest_offset < count {
2177 if T::MAY_REQUIRE_REALLOC {
2178 let (tx, rx) = oneshot::channel();
2183 tls::get(move |store| {
2184 self.concurrent_state_mut(store).push_high_priority(
2185 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2186 move |store, _| {
2187 _ = tx.send(accept(token.as_context_mut(store))?);
2188 Ok(())
2189 },
2190 ))),
2191 )
2192 });
2193 rx.await?
2194 } else {
2195 tls::get(|store| accept(token.as_context_mut(store)))?
2200 };
2201 }
2202
2203 tls::get(|store| {
2204 let count =
2205 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2206
2207 let transmit = self.concurrent_state_mut(store).get_mut(id)?;
2208
2209 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2210 unreachable!();
2211 };
2212
2213 *guest_offset += count;
2214
2215 transmit.read = ReadState::GuestReady {
2216 ty,
2217 flat_abi,
2218 options,
2219 address,
2220 count,
2221 handle,
2222 };
2223
2224 anyhow::Ok(())
2225 })?;
2226
2227 Ok(())
2228 }
2229
2230 ReadState::HostToHost {
2231 accept,
2232 mut buffer,
2233 limit,
2234 } => {
2235 let mut state = StreamResult::Completed;
2236 let mut position = 0;
2237
2238 while !matches!(state, StreamResult::Dropped) && position < limit {
2239 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2240 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2241 (buffer, position, _) = slice_buffer.into_parts();
2242 }
2243
2244 {
2245 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2246
2247 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty())
2248 {
2249 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2250 }
2251
2252 *pair.lock().unwrap() = Some((mine, buffer));
2253 }
2254
2255 tls::get(|store| {
2256 self.concurrent_state_mut(store).get_mut(id)?.read = match state {
2257 StreamResult::Dropped => ReadState::Dropped,
2258 StreamResult::Completed | StreamResult::Cancelled => {
2259 ReadState::HostToHost {
2260 accept,
2261 buffer,
2262 limit: 0,
2263 }
2264 }
2265 };
2266
2267 anyhow::Ok(())
2268 })?;
2269 Ok(())
2270 }
2271
2272 _ => unreachable!(),
2273 }
2274 }
2275
2276 fn pipe_from_guest(
2277 self,
2278 store: &mut dyn VMStore,
2279 kind: TransmitKind,
2280 id: TableId<TransmitState>,
2281 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2282 ) {
2283 let future = async move {
2284 let stream_state = future.await?;
2285 tls::get(|store| {
2286 let state = self.concurrent_state_mut(store);
2287 let transmit = state.get_mut(id)?;
2288 let ReadState::HostReady {
2289 consume,
2290 guest_offset,
2291 ..
2292 } = mem::replace(&mut transmit.read, ReadState::Open)
2293 else {
2294 unreachable!();
2295 };
2296 let code = return_code(kind, stream_state, guest_offset);
2297 transmit.read = match stream_state {
2298 StreamResult::Dropped => ReadState::Dropped,
2299 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2300 consume,
2301 guest_offset: 0,
2302 cancel: false,
2303 cancel_waker: None,
2304 },
2305 };
2306 let WriteState::GuestReady { ty, handle, .. } =
2307 mem::replace(&mut transmit.write, WriteState::Open)
2308 else {
2309 unreachable!();
2310 };
2311 state.send_write_result(ty, id, handle, code)?;
2312 Ok(())
2313 })
2314 };
2315
2316 self.concurrent_state_mut(store).push_future(future.boxed());
2317 }
2318
2319 fn pipe_to_guest(
2320 self,
2321 store: &mut dyn VMStore,
2322 kind: TransmitKind,
2323 id: TableId<TransmitState>,
2324 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2325 ) {
2326 let future = async move {
2327 let stream_state = future.await?;
2328 tls::get(|store| {
2329 let state = self.concurrent_state_mut(store);
2330 let transmit = state.get_mut(id)?;
2331 let WriteState::HostReady {
2332 produce,
2333 guest_offset,
2334 ..
2335 } = mem::replace(&mut transmit.write, WriteState::Open)
2336 else {
2337 unreachable!();
2338 };
2339 let code = return_code(kind, stream_state, guest_offset);
2340 transmit.write = match stream_state {
2341 StreamResult::Dropped => WriteState::Dropped,
2342 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2343 produce,
2344 guest_offset: 0,
2345 cancel: false,
2346 cancel_waker: None,
2347 },
2348 };
2349 let ReadState::GuestReady { ty, handle, .. } =
2350 mem::replace(&mut transmit.read, ReadState::Open)
2351 else {
2352 unreachable!();
2353 };
2354 state.send_read_result(ty, id, handle, code)?;
2355 Ok(())
2356 })
2357 };
2358
2359 self.concurrent_state_mut(store).push_future(future.boxed());
2360 }
2361
2362 fn host_drop_reader(
2364 self,
2365 store: &mut dyn VMStore,
2366 id: TableId<TransmitHandle>,
2367 kind: TransmitKind,
2368 ) -> Result<()> {
2369 let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
2370 let state = self.concurrent_state_mut(store);
2371 let transmit = state
2372 .get_mut(transmit_id)
2373 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2374 log::trace!(
2375 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2376 transmit.read,
2377 transmit.write
2378 );
2379
2380 transmit.read = ReadState::Dropped;
2381
2382 let new_state = if let WriteState::Dropped = &transmit.write {
2385 WriteState::Dropped
2386 } else {
2387 WriteState::Open
2388 };
2389
2390 let write_handle = transmit.write_handle;
2391
2392 match mem::replace(&mut transmit.write, new_state) {
2393 WriteState::GuestReady { ty, handle, .. } => {
2396 state.update_event(
2397 write_handle.rep(),
2398 match ty {
2399 TransmitIndex::Future(ty) => Event::FutureWrite {
2400 code: ReturnCode::Dropped(0),
2401 pending: Some((ty, handle)),
2402 },
2403 TransmitIndex::Stream(ty) => Event::StreamWrite {
2404 code: ReturnCode::Dropped(0),
2405 pending: Some((ty, handle)),
2406 },
2407 },
2408 )?;
2409 }
2410
2411 WriteState::HostReady { .. } => {}
2412
2413 WriteState::Open => {
2414 state.update_event(
2415 write_handle.rep(),
2416 match kind {
2417 TransmitKind::Future => Event::FutureWrite {
2418 code: ReturnCode::Dropped(0),
2419 pending: None,
2420 },
2421 TransmitKind::Stream => Event::StreamWrite {
2422 code: ReturnCode::Dropped(0),
2423 pending: None,
2424 },
2425 },
2426 )?;
2427 }
2428
2429 WriteState::Dropped => {
2430 log::trace!("host_drop_reader delete {transmit_id:?}");
2431 state.delete_transmit(transmit_id)?;
2432 }
2433 }
2434 Ok(())
2435 }
2436
2437 fn host_drop_writer<U>(
2439 self,
2440 store: StoreContextMut<U>,
2441 id: TableId<TransmitHandle>,
2442 on_drop_open: Option<fn() -> Result<()>>,
2443 ) -> Result<()> {
2444 let transmit_id = self.concurrent_state_mut(store.0).get_mut(id)?.state;
2445 let transmit = self
2446 .concurrent_state_mut(store.0)
2447 .get_mut(transmit_id)
2448 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2449 log::trace!(
2450 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2451 transmit.read,
2452 transmit.write
2453 );
2454
2455 match &mut transmit.write {
2457 WriteState::GuestReady { .. } => {
2458 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2459 }
2460 WriteState::HostReady { .. } => {}
2461 v @ WriteState::Open => {
2462 if let (Some(on_drop_open), false) = (
2463 on_drop_open,
2464 transmit.done || matches!(transmit.read, ReadState::Dropped),
2465 ) {
2466 on_drop_open()?;
2467 } else {
2468 *v = WriteState::Dropped;
2469 }
2470 }
2471 WriteState::Dropped => unreachable!("write state is already dropped"),
2472 }
2473
2474 let transmit = self.concurrent_state_mut(store.0).get_mut(transmit_id)?;
2475
2476 let new_state = if let ReadState::Dropped = &transmit.read {
2482 ReadState::Dropped
2483 } else {
2484 ReadState::Open
2485 };
2486
2487 let read_handle = transmit.read_handle;
2488
2489 match mem::replace(&mut transmit.read, new_state) {
2491 ReadState::GuestReady { ty, handle, .. } => {
2495 self.concurrent_state_mut(store.0).update_event(
2497 read_handle.rep(),
2498 match ty {
2499 TransmitIndex::Future(ty) => Event::FutureRead {
2500 code: ReturnCode::Dropped(0),
2501 pending: Some((ty, handle)),
2502 },
2503 TransmitIndex::Stream(ty) => Event::StreamRead {
2504 code: ReturnCode::Dropped(0),
2505 pending: Some((ty, handle)),
2506 },
2507 },
2508 )?;
2509 }
2510
2511 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2512
2513 ReadState::Open => {
2515 self.concurrent_state_mut(store.0).update_event(
2516 read_handle.rep(),
2517 match on_drop_open {
2518 Some(_) => Event::FutureRead {
2519 code: ReturnCode::Dropped(0),
2520 pending: None,
2521 },
2522 None => Event::StreamRead {
2523 code: ReturnCode::Dropped(0),
2524 pending: None,
2525 },
2526 },
2527 )?;
2528 }
2529
2530 ReadState::Dropped => {
2533 log::trace!("host_drop_writer delete {transmit_id:?}");
2534 self.concurrent_state_mut(store.0)
2535 .delete_transmit(transmit_id)?;
2536 }
2537 }
2538 Ok(())
2539 }
2540
2541 pub(super) fn guest_drop_writable<T>(
2543 self,
2544 store: StoreContextMut<T>,
2545 ty: TransmitIndex,
2546 writer: u32,
2547 ) -> Result<()> {
2548 let table = self.id().get_mut(store.0).table_for_transmit(ty);
2549 let transmit_rep = match ty {
2550 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2551 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2552 };
2553
2554 let id = TableId::<TransmitHandle>::new(transmit_rep);
2555 log::trace!("guest_drop_writable: drop writer {id:?}");
2556 match ty {
2557 TransmitIndex::Stream(_) => self.host_drop_writer(store, id, None),
2558 TransmitIndex::Future(_) => self.host_drop_writer(
2559 store,
2560 id,
2561 Some(|| {
2562 Err(anyhow!(
2563 "cannot drop future write end without first writing a value"
2564 ))
2565 }),
2566 ),
2567 }
2568 }
2569
2570 fn copy<T: 'static>(
2573 self,
2574 mut store: StoreContextMut<T>,
2575 flat_abi: Option<FlatAbi>,
2576 write_ty: TransmitIndex,
2577 write_options: &Options,
2578 write_address: usize,
2579 read_ty: TransmitIndex,
2580 read_options: &Options,
2581 read_address: usize,
2582 count: usize,
2583 rep: u32,
2584 ) -> Result<()> {
2585 let types = self.id().get(store.0).component().types().clone();
2586 match (write_ty, read_ty) {
2587 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2588 assert_eq!(count, 1);
2589
2590 let val = types[types[write_ty].ty]
2591 .payload
2592 .map(|ty| {
2593 let abi = types.canonical_abi(&ty);
2594 if write_address % usize::try_from(abi.align32)? != 0 {
2596 bail!("write pointer not aligned");
2597 }
2598
2599 let lift =
2600 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2601 let bytes = lift
2602 .memory()
2603 .get(write_address..)
2604 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2605 .ok_or_else(|| {
2606 anyhow::anyhow!("write pointer out of bounds of memory")
2607 })?;
2608
2609 Val::load(lift, ty, bytes)
2610 })
2611 .transpose()?;
2612
2613 if let Some(val) = val {
2614 let lower =
2615 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2616 let ty = types[types[read_ty].ty].payload.unwrap();
2617 let ptr = func::validate_inbounds_dynamic(
2618 types.canonical_abi(&ty),
2619 lower.as_slice_mut(),
2620 &ValRaw::u32(read_address.try_into().unwrap()),
2621 )?;
2622 val.store(lower, ty, ptr)?;
2623 }
2624 }
2625 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2626 if let Some(flat_abi) = flat_abi {
2627 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2629 if length_in_bytes > 0 {
2630 if write_address % usize::try_from(flat_abi.align)? != 0 {
2631 bail!("write pointer not aligned");
2632 }
2633 if read_address % usize::try_from(flat_abi.align)? != 0 {
2634 bail!("read pointer not aligned");
2635 }
2636
2637 let store_opaque = store.0.store_opaque_mut();
2638
2639 {
2640 let src = write_options
2641 .memory(store_opaque)
2642 .get(write_address..)
2643 .and_then(|b| b.get(..length_in_bytes))
2644 .ok_or_else(|| {
2645 anyhow::anyhow!("write pointer out of bounds of memory")
2646 })?
2647 .as_ptr();
2648 let dst = read_options
2649 .memory_mut(store_opaque)
2650 .get_mut(read_address..)
2651 .and_then(|b| b.get_mut(..length_in_bytes))
2652 .ok_or_else(|| {
2653 anyhow::anyhow!("read pointer out of bounds of memory")
2654 })?
2655 .as_mut_ptr();
2656 unsafe { src.copy_to(dst, length_in_bytes) };
2659 }
2660 }
2661 } else {
2662 let store_opaque = store.0.store_opaque_mut();
2663 let lift = &mut LiftContext::new(store_opaque, write_options, self);
2664 let ty = types[types[write_ty].ty].payload.unwrap();
2665 let abi = lift.types.canonical_abi(&ty);
2666 let size = usize::try_from(abi.size32).unwrap();
2667 if write_address % usize::try_from(abi.align32)? != 0 {
2668 bail!("write pointer not aligned");
2669 }
2670 let bytes = lift
2671 .memory()
2672 .get(write_address..)
2673 .and_then(|b| b.get(..size * count))
2674 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2675
2676 let values = (0..count)
2677 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2678 .collect::<Result<Vec<_>>>()?;
2679
2680 let id = TableId::<TransmitHandle>::new(rep);
2681 log::trace!("copy values {values:?} for {id:?}");
2682
2683 let lower =
2684 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2685 let ty = types[types[read_ty].ty].payload.unwrap();
2686 let abi = lower.types.canonical_abi(&ty);
2687 if read_address % usize::try_from(abi.align32)? != 0 {
2688 bail!("read pointer not aligned");
2689 }
2690 let size = usize::try_from(abi.size32).unwrap();
2691 lower
2692 .as_slice_mut()
2693 .get_mut(read_address..)
2694 .and_then(|b| b.get_mut(..size * count))
2695 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2696 let mut ptr = read_address;
2697 for value in values {
2698 value.store(lower, ty, ptr)?;
2699 ptr += size
2700 }
2701 }
2702 }
2703 _ => unreachable!(),
2704 }
2705
2706 Ok(())
2707 }
2708
2709 fn check_bounds(
2710 self,
2711 store: &StoreOpaque,
2712 options: &Options,
2713 ty: TransmitIndex,
2714 address: usize,
2715 count: usize,
2716 ) -> Result<()> {
2717 let types = self.id().get(store).component().types().clone();
2718 let size = usize::try_from(
2719 match ty {
2720 TransmitIndex::Future(ty) => types[types[ty].ty]
2721 .payload
2722 .map(|ty| types.canonical_abi(&ty).size32),
2723 TransmitIndex::Stream(ty) => types[types[ty].ty]
2724 .payload
2725 .map(|ty| types.canonical_abi(&ty).size32),
2726 }
2727 .unwrap_or(0),
2728 )
2729 .unwrap();
2730
2731 if count > 0 && size > 0 {
2732 options
2733 .memory(store)
2734 .get(address..)
2735 .and_then(|b| b.get(..(size * count)))
2736 .map(drop)
2737 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
2738 } else {
2739 Ok(())
2740 }
2741 }
2742
2743 pub(super) fn guest_write<T: 'static>(
2745 self,
2746 mut store: StoreContextMut<T>,
2747 ty: TransmitIndex,
2748 options: OptionsIndex,
2749 flat_abi: Option<FlatAbi>,
2750 handle: u32,
2751 address: u32,
2752 count: u32,
2753 ) -> Result<ReturnCode> {
2754 let address = usize::try_from(address).unwrap();
2755 let count = usize::try_from(count).unwrap();
2756 let options = Options::new_index(store.0, self, options);
2757 self.check_bounds(store.0, &options, ty, address, count)?;
2758 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
2759 let TransmitLocalState::Write { done } = *state else {
2760 bail!(
2761 "invalid handle {handle}; expected `Write`; got {:?}",
2762 *state
2763 );
2764 };
2765
2766 if done {
2767 bail!("cannot write to stream after being notified that the readable end dropped");
2768 }
2769
2770 *state = TransmitLocalState::Busy;
2771 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2772 let concurrent_state = self.concurrent_state_mut(store.0);
2773 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
2774 let transmit = concurrent_state.get_mut(transmit_id)?;
2775 log::trace!(
2776 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2777 transmit.read
2778 );
2779
2780 if transmit.done {
2781 bail!("cannot write to future after previous write succeeded or readable end dropped");
2782 }
2783
2784 let new_state = if let ReadState::Dropped = &transmit.read {
2785 ReadState::Dropped
2786 } else {
2787 ReadState::Open
2788 };
2789
2790 let set_guest_ready = |me: &mut ConcurrentState| {
2791 let transmit = me.get_mut(transmit_id)?;
2792 assert!(matches!(&transmit.write, WriteState::Open));
2793 transmit.write = WriteState::GuestReady {
2794 ty,
2795 flat_abi,
2796 options,
2797 address,
2798 count,
2799 handle,
2800 };
2801 Ok::<_, crate::Error>(())
2802 };
2803
2804 let mut result = match mem::replace(&mut transmit.read, new_state) {
2805 ReadState::GuestReady {
2806 ty: read_ty,
2807 flat_abi: read_flat_abi,
2808 options: read_options,
2809 address: read_address,
2810 count: read_count,
2811 handle: read_handle,
2812 } => {
2813 assert_eq!(flat_abi, read_flat_abi);
2814
2815 if let TransmitIndex::Future(_) = ty {
2816 transmit.done = true;
2817 }
2818
2819 let write_complete = count == 0 || read_count > 0;
2841 let read_complete = count > 0;
2842 let read_buffer_remaining = count < read_count;
2843
2844 let read_handle_rep = transmit.read_handle.rep();
2845
2846 let count = count.min(read_count);
2847
2848 self.copy(
2849 store.as_context_mut(),
2850 flat_abi,
2851 ty,
2852 &options,
2853 address,
2854 read_ty,
2855 &read_options,
2856 read_address,
2857 count,
2858 rep,
2859 )?;
2860
2861 let instance = self.id().get_mut(store.0);
2862 let types = instance.component().types();
2863 let item_size = payload(ty, types)
2864 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2865 .unwrap_or(0);
2866 let concurrent_state = instance.concurrent_state_mut();
2867 if read_complete {
2868 let count = u32::try_from(count).unwrap();
2869 let total = if let Some(Event::StreamRead {
2870 code: ReturnCode::Completed(old_total),
2871 ..
2872 }) = concurrent_state.take_event(read_handle_rep)?
2873 {
2874 count + old_total
2875 } else {
2876 count
2877 };
2878
2879 let code = ReturnCode::completed(ty.kind(), total);
2880
2881 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
2882 }
2883
2884 if read_buffer_remaining {
2885 let transmit = concurrent_state.get_mut(transmit_id)?;
2886 transmit.read = ReadState::GuestReady {
2887 ty: read_ty,
2888 flat_abi: read_flat_abi,
2889 options: read_options,
2890 address: read_address + (count * item_size),
2891 count: read_count - count,
2892 handle: read_handle,
2893 };
2894 }
2895
2896 if write_complete {
2897 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2898 } else {
2899 set_guest_ready(concurrent_state)?;
2900 ReturnCode::Blocked
2901 }
2902 }
2903
2904 ReadState::HostReady {
2905 consume,
2906 guest_offset,
2907 cancel,
2908 cancel_waker,
2909 } => {
2910 assert!(cancel_waker.is_none());
2911 assert!(!cancel);
2912 assert_eq!(0, guest_offset);
2913
2914 if let TransmitIndex::Future(_) = ty {
2915 transmit.done = true;
2916 }
2917
2918 set_guest_ready(concurrent_state)?;
2919 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
2920 }
2921
2922 ReadState::HostToHost { .. } => unreachable!(),
2923
2924 ReadState::Open => {
2925 set_guest_ready(concurrent_state)?;
2926 ReturnCode::Blocked
2927 }
2928
2929 ReadState::Dropped => {
2930 if let TransmitIndex::Future(_) = ty {
2931 transmit.done = true;
2932 }
2933
2934 ReturnCode::Dropped(0)
2935 }
2936 };
2937
2938 if result == ReturnCode::Blocked && !options.async_() {
2939 result = self.wait_for_write(store.0, transmit_handle)?;
2940 }
2941
2942 if result != ReturnCode::Blocked {
2943 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
2944 TransmitLocalState::Write {
2945 done: matches!(
2946 (result, ty),
2947 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
2948 ),
2949 };
2950 }
2951
2952 log::trace!(
2953 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
2954 );
2955
2956 Ok(result)
2957 }
2958
2959 fn consume(
2962 self,
2963 store: &mut dyn VMStore,
2964 kind: TransmitKind,
2965 transmit_id: TableId<TransmitState>,
2966 consume: PollStream,
2967 guest_offset: usize,
2968 cancel: bool,
2969 ) -> Result<ReturnCode> {
2970 let mut future = consume();
2971 self.concurrent_state_mut(store).get_mut(transmit_id)?.read = ReadState::HostReady {
2972 consume,
2973 guest_offset,
2974 cancel,
2975 cancel_waker: None,
2976 };
2977 let poll = self.set_tls(store, || {
2978 future
2979 .as_mut()
2980 .poll(&mut Context::from_waker(&Waker::noop()))
2981 });
2982
2983 Ok(match poll {
2984 Poll::Ready(state) => {
2985 let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
2986 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2987 unreachable!();
2988 };
2989 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2990 transmit.write = WriteState::Open;
2991 code
2992 }
2993 Poll::Pending => {
2994 self.pipe_from_guest(store, kind, transmit_id, future);
2995 ReturnCode::Blocked
2996 }
2997 })
2998 }
2999
3000 pub(super) fn guest_read<T: 'static>(
3002 self,
3003 mut store: StoreContextMut<T>,
3004 ty: TransmitIndex,
3005 options: OptionsIndex,
3006 flat_abi: Option<FlatAbi>,
3007 handle: u32,
3008 address: u32,
3009 count: u32,
3010 ) -> Result<ReturnCode> {
3011 let address = usize::try_from(address).unwrap();
3012 let count = usize::try_from(count).unwrap();
3013 let options = Options::new_index(store.0, self, options);
3014 self.check_bounds(store.0, &options, ty, address, count)?;
3015 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3016 let TransmitLocalState::Read { done } = *state else {
3017 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3018 };
3019
3020 if done {
3021 bail!("cannot read from stream after being notified that the writable end dropped");
3022 }
3023
3024 *state = TransmitLocalState::Busy;
3025 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3026 let concurrent_state = self.concurrent_state_mut(store.0);
3027 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3028 let transmit = concurrent_state.get_mut(transmit_id)?;
3029 log::trace!(
3030 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3031 transmit.write
3032 );
3033
3034 if transmit.done {
3035 bail!("cannot read from future after previous read succeeded");
3036 }
3037
3038 let new_state = if let WriteState::Dropped = &transmit.write {
3039 WriteState::Dropped
3040 } else {
3041 WriteState::Open
3042 };
3043
3044 let set_guest_ready = |me: &mut ConcurrentState| {
3045 let transmit = me.get_mut(transmit_id)?;
3046 assert!(matches!(&transmit.read, ReadState::Open));
3047 transmit.read = ReadState::GuestReady {
3048 ty,
3049 flat_abi,
3050 options,
3051 address,
3052 count,
3053 handle,
3054 };
3055 Ok::<_, crate::Error>(())
3056 };
3057
3058 let mut result = match mem::replace(&mut transmit.write, new_state) {
3059 WriteState::GuestReady {
3060 ty: write_ty,
3061 flat_abi: write_flat_abi,
3062 options: write_options,
3063 address: write_address,
3064 count: write_count,
3065 handle: write_handle,
3066 } => {
3067 assert_eq!(flat_abi, write_flat_abi);
3068
3069 if let TransmitIndex::Future(_) = ty {
3070 transmit.done = true;
3071 }
3072
3073 let write_handle_rep = transmit.write_handle.rep();
3074
3075 let write_complete = write_count == 0 || count > 0;
3080 let read_complete = write_count > 0;
3081 let write_buffer_remaining = count < write_count;
3082
3083 let count = count.min(write_count);
3084
3085 self.copy(
3086 store.as_context_mut(),
3087 flat_abi,
3088 write_ty,
3089 &write_options,
3090 write_address,
3091 ty,
3092 &options,
3093 address,
3094 count,
3095 rep,
3096 )?;
3097
3098 let instance = self.id().get_mut(store.0);
3099 let types = instance.component().types();
3100 let item_size = payload(ty, types)
3101 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3102 .unwrap_or(0);
3103 let concurrent_state = instance.concurrent_state_mut();
3104
3105 if write_complete {
3106 let count = u32::try_from(count).unwrap();
3107 let total = if let Some(Event::StreamWrite {
3108 code: ReturnCode::Completed(old_total),
3109 ..
3110 }) = concurrent_state.take_event(write_handle_rep)?
3111 {
3112 count + old_total
3113 } else {
3114 count
3115 };
3116
3117 let code = ReturnCode::completed(ty.kind(), total);
3118
3119 concurrent_state.send_write_result(
3120 write_ty,
3121 transmit_id,
3122 write_handle,
3123 code,
3124 )?;
3125 }
3126
3127 if write_buffer_remaining {
3128 let transmit = concurrent_state.get_mut(transmit_id)?;
3129 transmit.write = WriteState::GuestReady {
3130 ty: write_ty,
3131 flat_abi: write_flat_abi,
3132 options: write_options,
3133 address: write_address + (count * item_size),
3134 count: write_count - count,
3135 handle: write_handle,
3136 };
3137 }
3138
3139 if read_complete {
3140 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3141 } else {
3142 set_guest_ready(concurrent_state)?;
3143 ReturnCode::Blocked
3144 }
3145 }
3146
3147 WriteState::HostReady {
3148 produce,
3149 guest_offset,
3150 cancel,
3151 cancel_waker,
3152 } => {
3153 assert!(cancel_waker.is_none());
3154 assert!(!cancel);
3155 assert_eq!(0, guest_offset);
3156
3157 if let TransmitIndex::Future(_) = ty {
3158 transmit.done = true;
3159 }
3160
3161 set_guest_ready(concurrent_state)?;
3162
3163 self.produce(store.0, ty.kind(), transmit_id, produce, 0, false)?
3164 }
3165
3166 WriteState::Open => {
3167 set_guest_ready(concurrent_state)?;
3168 ReturnCode::Blocked
3169 }
3170
3171 WriteState::Dropped => ReturnCode::Dropped(0),
3172 };
3173
3174 if result == ReturnCode::Blocked && !options.async_() {
3175 result = self.wait_for_read(store.0, transmit_handle)?;
3176 }
3177
3178 if result != ReturnCode::Blocked {
3179 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3180 TransmitLocalState::Read {
3181 done: matches!(
3182 (result, ty),
3183 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3184 ),
3185 };
3186 }
3187
3188 log::trace!(
3189 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3190 );
3191
3192 Ok(result)
3193 }
3194
3195 fn produce(
3198 self,
3199 store: &mut dyn VMStore,
3200 kind: TransmitKind,
3201 transmit_id: TableId<TransmitState>,
3202 produce: PollStream,
3203 guest_offset: usize,
3204 cancel: bool,
3205 ) -> Result<ReturnCode> {
3206 let mut future = produce();
3207 self.concurrent_state_mut(store).get_mut(transmit_id)?.write = WriteState::HostReady {
3208 produce,
3209 guest_offset,
3210 cancel,
3211 cancel_waker: None,
3212 };
3213 let poll = self.set_tls(store, || {
3214 future
3215 .as_mut()
3216 .poll(&mut Context::from_waker(&Waker::noop()))
3217 });
3218
3219 Ok(match poll {
3220 Poll::Ready(state) => {
3221 let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3222 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3223 unreachable!();
3224 };
3225 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3226 transmit.read = ReadState::Open;
3227 code
3228 }
3229 Poll::Pending => {
3230 self.pipe_to_guest(store, kind, transmit_id, future);
3231 ReturnCode::Blocked
3232 }
3233 })
3234 }
3235
3236 fn wait_for_write(
3237 self,
3238 store: &mut dyn VMStore,
3239 handle: TableId<TransmitHandle>,
3240 ) -> Result<ReturnCode> {
3241 let waitable = Waitable::Transmit(handle);
3242 self.wait_for_event(store, waitable)?;
3243 let event = waitable.take_event(self.concurrent_state_mut(store))?;
3244 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3245 event
3246 {
3247 waitable.on_delivery(self.id().get_mut(store), event);
3248 Ok(code)
3249 } else {
3250 unreachable!()
3251 }
3252 }
3253
3254 fn cancel_write(
3256 self,
3257 store: &mut dyn VMStore,
3258 transmit_id: TableId<TransmitState>,
3259 async_: bool,
3260 ) -> Result<ReturnCode> {
3261 let state = self.concurrent_state_mut(store);
3262 let transmit = state.get_mut(transmit_id)?;
3263 log::trace!(
3264 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3265 transmit.read,
3266 transmit.write
3267 );
3268
3269 let code = if let Some(event) =
3270 Waitable::Transmit(transmit.write_handle).take_event(state)?
3271 {
3272 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3273 unreachable!();
3274 };
3275 match (code, event) {
3276 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3277 ReturnCode::Cancelled(count)
3278 }
3279 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3280 _ => unreachable!(),
3281 }
3282 } else if let ReadState::HostReady {
3283 cancel,
3284 cancel_waker,
3285 ..
3286 } = &mut state.get_mut(transmit_id)?.read
3287 {
3288 *cancel = true;
3289 if let Some(waker) = cancel_waker.take() {
3290 waker.wake();
3291 }
3292
3293 if async_ {
3294 ReturnCode::Blocked
3295 } else {
3296 let handle = self
3297 .concurrent_state_mut(store)
3298 .get_mut(transmit_id)?
3299 .write_handle;
3300 self.wait_for_write(store, handle)?
3301 }
3302 } else {
3303 ReturnCode::Cancelled(0)
3304 };
3305
3306 let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3307
3308 match &transmit.write {
3309 WriteState::GuestReady { .. } => {
3310 transmit.write = WriteState::Open;
3311 }
3312 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3313 WriteState::Open | WriteState::Dropped => {}
3314 }
3315
3316 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3317
3318 Ok(code)
3319 }
3320
3321 fn wait_for_read(
3322 self,
3323 store: &mut dyn VMStore,
3324 handle: TableId<TransmitHandle>,
3325 ) -> Result<ReturnCode> {
3326 let waitable = Waitable::Transmit(handle);
3327 self.wait_for_event(store, waitable)?;
3328 let event = waitable.take_event(self.concurrent_state_mut(store))?;
3329 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3330 event
3331 {
3332 waitable.on_delivery(self.id().get_mut(store), event);
3333 Ok(code)
3334 } else {
3335 unreachable!()
3336 }
3337 }
3338
3339 fn cancel_read(
3341 self,
3342 store: &mut dyn VMStore,
3343 transmit_id: TableId<TransmitState>,
3344 async_: bool,
3345 ) -> Result<ReturnCode> {
3346 let state = self.concurrent_state_mut(store);
3347 let transmit = state.get_mut(transmit_id)?;
3348 log::trace!(
3349 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3350 transmit.read,
3351 transmit.write
3352 );
3353
3354 let code = if let Some(event) =
3355 Waitable::Transmit(transmit.read_handle).take_event(state)?
3356 {
3357 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3358 unreachable!();
3359 };
3360 match (code, event) {
3361 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3362 ReturnCode::Cancelled(count)
3363 }
3364 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3365 _ => unreachable!(),
3366 }
3367 } else if let WriteState::HostReady {
3368 cancel,
3369 cancel_waker,
3370 ..
3371 } = &mut state.get_mut(transmit_id)?.write
3372 {
3373 *cancel = true;
3374 if let Some(waker) = cancel_waker.take() {
3375 waker.wake();
3376 }
3377
3378 if async_ {
3379 ReturnCode::Blocked
3380 } else {
3381 let handle = self
3382 .concurrent_state_mut(store)
3383 .get_mut(transmit_id)?
3384 .read_handle;
3385 self.wait_for_read(store, handle)?
3386 }
3387 } else {
3388 ReturnCode::Cancelled(0)
3389 };
3390
3391 let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3392
3393 match &transmit.read {
3394 ReadState::GuestReady { .. } => {
3395 transmit.read = ReadState::Open;
3396 }
3397 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3398 todo!("support host read cancellation")
3399 }
3400 ReadState::Open | ReadState::Dropped => {}
3401 }
3402
3403 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3404
3405 Ok(code)
3406 }
3407
3408 fn guest_cancel_write(
3410 self,
3411 store: &mut dyn VMStore,
3412 ty: TransmitIndex,
3413 async_: bool,
3414 writer: u32,
3415 ) -> Result<ReturnCode> {
3416 let (rep, state) =
3417 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3418 let id = TableId::<TransmitHandle>::new(rep);
3419 log::trace!("guest cancel write {id:?} (handle {writer})");
3420 match state {
3421 TransmitLocalState::Write { .. } => {
3422 bail!("stream or future write cancelled when no write is pending")
3423 }
3424 TransmitLocalState::Read { .. } => {
3425 bail!("passed read end to `{{stream|future}}.cancel-write`")
3426 }
3427 TransmitLocalState::Busy => {}
3428 }
3429 let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
3430 let code = self.cancel_write(store, transmit_id, async_)?;
3431 if !matches!(code, ReturnCode::Blocked) {
3432 let state =
3433 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3434 .1;
3435 if let TransmitLocalState::Busy = state {
3436 *state = TransmitLocalState::Write { done: false };
3437 }
3438 }
3439 Ok(code)
3440 }
3441
3442 fn guest_cancel_read(
3444 self,
3445 store: &mut dyn VMStore,
3446 ty: TransmitIndex,
3447 async_: bool,
3448 reader: u32,
3449 ) -> Result<ReturnCode> {
3450 let (rep, state) =
3451 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3452 let id = TableId::<TransmitHandle>::new(rep);
3453 log::trace!("guest cancel read {id:?} (handle {reader})");
3454 match state {
3455 TransmitLocalState::Read { .. } => {
3456 bail!("stream or future read cancelled when no read is pending")
3457 }
3458 TransmitLocalState::Write { .. } => {
3459 bail!("passed write end to `{{stream|future}}.cancel-read`")
3460 }
3461 TransmitLocalState::Busy => {}
3462 }
3463 let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
3464 let code = self.cancel_read(store, transmit_id, async_)?;
3465 if !matches!(code, ReturnCode::Blocked) {
3466 let state =
3467 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3468 .1;
3469 if let TransmitLocalState::Busy = state {
3470 *state = TransmitLocalState::Read { done: false };
3471 }
3472 }
3473 Ok(code)
3474 }
3475
3476 fn guest_drop_readable(
3478 self,
3479 store: &mut dyn VMStore,
3480 ty: TransmitIndex,
3481 reader: u32,
3482 ) -> Result<()> {
3483 let table = self.id().get_mut(store).table_for_transmit(ty);
3484 let (rep, _is_done) = match ty {
3485 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3486 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3487 };
3488 let kind = match ty {
3489 TransmitIndex::Stream(_) => TransmitKind::Stream,
3490 TransmitIndex::Future(_) => TransmitKind::Future,
3491 };
3492 let id = TableId::<TransmitHandle>::new(rep);
3493 log::trace!("guest_drop_readable: drop reader {id:?}");
3494 self.host_drop_reader(store, id, kind)
3495 }
3496
3497 pub(crate) fn error_context_new(
3499 self,
3500 store: &mut StoreOpaque,
3501 caller: RuntimeComponentInstanceIndex,
3502 ty: TypeComponentLocalErrorContextTableIndex,
3503 options: OptionsIndex,
3504 debug_msg_address: u32,
3505 debug_msg_len: u32,
3506 ) -> Result<u32> {
3507 self.id().get(store).check_may_leave(caller)?;
3508 let options = Options::new_index(store, self, options);
3509 let lift_ctx = &mut LiftContext::new(store, &options, self);
3510 let address = usize::try_from(debug_msg_address)?;
3512 let len = usize::try_from(debug_msg_len)?;
3513 lift_ctx
3514 .memory()
3515 .get(address..)
3516 .and_then(|b| b.get(..len))
3517 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3518 let message = WasmStr::new(address, len, lift_ctx)?;
3519
3520 let err_ctx = ErrorContextState {
3522 debug_msg: message
3523 .to_str_from_memory(options.memory(store))?
3524 .to_string(),
3525 };
3526 let state = self.concurrent_state_mut(store);
3527 let table_id = state.push(err_ctx)?;
3528 let global_ref_count_idx =
3529 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3530
3531 let _ = state
3533 .global_error_context_ref_counts
3534 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3535
3536 let local_idx = self
3543 .id()
3544 .get_mut(store)
3545 .table_for_error_context(ty)
3546 .error_context_insert(table_id.rep())?;
3547
3548 Ok(local_idx)
3549 }
3550
3551 pub(super) fn error_context_debug_message<T>(
3553 self,
3554 store: StoreContextMut<T>,
3555 ty: TypeComponentLocalErrorContextTableIndex,
3556 options: OptionsIndex,
3557 err_ctx_handle: u32,
3558 debug_msg_address: u32,
3559 ) -> Result<()> {
3560 let handle_table_id_rep = self
3562 .id()
3563 .get_mut(store.0)
3564 .table_for_error_context(ty)
3565 .error_context_rep(err_ctx_handle)?;
3566
3567 let state = self.concurrent_state_mut(store.0);
3568 let ErrorContextState { debug_msg } =
3570 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3571 let debug_msg = debug_msg.clone();
3572
3573 let options = Options::new_index(store.0, self, options);
3574 let types = self.id().get(store.0).component().types().clone();
3575 let lower_cx = &mut LowerContext::new(store, &options, &types, self);
3576 let debug_msg_address = usize::try_from(debug_msg_address)?;
3577 let offset = lower_cx
3579 .as_slice_mut()
3580 .get(debug_msg_address..)
3581 .and_then(|b| b.get(..debug_msg.bytes().len()))
3582 .map(|_| debug_msg_address)
3583 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3584 debug_msg
3585 .as_str()
3586 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3587
3588 Ok(())
3589 }
3590
3591 pub(crate) fn future_cancel_read(
3593 self,
3594 store: &mut dyn VMStore,
3595 caller: RuntimeComponentInstanceIndex,
3596 ty: TypeFutureTableIndex,
3597 async_: bool,
3598 reader: u32,
3599 ) -> Result<u32> {
3600 self.id().get(store).check_may_leave(caller)?;
3601 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3602 .map(|v| v.encode())
3603 }
3604
3605 pub(crate) fn future_cancel_write(
3607 self,
3608 store: &mut dyn VMStore,
3609 caller: RuntimeComponentInstanceIndex,
3610 ty: TypeFutureTableIndex,
3611 async_: bool,
3612 writer: u32,
3613 ) -> Result<u32> {
3614 self.id().get(store).check_may_leave(caller)?;
3615 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3616 .map(|v| v.encode())
3617 }
3618
3619 pub(crate) fn stream_cancel_read(
3621 self,
3622 store: &mut dyn VMStore,
3623 caller: RuntimeComponentInstanceIndex,
3624 ty: TypeStreamTableIndex,
3625 async_: bool,
3626 reader: u32,
3627 ) -> Result<u32> {
3628 self.id().get(store).check_may_leave(caller)?;
3629 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3630 .map(|v| v.encode())
3631 }
3632
3633 pub(crate) fn stream_cancel_write(
3635 self,
3636 store: &mut dyn VMStore,
3637 caller: RuntimeComponentInstanceIndex,
3638 ty: TypeStreamTableIndex,
3639 async_: bool,
3640 writer: u32,
3641 ) -> Result<u32> {
3642 self.id().get(store).check_may_leave(caller)?;
3643 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3644 .map(|v| v.encode())
3645 }
3646
3647 pub(crate) fn future_drop_readable(
3649 self,
3650 store: &mut dyn VMStore,
3651 caller: RuntimeComponentInstanceIndex,
3652 ty: TypeFutureTableIndex,
3653 reader: u32,
3654 ) -> Result<()> {
3655 self.id().get(store).check_may_leave(caller)?;
3656 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3657 }
3658
3659 pub(crate) fn stream_drop_readable(
3661 self,
3662 store: &mut dyn VMStore,
3663 caller: RuntimeComponentInstanceIndex,
3664 ty: TypeStreamTableIndex,
3665 reader: u32,
3666 ) -> Result<()> {
3667 self.id().get(store).check_may_leave(caller)?;
3668 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3669 }
3670}
3671
3672impl ComponentInstance {
3673 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
3674 let (tables, types) = self.guest_tables();
3675 let runtime_instance = match ty {
3676 TransmitIndex::Stream(ty) => types[ty].instance,
3677 TransmitIndex::Future(ty) => types[ty].instance,
3678 };
3679 &mut tables[runtime_instance]
3680 }
3681
3682 fn table_for_error_context(
3683 self: Pin<&mut Self>,
3684 ty: TypeComponentLocalErrorContextTableIndex,
3685 ) -> &mut HandleTable {
3686 let (tables, types) = self.guest_tables();
3687 let runtime_instance = types[ty].instance;
3688 &mut tables[runtime_instance]
3689 }
3690
3691 fn get_mut_by_index(
3692 self: Pin<&mut Self>,
3693 ty: TransmitIndex,
3694 index: u32,
3695 ) -> Result<(u32, &mut TransmitLocalState)> {
3696 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
3697 }
3698
3699 fn guest_new(mut self: Pin<&mut Self>, ty: TransmitIndex) -> Result<ResourcePair> {
3703 let (write, read) = self.as_mut().concurrent_state_mut().new_transmit()?;
3704
3705 let table = self.as_mut().table_for_transmit(ty);
3706 let (read_handle, write_handle) = match ty {
3707 TransmitIndex::Future(ty) => (
3708 table.future_insert_read(ty, read.rep())?,
3709 table.future_insert_write(ty, write.rep())?,
3710 ),
3711 TransmitIndex::Stream(ty) => (
3712 table.stream_insert_read(ty, read.rep())?,
3713 table.stream_insert_write(ty, write.rep())?,
3714 ),
3715 };
3716
3717 let state = self.as_mut().concurrent_state_mut();
3718 state.get_mut(read)?.common.handle = Some(read_handle);
3719 state.get_mut(write)?.common.handle = Some(write_handle);
3720
3721 Ok(ResourcePair {
3722 write: write_handle,
3723 read: read_handle,
3724 })
3725 }
3726
3727 pub(crate) fn error_context_drop(
3729 mut self: Pin<&mut Self>,
3730 caller: RuntimeComponentInstanceIndex,
3731 ty: TypeComponentLocalErrorContextTableIndex,
3732 error_context: u32,
3733 ) -> Result<()> {
3734 self.check_may_leave(caller)?;
3735
3736 let local_handle_table = self.as_mut().table_for_error_context(ty);
3737
3738 let rep = local_handle_table.error_context_drop(error_context)?;
3739
3740 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3741
3742 let state = self.concurrent_state_mut();
3743 let GlobalErrorContextRefCount(global_ref_count) = state
3744 .global_error_context_ref_counts
3745 .get_mut(&global_ref_count_idx)
3746 .expect("retrieve concurrent state for error context during drop");
3747
3748 assert!(*global_ref_count >= 1);
3750 *global_ref_count -= 1;
3751 if *global_ref_count == 0 {
3752 state
3753 .global_error_context_ref_counts
3754 .remove(&global_ref_count_idx);
3755
3756 state
3757 .delete(TableId::<ErrorContextState>::new(rep))
3758 .context("deleting component-global error context data")?;
3759 }
3760
3761 Ok(())
3762 }
3763
3764 fn guest_transfer(
3767 mut self: Pin<&mut Self>,
3768 src_idx: u32,
3769 src: TransmitIndex,
3770 dst: TransmitIndex,
3771 ) -> Result<u32> {
3772 let src_table = self.as_mut().table_for_transmit(src);
3773 let (rep, is_done) = match src {
3774 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
3775 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
3776 };
3777 if is_done {
3778 bail!("cannot lift after being notified that the writable end dropped");
3779 }
3780 let dst_table = self.as_mut().table_for_transmit(dst);
3781 let handle = match dst {
3782 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
3783 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
3784 }?;
3785 self.concurrent_state_mut()
3786 .get_mut(TableId::<TransmitHandle>::new(rep))?
3787 .common
3788 .handle = Some(handle);
3789 Ok(handle)
3790 }
3791
3792 pub(crate) fn future_new(
3794 self: Pin<&mut Self>,
3795 caller: RuntimeComponentInstanceIndex,
3796 ty: TypeFutureTableIndex,
3797 ) -> Result<ResourcePair> {
3798 self.check_may_leave(caller)?;
3799 self.guest_new(TransmitIndex::Future(ty))
3800 }
3801
3802 pub(crate) fn stream_new(
3804 self: Pin<&mut Self>,
3805 caller: RuntimeComponentInstanceIndex,
3806 ty: TypeStreamTableIndex,
3807 ) -> Result<ResourcePair> {
3808 self.check_may_leave(caller)?;
3809 self.guest_new(TransmitIndex::Stream(ty))
3810 }
3811
3812 pub(crate) fn future_transfer(
3815 self: Pin<&mut Self>,
3816 src_idx: u32,
3817 src: TypeFutureTableIndex,
3818 dst: TypeFutureTableIndex,
3819 ) -> Result<u32> {
3820 self.guest_transfer(
3821 src_idx,
3822 TransmitIndex::Future(src),
3823 TransmitIndex::Future(dst),
3824 )
3825 }
3826
3827 pub(crate) fn stream_transfer(
3830 self: Pin<&mut Self>,
3831 src_idx: u32,
3832 src: TypeStreamTableIndex,
3833 dst: TypeStreamTableIndex,
3834 ) -> Result<u32> {
3835 self.guest_transfer(
3836 src_idx,
3837 TransmitIndex::Stream(src),
3838 TransmitIndex::Stream(dst),
3839 )
3840 }
3841
3842 pub(crate) fn error_context_transfer(
3844 mut self: Pin<&mut Self>,
3845 src_idx: u32,
3846 src: TypeComponentLocalErrorContextTableIndex,
3847 dst: TypeComponentLocalErrorContextTableIndex,
3848 ) -> Result<u32> {
3849 let rep = self
3850 .as_mut()
3851 .table_for_error_context(src)
3852 .error_context_rep(src_idx)?;
3853 let dst_idx = self
3854 .as_mut()
3855 .table_for_error_context(dst)
3856 .error_context_insert(rep)?;
3857
3858 let global_ref_count = self
3862 .concurrent_state_mut()
3863 .global_error_context_ref_counts
3864 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
3865 .context("global ref count present for existing (sub)component error context")?;
3866 global_ref_count.0 += 1;
3867
3868 Ok(dst_idx)
3869 }
3870}
3871
3872impl ConcurrentState {
3873 fn send_write_result(
3874 &mut self,
3875 ty: TransmitIndex,
3876 id: TableId<TransmitState>,
3877 handle: u32,
3878 code: ReturnCode,
3879 ) -> Result<()> {
3880 let write_handle = self.get_mut(id)?.write_handle.rep();
3881 self.set_event(
3882 write_handle,
3883 match ty {
3884 TransmitIndex::Future(ty) => Event::FutureWrite {
3885 code,
3886 pending: Some((ty, handle)),
3887 },
3888 TransmitIndex::Stream(ty) => Event::StreamWrite {
3889 code,
3890 pending: Some((ty, handle)),
3891 },
3892 },
3893 )
3894 }
3895
3896 fn send_read_result(
3897 &mut self,
3898 ty: TransmitIndex,
3899 id: TableId<TransmitState>,
3900 handle: u32,
3901 code: ReturnCode,
3902 ) -> Result<()> {
3903 let read_handle = self.get_mut(id)?.read_handle.rep();
3904 self.set_event(
3905 read_handle,
3906 match ty {
3907 TransmitIndex::Future(ty) => Event::FutureRead {
3908 code,
3909 pending: Some((ty, handle)),
3910 },
3911 TransmitIndex::Stream(ty) => Event::StreamRead {
3912 code,
3913 pending: Some((ty, handle)),
3914 },
3915 },
3916 )
3917 }
3918
3919 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
3920 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
3921 }
3922
3923 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3924 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
3925 }
3926
3927 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3938 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
3939
3940 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
3941 let (ReturnCode::Completed(count)
3942 | ReturnCode::Dropped(count)
3943 | ReturnCode::Cancelled(count)) = old
3944 else {
3945 unreachable!()
3946 };
3947
3948 match new {
3949 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
3950 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
3951 _ => unreachable!(),
3952 }
3953 }
3954
3955 let event = match (waitable.take_event(self)?, event) {
3956 (None, _) => event,
3957 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
3958 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
3959 (
3960 Some(Event::StreamWrite {
3961 code: old_code,
3962 pending: old_pending,
3963 }),
3964 Event::StreamWrite { code, pending },
3965 ) => Event::StreamWrite {
3966 code: update_code(old_code, code),
3967 pending: old_pending.or(pending),
3968 },
3969 (
3970 Some(Event::StreamRead {
3971 code: old_code,
3972 pending: old_pending,
3973 }),
3974 Event::StreamRead { code, pending },
3975 ) => Event::StreamRead {
3976 code: update_code(old_code, code),
3977 pending: old_pending.or(pending),
3978 },
3979 _ => unreachable!(),
3980 };
3981
3982 waitable.set_event(self, Some(event))
3983 }
3984
3985 fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
3988 let state_id = self.push(TransmitState::default())?;
3989
3990 let write = self.push(TransmitHandle::new(state_id))?;
3991 let read = self.push(TransmitHandle::new(state_id))?;
3992
3993 let state = self.get_mut(state_id)?;
3994 state.write_handle = write;
3995 state.read_handle = read;
3996
3997 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
3998
3999 Ok((write, read))
4000 }
4001
4002 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4004 let state = self.delete(state_id)?;
4005 self.delete(state.write_handle)?;
4006 self.delete(state.read_handle)?;
4007
4008 log::trace!(
4009 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4010 state.write_handle,
4011 state.read_handle,
4012 );
4013
4014 Ok(())
4015 }
4016}
4017
4018pub(crate) struct ResourcePair {
4019 pub(crate) write: u32,
4020 pub(crate) read: u32,
4021}