1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext, Options};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Error, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use core::fmt;
15use core::future;
16use core::iter;
17use core::marker::PhantomData;
18use core::mem::{self, MaybeUninit};
19use core::pin::Pin;
20use core::task::{Context, Poll, Waker, ready};
21use futures::channel::oneshot;
22use futures::{FutureExt as _, stream};
23use std::boxed::Box;
24use std::io::Cursor;
25use std::string::{String, ToString};
26use std::sync::{Arc, Mutex};
27use std::vec::Vec;
28use wasmtime_environ::component::{
29 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
30 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
31 TypeFutureTableIndex, TypeStreamTableIndex,
32};
33
34pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
35
36mod buffers;
37
38#[derive(Copy, Clone, Debug)]
41pub enum TransmitKind {
42 Stream,
43 Future,
44}
45
46#[derive(Copy, Clone, Debug, PartialEq)]
48pub enum ReturnCode {
49 Blocked,
50 Completed(u32),
51 Dropped(u32),
52 Cancelled(u32),
53}
54
55impl ReturnCode {
56 pub fn encode(&self) -> u32 {
61 const BLOCKED: u32 = 0xffff_ffff;
62 const COMPLETED: u32 = 0x0;
63 const DROPPED: u32 = 0x1;
64 const CANCELLED: u32 = 0x2;
65 match self {
66 ReturnCode::Blocked => BLOCKED,
67 ReturnCode::Completed(n) => {
68 debug_assert!(*n < (1 << 28));
69 (n << 4) | COMPLETED
70 }
71 ReturnCode::Dropped(n) => {
72 debug_assert!(*n < (1 << 28));
73 (n << 4) | DROPPED
74 }
75 ReturnCode::Cancelled(n) => {
76 debug_assert!(*n < (1 << 28));
77 (n << 4) | CANCELLED
78 }
79 }
80 }
81
82 fn completed(kind: TransmitKind, count: u32) -> Self {
85 Self::Completed(if let TransmitKind::Future = kind {
86 0
87 } else {
88 count
89 })
90 }
91}
92
93#[derive(Copy, Clone, Debug)]
98pub enum TransmitIndex {
99 Stream(TypeStreamTableIndex),
100 Future(TypeFutureTableIndex),
101}
102
103impl TransmitIndex {
104 pub fn kind(&self) -> TransmitKind {
105 match self {
106 TransmitIndex::Stream(_) => TransmitKind::Stream,
107 TransmitIndex::Future(_) => TransmitKind::Future,
108 }
109 }
110}
111
112fn payload(ty: TransmitIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
115 match ty {
116 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
117 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
118 }
119}
120
121fn get_mut_by_index_from(
124 handle_table: &mut HandleTable,
125 ty: TransmitIndex,
126 index: u32,
127) -> Result<(u32, &mut TransmitLocalState)> {
128 match ty {
129 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
130 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
131 }
132}
133
134fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
135 mut store: StoreContextMut<U>,
136 instance: Instance,
137 options: &Options,
138 ty: TransmitIndex,
139 address: usize,
140 count: usize,
141 buffer: &mut B,
142) -> Result<()> {
143 let types = instance.id().get(store.0).component().types().clone();
144 let count = buffer.remaining().len().min(count);
145
146 let lower = &mut if T::MAY_REQUIRE_REALLOC {
147 LowerContext::new
148 } else {
149 LowerContext::new_without_realloc
150 }(store.as_context_mut(), options, &types, instance);
151
152 if address % usize::try_from(T::ALIGN32)? != 0 {
153 bail!("read pointer not aligned");
154 }
155 lower
156 .as_slice_mut()
157 .get_mut(address..)
158 .and_then(|b| b.get_mut(..T::SIZE32 * count))
159 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
160
161 if let Some(ty) = payload(ty, &types) {
162 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
163 }
164
165 buffer.skip(count);
166
167 Ok(())
168}
169
170fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
171 lift: &mut LiftContext<'_>,
172 ty: Option<InterfaceType>,
173 buffer: &mut B,
174 address: usize,
175 count: usize,
176) -> Result<()> {
177 let count = count.min(buffer.remaining_capacity());
178 if T::IS_RUST_UNIT_TYPE {
179 buffer.extend(
183 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
184 )
185 } else {
186 let ty = ty.unwrap();
187 if address % usize::try_from(T::ALIGN32)? != 0 {
188 bail!("write pointer not aligned");
189 }
190 lift.memory()
191 .get(address..)
192 .and_then(|b| b.get(..T::SIZE32 * count))
193 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
194
195 let list = &WasmList::new(address, count, lift, ty)?;
196 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
197 }
198 Ok(())
199}
200
201#[derive(Debug, PartialEq, Eq, PartialOrd)]
203pub(super) struct ErrorContextState {
204 pub(crate) debug_msg: String,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub(super) struct FlatAbi {
212 pub(super) size: u32,
213 pub(super) align: u32,
214}
215
216pub struct Destination<'a, T, B> {
218 instance: Instance,
219 id: TableId<TransmitState>,
220 buffer: &'a mut B,
221 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
222 _phantom: PhantomData<fn() -> T>,
223}
224
225impl<'a, T, B> Destination<'a, T, B> {
226 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
228 Destination {
229 instance: self.instance,
230 id: self.id,
231 buffer: &mut *self.buffer,
232 host_buffer: self.host_buffer.as_deref_mut(),
233 _phantom: PhantomData,
234 }
235 }
236
237 pub fn take_buffer(&mut self) -> B
243 where
244 B: Default,
245 {
246 mem::take(self.buffer)
247 }
248
249 pub fn set_buffer(&mut self, buffer: B) {
259 *self.buffer = buffer;
260 }
261
262 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
275 let transmit = self
276 .instance
277 .concurrent_state_mut(store.as_context_mut().0)
278 .get_mut(self.id)
279 .unwrap();
280
281 if let &ReadState::GuestReady { count, .. } = &transmit.read {
282 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
283 unreachable!()
284 };
285
286 Some(count - guest_offset)
287 } else {
288 None
289 }
290 }
291}
292
293impl<'a, B> Destination<'a, u8, B> {
294 pub fn as_direct<D>(
305 mut self,
306 store: StoreContextMut<'a, D>,
307 capacity: usize,
308 ) -> DirectDestination<'a, D> {
309 if let Some(buffer) = self.host_buffer.as_deref_mut() {
310 buffer.set_position(0);
311 if buffer.get_mut().is_empty() {
312 buffer.get_mut().resize(capacity, 0);
313 }
314 }
315
316 DirectDestination {
317 instance: self.instance,
318 id: self.id,
319 host_buffer: self.host_buffer,
320 store,
321 }
322 }
323}
324
325pub struct DirectDestination<'a, D: 'static> {
328 instance: Instance,
329 id: TableId<TransmitState>,
330 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
331 store: StoreContextMut<'a, D>,
332}
333
334impl<D: 'static> DirectDestination<'_, D> {
335 pub fn remaining(&mut self) -> &mut [u8] {
337 if let Some(buffer) = self.host_buffer.as_deref_mut() {
338 buffer.get_mut()
339 } else {
340 let transmit = self
341 .instance
342 .concurrent_state_mut(self.store.as_context_mut().0)
343 .get_mut(self.id)
344 .unwrap();
345
346 let &ReadState::GuestReady {
347 address,
348 count,
349 options,
350 ..
351 } = &transmit.read
352 else {
353 unreachable!();
354 };
355
356 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
357 unreachable!()
358 };
359
360 options
361 .memory_mut(self.store.0)
362 .get_mut((address + guest_offset)..)
363 .and_then(|b| b.get_mut(..(count - guest_offset)))
364 .unwrap()
365 }
366 }
367
368 pub fn mark_written(&mut self, count: usize) {
373 if let Some(buffer) = self.host_buffer.as_deref_mut() {
374 buffer.set_position(
375 buffer
376 .position()
377 .checked_add(u64::try_from(count).unwrap())
378 .unwrap(),
379 );
380 } else {
381 let transmit = self
382 .instance
383 .concurrent_state_mut(self.store.as_context_mut().0)
384 .get_mut(self.id)
385 .unwrap();
386
387 let ReadState::GuestReady {
388 count: read_count, ..
389 } = &transmit.read
390 else {
391 unreachable!();
392 };
393
394 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
395 unreachable!()
396 };
397
398 if *guest_offset + count > *read_count {
399 panic!(
400 "write count ({count}) must be less than or equal to read count ({read_count})"
401 )
402 } else {
403 *guest_offset += count;
404 }
405 }
406 }
407}
408
409#[derive(Copy, Clone, Debug)]
411pub enum StreamResult {
412 Completed,
415 Cancelled,
420 Dropped,
423}
424
425pub trait StreamProducer<D>: Send + 'static {
427 type Item;
429
430 type Buffer: WriteBuffer<Self::Item> + Default;
432
433 fn poll_produce<'a>(
504 self: Pin<&mut Self>,
505 cx: &mut Context<'_>,
506 store: StoreContextMut<'a, D>,
507 destination: Destination<'a, Self::Item, Self::Buffer>,
508 finish: bool,
509 ) -> Poll<Result<StreamResult>>;
510}
511
512impl<T, D> StreamProducer<D> for iter::Empty<T>
513where
514 T: Send + Sync + 'static,
515{
516 type Item = T;
517 type Buffer = Option<Self::Item>;
518
519 fn poll_produce<'a>(
520 self: Pin<&mut Self>,
521 _: &mut Context<'_>,
522 _: StoreContextMut<'a, D>,
523 _: Destination<'a, Self::Item, Self::Buffer>,
524 _: bool,
525 ) -> Poll<Result<StreamResult>> {
526 Poll::Ready(Ok(StreamResult::Dropped))
527 }
528}
529
530impl<T, D> StreamProducer<D> for stream::Empty<T>
531where
532 T: Send + Sync + 'static,
533{
534 type Item = T;
535 type Buffer = Option<Self::Item>;
536
537 fn poll_produce<'a>(
538 self: Pin<&mut Self>,
539 _: &mut Context<'_>,
540 _: StoreContextMut<'a, D>,
541 _: Destination<'a, Self::Item, Self::Buffer>,
542 _: bool,
543 ) -> Poll<Result<StreamResult>> {
544 Poll::Ready(Ok(StreamResult::Dropped))
545 }
546}
547
548impl<T, D> StreamProducer<D> for Vec<T>
549where
550 T: Unpin + Send + Sync + 'static,
551{
552 type Item = T;
553 type Buffer = VecBuffer<T>;
554
555 fn poll_produce<'a>(
556 self: Pin<&mut Self>,
557 _: &mut Context<'_>,
558 _: StoreContextMut<'a, D>,
559 mut dst: Destination<'a, Self::Item, Self::Buffer>,
560 _: bool,
561 ) -> Poll<Result<StreamResult>> {
562 dst.set_buffer(mem::take(self.get_mut()).into());
563 Poll::Ready(Ok(StreamResult::Dropped))
564 }
565}
566
567impl<T, D> StreamProducer<D> for Box<[T]>
568where
569 T: Unpin + Send + Sync + 'static,
570{
571 type Item = T;
572 type Buffer = VecBuffer<T>;
573
574 fn poll_produce<'a>(
575 self: Pin<&mut Self>,
576 _: &mut Context<'_>,
577 _: StoreContextMut<'a, D>,
578 mut dst: Destination<'a, Self::Item, Self::Buffer>,
579 _: bool,
580 ) -> Poll<Result<StreamResult>> {
581 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
582 Poll::Ready(Ok(StreamResult::Dropped))
583 }
584}
585
586#[cfg(feature = "component-model-async-bytes")]
587impl<D> StreamProducer<D> for bytes::Bytes {
588 type Item = u8;
589 type Buffer = Cursor<Self>;
590
591 fn poll_produce<'a>(
592 mut self: Pin<&mut Self>,
593 _: &mut Context<'_>,
594 mut store: StoreContextMut<'a, D>,
595 mut dst: Destination<'a, Self::Item, Self::Buffer>,
596 _: bool,
597 ) -> Poll<Result<StreamResult>> {
598 let cap = dst.remaining(&mut store);
599 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
600 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
602 return Poll::Ready(Ok(StreamResult::Dropped));
603 };
604 let cap = cap.into();
605 dst.set_buffer(Cursor::new(self.split_off(cap)));
607 let mut dst = dst.as_direct(store, cap);
608 dst.remaining().copy_from_slice(&self);
609 dst.mark_written(cap);
610 Poll::Ready(Ok(StreamResult::Dropped))
611 }
612}
613
614#[cfg(feature = "component-model-async-bytes")]
615impl<D> StreamProducer<D> for bytes::BytesMut {
616 type Item = u8;
617 type Buffer = Cursor<Self>;
618
619 fn poll_produce<'a>(
620 mut self: Pin<&mut Self>,
621 _: &mut Context<'_>,
622 mut store: StoreContextMut<'a, D>,
623 mut dst: Destination<'a, Self::Item, Self::Buffer>,
624 _: bool,
625 ) -> Poll<Result<StreamResult>> {
626 let cap = dst.remaining(&mut store);
627 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
628 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
630 return Poll::Ready(Ok(StreamResult::Dropped));
631 };
632 let cap = cap.into();
633 dst.set_buffer(Cursor::new(self.split_off(cap)));
635 let mut dst = dst.as_direct(store, cap);
636 dst.remaining().copy_from_slice(&self);
637 dst.mark_written(cap);
638 Poll::Ready(Ok(StreamResult::Dropped))
639 }
640}
641
642pub struct Source<'a, T> {
644 instance: Instance,
645 id: TableId<TransmitState>,
646 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
647}
648
649impl<'a, T> Source<'a, T> {
650 pub fn reborrow(&mut self) -> Source<'_, T> {
652 Source {
653 instance: self.instance,
654 id: self.id,
655 host_buffer: self.host_buffer.as_deref_mut(),
656 }
657 }
658
659 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
661 where
662 T: func::Lift + 'static,
663 B: ReadBuffer<T>,
664 {
665 if let Some(input) = &mut self.host_buffer {
666 let count = input.remaining().len().min(buffer.remaining_capacity());
667 buffer.move_from(*input, count);
668 } else {
669 let store = store.as_context_mut();
670 let transmit = self
671 .instance
672 .concurrent_state_mut(store.0)
673 .get_mut(self.id)?;
674
675 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
676 unreachable!();
677 };
678
679 let &WriteState::GuestReady {
680 ty,
681 address,
682 count,
683 options,
684 ..
685 } = &transmit.write
686 else {
687 unreachable!()
688 };
689
690 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self.instance);
691 let ty = payload(ty, cx.types);
692 let old_remaining = buffer.remaining_capacity();
693 lift::<T, B, S::Data>(
694 cx,
695 ty,
696 buffer,
697 address + (T::SIZE32 * guest_offset),
698 count - guest_offset,
699 )?;
700
701 let transmit = self
702 .instance
703 .concurrent_state_mut(store.0)
704 .get_mut(self.id)?;
705
706 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
707 unreachable!();
708 };
709
710 *guest_offset += old_remaining - buffer.remaining_capacity();
711 }
712
713 Ok(())
714 }
715
716 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
719 where
720 T: 'static,
721 {
722 let transmit = self
723 .instance
724 .concurrent_state_mut(store.as_context_mut().0)
725 .get_mut(self.id)
726 .unwrap();
727
728 if let &WriteState::GuestReady { count, .. } = &transmit.write {
729 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
730 unreachable!()
731 };
732
733 count - guest_offset
734 } else if let Some(host_buffer) = &self.host_buffer {
735 host_buffer.remaining().len()
736 } else {
737 unreachable!()
738 }
739 }
740}
741
742impl<'a> Source<'a, u8> {
743 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
745 DirectSource {
746 instance: self.instance,
747 id: self.id,
748 host_buffer: self.host_buffer,
749 store,
750 }
751 }
752}
753
754pub struct DirectSource<'a, D: 'static> {
757 instance: Instance,
758 id: TableId<TransmitState>,
759 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
760 store: StoreContextMut<'a, D>,
761}
762
763impl<D: 'static> DirectSource<'_, D> {
764 pub fn remaining(&mut self) -> &[u8] {
766 if let Some(buffer) = self.host_buffer.as_deref_mut() {
767 buffer.remaining()
768 } else {
769 let transmit = self
770 .instance
771 .concurrent_state_mut(self.store.as_context_mut().0)
772 .get_mut(self.id)
773 .unwrap();
774
775 let &WriteState::GuestReady {
776 address,
777 count,
778 options,
779 ..
780 } = &transmit.write
781 else {
782 unreachable!()
783 };
784
785 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
786 unreachable!()
787 };
788
789 options
790 .memory(self.store.0)
791 .get((address + guest_offset)..)
792 .and_then(|b| b.get(..(count - guest_offset)))
793 .unwrap()
794 }
795 }
796
797 pub fn mark_read(&mut self, count: usize) {
802 if let Some(buffer) = self.host_buffer.as_deref_mut() {
803 buffer.skip(count);
804 } else {
805 let transmit = self
806 .instance
807 .concurrent_state_mut(self.store.as_context_mut().0)
808 .get_mut(self.id)
809 .unwrap();
810
811 let WriteState::GuestReady {
812 count: write_count, ..
813 } = &transmit.write
814 else {
815 unreachable!()
816 };
817
818 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
819 unreachable!()
820 };
821
822 if *guest_offset + count > *write_count {
823 panic!(
824 "read count ({count}) must be less than or equal to write count ({write_count})"
825 )
826 } else {
827 *guest_offset += count;
828 }
829 }
830 }
831}
832
833pub trait StreamConsumer<D>: Send + 'static {
835 type Item;
837
838 fn poll_consume(
921 self: Pin<&mut Self>,
922 cx: &mut Context<'_>,
923 store: StoreContextMut<D>,
924 source: Source<'_, Self::Item>,
925 finish: bool,
926 ) -> Poll<Result<StreamResult>>;
927}
928
929pub trait FutureProducer<D>: Send + 'static {
931 type Item;
933
934 fn poll_produce(
944 self: Pin<&mut Self>,
945 cx: &mut Context<'_>,
946 store: StoreContextMut<D>,
947 finish: bool,
948 ) -> Poll<Result<Option<Self::Item>>>;
949}
950
951impl<T, E, D, Fut> FutureProducer<D> for Fut
952where
953 E: Into<Error>,
954 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
955{
956 type Item = T;
957
958 fn poll_produce<'a>(
959 self: Pin<&mut Self>,
960 cx: &mut Context<'_>,
961 _: StoreContextMut<'a, D>,
962 finish: bool,
963 ) -> Poll<Result<Option<T>>> {
964 match self.poll(cx) {
965 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
966 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
967 Poll::Pending if finish => Poll::Ready(Ok(None)),
968 Poll::Pending => Poll::Pending,
969 }
970 }
971}
972
973pub trait FutureConsumer<D>: Send + 'static {
975 type Item;
977
978 fn poll_consume(
990 self: Pin<&mut Self>,
991 cx: &mut Context<'_>,
992 store: StoreContextMut<D>,
993 source: Source<'_, Self::Item>,
994 finish: bool,
995 ) -> Poll<Result<()>>;
996}
997
998pub struct FutureReader<T> {
1005 instance: Instance,
1006 id: TableId<TransmitHandle>,
1007 _phantom: PhantomData<T>,
1008}
1009
1010impl<T> FutureReader<T> {
1011 pub fn new<S: AsContextMut>(
1013 instance: Instance,
1014 mut store: S,
1015 producer: impl FutureProducer<S::Data, Item = T>,
1016 ) -> Self
1017 where
1018 T: func::Lower + func::Lift + Send + Sync + 'static,
1019 {
1020 struct Producer<P>(P);
1021
1022 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1023 for Producer<P>
1024 {
1025 type Item = P::Item;
1026 type Buffer = Option<P::Item>;
1027
1028 fn poll_produce<'a>(
1029 self: Pin<&mut Self>,
1030 cx: &mut Context<'_>,
1031 store: StoreContextMut<D>,
1032 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1033 finish: bool,
1034 ) -> Poll<Result<StreamResult>> {
1035 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1038
1039 Poll::Ready(Ok(
1040 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1041 destination.set_buffer(Some(value));
1042
1043 StreamResult::Completed
1050 } else {
1051 StreamResult::Cancelled
1052 },
1053 ))
1054 }
1055 }
1056
1057 Self::new_(
1058 instance.new_transmit(
1059 store.as_context_mut(),
1060 TransmitKind::Future,
1061 Producer(producer),
1062 ),
1063 instance,
1064 )
1065 }
1066
1067 fn new_(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1068 Self {
1069 instance,
1070 id,
1071 _phantom: PhantomData,
1072 }
1073 }
1074
1075 pub fn pipe<S: AsContextMut>(
1077 self,
1078 store: S,
1079 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1080 ) where
1081 T: func::Lift + 'static,
1082 {
1083 struct Consumer<C>(C);
1084
1085 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1086 for Consumer<C>
1087 {
1088 type Item = T;
1089
1090 fn poll_consume(
1091 self: Pin<&mut Self>,
1092 cx: &mut Context<'_>,
1093 mut store: StoreContextMut<D>,
1094 mut source: Source<Self::Item>,
1095 finish: bool,
1096 ) -> Poll<Result<StreamResult>> {
1097 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1100
1101 ready!(consumer.poll_consume(
1102 cx,
1103 store.as_context_mut(),
1104 source.reborrow(),
1105 finish
1106 ))?;
1107
1108 Poll::Ready(Ok(if source.remaining(store) == 0 {
1109 StreamResult::Completed
1115 } else {
1116 StreamResult::Cancelled
1117 }))
1118 }
1119 }
1120
1121 self.instance
1122 .set_consumer(store, self.id, TransmitKind::Future, Consumer(consumer));
1123 }
1124
1125 pub fn into_val(self) -> Val {
1128 Val::Future(FutureAny(self.id.rep()))
1129 }
1130
1131 pub fn from_val(
1133 mut store: impl AsContextMut<Data: Send>,
1134 instance: Instance,
1135 value: &Val,
1136 ) -> Result<Self> {
1137 let Val::Future(FutureAny(rep)) = value else {
1138 bail!("expected `future`; got `{}`", value.desc());
1139 };
1140 let store = store.as_context_mut();
1141 let id = TableId::<TransmitHandle>::new(*rep);
1142 instance.concurrent_state_mut(store.0).get_mut(id)?; Ok(Self::new_(id, instance))
1144 }
1145
1146 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1148 match ty {
1149 InterfaceType::Future(src) => {
1150 let handle_table = cx
1151 .instance_mut()
1152 .table_for_transmit(TransmitIndex::Future(src));
1153 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1154 if is_done {
1155 bail!("cannot lift future after being notified that the writable end dropped");
1156 }
1157 let id = TableId::<TransmitHandle>::new(rep);
1158 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1159 let future = concurrent_state.get_mut(id)?;
1160 future.common.handle = None;
1161 let state = future.state;
1162
1163 if concurrent_state.get_mut(state)?.done {
1164 bail!("cannot lift future after previous read succeeded");
1165 }
1166
1167 Ok(Self::new_(id, cx.instance_handle()))
1168 }
1169 _ => func::bad_type_info(),
1170 }
1171 }
1172
1173 pub fn close(&mut self, mut store: impl AsContextMut) {
1181 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1183 self.instance
1184 .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Future)
1185 .unwrap();
1186 }
1187
1188 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1190 accessor.as_accessor().with(|access| self.close(access))
1191 }
1192
1193 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1199 where
1200 A: AsAccessor,
1201 {
1202 GuardedFutureReader::new(accessor, self)
1203 }
1204}
1205
1206impl<T> fmt::Debug for FutureReader<T> {
1207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1208 f.debug_struct("FutureReader")
1209 .field("id", &self.id)
1210 .field("instance", &self.instance)
1211 .finish()
1212 }
1213}
1214
1215pub(crate) fn lower_future_to_index<U>(
1217 rep: u32,
1218 cx: &mut LowerContext<'_, U>,
1219 ty: InterfaceType,
1220) -> Result<u32> {
1221 match ty {
1222 InterfaceType::Future(dst) => {
1223 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1224 let id = TableId::<TransmitHandle>::new(rep);
1225 let state = concurrent_state.get_mut(id)?.state;
1226 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1227
1228 let handle = cx
1229 .instance_mut()
1230 .table_for_transmit(TransmitIndex::Future(dst))
1231 .future_insert_read(dst, rep)?;
1232
1233 cx.instance_mut()
1234 .concurrent_state_mut()
1235 .get_mut(id)?
1236 .common
1237 .handle = Some(handle);
1238
1239 Ok(handle)
1240 }
1241 _ => func::bad_type_info(),
1242 }
1243}
1244
1245unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1248 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1249
1250 type Lower = <u32 as func::ComponentType>::Lower;
1251
1252 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1253 match ty {
1254 InterfaceType::Future(_) => Ok(()),
1255 other => bail!("expected `future`, found `{}`", func::desc(other)),
1256 }
1257 }
1258}
1259
1260unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1262 fn linear_lower_to_flat<U>(
1263 &self,
1264 cx: &mut LowerContext<'_, U>,
1265 ty: InterfaceType,
1266 dst: &mut MaybeUninit<Self::Lower>,
1267 ) -> Result<()> {
1268 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1269 cx,
1270 InterfaceType::U32,
1271 dst,
1272 )
1273 }
1274
1275 fn linear_lower_to_memory<U>(
1276 &self,
1277 cx: &mut LowerContext<'_, U>,
1278 ty: InterfaceType,
1279 offset: usize,
1280 ) -> Result<()> {
1281 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1282 cx,
1283 InterfaceType::U32,
1284 offset,
1285 )
1286 }
1287}
1288
1289unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1291 fn linear_lift_from_flat(
1292 cx: &mut LiftContext<'_>,
1293 ty: InterfaceType,
1294 src: &Self::Lower,
1295 ) -> Result<Self> {
1296 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1297 Self::lift_from_index(cx, ty, index)
1298 }
1299
1300 fn linear_lift_from_memory(
1301 cx: &mut LiftContext<'_>,
1302 ty: InterfaceType,
1303 bytes: &[u8],
1304 ) -> Result<Self> {
1305 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1306 Self::lift_from_index(cx, ty, index)
1307 }
1308}
1309
1310pub struct GuardedFutureReader<T, A>
1316where
1317 A: AsAccessor,
1318{
1319 reader: Option<FutureReader<T>>,
1323 accessor: A,
1324}
1325
1326impl<T, A> GuardedFutureReader<T, A>
1327where
1328 A: AsAccessor,
1329{
1330 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1332 Self {
1333 reader: Some(reader),
1334 accessor,
1335 }
1336 }
1337
1338 pub fn into_future(self) -> FutureReader<T> {
1341 self.into()
1342 }
1343}
1344
1345impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1346where
1347 A: AsAccessor,
1348{
1349 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1350 guard.reader.take().unwrap()
1351 }
1352}
1353
1354impl<T, A> Drop for GuardedFutureReader<T, A>
1355where
1356 A: AsAccessor,
1357{
1358 fn drop(&mut self) {
1359 if let Some(reader) = &mut self.reader {
1360 reader.close_with(&self.accessor)
1361 }
1362 }
1363}
1364
1365pub struct StreamReader<T> {
1372 instance: Instance,
1373 id: TableId<TransmitHandle>,
1374 _phantom: PhantomData<T>,
1375}
1376
1377impl<T> StreamReader<T> {
1378 pub fn new<S: AsContextMut>(
1380 instance: Instance,
1381 store: S,
1382 producer: impl StreamProducer<S::Data, Item = T>,
1383 ) -> Self
1384 where
1385 T: func::Lower + func::Lift + Send + Sync + 'static,
1386 {
1387 Self::new_(
1388 instance.new_transmit(store, TransmitKind::Stream, producer),
1389 instance,
1390 )
1391 }
1392
1393 fn new_(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1394 Self {
1395 instance,
1396 id,
1397 _phantom: PhantomData,
1398 }
1399 }
1400
1401 pub fn pipe<S: AsContextMut>(self, store: S, consumer: impl StreamConsumer<S::Data, Item = T>)
1403 where
1404 T: 'static,
1405 {
1406 self.instance
1407 .set_consumer(store, self.id, TransmitKind::Stream, consumer);
1408 }
1409
1410 pub fn into_val(self) -> Val {
1413 Val::Stream(StreamAny(self.id.rep()))
1414 }
1415
1416 pub fn from_val(
1418 mut store: impl AsContextMut<Data: Send>,
1419 instance: Instance,
1420 value: &Val,
1421 ) -> Result<Self> {
1422 let Val::Stream(StreamAny(rep)) = value else {
1423 bail!("expected `stream`; got `{}`", value.desc());
1424 };
1425 let store = store.as_context_mut();
1426 let id = TableId::<TransmitHandle>::new(*rep);
1427 instance.concurrent_state_mut(store.0).get_mut(id)?; Ok(Self::new_(id, instance))
1429 }
1430
1431 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1433 match ty {
1434 InterfaceType::Stream(src) => {
1435 let handle_table = cx
1436 .instance_mut()
1437 .table_for_transmit(TransmitIndex::Stream(src));
1438 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1439 if is_done {
1440 bail!("cannot lift stream after being notified that the writable end dropped");
1441 }
1442 let id = TableId::<TransmitHandle>::new(rep);
1443 cx.instance_mut()
1444 .concurrent_state_mut()
1445 .get_mut(id)?
1446 .common
1447 .handle = None;
1448 Ok(Self::new_(id, cx.instance_handle()))
1449 }
1450 _ => func::bad_type_info(),
1451 }
1452 }
1453
1454 pub fn close(&mut self, mut store: impl AsContextMut) {
1462 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1464 self.instance
1465 .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Stream)
1466 .unwrap()
1467 }
1468
1469 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1471 accessor.as_accessor().with(|access| self.close(access))
1472 }
1473
1474 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1480 where
1481 A: AsAccessor,
1482 {
1483 GuardedStreamReader::new(accessor, self)
1484 }
1485}
1486
1487impl<T> fmt::Debug for StreamReader<T> {
1488 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1489 f.debug_struct("StreamReader")
1490 .field("id", &self.id)
1491 .field("instance", &self.instance)
1492 .finish()
1493 }
1494}
1495
1496pub(crate) fn lower_stream_to_index<U>(
1498 rep: u32,
1499 cx: &mut LowerContext<'_, U>,
1500 ty: InterfaceType,
1501) -> Result<u32> {
1502 match ty {
1503 InterfaceType::Stream(dst) => {
1504 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1505 let id = TableId::<TransmitHandle>::new(rep);
1506 let state = concurrent_state.get_mut(id)?.state;
1507 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1508
1509 let handle = cx
1510 .instance_mut()
1511 .table_for_transmit(TransmitIndex::Stream(dst))
1512 .stream_insert_read(dst, rep)?;
1513
1514 cx.instance_mut()
1515 .concurrent_state_mut()
1516 .get_mut(id)?
1517 .common
1518 .handle = Some(handle);
1519
1520 Ok(handle)
1521 }
1522 _ => func::bad_type_info(),
1523 }
1524}
1525
1526unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1529 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1530
1531 type Lower = <u32 as func::ComponentType>::Lower;
1532
1533 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1534 match ty {
1535 InterfaceType::Stream(_) => Ok(()),
1536 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1537 }
1538 }
1539}
1540
1541unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1543 fn linear_lower_to_flat<U>(
1544 &self,
1545 cx: &mut LowerContext<'_, U>,
1546 ty: InterfaceType,
1547 dst: &mut MaybeUninit<Self::Lower>,
1548 ) -> Result<()> {
1549 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1550 cx,
1551 InterfaceType::U32,
1552 dst,
1553 )
1554 }
1555
1556 fn linear_lower_to_memory<U>(
1557 &self,
1558 cx: &mut LowerContext<'_, U>,
1559 ty: InterfaceType,
1560 offset: usize,
1561 ) -> Result<()> {
1562 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1563 cx,
1564 InterfaceType::U32,
1565 offset,
1566 )
1567 }
1568}
1569
1570unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1572 fn linear_lift_from_flat(
1573 cx: &mut LiftContext<'_>,
1574 ty: InterfaceType,
1575 src: &Self::Lower,
1576 ) -> Result<Self> {
1577 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1578 Self::lift_from_index(cx, ty, index)
1579 }
1580
1581 fn linear_lift_from_memory(
1582 cx: &mut LiftContext<'_>,
1583 ty: InterfaceType,
1584 bytes: &[u8],
1585 ) -> Result<Self> {
1586 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1587 Self::lift_from_index(cx, ty, index)
1588 }
1589}
1590
1591pub struct GuardedStreamReader<T, A>
1597where
1598 A: AsAccessor,
1599{
1600 reader: Option<StreamReader<T>>,
1604 accessor: A,
1605}
1606
1607impl<T, A> GuardedStreamReader<T, A>
1608where
1609 A: AsAccessor,
1610{
1611 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1614 Self {
1615 reader: Some(reader),
1616 accessor,
1617 }
1618 }
1619
1620 pub fn into_stream(self) -> StreamReader<T> {
1623 self.into()
1624 }
1625}
1626
1627impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1628where
1629 A: AsAccessor,
1630{
1631 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1632 guard.reader.take().unwrap()
1633 }
1634}
1635
1636impl<T, A> Drop for GuardedStreamReader<T, A>
1637where
1638 A: AsAccessor,
1639{
1640 fn drop(&mut self) {
1641 if let Some(reader) = &mut self.reader {
1642 reader.close_with(&self.accessor)
1643 }
1644 }
1645}
1646
1647pub struct ErrorContext {
1649 rep: u32,
1650}
1651
1652impl ErrorContext {
1653 pub(crate) fn new(rep: u32) -> Self {
1654 Self { rep }
1655 }
1656
1657 pub fn into_val(self) -> Val {
1659 Val::ErrorContext(ErrorContextAny(self.rep))
1660 }
1661
1662 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1664 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1665 bail!("expected `error-context`; got `{}`", value.desc());
1666 };
1667 Ok(Self::new(*rep))
1668 }
1669
1670 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1671 match ty {
1672 InterfaceType::ErrorContext(src) => {
1673 let rep = cx
1674 .instance_mut()
1675 .table_for_error_context(src)
1676 .error_context_rep(index)?;
1677
1678 Ok(Self { rep })
1679 }
1680 _ => func::bad_type_info(),
1681 }
1682 }
1683}
1684
1685pub(crate) fn lower_error_context_to_index<U>(
1686 rep: u32,
1687 cx: &mut LowerContext<'_, U>,
1688 ty: InterfaceType,
1689) -> Result<u32> {
1690 match ty {
1691 InterfaceType::ErrorContext(dst) => {
1692 let tbl = cx.instance_mut().table_for_error_context(dst);
1693 tbl.error_context_insert(rep)
1694 }
1695 _ => func::bad_type_info(),
1696 }
1697}
1698unsafe impl func::ComponentType for ErrorContext {
1701 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1702
1703 type Lower = <u32 as func::ComponentType>::Lower;
1704
1705 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1706 match ty {
1707 InterfaceType::ErrorContext(_) => Ok(()),
1708 other => bail!("expected `error`, found `{}`", func::desc(other)),
1709 }
1710 }
1711}
1712
1713unsafe impl func::Lower for ErrorContext {
1715 fn linear_lower_to_flat<T>(
1716 &self,
1717 cx: &mut LowerContext<'_, T>,
1718 ty: InterfaceType,
1719 dst: &mut MaybeUninit<Self::Lower>,
1720 ) -> Result<()> {
1721 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1722 cx,
1723 InterfaceType::U32,
1724 dst,
1725 )
1726 }
1727
1728 fn linear_lower_to_memory<T>(
1729 &self,
1730 cx: &mut LowerContext<'_, T>,
1731 ty: InterfaceType,
1732 offset: usize,
1733 ) -> Result<()> {
1734 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1735 cx,
1736 InterfaceType::U32,
1737 offset,
1738 )
1739 }
1740}
1741
1742unsafe impl func::Lift for ErrorContext {
1744 fn linear_lift_from_flat(
1745 cx: &mut LiftContext<'_>,
1746 ty: InterfaceType,
1747 src: &Self::Lower,
1748 ) -> Result<Self> {
1749 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1750 Self::lift_from_index(cx, ty, index)
1751 }
1752
1753 fn linear_lift_from_memory(
1754 cx: &mut LiftContext<'_>,
1755 ty: InterfaceType,
1756 bytes: &[u8],
1757 ) -> Result<Self> {
1758 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1759 Self::lift_from_index(cx, ty, index)
1760 }
1761}
1762
1763pub(super) struct TransmitHandle {
1765 pub(super) common: WaitableCommon,
1766 state: TableId<TransmitState>,
1768}
1769
1770impl TransmitHandle {
1771 fn new(state: TableId<TransmitState>) -> Self {
1772 Self {
1773 common: WaitableCommon::default(),
1774 state,
1775 }
1776 }
1777}
1778
1779impl TableDebug for TransmitHandle {
1780 fn type_name() -> &'static str {
1781 "TransmitHandle"
1782 }
1783}
1784
1785struct TransmitState {
1787 write_handle: TableId<TransmitHandle>,
1789 read_handle: TableId<TransmitHandle>,
1791 write: WriteState,
1793 read: ReadState,
1795 done: bool,
1797}
1798
1799impl Default for TransmitState {
1800 fn default() -> Self {
1801 Self {
1802 write_handle: TableId::new(u32::MAX),
1803 read_handle: TableId::new(u32::MAX),
1804 read: ReadState::Open,
1805 write: WriteState::Open,
1806 done: false,
1807 }
1808 }
1809}
1810
1811impl TableDebug for TransmitState {
1812 fn type_name() -> &'static str {
1813 "TransmitState"
1814 }
1815}
1816
1817type PollStream = Box<
1818 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
1819>;
1820
1821enum WriteState {
1823 Open,
1825 GuestReady {
1827 ty: TransmitIndex,
1828 flat_abi: Option<FlatAbi>,
1829 options: Options,
1830 address: usize,
1831 count: usize,
1832 handle: u32,
1833 },
1834 HostReady {
1836 produce: PollStream,
1837 guest_offset: usize,
1838 cancel: bool,
1839 cancel_waker: Option<Waker>,
1840 },
1841 Dropped,
1843}
1844
1845impl fmt::Debug for WriteState {
1846 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1847 match self {
1848 Self::Open => f.debug_tuple("Open").finish(),
1849 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1850 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1851 Self::Dropped => f.debug_tuple("Dropped").finish(),
1852 }
1853 }
1854}
1855
1856enum ReadState {
1858 Open,
1860 GuestReady {
1862 ty: TransmitIndex,
1863 flat_abi: Option<FlatAbi>,
1864 options: Options,
1865 address: usize,
1866 count: usize,
1867 handle: u32,
1868 },
1869 HostReady {
1871 consume: PollStream,
1872 guest_offset: usize,
1873 cancel: bool,
1874 cancel_waker: Option<Waker>,
1875 },
1876 HostToHost {
1878 accept: Box<
1879 dyn for<'a> Fn(
1880 &'a mut UntypedWriteBuffer<'a>,
1881 )
1882 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1883 + Send
1884 + Sync,
1885 >,
1886 buffer: Vec<u8>,
1887 limit: usize,
1888 },
1889 Dropped,
1891}
1892
1893impl fmt::Debug for ReadState {
1894 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1895 match self {
1896 Self::Open => f.debug_tuple("Open").finish(),
1897 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1898 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1899 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1900 Self::Dropped => f.debug_tuple("Dropped").finish(),
1901 }
1902 }
1903}
1904
1905fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
1906 let count = guest_offset.try_into().unwrap();
1907 match state {
1908 StreamResult::Dropped => ReturnCode::Dropped(count),
1909 StreamResult::Completed => ReturnCode::completed(kind, count),
1910 StreamResult::Cancelled => ReturnCode::Cancelled(count),
1911 }
1912}
1913
1914impl Instance {
1915 fn new_transmit<S: AsContextMut, P: StreamProducer<S::Data>>(
1916 self,
1917 mut store: S,
1918 kind: TransmitKind,
1919 producer: P,
1920 ) -> TableId<TransmitHandle>
1921 where
1922 P::Item: func::Lower,
1923 {
1924 let mut store = store.as_context_mut();
1925 let token = StoreToken::new(store.as_context_mut());
1926 let state = self.concurrent_state_mut(store.0);
1927 let (_, read) = state.new_transmit().unwrap();
1928 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
1929 let id = state.get_mut(read).unwrap().state;
1930 let produce = Box::new(move || {
1931 let producer = producer.clone();
1932 async move {
1933 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
1934
1935 let (result, cancelled) = if buffer.remaining().is_empty() {
1936 future::poll_fn(|cx| {
1937 tls::get(|store| {
1938 let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1939
1940 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
1941 unreachable!();
1942 };
1943
1944 let mut host_buffer =
1945 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
1946 Some(Cursor::new(mem::take(buffer)))
1947 } else {
1948 None
1949 };
1950
1951 let poll = mine.as_mut().poll_produce(
1952 cx,
1953 token.as_context_mut(store),
1954 Destination {
1955 instance: self,
1956 id,
1957 buffer: &mut buffer,
1958 host_buffer: host_buffer.as_mut(),
1959 _phantom: PhantomData,
1960 },
1961 cancel,
1962 );
1963
1964 let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1965
1966 let host_offset = if let (
1967 Some(host_buffer),
1968 ReadState::HostToHost { buffer, limit, .. },
1969 ) = (host_buffer, &mut transmit.read)
1970 {
1971 *limit = usize::try_from(host_buffer.position()).unwrap();
1972 *buffer = host_buffer.into_inner();
1973 *limit
1974 } else {
1975 0
1976 };
1977
1978 {
1979 let WriteState::HostReady {
1980 guest_offset,
1981 cancel,
1982 cancel_waker,
1983 ..
1984 } = &mut transmit.write
1985 else {
1986 unreachable!();
1987 };
1988
1989 if poll.is_pending() {
1990 if !buffer.remaining().is_empty()
1991 || *guest_offset > 0
1992 || host_offset > 0
1993 {
1994 return Poll::Ready(Err(anyhow!(
1995 "StreamProducer::poll_produce returned Poll::Pending \
1996 after producing at least one item"
1997 )));
1998 }
1999 *cancel_waker = Some(cx.waker().clone());
2000 } else {
2001 *cancel_waker = None;
2002 *cancel = false;
2003 }
2004 }
2005
2006 poll.map(|v| v.map(|result| (result, cancel)))
2007 })
2008 })
2009 .await?
2010 } else {
2011 (StreamResult::Completed, false)
2012 };
2013
2014 let (guest_offset, host_offset, count) = tls::get(|store| {
2015 let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
2016 let (count, host_offset) = match &transmit.read {
2017 &ReadState::GuestReady { count, .. } => (count, 0),
2018 &ReadState::HostToHost { limit, .. } => (1, limit),
2019 _ => unreachable!(),
2020 };
2021 let guest_offset = match &transmit.write {
2022 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2023 _ => unreachable!(),
2024 };
2025 (guest_offset, host_offset, count)
2026 });
2027
2028 match result {
2029 StreamResult::Completed => {
2030 if count > 1
2031 && buffer.remaining().is_empty()
2032 && guest_offset == 0
2033 && host_offset == 0
2034 {
2035 bail!(
2036 "StreamProducer::poll_produce returned StreamResult::Completed \
2037 without producing any items"
2038 );
2039 }
2040 }
2041 StreamResult::Cancelled => {
2042 if !cancelled {
2043 bail!(
2044 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2045 without being given a `finish` parameter value of true"
2046 );
2047 }
2048 }
2049 StreamResult::Dropped => {}
2050 }
2051
2052 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2053
2054 *producer.lock().unwrap() = Some((mine, buffer));
2055
2056 if write_buffer {
2057 self.write(token, id, producer, kind).await?;
2058 }
2059
2060 Ok(result)
2061 }
2062 .boxed()
2063 });
2064 state.get_mut(id).unwrap().write = WriteState::HostReady {
2065 produce,
2066 guest_offset: 0,
2067 cancel: false,
2068 cancel_waker: None,
2069 };
2070 read
2071 }
2072
2073 fn set_consumer<S: AsContextMut, C: StreamConsumer<S::Data>>(
2074 self,
2075 mut store: S,
2076 id: TableId<TransmitHandle>,
2077 kind: TransmitKind,
2078 consumer: C,
2079 ) {
2080 let mut store = store.as_context_mut();
2081 let token = StoreToken::new(store.as_context_mut());
2082 let state = self.concurrent_state_mut(store.0);
2083 let id = state.get_mut(id).unwrap().state;
2084 let transmit = state.get_mut(id).unwrap();
2085 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2086 let consume_with_buffer = {
2087 let consumer = consumer.clone();
2088 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2089 let mut mine = consumer.lock().unwrap().take().unwrap();
2090
2091 let host_buffer_remaining_before =
2092 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2093
2094 let (result, cancelled) = future::poll_fn(|cx| {
2095 tls::get(|store| {
2096 let cancel =
2097 match &self.concurrent_state_mut(store).get_mut(id).unwrap().read {
2098 &ReadState::HostReady { cancel, .. } => cancel,
2099 ReadState::Open => false,
2100 _ => unreachable!(),
2101 };
2102
2103 let poll = mine.as_mut().poll_consume(
2104 cx,
2105 token.as_context_mut(store),
2106 Source {
2107 instance: self,
2108 id,
2109 host_buffer: host_buffer.as_deref_mut(),
2110 },
2111 cancel,
2112 );
2113
2114 if let ReadState::HostReady {
2115 cancel_waker,
2116 cancel,
2117 ..
2118 } = &mut self.concurrent_state_mut(store).get_mut(id).unwrap().read
2119 {
2120 if poll.is_pending() {
2121 *cancel_waker = Some(cx.waker().clone());
2122 } else {
2123 *cancel_waker = None;
2124 *cancel = false;
2125 }
2126 }
2127
2128 poll.map(|v| v.map(|result| (result, cancel)))
2129 })
2130 })
2131 .await?;
2132
2133 let (guest_offset, count) = tls::get(|store| {
2134 let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
2135 (
2136 match &transmit.read {
2137 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2138 ReadState::Open => 0,
2139 _ => unreachable!(),
2140 },
2141 match &transmit.write {
2142 &WriteState::GuestReady { count, .. } => count,
2143 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2144 _ => unreachable!(),
2145 },
2146 )
2147 });
2148
2149 match result {
2150 StreamResult::Completed => {
2151 if count > 0
2152 && guest_offset == 0
2153 && host_buffer_remaining_before
2154 .zip(host_buffer.map(|v| v.remaining().len()))
2155 .map(|(before, after)| before == after)
2156 .unwrap_or(false)
2157 {
2158 bail!(
2159 "StreamConsumer::poll_consume returned StreamResult::Completed \
2160 without consuming any items"
2161 );
2162 }
2163
2164 if let TransmitKind::Future = kind {
2165 tls::get(|store| {
2166 self.concurrent_state_mut(store).get_mut(id).unwrap().done = true;
2167 });
2168 }
2169 }
2170 StreamResult::Cancelled => {
2171 if !cancelled {
2172 bail!(
2173 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2174 without being given a `finish` parameter value of true"
2175 );
2176 }
2177 }
2178 StreamResult::Dropped => {}
2179 }
2180
2181 *consumer.lock().unwrap() = Some(mine);
2182
2183 Ok(result)
2184 }
2185 };
2186 let consume = {
2187 let consume = consume_with_buffer.clone();
2188 Box::new(move || {
2189 let consume = consume.clone();
2190 async move { consume(None).await }.boxed()
2191 })
2192 };
2193
2194 match &transmit.write {
2195 WriteState::Open => {
2196 transmit.read = ReadState::HostReady {
2197 consume,
2198 guest_offset: 0,
2199 cancel: false,
2200 cancel_waker: None,
2201 };
2202 }
2203 WriteState::GuestReady { .. } => {
2204 let future = consume();
2205 transmit.read = ReadState::HostReady {
2206 consume,
2207 guest_offset: 0,
2208 cancel: false,
2209 cancel_waker: None,
2210 };
2211 self.pipe_from_guest(store.0, kind, id, future);
2212 }
2213 WriteState::HostReady { .. } => {
2214 let WriteState::HostReady { produce, .. } = mem::replace(
2215 &mut transmit.write,
2216 WriteState::HostReady {
2217 produce: Box::new(|| unreachable!()),
2218 guest_offset: 0,
2219 cancel: false,
2220 cancel_waker: None,
2221 },
2222 ) else {
2223 unreachable!();
2224 };
2225
2226 transmit.read = ReadState::HostToHost {
2227 accept: Box::new(move |input| {
2228 let consume = consume_with_buffer.clone();
2229 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2230 }),
2231 buffer: Vec::new(),
2232 limit: 0,
2233 };
2234
2235 let future = async move {
2236 loop {
2237 if tls::get(|store| {
2238 anyhow::Ok(matches!(
2239 self.concurrent_state_mut(store).get_mut(id)?.read,
2240 ReadState::Dropped
2241 ))
2242 })? {
2243 break Ok(());
2244 }
2245
2246 match produce().await? {
2247 StreamResult::Completed | StreamResult::Cancelled => {}
2248 StreamResult::Dropped => break Ok(()),
2249 }
2250
2251 if let TransmitKind::Future = kind {
2252 break Ok(());
2253 }
2254 }
2255 }
2256 .map(move |result| {
2257 tls::get(|store| self.concurrent_state_mut(store).delete_transmit(id))?;
2258 result
2259 });
2260
2261 state.push_future(Box::pin(future));
2262 }
2263 WriteState::Dropped => {
2264 let reader = transmit.read_handle;
2265 self.host_drop_reader(store.0, reader, kind).unwrap();
2266 }
2267 }
2268 }
2269
2270 async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2271 self,
2272 token: StoreToken<D>,
2273 id: TableId<TransmitState>,
2274 pair: Arc<Mutex<Option<(P, B)>>>,
2275 kind: TransmitKind,
2276 ) -> Result<()> {
2277 let (read, guest_offset) = tls::get(|store| {
2278 let transmit = self.concurrent_state_mut(store).get_mut(id)?;
2279
2280 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write
2281 {
2282 Some(guest_offset)
2283 } else {
2284 None
2285 };
2286
2287 anyhow::Ok((
2288 mem::replace(&mut transmit.read, ReadState::Open),
2289 guest_offset,
2290 ))
2291 })?;
2292
2293 match read {
2294 ReadState::GuestReady {
2295 ty,
2296 flat_abi,
2297 options,
2298 address,
2299 count,
2300 handle,
2301 } => {
2302 let guest_offset = guest_offset.unwrap();
2303
2304 if let TransmitKind::Future = kind {
2305 tls::get(|store| {
2306 self.concurrent_state_mut(store).get_mut(id)?.done = true;
2307 anyhow::Ok(())
2308 })?;
2309 }
2310
2311 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2312 let accept = {
2313 let pair = pair.clone();
2314 move |mut store: StoreContextMut<D>| {
2315 lower::<T, B, D>(
2316 store.as_context_mut(),
2317 self,
2318 &options,
2319 ty,
2320 address + (T::SIZE32 * guest_offset),
2321 count - guest_offset,
2322 &mut pair.lock().unwrap().as_mut().unwrap().1,
2323 )?;
2324 anyhow::Ok(())
2325 }
2326 };
2327
2328 if guest_offset < count {
2329 if T::MAY_REQUIRE_REALLOC {
2330 let (tx, rx) = oneshot::channel();
2335 tls::get(move |store| {
2336 self.concurrent_state_mut(store).push_high_priority(
2337 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2338 move |store, _| {
2339 _ = tx.send(accept(token.as_context_mut(store))?);
2340 Ok(())
2341 },
2342 ))),
2343 )
2344 });
2345 rx.await?
2346 } else {
2347 tls::get(|store| accept(token.as_context_mut(store)))?
2352 };
2353 }
2354
2355 tls::get(|store| {
2356 let count =
2357 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2358
2359 let transmit = self.concurrent_state_mut(store).get_mut(id)?;
2360
2361 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2362 unreachable!();
2363 };
2364
2365 *guest_offset += count;
2366
2367 transmit.read = ReadState::GuestReady {
2368 ty,
2369 flat_abi,
2370 options,
2371 address,
2372 count,
2373 handle,
2374 };
2375
2376 anyhow::Ok(())
2377 })?;
2378
2379 Ok(())
2380 }
2381
2382 ReadState::HostToHost {
2383 accept,
2384 mut buffer,
2385 limit,
2386 } => {
2387 let mut state = StreamResult::Completed;
2388 let mut position = 0;
2389
2390 while !matches!(state, StreamResult::Dropped) && position < limit {
2391 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2392 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2393 (buffer, position, _) = slice_buffer.into_parts();
2394 }
2395
2396 {
2397 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2398
2399 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty())
2400 {
2401 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2402 }
2403
2404 *pair.lock().unwrap() = Some((mine, buffer));
2405 }
2406
2407 tls::get(|store| {
2408 self.concurrent_state_mut(store).get_mut(id)?.read = match state {
2409 StreamResult::Dropped => ReadState::Dropped,
2410 StreamResult::Completed | StreamResult::Cancelled => {
2411 ReadState::HostToHost {
2412 accept,
2413 buffer,
2414 limit: 0,
2415 }
2416 }
2417 };
2418
2419 anyhow::Ok(())
2420 })?;
2421 Ok(())
2422 }
2423
2424 _ => unreachable!(),
2425 }
2426 }
2427
2428 fn pipe_from_guest(
2429 self,
2430 store: &mut dyn VMStore,
2431 kind: TransmitKind,
2432 id: TableId<TransmitState>,
2433 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2434 ) {
2435 let future = async move {
2436 let stream_state = future.await?;
2437 tls::get(|store| {
2438 let state = self.concurrent_state_mut(store);
2439 let transmit = state.get_mut(id)?;
2440 let ReadState::HostReady {
2441 consume,
2442 guest_offset,
2443 ..
2444 } = mem::replace(&mut transmit.read, ReadState::Open)
2445 else {
2446 unreachable!();
2447 };
2448 let code = return_code(kind, stream_state, guest_offset);
2449 transmit.read = match stream_state {
2450 StreamResult::Dropped => ReadState::Dropped,
2451 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2452 consume,
2453 guest_offset: 0,
2454 cancel: false,
2455 cancel_waker: None,
2456 },
2457 };
2458 let WriteState::GuestReady { ty, handle, .. } =
2459 mem::replace(&mut transmit.write, WriteState::Open)
2460 else {
2461 unreachable!();
2462 };
2463 state.send_write_result(ty, id, handle, code)?;
2464 Ok(())
2465 })
2466 };
2467
2468 self.concurrent_state_mut(store).push_future(future.boxed());
2469 }
2470
2471 fn pipe_to_guest(
2472 self,
2473 store: &mut dyn VMStore,
2474 kind: TransmitKind,
2475 id: TableId<TransmitState>,
2476 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2477 ) {
2478 let future = async move {
2479 let stream_state = future.await?;
2480 tls::get(|store| {
2481 let state = self.concurrent_state_mut(store);
2482 let transmit = state.get_mut(id)?;
2483 let WriteState::HostReady {
2484 produce,
2485 guest_offset,
2486 ..
2487 } = mem::replace(&mut transmit.write, WriteState::Open)
2488 else {
2489 unreachable!();
2490 };
2491 let code = return_code(kind, stream_state, guest_offset);
2492 transmit.write = match stream_state {
2493 StreamResult::Dropped => WriteState::Dropped,
2494 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2495 produce,
2496 guest_offset: 0,
2497 cancel: false,
2498 cancel_waker: None,
2499 },
2500 };
2501 let ReadState::GuestReady { ty, handle, .. } =
2502 mem::replace(&mut transmit.read, ReadState::Open)
2503 else {
2504 unreachable!();
2505 };
2506 state.send_read_result(ty, id, handle, code)?;
2507 Ok(())
2508 })
2509 };
2510
2511 self.concurrent_state_mut(store).push_future(future.boxed());
2512 }
2513
2514 fn host_drop_reader(
2516 self,
2517 store: &mut dyn VMStore,
2518 id: TableId<TransmitHandle>,
2519 kind: TransmitKind,
2520 ) -> Result<()> {
2521 let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
2522 let state = self.concurrent_state_mut(store);
2523 let transmit = state
2524 .get_mut(transmit_id)
2525 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2526 log::trace!(
2527 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2528 transmit.read,
2529 transmit.write
2530 );
2531
2532 transmit.read = ReadState::Dropped;
2533
2534 let new_state = if let WriteState::Dropped = &transmit.write {
2537 WriteState::Dropped
2538 } else {
2539 WriteState::Open
2540 };
2541
2542 let write_handle = transmit.write_handle;
2543
2544 match mem::replace(&mut transmit.write, new_state) {
2545 WriteState::GuestReady { ty, handle, .. } => {
2548 state.update_event(
2549 write_handle.rep(),
2550 match ty {
2551 TransmitIndex::Future(ty) => Event::FutureWrite {
2552 code: ReturnCode::Dropped(0),
2553 pending: Some((ty, handle)),
2554 },
2555 TransmitIndex::Stream(ty) => Event::StreamWrite {
2556 code: ReturnCode::Dropped(0),
2557 pending: Some((ty, handle)),
2558 },
2559 },
2560 )?;
2561 }
2562
2563 WriteState::HostReady { .. } => {}
2564
2565 WriteState::Open => {
2566 state.update_event(
2567 write_handle.rep(),
2568 match kind {
2569 TransmitKind::Future => Event::FutureWrite {
2570 code: ReturnCode::Dropped(0),
2571 pending: None,
2572 },
2573 TransmitKind::Stream => Event::StreamWrite {
2574 code: ReturnCode::Dropped(0),
2575 pending: None,
2576 },
2577 },
2578 )?;
2579 }
2580
2581 WriteState::Dropped => {
2582 log::trace!("host_drop_reader delete {transmit_id:?}");
2583 state.delete_transmit(transmit_id)?;
2584 }
2585 }
2586 Ok(())
2587 }
2588
2589 fn host_drop_writer<U>(
2591 self,
2592 store: StoreContextMut<U>,
2593 id: TableId<TransmitHandle>,
2594 on_drop_open: Option<fn() -> Result<()>>,
2595 ) -> Result<()> {
2596 let transmit_id = self.concurrent_state_mut(store.0).get_mut(id)?.state;
2597 let transmit = self
2598 .concurrent_state_mut(store.0)
2599 .get_mut(transmit_id)
2600 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2601 log::trace!(
2602 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2603 transmit.read,
2604 transmit.write
2605 );
2606
2607 match &mut transmit.write {
2609 WriteState::GuestReady { .. } => {
2610 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2611 }
2612 WriteState::HostReady { .. } => {}
2613 v @ WriteState::Open => {
2614 if let (Some(on_drop_open), false) = (
2615 on_drop_open,
2616 transmit.done || matches!(transmit.read, ReadState::Dropped),
2617 ) {
2618 on_drop_open()?;
2619 } else {
2620 *v = WriteState::Dropped;
2621 }
2622 }
2623 WriteState::Dropped => unreachable!("write state is already dropped"),
2624 }
2625
2626 let transmit = self.concurrent_state_mut(store.0).get_mut(transmit_id)?;
2627
2628 let new_state = if let ReadState::Dropped = &transmit.read {
2634 ReadState::Dropped
2635 } else {
2636 ReadState::Open
2637 };
2638
2639 let read_handle = transmit.read_handle;
2640
2641 match mem::replace(&mut transmit.read, new_state) {
2643 ReadState::GuestReady { ty, handle, .. } => {
2647 self.concurrent_state_mut(store.0).update_event(
2649 read_handle.rep(),
2650 match ty {
2651 TransmitIndex::Future(ty) => Event::FutureRead {
2652 code: ReturnCode::Dropped(0),
2653 pending: Some((ty, handle)),
2654 },
2655 TransmitIndex::Stream(ty) => Event::StreamRead {
2656 code: ReturnCode::Dropped(0),
2657 pending: Some((ty, handle)),
2658 },
2659 },
2660 )?;
2661 }
2662
2663 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2664
2665 ReadState::Open => {
2667 self.concurrent_state_mut(store.0).update_event(
2668 read_handle.rep(),
2669 match on_drop_open {
2670 Some(_) => Event::FutureRead {
2671 code: ReturnCode::Dropped(0),
2672 pending: None,
2673 },
2674 None => Event::StreamRead {
2675 code: ReturnCode::Dropped(0),
2676 pending: None,
2677 },
2678 },
2679 )?;
2680 }
2681
2682 ReadState::Dropped => {
2685 log::trace!("host_drop_writer delete {transmit_id:?}");
2686 self.concurrent_state_mut(store.0)
2687 .delete_transmit(transmit_id)?;
2688 }
2689 }
2690 Ok(())
2691 }
2692
2693 pub(super) fn guest_drop_writable<T>(
2695 self,
2696 store: StoreContextMut<T>,
2697 ty: TransmitIndex,
2698 writer: u32,
2699 ) -> Result<()> {
2700 let table = self.id().get_mut(store.0).table_for_transmit(ty);
2701 let transmit_rep = match ty {
2702 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2703 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2704 };
2705
2706 let id = TableId::<TransmitHandle>::new(transmit_rep);
2707 log::trace!("guest_drop_writable: drop writer {id:?}");
2708 match ty {
2709 TransmitIndex::Stream(_) => self.host_drop_writer(store, id, None),
2710 TransmitIndex::Future(_) => self.host_drop_writer(
2711 store,
2712 id,
2713 Some(|| {
2714 Err(anyhow!(
2715 "cannot drop future write end without first writing a value"
2716 ))
2717 }),
2718 ),
2719 }
2720 }
2721
2722 fn copy<T: 'static>(
2725 self,
2726 mut store: StoreContextMut<T>,
2727 flat_abi: Option<FlatAbi>,
2728 write_ty: TransmitIndex,
2729 write_options: &Options,
2730 write_address: usize,
2731 read_ty: TransmitIndex,
2732 read_options: &Options,
2733 read_address: usize,
2734 count: usize,
2735 rep: u32,
2736 ) -> Result<()> {
2737 let types = self.id().get(store.0).component().types().clone();
2738 match (write_ty, read_ty) {
2739 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2740 assert_eq!(count, 1);
2741
2742 let val = types[types[write_ty].ty]
2743 .payload
2744 .map(|ty| {
2745 let abi = types.canonical_abi(&ty);
2746 if write_address % usize::try_from(abi.align32)? != 0 {
2748 bail!("write pointer not aligned");
2749 }
2750
2751 let lift =
2752 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2753 let bytes = lift
2754 .memory()
2755 .get(write_address..)
2756 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2757 .ok_or_else(|| {
2758 anyhow::anyhow!("write pointer out of bounds of memory")
2759 })?;
2760
2761 Val::load(lift, ty, bytes)
2762 })
2763 .transpose()?;
2764
2765 if let Some(val) = val {
2766 let lower =
2767 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2768 let ty = types[types[read_ty].ty].payload.unwrap();
2769 let ptr = func::validate_inbounds_dynamic(
2770 types.canonical_abi(&ty),
2771 lower.as_slice_mut(),
2772 &ValRaw::u32(read_address.try_into().unwrap()),
2773 )?;
2774 val.store(lower, ty, ptr)?;
2775 }
2776 }
2777 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2778 if let Some(flat_abi) = flat_abi {
2779 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2781 if length_in_bytes > 0 {
2782 if write_address % usize::try_from(flat_abi.align)? != 0 {
2783 bail!("write pointer not aligned");
2784 }
2785 if read_address % usize::try_from(flat_abi.align)? != 0 {
2786 bail!("read pointer not aligned");
2787 }
2788
2789 let store_opaque = store.0.store_opaque_mut();
2790
2791 {
2792 let src = write_options
2793 .memory(store_opaque)
2794 .get(write_address..)
2795 .and_then(|b| b.get(..length_in_bytes))
2796 .ok_or_else(|| {
2797 anyhow::anyhow!("write pointer out of bounds of memory")
2798 })?
2799 .as_ptr();
2800 let dst = read_options
2801 .memory_mut(store_opaque)
2802 .get_mut(read_address..)
2803 .and_then(|b| b.get_mut(..length_in_bytes))
2804 .ok_or_else(|| {
2805 anyhow::anyhow!("read pointer out of bounds of memory")
2806 })?
2807 .as_mut_ptr();
2808 unsafe { src.copy_to(dst, length_in_bytes) };
2811 }
2812 }
2813 } else {
2814 let store_opaque = store.0.store_opaque_mut();
2815 let lift = &mut LiftContext::new(store_opaque, write_options, self);
2816 let ty = types[types[write_ty].ty].payload.unwrap();
2817 let abi = lift.types.canonical_abi(&ty);
2818 let size = usize::try_from(abi.size32).unwrap();
2819 if write_address % usize::try_from(abi.align32)? != 0 {
2820 bail!("write pointer not aligned");
2821 }
2822 let bytes = lift
2823 .memory()
2824 .get(write_address..)
2825 .and_then(|b| b.get(..size * count))
2826 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2827
2828 let values = (0..count)
2829 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2830 .collect::<Result<Vec<_>>>()?;
2831
2832 let id = TableId::<TransmitHandle>::new(rep);
2833 log::trace!("copy values {values:?} for {id:?}");
2834
2835 let lower =
2836 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2837 let ty = types[types[read_ty].ty].payload.unwrap();
2838 let abi = lower.types.canonical_abi(&ty);
2839 if read_address % usize::try_from(abi.align32)? != 0 {
2840 bail!("read pointer not aligned");
2841 }
2842 let size = usize::try_from(abi.size32).unwrap();
2843 lower
2844 .as_slice_mut()
2845 .get_mut(read_address..)
2846 .and_then(|b| b.get_mut(..size * count))
2847 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2848 let mut ptr = read_address;
2849 for value in values {
2850 value.store(lower, ty, ptr)?;
2851 ptr += size
2852 }
2853 }
2854 }
2855 _ => unreachable!(),
2856 }
2857
2858 Ok(())
2859 }
2860
2861 fn check_bounds(
2862 self,
2863 store: &StoreOpaque,
2864 options: &Options,
2865 ty: TransmitIndex,
2866 address: usize,
2867 count: usize,
2868 ) -> Result<()> {
2869 let types = self.id().get(store).component().types().clone();
2870 let size = usize::try_from(
2871 match ty {
2872 TransmitIndex::Future(ty) => types[types[ty].ty]
2873 .payload
2874 .map(|ty| types.canonical_abi(&ty).size32),
2875 TransmitIndex::Stream(ty) => types[types[ty].ty]
2876 .payload
2877 .map(|ty| types.canonical_abi(&ty).size32),
2878 }
2879 .unwrap_or(0),
2880 )
2881 .unwrap();
2882
2883 if count > 0 && size > 0 {
2884 options
2885 .memory(store)
2886 .get(address..)
2887 .and_then(|b| b.get(..(size * count)))
2888 .map(drop)
2889 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
2890 } else {
2891 Ok(())
2892 }
2893 }
2894
2895 pub(super) fn guest_write<T: 'static>(
2897 self,
2898 mut store: StoreContextMut<T>,
2899 ty: TransmitIndex,
2900 options: OptionsIndex,
2901 flat_abi: Option<FlatAbi>,
2902 handle: u32,
2903 address: u32,
2904 count: u32,
2905 ) -> Result<ReturnCode> {
2906 let address = usize::try_from(address).unwrap();
2907 let count = usize::try_from(count).unwrap();
2908 let options = Options::new_index(store.0, self, options);
2909 self.check_bounds(store.0, &options, ty, address, count)?;
2910 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
2911 let TransmitLocalState::Write { done } = *state else {
2912 bail!(
2913 "invalid handle {handle}; expected `Write`; got {:?}",
2914 *state
2915 );
2916 };
2917
2918 if done {
2919 bail!("cannot write to stream after being notified that the readable end dropped");
2920 }
2921
2922 *state = TransmitLocalState::Busy;
2923 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2924 let concurrent_state = self.concurrent_state_mut(store.0);
2925 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
2926 let transmit = concurrent_state.get_mut(transmit_id)?;
2927 log::trace!(
2928 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2929 transmit.read
2930 );
2931
2932 if transmit.done {
2933 bail!("cannot write to future after previous write succeeded or readable end dropped");
2934 }
2935
2936 let new_state = if let ReadState::Dropped = &transmit.read {
2937 ReadState::Dropped
2938 } else {
2939 ReadState::Open
2940 };
2941
2942 let set_guest_ready = |me: &mut ConcurrentState| {
2943 let transmit = me.get_mut(transmit_id)?;
2944 assert!(matches!(&transmit.write, WriteState::Open));
2945 transmit.write = WriteState::GuestReady {
2946 ty,
2947 flat_abi,
2948 options,
2949 address,
2950 count,
2951 handle,
2952 };
2953 Ok::<_, crate::Error>(())
2954 };
2955
2956 let mut result = match mem::replace(&mut transmit.read, new_state) {
2957 ReadState::GuestReady {
2958 ty: read_ty,
2959 flat_abi: read_flat_abi,
2960 options: read_options,
2961 address: read_address,
2962 count: read_count,
2963 handle: read_handle,
2964 } => {
2965 assert_eq!(flat_abi, read_flat_abi);
2966
2967 if let TransmitIndex::Future(_) = ty {
2968 transmit.done = true;
2969 }
2970
2971 let write_complete = count == 0 || read_count > 0;
2993 let read_complete = count > 0;
2994 let read_buffer_remaining = count < read_count;
2995
2996 let read_handle_rep = transmit.read_handle.rep();
2997
2998 let count = count.min(read_count);
2999
3000 self.copy(
3001 store.as_context_mut(),
3002 flat_abi,
3003 ty,
3004 &options,
3005 address,
3006 read_ty,
3007 &read_options,
3008 read_address,
3009 count,
3010 rep,
3011 )?;
3012
3013 let instance = self.id().get_mut(store.0);
3014 let types = instance.component().types();
3015 let item_size = payload(ty, types)
3016 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3017 .unwrap_or(0);
3018 let concurrent_state = instance.concurrent_state_mut();
3019 if read_complete {
3020 let count = u32::try_from(count).unwrap();
3021 let total = if let Some(Event::StreamRead {
3022 code: ReturnCode::Completed(old_total),
3023 ..
3024 }) = concurrent_state.take_event(read_handle_rep)?
3025 {
3026 count + old_total
3027 } else {
3028 count
3029 };
3030
3031 let code = ReturnCode::completed(ty.kind(), total);
3032
3033 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3034 }
3035
3036 if read_buffer_remaining {
3037 let transmit = concurrent_state.get_mut(transmit_id)?;
3038 transmit.read = ReadState::GuestReady {
3039 ty: read_ty,
3040 flat_abi: read_flat_abi,
3041 options: read_options,
3042 address: read_address + (count * item_size),
3043 count: read_count - count,
3044 handle: read_handle,
3045 };
3046 }
3047
3048 if write_complete {
3049 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3050 } else {
3051 set_guest_ready(concurrent_state)?;
3052 ReturnCode::Blocked
3053 }
3054 }
3055
3056 ReadState::HostReady {
3057 consume,
3058 guest_offset,
3059 cancel,
3060 cancel_waker,
3061 } => {
3062 assert!(cancel_waker.is_none());
3063 assert!(!cancel);
3064 assert_eq!(0, guest_offset);
3065
3066 if let TransmitIndex::Future(_) = ty {
3067 transmit.done = true;
3068 }
3069
3070 set_guest_ready(concurrent_state)?;
3071 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3072 }
3073
3074 ReadState::HostToHost { .. } => unreachable!(),
3075
3076 ReadState::Open => {
3077 set_guest_ready(concurrent_state)?;
3078 ReturnCode::Blocked
3079 }
3080
3081 ReadState::Dropped => {
3082 if let TransmitIndex::Future(_) = ty {
3083 transmit.done = true;
3084 }
3085
3086 ReturnCode::Dropped(0)
3087 }
3088 };
3089
3090 if result == ReturnCode::Blocked && !options.async_() {
3091 result = self.wait_for_write(store.0, transmit_handle)?;
3092 }
3093
3094 if result != ReturnCode::Blocked {
3095 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3096 TransmitLocalState::Write {
3097 done: matches!(
3098 (result, ty),
3099 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3100 ),
3101 };
3102 }
3103
3104 log::trace!(
3105 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3106 );
3107
3108 Ok(result)
3109 }
3110
3111 fn consume(
3114 self,
3115 store: &mut dyn VMStore,
3116 kind: TransmitKind,
3117 transmit_id: TableId<TransmitState>,
3118 consume: PollStream,
3119 guest_offset: usize,
3120 cancel: bool,
3121 ) -> Result<ReturnCode> {
3122 let mut future = consume();
3123 self.concurrent_state_mut(store).get_mut(transmit_id)?.read = ReadState::HostReady {
3124 consume,
3125 guest_offset,
3126 cancel,
3127 cancel_waker: None,
3128 };
3129 let poll = self.set_tls(store, || {
3130 future
3131 .as_mut()
3132 .poll(&mut Context::from_waker(&Waker::noop()))
3133 });
3134
3135 Ok(match poll {
3136 Poll::Ready(state) => {
3137 let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3138 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3139 unreachable!();
3140 };
3141 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3142 transmit.write = WriteState::Open;
3143 code
3144 }
3145 Poll::Pending => {
3146 self.pipe_from_guest(store, kind, transmit_id, future);
3147 ReturnCode::Blocked
3148 }
3149 })
3150 }
3151
3152 pub(super) fn guest_read<T: 'static>(
3154 self,
3155 mut store: StoreContextMut<T>,
3156 ty: TransmitIndex,
3157 options: OptionsIndex,
3158 flat_abi: Option<FlatAbi>,
3159 handle: u32,
3160 address: u32,
3161 count: u32,
3162 ) -> Result<ReturnCode> {
3163 let address = usize::try_from(address).unwrap();
3164 let count = usize::try_from(count).unwrap();
3165 let options = Options::new_index(store.0, self, options);
3166 self.check_bounds(store.0, &options, ty, address, count)?;
3167 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3168 let TransmitLocalState::Read { done } = *state else {
3169 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3170 };
3171
3172 if done {
3173 bail!("cannot read from stream after being notified that the writable end dropped");
3174 }
3175
3176 *state = TransmitLocalState::Busy;
3177 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3178 let concurrent_state = self.concurrent_state_mut(store.0);
3179 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3180 let transmit = concurrent_state.get_mut(transmit_id)?;
3181 log::trace!(
3182 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3183 transmit.write
3184 );
3185
3186 if transmit.done {
3187 bail!("cannot read from future after previous read succeeded");
3188 }
3189
3190 let new_state = if let WriteState::Dropped = &transmit.write {
3191 WriteState::Dropped
3192 } else {
3193 WriteState::Open
3194 };
3195
3196 let set_guest_ready = |me: &mut ConcurrentState| {
3197 let transmit = me.get_mut(transmit_id)?;
3198 assert!(matches!(&transmit.read, ReadState::Open));
3199 transmit.read = ReadState::GuestReady {
3200 ty,
3201 flat_abi,
3202 options,
3203 address,
3204 count,
3205 handle,
3206 };
3207 Ok::<_, crate::Error>(())
3208 };
3209
3210 let mut result = match mem::replace(&mut transmit.write, new_state) {
3211 WriteState::GuestReady {
3212 ty: write_ty,
3213 flat_abi: write_flat_abi,
3214 options: write_options,
3215 address: write_address,
3216 count: write_count,
3217 handle: write_handle,
3218 } => {
3219 assert_eq!(flat_abi, write_flat_abi);
3220
3221 if let TransmitIndex::Future(_) = ty {
3222 transmit.done = true;
3223 }
3224
3225 let write_handle_rep = transmit.write_handle.rep();
3226
3227 let write_complete = write_count == 0 || count > 0;
3232 let read_complete = write_count > 0;
3233 let write_buffer_remaining = count < write_count;
3234
3235 let count = count.min(write_count);
3236
3237 self.copy(
3238 store.as_context_mut(),
3239 flat_abi,
3240 write_ty,
3241 &write_options,
3242 write_address,
3243 ty,
3244 &options,
3245 address,
3246 count,
3247 rep,
3248 )?;
3249
3250 let instance = self.id().get_mut(store.0);
3251 let types = instance.component().types();
3252 let item_size = payload(ty, types)
3253 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3254 .unwrap_or(0);
3255 let concurrent_state = instance.concurrent_state_mut();
3256
3257 if write_complete {
3258 let count = u32::try_from(count).unwrap();
3259 let total = if let Some(Event::StreamWrite {
3260 code: ReturnCode::Completed(old_total),
3261 ..
3262 }) = concurrent_state.take_event(write_handle_rep)?
3263 {
3264 count + old_total
3265 } else {
3266 count
3267 };
3268
3269 let code = ReturnCode::completed(ty.kind(), total);
3270
3271 concurrent_state.send_write_result(
3272 write_ty,
3273 transmit_id,
3274 write_handle,
3275 code,
3276 )?;
3277 }
3278
3279 if write_buffer_remaining {
3280 let transmit = concurrent_state.get_mut(transmit_id)?;
3281 transmit.write = WriteState::GuestReady {
3282 ty: write_ty,
3283 flat_abi: write_flat_abi,
3284 options: write_options,
3285 address: write_address + (count * item_size),
3286 count: write_count - count,
3287 handle: write_handle,
3288 };
3289 }
3290
3291 if read_complete {
3292 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3293 } else {
3294 set_guest_ready(concurrent_state)?;
3295 ReturnCode::Blocked
3296 }
3297 }
3298
3299 WriteState::HostReady {
3300 produce,
3301 guest_offset,
3302 cancel,
3303 cancel_waker,
3304 } => {
3305 assert!(cancel_waker.is_none());
3306 assert!(!cancel);
3307 assert_eq!(0, guest_offset);
3308
3309 if let TransmitIndex::Future(_) = ty {
3310 transmit.done = true;
3311 }
3312
3313 set_guest_ready(concurrent_state)?;
3314
3315 self.produce(store.0, ty.kind(), transmit_id, produce, 0, false)?
3316 }
3317
3318 WriteState::Open => {
3319 set_guest_ready(concurrent_state)?;
3320 ReturnCode::Blocked
3321 }
3322
3323 WriteState::Dropped => ReturnCode::Dropped(0),
3324 };
3325
3326 if result == ReturnCode::Blocked && !options.async_() {
3327 result = self.wait_for_read(store.0, transmit_handle)?;
3328 }
3329
3330 if result != ReturnCode::Blocked {
3331 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3332 TransmitLocalState::Read {
3333 done: matches!(
3334 (result, ty),
3335 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3336 ),
3337 };
3338 }
3339
3340 log::trace!(
3341 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3342 );
3343
3344 Ok(result)
3345 }
3346
3347 fn produce(
3350 self,
3351 store: &mut dyn VMStore,
3352 kind: TransmitKind,
3353 transmit_id: TableId<TransmitState>,
3354 produce: PollStream,
3355 guest_offset: usize,
3356 cancel: bool,
3357 ) -> Result<ReturnCode> {
3358 let mut future = produce();
3359 self.concurrent_state_mut(store).get_mut(transmit_id)?.write = WriteState::HostReady {
3360 produce,
3361 guest_offset,
3362 cancel,
3363 cancel_waker: None,
3364 };
3365 let poll = self.set_tls(store, || {
3366 future
3367 .as_mut()
3368 .poll(&mut Context::from_waker(&Waker::noop()))
3369 });
3370
3371 Ok(match poll {
3372 Poll::Ready(state) => {
3373 let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3374 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3375 unreachable!();
3376 };
3377 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3378 transmit.read = ReadState::Open;
3379 code
3380 }
3381 Poll::Pending => {
3382 self.pipe_to_guest(store, kind, transmit_id, future);
3383 ReturnCode::Blocked
3384 }
3385 })
3386 }
3387
3388 fn wait_for_write(
3389 self,
3390 store: &mut dyn VMStore,
3391 handle: TableId<TransmitHandle>,
3392 ) -> Result<ReturnCode> {
3393 let waitable = Waitable::Transmit(handle);
3394 self.wait_for_event(store, waitable)?;
3395 let event = waitable.take_event(self.concurrent_state_mut(store))?;
3396 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3397 event
3398 {
3399 waitable.on_delivery(self.id().get_mut(store), event);
3400 Ok(code)
3401 } else {
3402 unreachable!()
3403 }
3404 }
3405
3406 fn cancel_write(
3408 self,
3409 store: &mut dyn VMStore,
3410 transmit_id: TableId<TransmitState>,
3411 async_: bool,
3412 ) -> Result<ReturnCode> {
3413 let state = self.concurrent_state_mut(store);
3414 let transmit = state.get_mut(transmit_id)?;
3415 log::trace!(
3416 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3417 transmit.read,
3418 transmit.write
3419 );
3420
3421 let code = if let Some(event) =
3422 Waitable::Transmit(transmit.write_handle).take_event(state)?
3423 {
3424 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3425 unreachable!();
3426 };
3427 match (code, event) {
3428 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3429 ReturnCode::Cancelled(count)
3430 }
3431 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3432 _ => unreachable!(),
3433 }
3434 } else if let ReadState::HostReady {
3435 cancel,
3436 cancel_waker,
3437 ..
3438 } = &mut state.get_mut(transmit_id)?.read
3439 {
3440 *cancel = true;
3441 if let Some(waker) = cancel_waker.take() {
3442 waker.wake();
3443 }
3444
3445 if async_ {
3446 ReturnCode::Blocked
3447 } else {
3448 let handle = self
3449 .concurrent_state_mut(store)
3450 .get_mut(transmit_id)?
3451 .write_handle;
3452 self.wait_for_write(store, handle)?
3453 }
3454 } else {
3455 ReturnCode::Cancelled(0)
3456 };
3457
3458 let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3459
3460 match &transmit.write {
3461 WriteState::GuestReady { .. } => {
3462 transmit.write = WriteState::Open;
3463 }
3464 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3465 WriteState::Open | WriteState::Dropped => {}
3466 }
3467
3468 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3469
3470 Ok(code)
3471 }
3472
3473 fn wait_for_read(
3474 self,
3475 store: &mut dyn VMStore,
3476 handle: TableId<TransmitHandle>,
3477 ) -> Result<ReturnCode> {
3478 let waitable = Waitable::Transmit(handle);
3479 self.wait_for_event(store, waitable)?;
3480 let event = waitable.take_event(self.concurrent_state_mut(store))?;
3481 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3482 event
3483 {
3484 waitable.on_delivery(self.id().get_mut(store), event);
3485 Ok(code)
3486 } else {
3487 unreachable!()
3488 }
3489 }
3490
3491 fn cancel_read(
3493 self,
3494 store: &mut dyn VMStore,
3495 transmit_id: TableId<TransmitState>,
3496 async_: bool,
3497 ) -> Result<ReturnCode> {
3498 let state = self.concurrent_state_mut(store);
3499 let transmit = state.get_mut(transmit_id)?;
3500 log::trace!(
3501 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3502 transmit.read,
3503 transmit.write
3504 );
3505
3506 let code = if let Some(event) =
3507 Waitable::Transmit(transmit.read_handle).take_event(state)?
3508 {
3509 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3510 unreachable!();
3511 };
3512 match (code, event) {
3513 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3514 ReturnCode::Cancelled(count)
3515 }
3516 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3517 _ => unreachable!(),
3518 }
3519 } else if let WriteState::HostReady {
3520 cancel,
3521 cancel_waker,
3522 ..
3523 } = &mut state.get_mut(transmit_id)?.write
3524 {
3525 *cancel = true;
3526 if let Some(waker) = cancel_waker.take() {
3527 waker.wake();
3528 }
3529
3530 if async_ {
3531 ReturnCode::Blocked
3532 } else {
3533 let handle = self
3534 .concurrent_state_mut(store)
3535 .get_mut(transmit_id)?
3536 .read_handle;
3537 self.wait_for_read(store, handle)?
3538 }
3539 } else {
3540 ReturnCode::Cancelled(0)
3541 };
3542
3543 let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3544
3545 match &transmit.read {
3546 ReadState::GuestReady { .. } => {
3547 transmit.read = ReadState::Open;
3548 }
3549 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3550 todo!("support host read cancellation")
3551 }
3552 ReadState::Open | ReadState::Dropped => {}
3553 }
3554
3555 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3556
3557 Ok(code)
3558 }
3559
3560 fn guest_cancel_write(
3562 self,
3563 store: &mut dyn VMStore,
3564 ty: TransmitIndex,
3565 async_: bool,
3566 writer: u32,
3567 ) -> Result<ReturnCode> {
3568 let (rep, state) =
3569 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3570 let id = TableId::<TransmitHandle>::new(rep);
3571 log::trace!("guest cancel write {id:?} (handle {writer})");
3572 match state {
3573 TransmitLocalState::Write { .. } => {
3574 bail!("stream or future write cancelled when no write is pending")
3575 }
3576 TransmitLocalState::Read { .. } => {
3577 bail!("passed read end to `{{stream|future}}.cancel-write`")
3578 }
3579 TransmitLocalState::Busy => {}
3580 }
3581 let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
3582 let code = self.cancel_write(store, transmit_id, async_)?;
3583 if !matches!(code, ReturnCode::Blocked) {
3584 let state =
3585 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3586 .1;
3587 if let TransmitLocalState::Busy = state {
3588 *state = TransmitLocalState::Write { done: false };
3589 }
3590 }
3591 Ok(code)
3592 }
3593
3594 fn guest_cancel_read(
3596 self,
3597 store: &mut dyn VMStore,
3598 ty: TransmitIndex,
3599 async_: bool,
3600 reader: u32,
3601 ) -> Result<ReturnCode> {
3602 let (rep, state) =
3603 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3604 let id = TableId::<TransmitHandle>::new(rep);
3605 log::trace!("guest cancel read {id:?} (handle {reader})");
3606 match state {
3607 TransmitLocalState::Read { .. } => {
3608 bail!("stream or future read cancelled when no read is pending")
3609 }
3610 TransmitLocalState::Write { .. } => {
3611 bail!("passed write end to `{{stream|future}}.cancel-read`")
3612 }
3613 TransmitLocalState::Busy => {}
3614 }
3615 let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
3616 let code = self.cancel_read(store, transmit_id, async_)?;
3617 if !matches!(code, ReturnCode::Blocked) {
3618 let state =
3619 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3620 .1;
3621 if let TransmitLocalState::Busy = state {
3622 *state = TransmitLocalState::Read { done: false };
3623 }
3624 }
3625 Ok(code)
3626 }
3627
3628 fn guest_drop_readable(
3630 self,
3631 store: &mut dyn VMStore,
3632 ty: TransmitIndex,
3633 reader: u32,
3634 ) -> Result<()> {
3635 let table = self.id().get_mut(store).table_for_transmit(ty);
3636 let (rep, _is_done) = match ty {
3637 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3638 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3639 };
3640 let kind = match ty {
3641 TransmitIndex::Stream(_) => TransmitKind::Stream,
3642 TransmitIndex::Future(_) => TransmitKind::Future,
3643 };
3644 let id = TableId::<TransmitHandle>::new(rep);
3645 log::trace!("guest_drop_readable: drop reader {id:?}");
3646 self.host_drop_reader(store, id, kind)
3647 }
3648
3649 pub(crate) fn error_context_new(
3651 self,
3652 store: &mut StoreOpaque,
3653 caller: RuntimeComponentInstanceIndex,
3654 ty: TypeComponentLocalErrorContextTableIndex,
3655 options: OptionsIndex,
3656 debug_msg_address: u32,
3657 debug_msg_len: u32,
3658 ) -> Result<u32> {
3659 self.id().get(store).check_may_leave(caller)?;
3660 let options = Options::new_index(store, self, options);
3661 let lift_ctx = &mut LiftContext::new(store, &options, self);
3662 let address = usize::try_from(debug_msg_address)?;
3664 let len = usize::try_from(debug_msg_len)?;
3665 lift_ctx
3666 .memory()
3667 .get(address..)
3668 .and_then(|b| b.get(..len))
3669 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3670 let message = WasmStr::new(address, len, lift_ctx)?;
3671
3672 let err_ctx = ErrorContextState {
3674 debug_msg: message
3675 .to_str_from_memory(options.memory(store))?
3676 .to_string(),
3677 };
3678 let state = self.concurrent_state_mut(store);
3679 let table_id = state.push(err_ctx)?;
3680 let global_ref_count_idx =
3681 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3682
3683 let _ = state
3685 .global_error_context_ref_counts
3686 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3687
3688 let local_idx = self
3695 .id()
3696 .get_mut(store)
3697 .table_for_error_context(ty)
3698 .error_context_insert(table_id.rep())?;
3699
3700 Ok(local_idx)
3701 }
3702
3703 pub(super) fn error_context_debug_message<T>(
3705 self,
3706 store: StoreContextMut<T>,
3707 ty: TypeComponentLocalErrorContextTableIndex,
3708 options: OptionsIndex,
3709 err_ctx_handle: u32,
3710 debug_msg_address: u32,
3711 ) -> Result<()> {
3712 let handle_table_id_rep = self
3714 .id()
3715 .get_mut(store.0)
3716 .table_for_error_context(ty)
3717 .error_context_rep(err_ctx_handle)?;
3718
3719 let state = self.concurrent_state_mut(store.0);
3720 let ErrorContextState { debug_msg } =
3722 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3723 let debug_msg = debug_msg.clone();
3724
3725 let options = Options::new_index(store.0, self, options);
3726 let types = self.id().get(store.0).component().types().clone();
3727 let lower_cx = &mut LowerContext::new(store, &options, &types, self);
3728 let debug_msg_address = usize::try_from(debug_msg_address)?;
3729 let offset = lower_cx
3731 .as_slice_mut()
3732 .get(debug_msg_address..)
3733 .and_then(|b| b.get(..debug_msg.bytes().len()))
3734 .map(|_| debug_msg_address)
3735 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3736 debug_msg
3737 .as_str()
3738 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3739
3740 Ok(())
3741 }
3742
3743 pub(crate) fn future_cancel_read(
3745 self,
3746 store: &mut dyn VMStore,
3747 caller: RuntimeComponentInstanceIndex,
3748 ty: TypeFutureTableIndex,
3749 async_: bool,
3750 reader: u32,
3751 ) -> Result<u32> {
3752 self.id().get(store).check_may_leave(caller)?;
3753 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3754 .map(|v| v.encode())
3755 }
3756
3757 pub(crate) fn future_cancel_write(
3759 self,
3760 store: &mut dyn VMStore,
3761 caller: RuntimeComponentInstanceIndex,
3762 ty: TypeFutureTableIndex,
3763 async_: bool,
3764 writer: u32,
3765 ) -> Result<u32> {
3766 self.id().get(store).check_may_leave(caller)?;
3767 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3768 .map(|v| v.encode())
3769 }
3770
3771 pub(crate) fn stream_cancel_read(
3773 self,
3774 store: &mut dyn VMStore,
3775 caller: RuntimeComponentInstanceIndex,
3776 ty: TypeStreamTableIndex,
3777 async_: bool,
3778 reader: u32,
3779 ) -> Result<u32> {
3780 self.id().get(store).check_may_leave(caller)?;
3781 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3782 .map(|v| v.encode())
3783 }
3784
3785 pub(crate) fn stream_cancel_write(
3787 self,
3788 store: &mut dyn VMStore,
3789 caller: RuntimeComponentInstanceIndex,
3790 ty: TypeStreamTableIndex,
3791 async_: bool,
3792 writer: u32,
3793 ) -> Result<u32> {
3794 self.id().get(store).check_may_leave(caller)?;
3795 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3796 .map(|v| v.encode())
3797 }
3798
3799 pub(crate) fn future_drop_readable(
3801 self,
3802 store: &mut dyn VMStore,
3803 caller: RuntimeComponentInstanceIndex,
3804 ty: TypeFutureTableIndex,
3805 reader: u32,
3806 ) -> Result<()> {
3807 self.id().get(store).check_may_leave(caller)?;
3808 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3809 }
3810
3811 pub(crate) fn stream_drop_readable(
3813 self,
3814 store: &mut dyn VMStore,
3815 caller: RuntimeComponentInstanceIndex,
3816 ty: TypeStreamTableIndex,
3817 reader: u32,
3818 ) -> Result<()> {
3819 self.id().get(store).check_may_leave(caller)?;
3820 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3821 }
3822}
3823
3824impl ComponentInstance {
3825 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
3826 let (tables, types) = self.guest_tables();
3827 let runtime_instance = match ty {
3828 TransmitIndex::Stream(ty) => types[ty].instance,
3829 TransmitIndex::Future(ty) => types[ty].instance,
3830 };
3831 &mut tables[runtime_instance]
3832 }
3833
3834 fn table_for_error_context(
3835 self: Pin<&mut Self>,
3836 ty: TypeComponentLocalErrorContextTableIndex,
3837 ) -> &mut HandleTable {
3838 let (tables, types) = self.guest_tables();
3839 let runtime_instance = types[ty].instance;
3840 &mut tables[runtime_instance]
3841 }
3842
3843 fn get_mut_by_index(
3844 self: Pin<&mut Self>,
3845 ty: TransmitIndex,
3846 index: u32,
3847 ) -> Result<(u32, &mut TransmitLocalState)> {
3848 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
3849 }
3850
3851 fn guest_new(mut self: Pin<&mut Self>, ty: TransmitIndex) -> Result<ResourcePair> {
3855 let (write, read) = self.as_mut().concurrent_state_mut().new_transmit()?;
3856
3857 let table = self.as_mut().table_for_transmit(ty);
3858 let (read_handle, write_handle) = match ty {
3859 TransmitIndex::Future(ty) => (
3860 table.future_insert_read(ty, read.rep())?,
3861 table.future_insert_write(ty, write.rep())?,
3862 ),
3863 TransmitIndex::Stream(ty) => (
3864 table.stream_insert_read(ty, read.rep())?,
3865 table.stream_insert_write(ty, write.rep())?,
3866 ),
3867 };
3868
3869 let state = self.as_mut().concurrent_state_mut();
3870 state.get_mut(read)?.common.handle = Some(read_handle);
3871 state.get_mut(write)?.common.handle = Some(write_handle);
3872
3873 Ok(ResourcePair {
3874 write: write_handle,
3875 read: read_handle,
3876 })
3877 }
3878
3879 pub(crate) fn error_context_drop(
3881 mut self: Pin<&mut Self>,
3882 caller: RuntimeComponentInstanceIndex,
3883 ty: TypeComponentLocalErrorContextTableIndex,
3884 error_context: u32,
3885 ) -> Result<()> {
3886 self.check_may_leave(caller)?;
3887
3888 let local_handle_table = self.as_mut().table_for_error_context(ty);
3889
3890 let rep = local_handle_table.error_context_drop(error_context)?;
3891
3892 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3893
3894 let state = self.concurrent_state_mut();
3895 let GlobalErrorContextRefCount(global_ref_count) = state
3896 .global_error_context_ref_counts
3897 .get_mut(&global_ref_count_idx)
3898 .expect("retrieve concurrent state for error context during drop");
3899
3900 assert!(*global_ref_count >= 1);
3902 *global_ref_count -= 1;
3903 if *global_ref_count == 0 {
3904 state
3905 .global_error_context_ref_counts
3906 .remove(&global_ref_count_idx);
3907
3908 state
3909 .delete(TableId::<ErrorContextState>::new(rep))
3910 .context("deleting component-global error context data")?;
3911 }
3912
3913 Ok(())
3914 }
3915
3916 fn guest_transfer(
3919 mut self: Pin<&mut Self>,
3920 src_idx: u32,
3921 src: TransmitIndex,
3922 dst: TransmitIndex,
3923 ) -> Result<u32> {
3924 let src_table = self.as_mut().table_for_transmit(src);
3925 let (rep, is_done) = match src {
3926 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
3927 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
3928 };
3929 if is_done {
3930 bail!("cannot lift after being notified that the writable end dropped");
3931 }
3932 let dst_table = self.as_mut().table_for_transmit(dst);
3933 let handle = match dst {
3934 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
3935 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
3936 }?;
3937 self.concurrent_state_mut()
3938 .get_mut(TableId::<TransmitHandle>::new(rep))?
3939 .common
3940 .handle = Some(handle);
3941 Ok(handle)
3942 }
3943
3944 pub(crate) fn future_new(
3946 self: Pin<&mut Self>,
3947 caller: RuntimeComponentInstanceIndex,
3948 ty: TypeFutureTableIndex,
3949 ) -> Result<ResourcePair> {
3950 self.check_may_leave(caller)?;
3951 self.guest_new(TransmitIndex::Future(ty))
3952 }
3953
3954 pub(crate) fn stream_new(
3956 self: Pin<&mut Self>,
3957 caller: RuntimeComponentInstanceIndex,
3958 ty: TypeStreamTableIndex,
3959 ) -> Result<ResourcePair> {
3960 self.check_may_leave(caller)?;
3961 self.guest_new(TransmitIndex::Stream(ty))
3962 }
3963
3964 pub(crate) fn future_transfer(
3967 self: Pin<&mut Self>,
3968 src_idx: u32,
3969 src: TypeFutureTableIndex,
3970 dst: TypeFutureTableIndex,
3971 ) -> Result<u32> {
3972 self.guest_transfer(
3973 src_idx,
3974 TransmitIndex::Future(src),
3975 TransmitIndex::Future(dst),
3976 )
3977 }
3978
3979 pub(crate) fn stream_transfer(
3982 self: Pin<&mut Self>,
3983 src_idx: u32,
3984 src: TypeStreamTableIndex,
3985 dst: TypeStreamTableIndex,
3986 ) -> Result<u32> {
3987 self.guest_transfer(
3988 src_idx,
3989 TransmitIndex::Stream(src),
3990 TransmitIndex::Stream(dst),
3991 )
3992 }
3993
3994 pub(crate) fn error_context_transfer(
3996 mut self: Pin<&mut Self>,
3997 src_idx: u32,
3998 src: TypeComponentLocalErrorContextTableIndex,
3999 dst: TypeComponentLocalErrorContextTableIndex,
4000 ) -> Result<u32> {
4001 let rep = self
4002 .as_mut()
4003 .table_for_error_context(src)
4004 .error_context_rep(src_idx)?;
4005 let dst_idx = self
4006 .as_mut()
4007 .table_for_error_context(dst)
4008 .error_context_insert(rep)?;
4009
4010 let global_ref_count = self
4014 .concurrent_state_mut()
4015 .global_error_context_ref_counts
4016 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4017 .context("global ref count present for existing (sub)component error context")?;
4018 global_ref_count.0 += 1;
4019
4020 Ok(dst_idx)
4021 }
4022}
4023
4024impl ConcurrentState {
4025 fn send_write_result(
4026 &mut self,
4027 ty: TransmitIndex,
4028 id: TableId<TransmitState>,
4029 handle: u32,
4030 code: ReturnCode,
4031 ) -> Result<()> {
4032 let write_handle = self.get_mut(id)?.write_handle.rep();
4033 self.set_event(
4034 write_handle,
4035 match ty {
4036 TransmitIndex::Future(ty) => Event::FutureWrite {
4037 code,
4038 pending: Some((ty, handle)),
4039 },
4040 TransmitIndex::Stream(ty) => Event::StreamWrite {
4041 code,
4042 pending: Some((ty, handle)),
4043 },
4044 },
4045 )
4046 }
4047
4048 fn send_read_result(
4049 &mut self,
4050 ty: TransmitIndex,
4051 id: TableId<TransmitState>,
4052 handle: u32,
4053 code: ReturnCode,
4054 ) -> Result<()> {
4055 let read_handle = self.get_mut(id)?.read_handle.rep();
4056 self.set_event(
4057 read_handle,
4058 match ty {
4059 TransmitIndex::Future(ty) => Event::FutureRead {
4060 code,
4061 pending: Some((ty, handle)),
4062 },
4063 TransmitIndex::Stream(ty) => Event::StreamRead {
4064 code,
4065 pending: Some((ty, handle)),
4066 },
4067 },
4068 )
4069 }
4070
4071 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4072 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4073 }
4074
4075 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4076 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4077 }
4078
4079 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4090 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4091
4092 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4093 let (ReturnCode::Completed(count)
4094 | ReturnCode::Dropped(count)
4095 | ReturnCode::Cancelled(count)) = old
4096 else {
4097 unreachable!()
4098 };
4099
4100 match new {
4101 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4102 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4103 _ => unreachable!(),
4104 }
4105 }
4106
4107 let event = match (waitable.take_event(self)?, event) {
4108 (None, _) => event,
4109 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4110 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4111 (
4112 Some(Event::StreamWrite {
4113 code: old_code,
4114 pending: old_pending,
4115 }),
4116 Event::StreamWrite { code, pending },
4117 ) => Event::StreamWrite {
4118 code: update_code(old_code, code),
4119 pending: old_pending.or(pending),
4120 },
4121 (
4122 Some(Event::StreamRead {
4123 code: old_code,
4124 pending: old_pending,
4125 }),
4126 Event::StreamRead { code, pending },
4127 ) => Event::StreamRead {
4128 code: update_code(old_code, code),
4129 pending: old_pending.or(pending),
4130 },
4131 _ => unreachable!(),
4132 };
4133
4134 waitable.set_event(self, Some(event))
4135 }
4136
4137 fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4140 let state_id = self.push(TransmitState::default())?;
4141
4142 let write = self.push(TransmitHandle::new(state_id))?;
4143 let read = self.push(TransmitHandle::new(state_id))?;
4144
4145 let state = self.get_mut(state_id)?;
4146 state.write_handle = write;
4147 state.read_handle = read;
4148
4149 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4150
4151 Ok((write, read))
4152 }
4153
4154 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4156 let state = self.delete(state_id)?;
4157 self.delete(state.write_handle)?;
4158 self.delete(state.read_handle)?;
4159
4160 log::trace!(
4161 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4162 state.write_handle,
4163 state.read_handle,
4164 );
4165
4166 Ok(())
4167 }
4168}
4169
4170pub(crate) struct ResourcePair {
4171 pub(crate) write: u32,
4172 pub(crate) read: u32,
4173}
4174
4175#[cfg(test)]
4176mod tests {
4177 use super::*;
4178 use crate::{Engine, Store};
4179 use core::future::pending;
4180 use core::pin::pin;
4181 use std::sync::LazyLock;
4182
4183 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4184
4185 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4186 where
4187 T: FutureProducer<()>,
4188 {
4189 rx.poll_produce(
4190 &mut Context::from_waker(Waker::noop()),
4191 Store::new(&ENGINE, ()).as_context_mut(),
4192 finish,
4193 )
4194 }
4195
4196 #[test]
4197 fn future_producer() {
4198 let mut fut = pin!(async { anyhow::Ok(()) });
4199 assert!(matches!(
4200 poll_future_producer(fut.as_mut(), false),
4201 Poll::Ready(Ok(Some(()))),
4202 ));
4203
4204 let mut fut = pin!(async { anyhow::Ok(()) });
4205 assert!(matches!(
4206 poll_future_producer(fut.as_mut(), true),
4207 Poll::Ready(Ok(Some(()))),
4208 ));
4209
4210 let mut fut = pin!(pending::<Result<()>>());
4211 assert!(matches!(
4212 poll_future_producer(fut.as_mut(), false),
4213 Poll::Pending,
4214 ));
4215 assert!(matches!(
4216 poll_future_producer(fut.as_mut(), true),
4217 Poll::Ready(Ok(None)),
4218 ));
4219
4220 let (tx, rx) = oneshot::channel();
4221 let mut rx = pin!(rx);
4222 assert!(matches!(
4223 poll_future_producer(rx.as_mut(), false),
4224 Poll::Pending,
4225 ));
4226 assert!(matches!(
4227 poll_future_producer(rx.as_mut(), true),
4228 Poll::Ready(Ok(None)),
4229 ));
4230 tx.send(()).unwrap();
4231 assert!(matches!(
4232 poll_future_producer(rx.as_mut(), true),
4233 Poll::Ready(Ok(Some(()))),
4234 ));
4235
4236 let (tx, rx) = oneshot::channel();
4237 let mut rx = pin!(rx);
4238 tx.send(()).unwrap();
4239 assert!(matches!(
4240 poll_future_producer(rx.as_mut(), false),
4241 Poll::Ready(Ok(Some(()))),
4242 ));
4243
4244 let (tx, rx) = oneshot::channel::<()>();
4245 let mut rx = pin!(rx);
4246 drop(tx);
4247 assert!(matches!(
4248 poll_future_producer(rx.as_mut(), false),
4249 Poll::Ready(Err(..)),
4250 ));
4251
4252 let (tx, rx) = oneshot::channel::<()>();
4253 let mut rx = pin!(rx);
4254 drop(tx);
4255 assert!(matches!(
4256 poll_future_producer(rx.as_mut(), true),
4257 Poll::Ready(Err(..)),
4258 ));
4259 }
4260}