1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext, Options};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Error, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use core::fmt;
15use core::future;
16use core::iter;
17use core::marker::PhantomData;
18use core::mem::{self, MaybeUninit};
19use core::pin::Pin;
20use core::task::{Context, Poll, Waker, ready};
21use futures::channel::oneshot;
22use futures::{FutureExt as _, stream};
23use std::any::{Any, TypeId};
24use std::boxed::Box;
25use std::io::Cursor;
26use std::string::{String, ToString};
27use std::sync::{Arc, Mutex};
28use std::vec::Vec;
29use wasmtime_environ::component::{
30 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
31 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
32 TypeFutureTableIndex, TypeStreamTableIndex,
33};
34
35pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
36
37mod buffers;
38
39#[derive(Copy, Clone, Debug)]
42pub enum TransmitKind {
43 Stream,
44 Future,
45}
46
47#[derive(Copy, Clone, Debug, PartialEq)]
49pub enum ReturnCode {
50 Blocked,
51 Completed(u32),
52 Dropped(u32),
53 Cancelled(u32),
54}
55
56impl ReturnCode {
57 pub fn encode(&self) -> u32 {
62 const BLOCKED: u32 = 0xffff_ffff;
63 const COMPLETED: u32 = 0x0;
64 const DROPPED: u32 = 0x1;
65 const CANCELLED: u32 = 0x2;
66 match self {
67 ReturnCode::Blocked => BLOCKED,
68 ReturnCode::Completed(n) => {
69 debug_assert!(*n < (1 << 28));
70 (n << 4) | COMPLETED
71 }
72 ReturnCode::Dropped(n) => {
73 debug_assert!(*n < (1 << 28));
74 (n << 4) | DROPPED
75 }
76 ReturnCode::Cancelled(n) => {
77 debug_assert!(*n < (1 << 28));
78 (n << 4) | CANCELLED
79 }
80 }
81 }
82
83 fn completed(kind: TransmitKind, count: u32) -> Self {
86 Self::Completed(if let TransmitKind::Future = kind {
87 0
88 } else {
89 count
90 })
91 }
92}
93
94#[derive(Copy, Clone, Debug)]
99pub enum TransmitIndex {
100 Stream(TypeStreamTableIndex),
101 Future(TypeFutureTableIndex),
102}
103
104impl TransmitIndex {
105 pub fn kind(&self) -> TransmitKind {
106 match self {
107 TransmitIndex::Stream(_) => TransmitKind::Stream,
108 TransmitIndex::Future(_) => TransmitKind::Future,
109 }
110 }
111}
112
113fn payload(ty: TransmitIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
116 match ty {
117 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
118 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
119 }
120}
121
122fn get_mut_by_index_from(
125 handle_table: &mut HandleTable,
126 ty: TransmitIndex,
127 index: u32,
128) -> Result<(u32, &mut TransmitLocalState)> {
129 match ty {
130 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
131 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
132 }
133}
134
135fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
136 mut store: StoreContextMut<U>,
137 instance: Instance,
138 options: &Options,
139 ty: TransmitIndex,
140 address: usize,
141 count: usize,
142 buffer: &mut B,
143) -> Result<()> {
144 let types = instance.id().get(store.0).component().types().clone();
145 let count = buffer.remaining().len().min(count);
146
147 let lower = &mut if T::MAY_REQUIRE_REALLOC {
148 LowerContext::new
149 } else {
150 LowerContext::new_without_realloc
151 }(store.as_context_mut(), options, &types, instance);
152
153 if address % usize::try_from(T::ALIGN32)? != 0 {
154 bail!("read pointer not aligned");
155 }
156 lower
157 .as_slice_mut()
158 .get_mut(address..)
159 .and_then(|b| b.get_mut(..T::SIZE32 * count))
160 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
161
162 if let Some(ty) = payload(ty, &types) {
163 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
164 }
165
166 buffer.skip(count);
167
168 Ok(())
169}
170
171fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
172 lift: &mut LiftContext<'_>,
173 ty: Option<InterfaceType>,
174 buffer: &mut B,
175 address: usize,
176 count: usize,
177) -> Result<()> {
178 let count = count.min(buffer.remaining_capacity());
179 if T::IS_RUST_UNIT_TYPE {
180 buffer.extend(
184 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
185 )
186 } else {
187 let ty = ty.unwrap();
188 if address % usize::try_from(T::ALIGN32)? != 0 {
189 bail!("write pointer not aligned");
190 }
191 lift.memory()
192 .get(address..)
193 .and_then(|b| b.get(..T::SIZE32 * count))
194 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
195
196 let list = &WasmList::new(address, count, lift, ty)?;
197 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
198 }
199 Ok(())
200}
201
202#[derive(Debug, PartialEq, Eq, PartialOrd)]
204pub(super) struct ErrorContextState {
205 pub(crate) debug_msg: String,
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub(super) struct FlatAbi {
213 pub(super) size: u32,
214 pub(super) align: u32,
215}
216
217pub struct Destination<'a, T, B> {
219 id: TableId<TransmitState>,
220 buffer: &'a mut B,
221 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
222 _phantom: PhantomData<fn() -> T>,
223}
224
225impl<'a, T, B> Destination<'a, T, B> {
226 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
228 Destination {
229 id: self.id,
230 buffer: &mut *self.buffer,
231 host_buffer: self.host_buffer.as_deref_mut(),
232 _phantom: PhantomData,
233 }
234 }
235
236 pub fn take_buffer(&mut self) -> B
242 where
243 B: Default,
244 {
245 mem::take(self.buffer)
246 }
247
248 pub fn set_buffer(&mut self, buffer: B) {
258 *self.buffer = buffer;
259 }
260
261 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
274 let transmit = store
275 .as_context_mut()
276 .0
277 .concurrent_state_mut()
278 .get_mut(self.id)
279 .unwrap();
280
281 if let &ReadState::GuestReady { count, .. } = &transmit.read {
282 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
283 unreachable!()
284 };
285
286 Some(count - guest_offset)
287 } else {
288 None
289 }
290 }
291}
292
293impl<'a, B> Destination<'a, u8, B> {
294 pub fn as_direct<D>(
305 mut self,
306 store: StoreContextMut<'a, D>,
307 capacity: usize,
308 ) -> DirectDestination<'a, D> {
309 if let Some(buffer) = self.host_buffer.as_deref_mut() {
310 buffer.set_position(0);
311 if buffer.get_mut().is_empty() {
312 buffer.get_mut().resize(capacity, 0);
313 }
314 }
315
316 DirectDestination {
317 id: self.id,
318 host_buffer: self.host_buffer,
319 store,
320 }
321 }
322}
323
324pub struct DirectDestination<'a, D: 'static> {
327 id: TableId<TransmitState>,
328 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
329 store: StoreContextMut<'a, D>,
330}
331
332impl<D: 'static> DirectDestination<'_, D> {
333 pub fn remaining(&mut self) -> &mut [u8] {
335 if let Some(buffer) = self.host_buffer.as_deref_mut() {
336 buffer.get_mut()
337 } else {
338 let transmit = self
339 .store
340 .as_context_mut()
341 .0
342 .concurrent_state_mut()
343 .get_mut(self.id)
344 .unwrap();
345
346 let &ReadState::GuestReady {
347 address,
348 count,
349 options,
350 ..
351 } = &transmit.read
352 else {
353 unreachable!();
354 };
355
356 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
357 unreachable!()
358 };
359
360 options
361 .memory_mut(self.store.0)
362 .get_mut((address + guest_offset)..)
363 .and_then(|b| b.get_mut(..(count - guest_offset)))
364 .unwrap()
365 }
366 }
367
368 pub fn mark_written(&mut self, count: usize) {
373 if let Some(buffer) = self.host_buffer.as_deref_mut() {
374 buffer.set_position(
375 buffer
376 .position()
377 .checked_add(u64::try_from(count).unwrap())
378 .unwrap(),
379 );
380 } else {
381 let transmit = self
382 .store
383 .as_context_mut()
384 .0
385 .concurrent_state_mut()
386 .get_mut(self.id)
387 .unwrap();
388
389 let ReadState::GuestReady {
390 count: read_count, ..
391 } = &transmit.read
392 else {
393 unreachable!();
394 };
395
396 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
397 unreachable!()
398 };
399
400 if *guest_offset + count > *read_count {
401 panic!(
402 "write count ({count}) must be less than or equal to read count ({read_count})"
403 )
404 } else {
405 *guest_offset += count;
406 }
407 }
408 }
409}
410
411#[derive(Copy, Clone, Debug)]
413pub enum StreamResult {
414 Completed,
417 Cancelled,
422 Dropped,
425}
426
427pub trait StreamProducer<D>: Send + 'static {
429 type Item;
431
432 type Buffer: WriteBuffer<Self::Item> + Default;
434
435 fn poll_produce<'a>(
506 self: Pin<&mut Self>,
507 cx: &mut Context<'_>,
508 store: StoreContextMut<'a, D>,
509 destination: Destination<'a, Self::Item, Self::Buffer>,
510 finish: bool,
511 ) -> Poll<Result<StreamResult>>;
512
513 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
519 Err(me)
520 }
521}
522
523impl<T, D> StreamProducer<D> for iter::Empty<T>
524where
525 T: Send + Sync + 'static,
526{
527 type Item = T;
528 type Buffer = Option<Self::Item>;
529
530 fn poll_produce<'a>(
531 self: Pin<&mut Self>,
532 _: &mut Context<'_>,
533 _: StoreContextMut<'a, D>,
534 _: Destination<'a, Self::Item, Self::Buffer>,
535 _: bool,
536 ) -> Poll<Result<StreamResult>> {
537 Poll::Ready(Ok(StreamResult::Dropped))
538 }
539}
540
541impl<T, D> StreamProducer<D> for stream::Empty<T>
542where
543 T: Send + Sync + 'static,
544{
545 type Item = T;
546 type Buffer = Option<Self::Item>;
547
548 fn poll_produce<'a>(
549 self: Pin<&mut Self>,
550 _: &mut Context<'_>,
551 _: StoreContextMut<'a, D>,
552 _: Destination<'a, Self::Item, Self::Buffer>,
553 _: bool,
554 ) -> Poll<Result<StreamResult>> {
555 Poll::Ready(Ok(StreamResult::Dropped))
556 }
557}
558
559impl<T, D> StreamProducer<D> for Vec<T>
560where
561 T: Unpin + Send + Sync + 'static,
562{
563 type Item = T;
564 type Buffer = VecBuffer<T>;
565
566 fn poll_produce<'a>(
567 self: Pin<&mut Self>,
568 _: &mut Context<'_>,
569 _: StoreContextMut<'a, D>,
570 mut dst: Destination<'a, Self::Item, Self::Buffer>,
571 _: bool,
572 ) -> Poll<Result<StreamResult>> {
573 dst.set_buffer(mem::take(self.get_mut()).into());
574 Poll::Ready(Ok(StreamResult::Dropped))
575 }
576}
577
578impl<T, D> StreamProducer<D> for Box<[T]>
579where
580 T: Unpin + Send + Sync + 'static,
581{
582 type Item = T;
583 type Buffer = VecBuffer<T>;
584
585 fn poll_produce<'a>(
586 self: Pin<&mut Self>,
587 _: &mut Context<'_>,
588 _: StoreContextMut<'a, D>,
589 mut dst: Destination<'a, Self::Item, Self::Buffer>,
590 _: bool,
591 ) -> Poll<Result<StreamResult>> {
592 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
593 Poll::Ready(Ok(StreamResult::Dropped))
594 }
595}
596
597#[cfg(feature = "component-model-async-bytes")]
598impl<D> StreamProducer<D> for bytes::Bytes {
599 type Item = u8;
600 type Buffer = Cursor<Self>;
601
602 fn poll_produce<'a>(
603 mut self: Pin<&mut Self>,
604 _: &mut Context<'_>,
605 mut store: StoreContextMut<'a, D>,
606 mut dst: Destination<'a, Self::Item, Self::Buffer>,
607 _: bool,
608 ) -> Poll<Result<StreamResult>> {
609 let cap = dst.remaining(&mut store);
610 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
611 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
613 return Poll::Ready(Ok(StreamResult::Dropped));
614 };
615 let cap = cap.into();
616 dst.set_buffer(Cursor::new(self.split_off(cap)));
618 let mut dst = dst.as_direct(store, cap);
619 dst.remaining().copy_from_slice(&self);
620 dst.mark_written(cap);
621 Poll::Ready(Ok(StreamResult::Dropped))
622 }
623}
624
625#[cfg(feature = "component-model-async-bytes")]
626impl<D> StreamProducer<D> for bytes::BytesMut {
627 type Item = u8;
628 type Buffer = Cursor<Self>;
629
630 fn poll_produce<'a>(
631 mut self: Pin<&mut Self>,
632 _: &mut Context<'_>,
633 mut store: StoreContextMut<'a, D>,
634 mut dst: Destination<'a, Self::Item, Self::Buffer>,
635 _: bool,
636 ) -> Poll<Result<StreamResult>> {
637 let cap = dst.remaining(&mut store);
638 let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
639 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
641 return Poll::Ready(Ok(StreamResult::Dropped));
642 };
643 let cap = cap.into();
644 dst.set_buffer(Cursor::new(self.split_off(cap)));
646 let mut dst = dst.as_direct(store, cap);
647 dst.remaining().copy_from_slice(&self);
648 dst.mark_written(cap);
649 Poll::Ready(Ok(StreamResult::Dropped))
650 }
651}
652
653pub struct Source<'a, T> {
655 instance: Option<Instance>,
656 id: TableId<TransmitState>,
657 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
658}
659
660impl<'a, T> Source<'a, T> {
661 pub fn reborrow(&mut self) -> Source<'_, T> {
663 Source {
664 instance: self.instance,
665 id: self.id,
666 host_buffer: self.host_buffer.as_deref_mut(),
667 }
668 }
669
670 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
672 where
673 T: func::Lift + 'static,
674 B: ReadBuffer<T>,
675 {
676 if let Some(input) = &mut self.host_buffer {
677 let count = input.remaining().len().min(buffer.remaining_capacity());
678 buffer.move_from(*input, count);
679 } else {
680 let store = store.as_context_mut();
681 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
682
683 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
684 unreachable!();
685 };
686
687 let &WriteState::GuestReady {
688 ty,
689 address,
690 count,
691 options,
692 ..
693 } = &transmit.write
694 else {
695 unreachable!()
696 };
697
698 let cx =
699 &mut LiftContext::new(store.0.store_opaque_mut(), &options, self.instance.unwrap());
700 let ty = payload(ty, cx.types);
701 let old_remaining = buffer.remaining_capacity();
702 lift::<T, B, S::Data>(
703 cx,
704 ty,
705 buffer,
706 address + (T::SIZE32 * guest_offset),
707 count - guest_offset,
708 )?;
709
710 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
711
712 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
713 unreachable!();
714 };
715
716 *guest_offset += old_remaining - buffer.remaining_capacity();
717 }
718
719 Ok(())
720 }
721
722 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
725 where
726 T: 'static,
727 {
728 let transmit = store
729 .as_context_mut()
730 .0
731 .concurrent_state_mut()
732 .get_mut(self.id)
733 .unwrap();
734
735 if let &WriteState::GuestReady { count, .. } = &transmit.write {
736 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
737 unreachable!()
738 };
739
740 count - guest_offset
741 } else if let Some(host_buffer) = &self.host_buffer {
742 host_buffer.remaining().len()
743 } else {
744 unreachable!()
745 }
746 }
747}
748
749impl<'a> Source<'a, u8> {
750 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
752 DirectSource {
753 id: self.id,
754 host_buffer: self.host_buffer,
755 store,
756 }
757 }
758}
759
760pub struct DirectSource<'a, D: 'static> {
763 id: TableId<TransmitState>,
764 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
765 store: StoreContextMut<'a, D>,
766}
767
768impl<D: 'static> DirectSource<'_, D> {
769 pub fn remaining(&mut self) -> &[u8] {
771 if let Some(buffer) = self.host_buffer.as_deref_mut() {
772 buffer.remaining()
773 } else {
774 let transmit = self
775 .store
776 .as_context_mut()
777 .0
778 .concurrent_state_mut()
779 .get_mut(self.id)
780 .unwrap();
781
782 let &WriteState::GuestReady {
783 address,
784 count,
785 options,
786 ..
787 } = &transmit.write
788 else {
789 unreachable!()
790 };
791
792 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
793 unreachable!()
794 };
795
796 options
797 .memory(self.store.0)
798 .get((address + guest_offset)..)
799 .and_then(|b| b.get(..(count - guest_offset)))
800 .unwrap()
801 }
802 }
803
804 pub fn mark_read(&mut self, count: usize) {
809 if let Some(buffer) = self.host_buffer.as_deref_mut() {
810 buffer.skip(count);
811 } else {
812 let transmit = self
813 .store
814 .as_context_mut()
815 .0
816 .concurrent_state_mut()
817 .get_mut(self.id)
818 .unwrap();
819
820 let WriteState::GuestReady {
821 count: write_count, ..
822 } = &transmit.write
823 else {
824 unreachable!()
825 };
826
827 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
828 unreachable!()
829 };
830
831 if *guest_offset + count > *write_count {
832 panic!(
833 "read count ({count}) must be less than or equal to write count ({write_count})"
834 )
835 } else {
836 *guest_offset += count;
837 }
838 }
839 }
840}
841
842pub trait StreamConsumer<D>: Send + 'static {
844 type Item;
846
847 fn poll_consume(
930 self: Pin<&mut Self>,
931 cx: &mut Context<'_>,
932 store: StoreContextMut<D>,
933 source: Source<'_, Self::Item>,
934 finish: bool,
935 ) -> Poll<Result<StreamResult>>;
936}
937
938pub trait FutureProducer<D>: Send + 'static {
940 type Item;
942
943 fn poll_produce(
953 self: Pin<&mut Self>,
954 cx: &mut Context<'_>,
955 store: StoreContextMut<D>,
956 finish: bool,
957 ) -> Poll<Result<Option<Self::Item>>>;
958}
959
960impl<T, E, D, Fut> FutureProducer<D> for Fut
961where
962 E: Into<Error>,
963 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
964{
965 type Item = T;
966
967 fn poll_produce<'a>(
968 self: Pin<&mut Self>,
969 cx: &mut Context<'_>,
970 _: StoreContextMut<'a, D>,
971 finish: bool,
972 ) -> Poll<Result<Option<T>>> {
973 match self.poll(cx) {
974 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
975 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
976 Poll::Pending if finish => Poll::Ready(Ok(None)),
977 Poll::Pending => Poll::Pending,
978 }
979 }
980}
981
982pub trait FutureConsumer<D>: Send + 'static {
984 type Item;
986
987 fn poll_consume(
999 self: Pin<&mut Self>,
1000 cx: &mut Context<'_>,
1001 store: StoreContextMut<D>,
1002 source: Source<'_, Self::Item>,
1003 finish: bool,
1004 ) -> Poll<Result<()>>;
1005}
1006
1007pub struct FutureReader<T> {
1014 id: TableId<TransmitHandle>,
1015 _phantom: PhantomData<T>,
1016}
1017
1018impl<T> FutureReader<T> {
1019 pub fn new<S: AsContextMut>(
1021 mut store: S,
1022 producer: impl FutureProducer<S::Data, Item = T>,
1023 ) -> Self
1024 where
1025 T: func::Lower + func::Lift + Send + Sync + 'static,
1026 {
1027 struct Producer<P>(P);
1028
1029 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1030 for Producer<P>
1031 {
1032 type Item = P::Item;
1033 type Buffer = Option<P::Item>;
1034
1035 fn poll_produce<'a>(
1036 self: Pin<&mut Self>,
1037 cx: &mut Context<'_>,
1038 store: StoreContextMut<D>,
1039 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1040 finish: bool,
1041 ) -> Poll<Result<StreamResult>> {
1042 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1045
1046 Poll::Ready(Ok(
1047 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1048 destination.set_buffer(Some(value));
1049
1050 StreamResult::Completed
1057 } else {
1058 StreamResult::Cancelled
1059 },
1060 ))
1061 }
1062 }
1063
1064 Self::new_(
1065 store
1066 .as_context_mut()
1067 .new_transmit(TransmitKind::Future, Producer(producer)),
1068 )
1069 }
1070
1071 fn new_(id: TableId<TransmitHandle>) -> Self {
1072 Self {
1073 id,
1074 _phantom: PhantomData,
1075 }
1076 }
1077
1078 pub fn pipe<S: AsContextMut>(
1080 self,
1081 mut store: S,
1082 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1083 ) where
1084 T: func::Lift + 'static,
1085 {
1086 struct Consumer<C>(C);
1087
1088 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1089 for Consumer<C>
1090 {
1091 type Item = T;
1092
1093 fn poll_consume(
1094 self: Pin<&mut Self>,
1095 cx: &mut Context<'_>,
1096 mut store: StoreContextMut<D>,
1097 mut source: Source<Self::Item>,
1098 finish: bool,
1099 ) -> Poll<Result<StreamResult>> {
1100 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1103
1104 ready!(consumer.poll_consume(
1105 cx,
1106 store.as_context_mut(),
1107 source.reborrow(),
1108 finish
1109 ))?;
1110
1111 Poll::Ready(Ok(if source.remaining(store) == 0 {
1112 StreamResult::Completed
1118 } else {
1119 StreamResult::Cancelled
1120 }))
1121 }
1122 }
1123
1124 store
1125 .as_context_mut()
1126 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1127 }
1128
1129 pub fn into_val(self) -> Val {
1132 Val::Future(FutureAny(self.id.rep()))
1133 }
1134
1135 pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1137 let Val::Future(FutureAny(rep)) = value else {
1138 bail!("expected `future`; got `{}`", value.desc());
1139 };
1140 let store = store.as_context_mut();
1141 let id = TableId::<TransmitHandle>::new(*rep);
1142 store.0.concurrent_state_mut().get_mut(id)?; Ok(Self::new_(id))
1144 }
1145
1146 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1148 match ty {
1149 InterfaceType::Future(src) => {
1150 let handle_table = cx
1151 .instance_mut()
1152 .table_for_transmit(TransmitIndex::Future(src));
1153 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1154 if is_done {
1155 bail!("cannot lift future after being notified that the writable end dropped");
1156 }
1157 let id = TableId::<TransmitHandle>::new(rep);
1158 let concurrent_state = cx.concurrent_state_mut();
1159 let future = concurrent_state.get_mut(id)?;
1160 future.common.handle = None;
1161 let state = future.state;
1162
1163 if concurrent_state.get_mut(state)?.done {
1164 bail!("cannot lift future after previous read succeeded");
1165 }
1166
1167 Ok(Self::new_(id))
1168 }
1169 _ => func::bad_type_info(),
1170 }
1171 }
1172
1173 pub fn close(&mut self, mut store: impl AsContextMut) {
1181 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1183 store
1184 .as_context_mut()
1185 .0
1186 .host_drop_reader(id, TransmitKind::Future)
1187 .unwrap();
1188 }
1189
1190 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1192 accessor.as_accessor().with(|access| self.close(access))
1193 }
1194
1195 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1201 where
1202 A: AsAccessor,
1203 {
1204 GuardedFutureReader::new(accessor, self)
1205 }
1206}
1207
1208impl<T> fmt::Debug for FutureReader<T> {
1209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1210 f.debug_struct("FutureReader")
1211 .field("id", &self.id)
1212 .finish()
1213 }
1214}
1215
1216pub(crate) fn lower_future_to_index<U>(
1218 rep: u32,
1219 cx: &mut LowerContext<'_, U>,
1220 ty: InterfaceType,
1221) -> Result<u32> {
1222 match ty {
1223 InterfaceType::Future(dst) => {
1224 let concurrent_state = cx.store.0.concurrent_state_mut();
1225 let id = TableId::<TransmitHandle>::new(rep);
1226 let state = concurrent_state.get_mut(id)?.state;
1227 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1228
1229 let handle = cx
1230 .instance_mut()
1231 .table_for_transmit(TransmitIndex::Future(dst))
1232 .future_insert_read(dst, rep)?;
1233
1234 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1235
1236 Ok(handle)
1237 }
1238 _ => func::bad_type_info(),
1239 }
1240}
1241
1242unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1245 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1246
1247 type Lower = <u32 as func::ComponentType>::Lower;
1248
1249 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1250 match ty {
1251 InterfaceType::Future(_) => Ok(()),
1252 other => bail!("expected `future`, found `{}`", func::desc(other)),
1253 }
1254 }
1255}
1256
1257unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1259 fn linear_lower_to_flat<U>(
1260 &self,
1261 cx: &mut LowerContext<'_, U>,
1262 ty: InterfaceType,
1263 dst: &mut MaybeUninit<Self::Lower>,
1264 ) -> Result<()> {
1265 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1266 cx,
1267 InterfaceType::U32,
1268 dst,
1269 )
1270 }
1271
1272 fn linear_lower_to_memory<U>(
1273 &self,
1274 cx: &mut LowerContext<'_, U>,
1275 ty: InterfaceType,
1276 offset: usize,
1277 ) -> Result<()> {
1278 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1279 cx,
1280 InterfaceType::U32,
1281 offset,
1282 )
1283 }
1284}
1285
1286unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1288 fn linear_lift_from_flat(
1289 cx: &mut LiftContext<'_>,
1290 ty: InterfaceType,
1291 src: &Self::Lower,
1292 ) -> Result<Self> {
1293 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1294 Self::lift_from_index(cx, ty, index)
1295 }
1296
1297 fn linear_lift_from_memory(
1298 cx: &mut LiftContext<'_>,
1299 ty: InterfaceType,
1300 bytes: &[u8],
1301 ) -> Result<Self> {
1302 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1303 Self::lift_from_index(cx, ty, index)
1304 }
1305}
1306
1307pub struct GuardedFutureReader<T, A>
1313where
1314 A: AsAccessor,
1315{
1316 reader: Option<FutureReader<T>>,
1320 accessor: A,
1321}
1322
1323impl<T, A> GuardedFutureReader<T, A>
1324where
1325 A: AsAccessor,
1326{
1327 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1329 Self {
1330 reader: Some(reader),
1331 accessor,
1332 }
1333 }
1334
1335 pub fn into_future(self) -> FutureReader<T> {
1338 self.into()
1339 }
1340}
1341
1342impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1343where
1344 A: AsAccessor,
1345{
1346 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1347 guard.reader.take().unwrap()
1348 }
1349}
1350
1351impl<T, A> Drop for GuardedFutureReader<T, A>
1352where
1353 A: AsAccessor,
1354{
1355 fn drop(&mut self) {
1356 if let Some(reader) = &mut self.reader {
1357 reader.close_with(&self.accessor)
1358 }
1359 }
1360}
1361
1362pub struct StreamReader<T> {
1369 id: TableId<TransmitHandle>,
1370 _phantom: PhantomData<T>,
1371}
1372
1373impl<T> StreamReader<T> {
1374 pub fn new<S: AsContextMut>(
1376 mut store: S,
1377 producer: impl StreamProducer<S::Data, Item = T>,
1378 ) -> Self
1379 where
1380 T: func::Lower + func::Lift + Send + Sync + 'static,
1381 {
1382 Self::new_(
1383 store
1384 .as_context_mut()
1385 .new_transmit(TransmitKind::Stream, producer),
1386 )
1387 }
1388
1389 fn new_(id: TableId<TransmitHandle>) -> Self {
1390 Self {
1391 id,
1392 _phantom: PhantomData,
1393 }
1394 }
1395
1396 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1413 let store = store.as_context_mut();
1414 let state = store.0.concurrent_state_mut();
1415 let id = state.get_mut(self.id).unwrap().state;
1416 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1417 match try_into(TypeId::of::<V>()) {
1418 Some(result) => {
1419 self.close(store);
1420 Ok(*result.downcast::<V>().unwrap())
1421 }
1422 None => Err(self),
1423 }
1424 } else {
1425 Err(self)
1426 }
1427 }
1428
1429 pub fn pipe<S: AsContextMut>(
1431 self,
1432 mut store: S,
1433 consumer: impl StreamConsumer<S::Data, Item = T>,
1434 ) where
1435 T: 'static,
1436 {
1437 store
1438 .as_context_mut()
1439 .set_consumer(self.id, TransmitKind::Stream, consumer);
1440 }
1441
1442 pub fn into_val(self) -> Val {
1445 Val::Stream(StreamAny(self.id.rep()))
1446 }
1447
1448 pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1450 let Val::Stream(StreamAny(rep)) = value else {
1451 bail!("expected `stream`; got `{}`", value.desc());
1452 };
1453 let store = store.as_context_mut();
1454 let id = TableId::<TransmitHandle>::new(*rep);
1455 store.0.concurrent_state_mut().get_mut(id)?; Ok(Self::new_(id))
1457 }
1458
1459 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1461 match ty {
1462 InterfaceType::Stream(src) => {
1463 let handle_table = cx
1464 .instance_mut()
1465 .table_for_transmit(TransmitIndex::Stream(src));
1466 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1467 if is_done {
1468 bail!("cannot lift stream after being notified that the writable end dropped");
1469 }
1470 let id = TableId::<TransmitHandle>::new(rep);
1471 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1472 Ok(Self::new_(id))
1473 }
1474 _ => func::bad_type_info(),
1475 }
1476 }
1477
1478 pub fn close(&mut self, mut store: impl AsContextMut) {
1486 let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1488 store
1489 .as_context_mut()
1490 .0
1491 .host_drop_reader(id, TransmitKind::Stream)
1492 .unwrap()
1493 }
1494
1495 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1497 accessor.as_accessor().with(|access| self.close(access))
1498 }
1499
1500 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1506 where
1507 A: AsAccessor,
1508 {
1509 GuardedStreamReader::new(accessor, self)
1510 }
1511}
1512
1513impl<T> fmt::Debug for StreamReader<T> {
1514 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1515 f.debug_struct("StreamReader")
1516 .field("id", &self.id)
1517 .finish()
1518 }
1519}
1520
1521pub(crate) fn lower_stream_to_index<U>(
1523 rep: u32,
1524 cx: &mut LowerContext<'_, U>,
1525 ty: InterfaceType,
1526) -> Result<u32> {
1527 match ty {
1528 InterfaceType::Stream(dst) => {
1529 let concurrent_state = cx.store.0.concurrent_state_mut();
1530 let id = TableId::<TransmitHandle>::new(rep);
1531 let state = concurrent_state.get_mut(id)?.state;
1532 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1533
1534 let handle = cx
1535 .instance_mut()
1536 .table_for_transmit(TransmitIndex::Stream(dst))
1537 .stream_insert_read(dst, rep)?;
1538
1539 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1540
1541 Ok(handle)
1542 }
1543 _ => func::bad_type_info(),
1544 }
1545}
1546
1547unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1550 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1551
1552 type Lower = <u32 as func::ComponentType>::Lower;
1553
1554 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1555 match ty {
1556 InterfaceType::Stream(_) => Ok(()),
1557 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1558 }
1559 }
1560}
1561
1562unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1564 fn linear_lower_to_flat<U>(
1565 &self,
1566 cx: &mut LowerContext<'_, U>,
1567 ty: InterfaceType,
1568 dst: &mut MaybeUninit<Self::Lower>,
1569 ) -> Result<()> {
1570 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1571 cx,
1572 InterfaceType::U32,
1573 dst,
1574 )
1575 }
1576
1577 fn linear_lower_to_memory<U>(
1578 &self,
1579 cx: &mut LowerContext<'_, U>,
1580 ty: InterfaceType,
1581 offset: usize,
1582 ) -> Result<()> {
1583 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1584 cx,
1585 InterfaceType::U32,
1586 offset,
1587 )
1588 }
1589}
1590
1591unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1593 fn linear_lift_from_flat(
1594 cx: &mut LiftContext<'_>,
1595 ty: InterfaceType,
1596 src: &Self::Lower,
1597 ) -> Result<Self> {
1598 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1599 Self::lift_from_index(cx, ty, index)
1600 }
1601
1602 fn linear_lift_from_memory(
1603 cx: &mut LiftContext<'_>,
1604 ty: InterfaceType,
1605 bytes: &[u8],
1606 ) -> Result<Self> {
1607 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1608 Self::lift_from_index(cx, ty, index)
1609 }
1610}
1611
1612pub struct GuardedStreamReader<T, A>
1618where
1619 A: AsAccessor,
1620{
1621 reader: Option<StreamReader<T>>,
1625 accessor: A,
1626}
1627
1628impl<T, A> GuardedStreamReader<T, A>
1629where
1630 A: AsAccessor,
1631{
1632 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1635 Self {
1636 reader: Some(reader),
1637 accessor,
1638 }
1639 }
1640
1641 pub fn into_stream(self) -> StreamReader<T> {
1644 self.into()
1645 }
1646}
1647
1648impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1649where
1650 A: AsAccessor,
1651{
1652 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1653 guard.reader.take().unwrap()
1654 }
1655}
1656
1657impl<T, A> Drop for GuardedStreamReader<T, A>
1658where
1659 A: AsAccessor,
1660{
1661 fn drop(&mut self) {
1662 if let Some(reader) = &mut self.reader {
1663 reader.close_with(&self.accessor)
1664 }
1665 }
1666}
1667
1668pub struct ErrorContext {
1670 rep: u32,
1671}
1672
1673impl ErrorContext {
1674 pub(crate) fn new(rep: u32) -> Self {
1675 Self { rep }
1676 }
1677
1678 pub fn into_val(self) -> Val {
1680 Val::ErrorContext(ErrorContextAny(self.rep))
1681 }
1682
1683 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1685 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1686 bail!("expected `error-context`; got `{}`", value.desc());
1687 };
1688 Ok(Self::new(*rep))
1689 }
1690
1691 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1692 match ty {
1693 InterfaceType::ErrorContext(src) => {
1694 let rep = cx
1695 .instance_mut()
1696 .table_for_error_context(src)
1697 .error_context_rep(index)?;
1698
1699 Ok(Self { rep })
1700 }
1701 _ => func::bad_type_info(),
1702 }
1703 }
1704}
1705
1706pub(crate) fn lower_error_context_to_index<U>(
1707 rep: u32,
1708 cx: &mut LowerContext<'_, U>,
1709 ty: InterfaceType,
1710) -> Result<u32> {
1711 match ty {
1712 InterfaceType::ErrorContext(dst) => {
1713 let tbl = cx.instance_mut().table_for_error_context(dst);
1714 tbl.error_context_insert(rep)
1715 }
1716 _ => func::bad_type_info(),
1717 }
1718}
1719unsafe impl func::ComponentType for ErrorContext {
1722 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1723
1724 type Lower = <u32 as func::ComponentType>::Lower;
1725
1726 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1727 match ty {
1728 InterfaceType::ErrorContext(_) => Ok(()),
1729 other => bail!("expected `error`, found `{}`", func::desc(other)),
1730 }
1731 }
1732}
1733
1734unsafe impl func::Lower for ErrorContext {
1736 fn linear_lower_to_flat<T>(
1737 &self,
1738 cx: &mut LowerContext<'_, T>,
1739 ty: InterfaceType,
1740 dst: &mut MaybeUninit<Self::Lower>,
1741 ) -> Result<()> {
1742 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1743 cx,
1744 InterfaceType::U32,
1745 dst,
1746 )
1747 }
1748
1749 fn linear_lower_to_memory<T>(
1750 &self,
1751 cx: &mut LowerContext<'_, T>,
1752 ty: InterfaceType,
1753 offset: usize,
1754 ) -> Result<()> {
1755 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1756 cx,
1757 InterfaceType::U32,
1758 offset,
1759 )
1760 }
1761}
1762
1763unsafe impl func::Lift for ErrorContext {
1765 fn linear_lift_from_flat(
1766 cx: &mut LiftContext<'_>,
1767 ty: InterfaceType,
1768 src: &Self::Lower,
1769 ) -> Result<Self> {
1770 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1771 Self::lift_from_index(cx, ty, index)
1772 }
1773
1774 fn linear_lift_from_memory(
1775 cx: &mut LiftContext<'_>,
1776 ty: InterfaceType,
1777 bytes: &[u8],
1778 ) -> Result<Self> {
1779 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1780 Self::lift_from_index(cx, ty, index)
1781 }
1782}
1783
1784pub(super) struct TransmitHandle {
1786 pub(super) common: WaitableCommon,
1787 state: TableId<TransmitState>,
1789}
1790
1791impl TransmitHandle {
1792 fn new(state: TableId<TransmitState>) -> Self {
1793 Self {
1794 common: WaitableCommon::default(),
1795 state,
1796 }
1797 }
1798}
1799
1800impl TableDebug for TransmitHandle {
1801 fn type_name() -> &'static str {
1802 "TransmitHandle"
1803 }
1804}
1805
1806struct TransmitState {
1808 write_handle: TableId<TransmitHandle>,
1810 read_handle: TableId<TransmitHandle>,
1812 write: WriteState,
1814 read: ReadState,
1816 done: bool,
1818}
1819
1820impl Default for TransmitState {
1821 fn default() -> Self {
1822 Self {
1823 write_handle: TableId::new(u32::MAX),
1824 read_handle: TableId::new(u32::MAX),
1825 read: ReadState::Open,
1826 write: WriteState::Open,
1827 done: false,
1828 }
1829 }
1830}
1831
1832impl TableDebug for TransmitState {
1833 fn type_name() -> &'static str {
1834 "TransmitState"
1835 }
1836}
1837
1838type PollStream = Box<
1839 dyn Fn(Option<Instance>) -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>
1840 + Send
1841 + Sync,
1842>;
1843
1844type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
1845
1846enum WriteState {
1848 Open,
1850 GuestReady {
1852 instance: Instance,
1853 ty: TransmitIndex,
1854 flat_abi: Option<FlatAbi>,
1855 options: Options,
1856 address: usize,
1857 count: usize,
1858 handle: u32,
1859 },
1860 HostReady {
1862 produce: PollStream,
1863 try_into: TryInto,
1864 guest_offset: usize,
1865 cancel: bool,
1866 cancel_waker: Option<Waker>,
1867 },
1868 Dropped,
1870}
1871
1872impl fmt::Debug for WriteState {
1873 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1874 match self {
1875 Self::Open => f.debug_tuple("Open").finish(),
1876 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1877 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1878 Self::Dropped => f.debug_tuple("Dropped").finish(),
1879 }
1880 }
1881}
1882
1883enum ReadState {
1885 Open,
1887 GuestReady {
1889 ty: TransmitIndex,
1890 flat_abi: Option<FlatAbi>,
1891 options: Options,
1892 address: usize,
1893 count: usize,
1894 handle: u32,
1895 },
1896 HostReady {
1898 consume: PollStream,
1899 guest_offset: usize,
1900 cancel: bool,
1901 cancel_waker: Option<Waker>,
1902 },
1903 HostToHost {
1905 accept: Box<
1906 dyn for<'a> Fn(
1907 &'a mut UntypedWriteBuffer<'a>,
1908 )
1909 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1910 + Send
1911 + Sync,
1912 >,
1913 buffer: Vec<u8>,
1914 limit: usize,
1915 },
1916 Dropped,
1918}
1919
1920impl fmt::Debug for ReadState {
1921 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1922 match self {
1923 Self::Open => f.debug_tuple("Open").finish(),
1924 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1925 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1926 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1927 Self::Dropped => f.debug_tuple("Dropped").finish(),
1928 }
1929 }
1930}
1931
1932fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
1933 let count = guest_offset.try_into().unwrap();
1934 match state {
1935 StreamResult::Dropped => ReturnCode::Dropped(count),
1936 StreamResult::Completed => ReturnCode::completed(kind, count),
1937 StreamResult::Cancelled => ReturnCode::Cancelled(count),
1938 }
1939}
1940
1941impl StoreOpaque {
1942 fn pipe_from_guest(
1943 &mut self,
1944 kind: TransmitKind,
1945 id: TableId<TransmitState>,
1946 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
1947 ) {
1948 let future = async move {
1949 let stream_state = future.await?;
1950 tls::get(|store| {
1951 let state = store.concurrent_state_mut();
1952 let transmit = state.get_mut(id)?;
1953 let ReadState::HostReady {
1954 consume,
1955 guest_offset,
1956 ..
1957 } = mem::replace(&mut transmit.read, ReadState::Open)
1958 else {
1959 unreachable!();
1960 };
1961 let code = return_code(kind, stream_state, guest_offset);
1962 transmit.read = match stream_state {
1963 StreamResult::Dropped => ReadState::Dropped,
1964 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
1965 consume,
1966 guest_offset: 0,
1967 cancel: false,
1968 cancel_waker: None,
1969 },
1970 };
1971 let WriteState::GuestReady { ty, handle, .. } =
1972 mem::replace(&mut transmit.write, WriteState::Open)
1973 else {
1974 unreachable!();
1975 };
1976 state.send_write_result(ty, id, handle, code)?;
1977 Ok(())
1978 })
1979 };
1980
1981 self.concurrent_state_mut().push_future(future.boxed());
1982 }
1983
1984 fn pipe_to_guest(
1985 &mut self,
1986 kind: TransmitKind,
1987 id: TableId<TransmitState>,
1988 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
1989 ) {
1990 let future = async move {
1991 let stream_state = future.await?;
1992 tls::get(|store| {
1993 let state = store.concurrent_state_mut();
1994 let transmit = state.get_mut(id)?;
1995 let WriteState::HostReady {
1996 produce,
1997 try_into,
1998 guest_offset,
1999 ..
2000 } = mem::replace(&mut transmit.write, WriteState::Open)
2001 else {
2002 unreachable!();
2003 };
2004 let code = return_code(kind, stream_state, guest_offset);
2005 transmit.write = match stream_state {
2006 StreamResult::Dropped => WriteState::Dropped,
2007 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2008 produce,
2009 try_into,
2010 guest_offset: 0,
2011 cancel: false,
2012 cancel_waker: None,
2013 },
2014 };
2015 let ReadState::GuestReady { ty, handle, .. } =
2016 mem::replace(&mut transmit.read, ReadState::Open)
2017 else {
2018 unreachable!();
2019 };
2020 state.send_read_result(ty, id, handle, code)?;
2021 Ok(())
2022 })
2023 };
2024
2025 self.concurrent_state_mut().push_future(future.boxed());
2026 }
2027
2028 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2030 let state = self.concurrent_state_mut();
2031 let transmit_id = state.get_mut(id)?.state;
2032 let transmit = state
2033 .get_mut(transmit_id)
2034 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2035 log::trace!(
2036 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2037 transmit.read,
2038 transmit.write
2039 );
2040
2041 transmit.read = ReadState::Dropped;
2042
2043 let new_state = if let WriteState::Dropped = &transmit.write {
2046 WriteState::Dropped
2047 } else {
2048 WriteState::Open
2049 };
2050
2051 let write_handle = transmit.write_handle;
2052
2053 match mem::replace(&mut transmit.write, new_state) {
2054 WriteState::GuestReady { ty, handle, .. } => {
2057 state.update_event(
2058 write_handle.rep(),
2059 match ty {
2060 TransmitIndex::Future(ty) => Event::FutureWrite {
2061 code: ReturnCode::Dropped(0),
2062 pending: Some((ty, handle)),
2063 },
2064 TransmitIndex::Stream(ty) => Event::StreamWrite {
2065 code: ReturnCode::Dropped(0),
2066 pending: Some((ty, handle)),
2067 },
2068 },
2069 )?;
2070 }
2071
2072 WriteState::HostReady { .. } => {}
2073
2074 WriteState::Open => {
2075 state.update_event(
2076 write_handle.rep(),
2077 match kind {
2078 TransmitKind::Future => Event::FutureWrite {
2079 code: ReturnCode::Dropped(0),
2080 pending: None,
2081 },
2082 TransmitKind::Stream => Event::StreamWrite {
2083 code: ReturnCode::Dropped(0),
2084 pending: None,
2085 },
2086 },
2087 )?;
2088 }
2089
2090 WriteState::Dropped => {
2091 log::trace!("host_drop_reader delete {transmit_id:?}");
2092 state.delete_transmit(transmit_id)?;
2093 }
2094 }
2095 Ok(())
2096 }
2097
2098 fn host_drop_writer(
2100 &mut self,
2101 id: TableId<TransmitHandle>,
2102 on_drop_open: Option<fn() -> Result<()>>,
2103 ) -> Result<()> {
2104 let state = self.concurrent_state_mut();
2105 let transmit_id = state.get_mut(id)?.state;
2106 let transmit = state
2107 .get_mut(transmit_id)
2108 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2109 log::trace!(
2110 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2111 transmit.read,
2112 transmit.write
2113 );
2114
2115 match &mut transmit.write {
2117 WriteState::GuestReady { .. } => {
2118 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2119 }
2120 WriteState::HostReady { .. } => {}
2121 v @ WriteState::Open => {
2122 if let (Some(on_drop_open), false) = (
2123 on_drop_open,
2124 transmit.done || matches!(transmit.read, ReadState::Dropped),
2125 ) {
2126 on_drop_open()?;
2127 } else {
2128 *v = WriteState::Dropped;
2129 }
2130 }
2131 WriteState::Dropped => unreachable!("write state is already dropped"),
2132 }
2133
2134 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2135
2136 let new_state = if let ReadState::Dropped = &transmit.read {
2142 ReadState::Dropped
2143 } else {
2144 ReadState::Open
2145 };
2146
2147 let read_handle = transmit.read_handle;
2148
2149 match mem::replace(&mut transmit.read, new_state) {
2151 ReadState::GuestReady { ty, handle, .. } => {
2155 self.concurrent_state_mut().update_event(
2157 read_handle.rep(),
2158 match ty {
2159 TransmitIndex::Future(ty) => Event::FutureRead {
2160 code: ReturnCode::Dropped(0),
2161 pending: Some((ty, handle)),
2162 },
2163 TransmitIndex::Stream(ty) => Event::StreamRead {
2164 code: ReturnCode::Dropped(0),
2165 pending: Some((ty, handle)),
2166 },
2167 },
2168 )?;
2169 }
2170
2171 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2172
2173 ReadState::Open => {
2175 self.concurrent_state_mut().update_event(
2176 read_handle.rep(),
2177 match on_drop_open {
2178 Some(_) => Event::FutureRead {
2179 code: ReturnCode::Dropped(0),
2180 pending: None,
2181 },
2182 None => Event::StreamRead {
2183 code: ReturnCode::Dropped(0),
2184 pending: None,
2185 },
2186 },
2187 )?;
2188 }
2189
2190 ReadState::Dropped => {
2193 log::trace!("host_drop_writer delete {transmit_id:?}");
2194 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2195 }
2196 }
2197 Ok(())
2198 }
2199}
2200
2201impl<T> StoreContextMut<'_, T> {
2202 fn new_transmit<P: StreamProducer<T>>(
2203 mut self,
2204 kind: TransmitKind,
2205 producer: P,
2206 ) -> TableId<TransmitHandle>
2207 where
2208 P::Item: func::Lower,
2209 {
2210 let token = StoreToken::new(self.as_context_mut());
2211 let state = self.0.concurrent_state_mut();
2212 let (_, read) = state.new_transmit().unwrap();
2213 let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2214 let id = state.get_mut(read).unwrap().state;
2215 let produce = Box::new({
2216 let producer = producer.clone();
2217 move |instance| {
2218 let producer = producer.clone();
2219 async move {
2220 let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2221
2222 let (result, cancelled) = if buffer.remaining().is_empty() {
2223 future::poll_fn(|cx| {
2224 tls::get(|store| {
2225 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2226
2227 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2228 unreachable!();
2229 };
2230
2231 let mut host_buffer =
2232 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2233 Some(Cursor::new(mem::take(buffer)))
2234 } else {
2235 None
2236 };
2237
2238 let poll = mine.as_mut().poll_produce(
2239 cx,
2240 token.as_context_mut(store),
2241 Destination {
2242 id,
2243 buffer: &mut buffer,
2244 host_buffer: host_buffer.as_mut(),
2245 _phantom: PhantomData,
2246 },
2247 cancel,
2248 );
2249
2250 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2251
2252 let host_offset = if let (
2253 Some(host_buffer),
2254 ReadState::HostToHost { buffer, limit, .. },
2255 ) = (host_buffer, &mut transmit.read)
2256 {
2257 *limit = usize::try_from(host_buffer.position()).unwrap();
2258 *buffer = host_buffer.into_inner();
2259 *limit
2260 } else {
2261 0
2262 };
2263
2264 {
2265 let WriteState::HostReady {
2266 guest_offset,
2267 cancel,
2268 cancel_waker,
2269 ..
2270 } = &mut transmit.write
2271 else {
2272 unreachable!();
2273 };
2274
2275 if poll.is_pending() {
2276 if !buffer.remaining().is_empty()
2277 || *guest_offset > 0
2278 || host_offset > 0
2279 {
2280 return Poll::Ready(Err(anyhow!(
2281 "StreamProducer::poll_produce returned Poll::Pending \
2282 after producing at least one item"
2283 )));
2284 }
2285 *cancel_waker = Some(cx.waker().clone());
2286 } else {
2287 *cancel_waker = None;
2288 *cancel = false;
2289 }
2290 }
2291
2292 poll.map(|v| v.map(|result| (result, cancel)))
2293 })
2294 })
2295 .await?
2296 } else {
2297 (StreamResult::Completed, false)
2298 };
2299
2300 let (guest_offset, host_offset, count) = tls::get(|store| {
2301 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2302 let (count, host_offset) = match &transmit.read {
2303 &ReadState::GuestReady { count, .. } => (count, 0),
2304 &ReadState::HostToHost { limit, .. } => (1, limit),
2305 _ => unreachable!(),
2306 };
2307 let guest_offset = match &transmit.write {
2308 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2309 _ => unreachable!(),
2310 };
2311 (guest_offset, host_offset, count)
2312 });
2313
2314 match result {
2315 StreamResult::Completed => {
2316 if count > 1
2317 && buffer.remaining().is_empty()
2318 && guest_offset == 0
2319 && host_offset == 0
2320 {
2321 bail!(
2322 "StreamProducer::poll_produce returned StreamResult::Completed \
2323 without producing any items"
2324 );
2325 }
2326 }
2327 StreamResult::Cancelled => {
2328 if !cancelled {
2329 bail!(
2330 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2331 without being given a `finish` parameter value of true"
2332 );
2333 }
2334 }
2335 StreamResult::Dropped => {}
2336 }
2337
2338 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2339
2340 *producer.lock().unwrap() = Some((mine, buffer));
2341
2342 if write_buffer {
2343 write(instance, token, id, producer, kind).await?;
2344 }
2345
2346 Ok(result)
2347 }
2348 .boxed()
2349 }
2350 });
2351 let try_into = Box::new(move |ty| {
2352 let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2353 match P::try_into(mine, ty) {
2354 Ok(value) => Some(value),
2355 Err(mine) => {
2356 *producer.lock().unwrap() = Some((mine, buffer));
2357 None
2358 }
2359 }
2360 });
2361 state.get_mut(id).unwrap().write = WriteState::HostReady {
2362 produce,
2363 try_into,
2364 guest_offset: 0,
2365 cancel: false,
2366 cancel_waker: None,
2367 };
2368 read
2369 }
2370
2371 fn set_consumer<C: StreamConsumer<T>>(
2372 mut self,
2373 id: TableId<TransmitHandle>,
2374 kind: TransmitKind,
2375 consumer: C,
2376 ) {
2377 let token = StoreToken::new(self.as_context_mut());
2378 let state = self.0.concurrent_state_mut();
2379 let id = state.get_mut(id).unwrap().state;
2380 let transmit = state.get_mut(id).unwrap();
2381 let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2382 let consume_with_buffer = {
2383 let consumer = consumer.clone();
2384 async move |instance, mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2385 let mut mine = consumer.lock().unwrap().take().unwrap();
2386
2387 let host_buffer_remaining_before =
2388 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2389
2390 let (result, cancelled) = future::poll_fn(|cx| {
2391 tls::get(|store| {
2392 let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2393 &ReadState::HostReady { cancel, .. } => cancel,
2394 ReadState::Open => false,
2395 _ => unreachable!(),
2396 };
2397
2398 let poll = mine.as_mut().poll_consume(
2399 cx,
2400 token.as_context_mut(store),
2401 Source {
2402 instance,
2403 id,
2404 host_buffer: host_buffer.as_deref_mut(),
2405 },
2406 cancel,
2407 );
2408
2409 if let ReadState::HostReady {
2410 cancel_waker,
2411 cancel,
2412 ..
2413 } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2414 {
2415 if poll.is_pending() {
2416 *cancel_waker = Some(cx.waker().clone());
2417 } else {
2418 *cancel_waker = None;
2419 *cancel = false;
2420 }
2421 }
2422
2423 poll.map(|v| v.map(|result| (result, cancel)))
2424 })
2425 })
2426 .await?;
2427
2428 let (guest_offset, count) = tls::get(|store| {
2429 let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2430 (
2431 match &transmit.read {
2432 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2433 ReadState::Open => 0,
2434 _ => unreachable!(),
2435 },
2436 match &transmit.write {
2437 &WriteState::GuestReady { count, .. } => count,
2438 WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2439 _ => unreachable!(),
2440 },
2441 )
2442 });
2443
2444 match result {
2445 StreamResult::Completed => {
2446 if count > 0
2447 && guest_offset == 0
2448 && host_buffer_remaining_before
2449 .zip(host_buffer.map(|v| v.remaining().len()))
2450 .map(|(before, after)| before == after)
2451 .unwrap_or(false)
2452 {
2453 bail!(
2454 "StreamConsumer::poll_consume returned StreamResult::Completed \
2455 without consuming any items"
2456 );
2457 }
2458
2459 if let TransmitKind::Future = kind {
2460 tls::get(|store| {
2461 store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2462 });
2463 }
2464 }
2465 StreamResult::Cancelled => {
2466 if !cancelled {
2467 bail!(
2468 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2469 without being given a `finish` parameter value of true"
2470 );
2471 }
2472 }
2473 StreamResult::Dropped => {}
2474 }
2475
2476 *consumer.lock().unwrap() = Some(mine);
2477
2478 Ok(result)
2479 }
2480 };
2481 let consume = {
2482 let consume = consume_with_buffer.clone();
2483 Box::new(move |instance| {
2484 let consume = consume.clone();
2485 async move { consume(instance, None).await }.boxed()
2486 })
2487 };
2488
2489 match &transmit.write {
2490 WriteState::Open => {
2491 transmit.read = ReadState::HostReady {
2492 consume,
2493 guest_offset: 0,
2494 cancel: false,
2495 cancel_waker: None,
2496 };
2497 }
2498 &WriteState::GuestReady { instance, .. } => {
2499 let future = consume(Some(instance));
2500 transmit.read = ReadState::HostReady {
2501 consume,
2502 guest_offset: 0,
2503 cancel: false,
2504 cancel_waker: None,
2505 };
2506 self.0.pipe_from_guest(kind, id, future);
2507 }
2508 WriteState::HostReady { .. } => {
2509 let WriteState::HostReady { produce, .. } = mem::replace(
2510 &mut transmit.write,
2511 WriteState::HostReady {
2512 produce: Box::new(|_| unreachable!()),
2513 try_into: Box::new(|_| unreachable!()),
2514 guest_offset: 0,
2515 cancel: false,
2516 cancel_waker: None,
2517 },
2518 ) else {
2519 unreachable!();
2520 };
2521
2522 transmit.read = ReadState::HostToHost {
2523 accept: Box::new(move |input| {
2524 let consume = consume_with_buffer.clone();
2525 async move { consume(None, Some(input.get_mut::<C::Item>())).await }.boxed()
2526 }),
2527 buffer: Vec::new(),
2528 limit: 0,
2529 };
2530
2531 let future = async move {
2532 loop {
2533 if tls::get(|store| {
2534 anyhow::Ok(matches!(
2535 store.concurrent_state_mut().get_mut(id)?.read,
2536 ReadState::Dropped
2537 ))
2538 })? {
2539 break Ok(());
2540 }
2541
2542 match produce(None).await? {
2543 StreamResult::Completed | StreamResult::Cancelled => {}
2544 StreamResult::Dropped => break Ok(()),
2545 }
2546
2547 if let TransmitKind::Future = kind {
2548 break Ok(());
2549 }
2550 }
2551 }
2552 .map(move |result| {
2553 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2554 result
2555 });
2556
2557 state.push_future(Box::pin(future));
2558 }
2559 WriteState::Dropped => {
2560 let reader = transmit.read_handle;
2561 self.0.host_drop_reader(reader, kind).unwrap();
2562 }
2563 }
2564 }
2565}
2566
2567async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2568 instance: Option<Instance>,
2569 token: StoreToken<D>,
2570 id: TableId<TransmitState>,
2571 pair: Arc<Mutex<Option<(P, B)>>>,
2572 kind: TransmitKind,
2573) -> Result<()> {
2574 let (read, guest_offset) = tls::get(|store| {
2575 let transmit = store.concurrent_state_mut().get_mut(id)?;
2576
2577 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2578 Some(guest_offset)
2579 } else {
2580 None
2581 };
2582
2583 anyhow::Ok((
2584 mem::replace(&mut transmit.read, ReadState::Open),
2585 guest_offset,
2586 ))
2587 })?;
2588
2589 match read {
2590 ReadState::GuestReady {
2591 ty,
2592 flat_abi,
2593 options,
2594 address,
2595 count,
2596 handle,
2597 } => {
2598 let guest_offset = guest_offset.unwrap();
2599
2600 if let TransmitKind::Future = kind {
2601 tls::get(|store| {
2602 store.concurrent_state_mut().get_mut(id)?.done = true;
2603 anyhow::Ok(())
2604 })?;
2605 }
2606
2607 let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2608 let accept = {
2609 let pair = pair.clone();
2610 move |mut store: StoreContextMut<D>| {
2611 lower::<T, B, D>(
2612 store.as_context_mut(),
2613 instance.unwrap(),
2614 &options,
2615 ty,
2616 address + (T::SIZE32 * guest_offset),
2617 count - guest_offset,
2618 &mut pair.lock().unwrap().as_mut().unwrap().1,
2619 )?;
2620 anyhow::Ok(())
2621 }
2622 };
2623
2624 if guest_offset < count {
2625 if T::MAY_REQUIRE_REALLOC {
2626 let (tx, rx) = oneshot::channel();
2631 tls::get(move |store| {
2632 store
2633 .concurrent_state_mut()
2634 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2635 move |store| {
2636 _ = tx.send(accept(token.as_context_mut(store))?);
2637 Ok(())
2638 },
2639 ))))
2640 });
2641 rx.await?
2642 } else {
2643 tls::get(|store| accept(token.as_context_mut(store)))?
2648 };
2649 }
2650
2651 tls::get(|store| {
2652 let count =
2653 old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2654
2655 let transmit = store.concurrent_state_mut().get_mut(id)?;
2656
2657 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2658 unreachable!();
2659 };
2660
2661 *guest_offset += count;
2662
2663 transmit.read = ReadState::GuestReady {
2664 ty,
2665 flat_abi,
2666 options,
2667 address,
2668 count,
2669 handle,
2670 };
2671
2672 anyhow::Ok(())
2673 })?;
2674
2675 Ok(())
2676 }
2677
2678 ReadState::HostToHost {
2679 accept,
2680 mut buffer,
2681 limit,
2682 } => {
2683 let mut state = StreamResult::Completed;
2684 let mut position = 0;
2685
2686 while !matches!(state, StreamResult::Dropped) && position < limit {
2687 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2688 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2689 (buffer, position, _) = slice_buffer.into_parts();
2690 }
2691
2692 {
2693 let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2694
2695 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2696 state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2697 }
2698
2699 *pair.lock().unwrap() = Some((mine, buffer));
2700 }
2701
2702 tls::get(|store| {
2703 store.concurrent_state_mut().get_mut(id)?.read = match state {
2704 StreamResult::Dropped => ReadState::Dropped,
2705 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2706 accept,
2707 buffer,
2708 limit: 0,
2709 },
2710 };
2711
2712 anyhow::Ok(())
2713 })?;
2714 Ok(())
2715 }
2716
2717 _ => unreachable!(),
2718 }
2719}
2720
2721impl Instance {
2722 fn consume(
2725 self,
2726 store: &mut dyn VMStore,
2727 kind: TransmitKind,
2728 transmit_id: TableId<TransmitState>,
2729 consume: PollStream,
2730 guest_offset: usize,
2731 cancel: bool,
2732 ) -> Result<ReturnCode> {
2733 let mut future = consume(Some(self));
2734 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2735 consume,
2736 guest_offset,
2737 cancel,
2738 cancel_waker: None,
2739 };
2740 let poll = tls::set(store, || {
2741 future
2742 .as_mut()
2743 .poll(&mut Context::from_waker(&Waker::noop()))
2744 });
2745
2746 Ok(match poll {
2747 Poll::Ready(state) => {
2748 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2749 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2750 unreachable!();
2751 };
2752 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2753 transmit.write = WriteState::Open;
2754 code
2755 }
2756 Poll::Pending => {
2757 store.pipe_from_guest(kind, transmit_id, future);
2758 ReturnCode::Blocked
2759 }
2760 })
2761 }
2762
2763 fn produce(
2766 self,
2767 store: &mut dyn VMStore,
2768 kind: TransmitKind,
2769 transmit_id: TableId<TransmitState>,
2770 produce: PollStream,
2771 try_into: TryInto,
2772 guest_offset: usize,
2773 cancel: bool,
2774 ) -> Result<ReturnCode> {
2775 let mut future = produce(Some(self));
2776 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2777 produce,
2778 try_into,
2779 guest_offset,
2780 cancel,
2781 cancel_waker: None,
2782 };
2783 let poll = tls::set(store, || {
2784 future
2785 .as_mut()
2786 .poll(&mut Context::from_waker(&Waker::noop()))
2787 });
2788
2789 Ok(match poll {
2790 Poll::Ready(state) => {
2791 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2792 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2793 unreachable!();
2794 };
2795 let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2796 transmit.read = ReadState::Open;
2797 code
2798 }
2799 Poll::Pending => {
2800 store.pipe_to_guest(kind, transmit_id, future);
2801 ReturnCode::Blocked
2802 }
2803 })
2804 }
2805
2806 pub(super) fn guest_drop_writable(
2808 self,
2809 store: &mut StoreOpaque,
2810 ty: TransmitIndex,
2811 writer: u32,
2812 ) -> Result<()> {
2813 let table = self.id().get_mut(store).table_for_transmit(ty);
2814 let transmit_rep = match ty {
2815 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2816 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2817 };
2818
2819 let id = TableId::<TransmitHandle>::new(transmit_rep);
2820 log::trace!("guest_drop_writable: drop writer {id:?}");
2821 match ty {
2822 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
2823 TransmitIndex::Future(_) => store.host_drop_writer(
2824 id,
2825 Some(|| {
2826 Err(anyhow!(
2827 "cannot drop future write end without first writing a value"
2828 ))
2829 }),
2830 ),
2831 }
2832 }
2833
2834 fn copy<T: 'static>(
2837 self,
2838 mut store: StoreContextMut<T>,
2839 flat_abi: Option<FlatAbi>,
2840 write_ty: TransmitIndex,
2841 write_options: &Options,
2842 write_address: usize,
2843 read_ty: TransmitIndex,
2844 read_options: &Options,
2845 read_address: usize,
2846 count: usize,
2847 rep: u32,
2848 ) -> Result<()> {
2849 let types = self.id().get(store.0).component().types().clone();
2850 match (write_ty, read_ty) {
2851 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2852 assert_eq!(count, 1);
2853
2854 let val = types[types[write_ty].ty]
2855 .payload
2856 .map(|ty| {
2857 let abi = types.canonical_abi(&ty);
2858 if write_address % usize::try_from(abi.align32)? != 0 {
2860 bail!("write pointer not aligned");
2861 }
2862
2863 let lift =
2864 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2865 let bytes = lift
2866 .memory()
2867 .get(write_address..)
2868 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2869 .ok_or_else(|| {
2870 anyhow::anyhow!("write pointer out of bounds of memory")
2871 })?;
2872
2873 Val::load(lift, ty, bytes)
2874 })
2875 .transpose()?;
2876
2877 if let Some(val) = val {
2878 let lower =
2879 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2880 let ty = types[types[read_ty].ty].payload.unwrap();
2881 let ptr = func::validate_inbounds_dynamic(
2882 types.canonical_abi(&ty),
2883 lower.as_slice_mut(),
2884 &ValRaw::u32(read_address.try_into().unwrap()),
2885 )?;
2886 val.store(lower, ty, ptr)?;
2887 }
2888 }
2889 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2890 if let Some(flat_abi) = flat_abi {
2891 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2893 if length_in_bytes > 0 {
2894 if write_address % usize::try_from(flat_abi.align)? != 0 {
2895 bail!("write pointer not aligned");
2896 }
2897 if read_address % usize::try_from(flat_abi.align)? != 0 {
2898 bail!("read pointer not aligned");
2899 }
2900
2901 let store_opaque = store.0.store_opaque_mut();
2902
2903 {
2904 let src = write_options
2905 .memory(store_opaque)
2906 .get(write_address..)
2907 .and_then(|b| b.get(..length_in_bytes))
2908 .ok_or_else(|| {
2909 anyhow::anyhow!("write pointer out of bounds of memory")
2910 })?
2911 .as_ptr();
2912 let dst = read_options
2913 .memory_mut(store_opaque)
2914 .get_mut(read_address..)
2915 .and_then(|b| b.get_mut(..length_in_bytes))
2916 .ok_or_else(|| {
2917 anyhow::anyhow!("read pointer out of bounds of memory")
2918 })?
2919 .as_mut_ptr();
2920 unsafe { src.copy_to(dst, length_in_bytes) };
2923 }
2924 }
2925 } else {
2926 let store_opaque = store.0.store_opaque_mut();
2927 let lift = &mut LiftContext::new(store_opaque, write_options, self);
2928 let ty = types[types[write_ty].ty].payload.unwrap();
2929 let abi = lift.types.canonical_abi(&ty);
2930 let size = usize::try_from(abi.size32).unwrap();
2931 if write_address % usize::try_from(abi.align32)? != 0 {
2932 bail!("write pointer not aligned");
2933 }
2934 let bytes = lift
2935 .memory()
2936 .get(write_address..)
2937 .and_then(|b| b.get(..size * count))
2938 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2939
2940 let values = (0..count)
2941 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2942 .collect::<Result<Vec<_>>>()?;
2943
2944 let id = TableId::<TransmitHandle>::new(rep);
2945 log::trace!("copy values {values:?} for {id:?}");
2946
2947 let lower =
2948 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2949 let ty = types[types[read_ty].ty].payload.unwrap();
2950 let abi = lower.types.canonical_abi(&ty);
2951 if read_address % usize::try_from(abi.align32)? != 0 {
2952 bail!("read pointer not aligned");
2953 }
2954 let size = usize::try_from(abi.size32).unwrap();
2955 lower
2956 .as_slice_mut()
2957 .get_mut(read_address..)
2958 .and_then(|b| b.get_mut(..size * count))
2959 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2960 let mut ptr = read_address;
2961 for value in values {
2962 value.store(lower, ty, ptr)?;
2963 ptr += size
2964 }
2965 }
2966 }
2967 _ => unreachable!(),
2968 }
2969
2970 Ok(())
2971 }
2972
2973 fn check_bounds(
2974 self,
2975 store: &StoreOpaque,
2976 options: &Options,
2977 ty: TransmitIndex,
2978 address: usize,
2979 count: usize,
2980 ) -> Result<()> {
2981 let types = self.id().get(store).component().types().clone();
2982 let size = usize::try_from(
2983 match ty {
2984 TransmitIndex::Future(ty) => types[types[ty].ty]
2985 .payload
2986 .map(|ty| types.canonical_abi(&ty).size32),
2987 TransmitIndex::Stream(ty) => types[types[ty].ty]
2988 .payload
2989 .map(|ty| types.canonical_abi(&ty).size32),
2990 }
2991 .unwrap_or(0),
2992 )
2993 .unwrap();
2994
2995 if count > 0 && size > 0 {
2996 options
2997 .memory(store)
2998 .get(address..)
2999 .and_then(|b| b.get(..(size * count)))
3000 .map(drop)
3001 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
3002 } else {
3003 Ok(())
3004 }
3005 }
3006
3007 pub(super) fn guest_write<T: 'static>(
3009 self,
3010 mut store: StoreContextMut<T>,
3011 ty: TransmitIndex,
3012 options: OptionsIndex,
3013 flat_abi: Option<FlatAbi>,
3014 handle: u32,
3015 address: u32,
3016 count: u32,
3017 ) -> Result<ReturnCode> {
3018 let address = usize::try_from(address).unwrap();
3019 let count = usize::try_from(count).unwrap();
3020 let options = Options::new_index(store.0, self, options);
3021 self.check_bounds(store.0, &options, ty, address, count)?;
3022 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3023 let TransmitLocalState::Write { done } = *state else {
3024 bail!(
3025 "invalid handle {handle}; expected `Write`; got {:?}",
3026 *state
3027 );
3028 };
3029
3030 if done {
3031 bail!("cannot write to stream after being notified that the readable end dropped");
3032 }
3033
3034 *state = TransmitLocalState::Busy;
3035 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3036 let concurrent_state = store.0.concurrent_state_mut();
3037 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3038 let transmit = concurrent_state.get_mut(transmit_id)?;
3039 log::trace!(
3040 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3041 transmit.read
3042 );
3043
3044 if transmit.done {
3045 bail!("cannot write to future after previous write succeeded or readable end dropped");
3046 }
3047
3048 let new_state = if let ReadState::Dropped = &transmit.read {
3049 ReadState::Dropped
3050 } else {
3051 ReadState::Open
3052 };
3053
3054 let set_guest_ready = |me: &mut ConcurrentState| {
3055 let transmit = me.get_mut(transmit_id)?;
3056 assert!(matches!(&transmit.write, WriteState::Open));
3057 transmit.write = WriteState::GuestReady {
3058 instance: self,
3059 ty,
3060 flat_abi,
3061 options,
3062 address,
3063 count,
3064 handle,
3065 };
3066 Ok::<_, crate::Error>(())
3067 };
3068
3069 let mut result = match mem::replace(&mut transmit.read, new_state) {
3070 ReadState::GuestReady {
3071 ty: read_ty,
3072 flat_abi: read_flat_abi,
3073 options: read_options,
3074 address: read_address,
3075 count: read_count,
3076 handle: read_handle,
3077 } => {
3078 assert_eq!(flat_abi, read_flat_abi);
3079
3080 if let TransmitIndex::Future(_) = ty {
3081 transmit.done = true;
3082 }
3083
3084 let write_complete = count == 0 || read_count > 0;
3106 let read_complete = count > 0;
3107 let read_buffer_remaining = count < read_count;
3108
3109 let read_handle_rep = transmit.read_handle.rep();
3110
3111 let count = count.min(read_count);
3112
3113 self.copy(
3114 store.as_context_mut(),
3115 flat_abi,
3116 ty,
3117 &options,
3118 address,
3119 read_ty,
3120 &read_options,
3121 read_address,
3122 count,
3123 rep,
3124 )?;
3125
3126 let instance = self.id().get_mut(store.0);
3127 let types = instance.component().types();
3128 let item_size = payload(ty, types)
3129 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3130 .unwrap_or(0);
3131 let concurrent_state = store.0.concurrent_state_mut();
3132 if read_complete {
3133 let count = u32::try_from(count).unwrap();
3134 let total = if let Some(Event::StreamRead {
3135 code: ReturnCode::Completed(old_total),
3136 ..
3137 }) = concurrent_state.take_event(read_handle_rep)?
3138 {
3139 count + old_total
3140 } else {
3141 count
3142 };
3143
3144 let code = ReturnCode::completed(ty.kind(), total);
3145
3146 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3147 }
3148
3149 if read_buffer_remaining {
3150 let transmit = concurrent_state.get_mut(transmit_id)?;
3151 transmit.read = ReadState::GuestReady {
3152 ty: read_ty,
3153 flat_abi: read_flat_abi,
3154 options: read_options,
3155 address: read_address + (count * item_size),
3156 count: read_count - count,
3157 handle: read_handle,
3158 };
3159 }
3160
3161 if write_complete {
3162 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3163 } else {
3164 set_guest_ready(concurrent_state)?;
3165 ReturnCode::Blocked
3166 }
3167 }
3168
3169 ReadState::HostReady {
3170 consume,
3171 guest_offset,
3172 cancel,
3173 cancel_waker,
3174 } => {
3175 assert!(cancel_waker.is_none());
3176 assert!(!cancel);
3177 assert_eq!(0, guest_offset);
3178
3179 if let TransmitIndex::Future(_) = ty {
3180 transmit.done = true;
3181 }
3182
3183 set_guest_ready(concurrent_state)?;
3184 self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3185 }
3186
3187 ReadState::HostToHost { .. } => unreachable!(),
3188
3189 ReadState::Open => {
3190 set_guest_ready(concurrent_state)?;
3191 ReturnCode::Blocked
3192 }
3193
3194 ReadState::Dropped => {
3195 if let TransmitIndex::Future(_) = ty {
3196 transmit.done = true;
3197 }
3198
3199 ReturnCode::Dropped(0)
3200 }
3201 };
3202
3203 if result == ReturnCode::Blocked && !options.async_() {
3204 result = self.wait_for_write(store.0, transmit_handle)?;
3205 }
3206
3207 if result != ReturnCode::Blocked {
3208 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3209 TransmitLocalState::Write {
3210 done: matches!(
3211 (result, ty),
3212 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3213 ),
3214 };
3215 }
3216
3217 log::trace!(
3218 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3219 );
3220
3221 Ok(result)
3222 }
3223
3224 pub(super) fn guest_read<T: 'static>(
3226 self,
3227 mut store: StoreContextMut<T>,
3228 ty: TransmitIndex,
3229 options: OptionsIndex,
3230 flat_abi: Option<FlatAbi>,
3231 handle: u32,
3232 address: u32,
3233 count: u32,
3234 ) -> Result<ReturnCode> {
3235 let address = usize::try_from(address).unwrap();
3236 let count = usize::try_from(count).unwrap();
3237 let options = Options::new_index(store.0, self, options);
3238 self.check_bounds(store.0, &options, ty, address, count)?;
3239 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3240 let TransmitLocalState::Read { done } = *state else {
3241 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3242 };
3243
3244 if done {
3245 bail!("cannot read from stream after being notified that the writable end dropped");
3246 }
3247
3248 *state = TransmitLocalState::Busy;
3249 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3250 let concurrent_state = store.0.concurrent_state_mut();
3251 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3252 let transmit = concurrent_state.get_mut(transmit_id)?;
3253 log::trace!(
3254 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3255 transmit.write
3256 );
3257
3258 if transmit.done {
3259 bail!("cannot read from future after previous read succeeded");
3260 }
3261
3262 let new_state = if let WriteState::Dropped = &transmit.write {
3263 WriteState::Dropped
3264 } else {
3265 WriteState::Open
3266 };
3267
3268 let set_guest_ready = |me: &mut ConcurrentState| {
3269 let transmit = me.get_mut(transmit_id)?;
3270 assert!(matches!(&transmit.read, ReadState::Open));
3271 transmit.read = ReadState::GuestReady {
3272 ty,
3273 flat_abi,
3274 options,
3275 address,
3276 count,
3277 handle,
3278 };
3279 Ok::<_, crate::Error>(())
3280 };
3281
3282 let mut result = match mem::replace(&mut transmit.write, new_state) {
3283 WriteState::GuestReady {
3284 instance: _,
3285 ty: write_ty,
3286 flat_abi: write_flat_abi,
3287 options: write_options,
3288 address: write_address,
3289 count: write_count,
3290 handle: write_handle,
3291 } => {
3292 assert_eq!(flat_abi, write_flat_abi);
3293
3294 if let TransmitIndex::Future(_) = ty {
3295 transmit.done = true;
3296 }
3297
3298 let write_handle_rep = transmit.write_handle.rep();
3299
3300 let write_complete = write_count == 0 || count > 0;
3305 let read_complete = write_count > 0;
3306 let write_buffer_remaining = count < write_count;
3307
3308 let count = count.min(write_count);
3309
3310 self.copy(
3311 store.as_context_mut(),
3312 flat_abi,
3313 write_ty,
3314 &write_options,
3315 write_address,
3316 ty,
3317 &options,
3318 address,
3319 count,
3320 rep,
3321 )?;
3322
3323 let instance = self.id().get_mut(store.0);
3324 let types = instance.component().types();
3325 let item_size = payload(ty, types)
3326 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3327 .unwrap_or(0);
3328 let concurrent_state = store.0.concurrent_state_mut();
3329
3330 if write_complete {
3331 let count = u32::try_from(count).unwrap();
3332 let total = if let Some(Event::StreamWrite {
3333 code: ReturnCode::Completed(old_total),
3334 ..
3335 }) = concurrent_state.take_event(write_handle_rep)?
3336 {
3337 count + old_total
3338 } else {
3339 count
3340 };
3341
3342 let code = ReturnCode::completed(ty.kind(), total);
3343
3344 concurrent_state.send_write_result(
3345 write_ty,
3346 transmit_id,
3347 write_handle,
3348 code,
3349 )?;
3350 }
3351
3352 if write_buffer_remaining {
3353 let transmit = concurrent_state.get_mut(transmit_id)?;
3354 transmit.write = WriteState::GuestReady {
3355 instance: self,
3356 ty: write_ty,
3357 flat_abi: write_flat_abi,
3358 options: write_options,
3359 address: write_address + (count * item_size),
3360 count: write_count - count,
3361 handle: write_handle,
3362 };
3363 }
3364
3365 if read_complete {
3366 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3367 } else {
3368 set_guest_ready(concurrent_state)?;
3369 ReturnCode::Blocked
3370 }
3371 }
3372
3373 WriteState::HostReady {
3374 produce,
3375 try_into,
3376 guest_offset,
3377 cancel,
3378 cancel_waker,
3379 } => {
3380 assert!(cancel_waker.is_none());
3381 assert!(!cancel);
3382 assert_eq!(0, guest_offset);
3383
3384 if let TransmitIndex::Future(_) = ty {
3385 transmit.done = true;
3386 }
3387
3388 set_guest_ready(concurrent_state)?;
3389
3390 self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?
3391 }
3392
3393 WriteState::Open => {
3394 set_guest_ready(concurrent_state)?;
3395 ReturnCode::Blocked
3396 }
3397
3398 WriteState::Dropped => ReturnCode::Dropped(0),
3399 };
3400
3401 if result == ReturnCode::Blocked && !options.async_() {
3402 result = self.wait_for_read(store.0, transmit_handle)?;
3403 }
3404
3405 if result != ReturnCode::Blocked {
3406 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3407 TransmitLocalState::Read {
3408 done: matches!(
3409 (result, ty),
3410 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3411 ),
3412 };
3413 }
3414
3415 log::trace!(
3416 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3417 );
3418
3419 Ok(result)
3420 }
3421
3422 fn wait_for_write(
3423 self,
3424 store: &mut StoreOpaque,
3425 handle: TableId<TransmitHandle>,
3426 ) -> Result<ReturnCode> {
3427 let waitable = Waitable::Transmit(handle);
3428 store.wait_for_event(waitable)?;
3429 let event = waitable.take_event(store.concurrent_state_mut())?;
3430 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3431 event
3432 {
3433 waitable.on_delivery(self.id().get_mut(store), event);
3434 Ok(code)
3435 } else {
3436 unreachable!()
3437 }
3438 }
3439
3440 fn cancel_write(
3442 self,
3443 store: &mut StoreOpaque,
3444 transmit_id: TableId<TransmitState>,
3445 async_: bool,
3446 ) -> Result<ReturnCode> {
3447 let state = store.concurrent_state_mut();
3448 let transmit = state.get_mut(transmit_id)?;
3449 log::trace!(
3450 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3451 transmit.read,
3452 transmit.write
3453 );
3454
3455 let code = if let Some(event) =
3456 Waitable::Transmit(transmit.write_handle).take_event(state)?
3457 {
3458 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3459 unreachable!();
3460 };
3461 match (code, event) {
3462 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3463 ReturnCode::Cancelled(count)
3464 }
3465 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3466 _ => unreachable!(),
3467 }
3468 } else if let ReadState::HostReady {
3469 cancel,
3470 cancel_waker,
3471 ..
3472 } = &mut state.get_mut(transmit_id)?.read
3473 {
3474 *cancel = true;
3475 if let Some(waker) = cancel_waker.take() {
3476 waker.wake();
3477 }
3478
3479 if async_ {
3480 ReturnCode::Blocked
3481 } else {
3482 let handle = store
3483 .concurrent_state_mut()
3484 .get_mut(transmit_id)?
3485 .write_handle;
3486 self.wait_for_write(store, handle)?
3487 }
3488 } else {
3489 ReturnCode::Cancelled(0)
3490 };
3491
3492 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3493
3494 match &transmit.write {
3495 WriteState::GuestReady { .. } => {
3496 transmit.write = WriteState::Open;
3497 }
3498 WriteState::HostReady { .. } => todo!("support host write cancellation"),
3499 WriteState::Open | WriteState::Dropped => {}
3500 }
3501
3502 log::trace!("cancelled write {transmit_id:?}: {code:?}");
3503
3504 Ok(code)
3505 }
3506
3507 fn wait_for_read(
3508 self,
3509 store: &mut StoreOpaque,
3510 handle: TableId<TransmitHandle>,
3511 ) -> Result<ReturnCode> {
3512 let waitable = Waitable::Transmit(handle);
3513 store.wait_for_event(waitable)?;
3514 let event = waitable.take_event(store.concurrent_state_mut())?;
3515 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3516 event
3517 {
3518 waitable.on_delivery(self.id().get_mut(store), event);
3519 Ok(code)
3520 } else {
3521 unreachable!()
3522 }
3523 }
3524
3525 fn cancel_read(
3527 self,
3528 store: &mut StoreOpaque,
3529 transmit_id: TableId<TransmitState>,
3530 async_: bool,
3531 ) -> Result<ReturnCode> {
3532 let state = store.concurrent_state_mut();
3533 let transmit = state.get_mut(transmit_id)?;
3534 log::trace!(
3535 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3536 transmit.read,
3537 transmit.write
3538 );
3539
3540 let code = if let Some(event) =
3541 Waitable::Transmit(transmit.read_handle).take_event(state)?
3542 {
3543 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3544 unreachable!();
3545 };
3546 match (code, event) {
3547 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3548 ReturnCode::Cancelled(count)
3549 }
3550 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3551 _ => unreachable!(),
3552 }
3553 } else if let WriteState::HostReady {
3554 cancel,
3555 cancel_waker,
3556 ..
3557 } = &mut state.get_mut(transmit_id)?.write
3558 {
3559 *cancel = true;
3560 if let Some(waker) = cancel_waker.take() {
3561 waker.wake();
3562 }
3563
3564 if async_ {
3565 ReturnCode::Blocked
3566 } else {
3567 let handle = store
3568 .concurrent_state_mut()
3569 .get_mut(transmit_id)?
3570 .read_handle;
3571 self.wait_for_read(store, handle)?
3572 }
3573 } else {
3574 ReturnCode::Cancelled(0)
3575 };
3576
3577 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3578
3579 match &transmit.read {
3580 ReadState::GuestReady { .. } => {
3581 transmit.read = ReadState::Open;
3582 }
3583 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3584 todo!("support host read cancellation")
3585 }
3586 ReadState::Open | ReadState::Dropped => {}
3587 }
3588
3589 log::trace!("cancelled read {transmit_id:?}: {code:?}");
3590
3591 Ok(code)
3592 }
3593
3594 fn guest_cancel_write(
3596 self,
3597 store: &mut StoreOpaque,
3598 ty: TransmitIndex,
3599 async_: bool,
3600 writer: u32,
3601 ) -> Result<ReturnCode> {
3602 let (rep, state) =
3603 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3604 let id = TableId::<TransmitHandle>::new(rep);
3605 log::trace!("guest cancel write {id:?} (handle {writer})");
3606 match state {
3607 TransmitLocalState::Write { .. } => {
3608 bail!("stream or future write cancelled when no write is pending")
3609 }
3610 TransmitLocalState::Read { .. } => {
3611 bail!("passed read end to `{{stream|future}}.cancel-write`")
3612 }
3613 TransmitLocalState::Busy => {}
3614 }
3615 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3616 let code = self.cancel_write(store, transmit_id, async_)?;
3617 if !matches!(code, ReturnCode::Blocked) {
3618 let state =
3619 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3620 .1;
3621 if let TransmitLocalState::Busy = state {
3622 *state = TransmitLocalState::Write { done: false };
3623 }
3624 }
3625 Ok(code)
3626 }
3627
3628 fn guest_cancel_read(
3630 self,
3631 store: &mut StoreOpaque,
3632 ty: TransmitIndex,
3633 async_: bool,
3634 reader: u32,
3635 ) -> Result<ReturnCode> {
3636 let (rep, state) =
3637 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3638 let id = TableId::<TransmitHandle>::new(rep);
3639 log::trace!("guest cancel read {id:?} (handle {reader})");
3640 match state {
3641 TransmitLocalState::Read { .. } => {
3642 bail!("stream or future read cancelled when no read is pending")
3643 }
3644 TransmitLocalState::Write { .. } => {
3645 bail!("passed write end to `{{stream|future}}.cancel-read`")
3646 }
3647 TransmitLocalState::Busy => {}
3648 }
3649 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3650 let code = self.cancel_read(store, transmit_id, async_)?;
3651 if !matches!(code, ReturnCode::Blocked) {
3652 let state =
3653 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3654 .1;
3655 if let TransmitLocalState::Busy = state {
3656 *state = TransmitLocalState::Read { done: false };
3657 }
3658 }
3659 Ok(code)
3660 }
3661
3662 fn guest_drop_readable(
3664 self,
3665 store: &mut StoreOpaque,
3666 ty: TransmitIndex,
3667 reader: u32,
3668 ) -> Result<()> {
3669 let table = self.id().get_mut(store).table_for_transmit(ty);
3670 let (rep, _is_done) = match ty {
3671 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3672 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3673 };
3674 let kind = match ty {
3675 TransmitIndex::Stream(_) => TransmitKind::Stream,
3676 TransmitIndex::Future(_) => TransmitKind::Future,
3677 };
3678 let id = TableId::<TransmitHandle>::new(rep);
3679 log::trace!("guest_drop_readable: drop reader {id:?}");
3680 store.host_drop_reader(id, kind)
3681 }
3682
3683 pub(crate) fn error_context_new(
3685 self,
3686 store: &mut StoreOpaque,
3687 caller: RuntimeComponentInstanceIndex,
3688 ty: TypeComponentLocalErrorContextTableIndex,
3689 options: OptionsIndex,
3690 debug_msg_address: u32,
3691 debug_msg_len: u32,
3692 ) -> Result<u32> {
3693 self.id().get(store).check_may_leave(caller)?;
3694 let options = Options::new_index(store, self, options);
3695 let lift_ctx = &mut LiftContext::new(store, &options, self);
3696 let address = usize::try_from(debug_msg_address)?;
3698 let len = usize::try_from(debug_msg_len)?;
3699 lift_ctx
3700 .memory()
3701 .get(address..)
3702 .and_then(|b| b.get(..len))
3703 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3704 let message = WasmStr::new(address, len, lift_ctx)?;
3705
3706 let err_ctx = ErrorContextState {
3708 debug_msg: message
3709 .to_str_from_memory(options.memory(store))?
3710 .to_string(),
3711 };
3712 let state = store.concurrent_state_mut();
3713 let table_id = state.push(err_ctx)?;
3714 let global_ref_count_idx =
3715 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3716
3717 let _ = state
3719 .global_error_context_ref_counts
3720 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3721
3722 let local_idx = self
3729 .id()
3730 .get_mut(store)
3731 .table_for_error_context(ty)
3732 .error_context_insert(table_id.rep())?;
3733
3734 Ok(local_idx)
3735 }
3736
3737 pub(super) fn error_context_debug_message<T>(
3739 self,
3740 store: StoreContextMut<T>,
3741 ty: TypeComponentLocalErrorContextTableIndex,
3742 options: OptionsIndex,
3743 err_ctx_handle: u32,
3744 debug_msg_address: u32,
3745 ) -> Result<()> {
3746 let handle_table_id_rep = self
3748 .id()
3749 .get_mut(store.0)
3750 .table_for_error_context(ty)
3751 .error_context_rep(err_ctx_handle)?;
3752
3753 let state = store.0.concurrent_state_mut();
3754 let ErrorContextState { debug_msg } =
3756 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3757 let debug_msg = debug_msg.clone();
3758
3759 let options = Options::new_index(store.0, self, options);
3760 let types = self.id().get(store.0).component().types().clone();
3761 let lower_cx = &mut LowerContext::new(store, &options, &types, self);
3762 let debug_msg_address = usize::try_from(debug_msg_address)?;
3763 let offset = lower_cx
3765 .as_slice_mut()
3766 .get(debug_msg_address..)
3767 .and_then(|b| b.get(..debug_msg.bytes().len()))
3768 .map(|_| debug_msg_address)
3769 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3770 debug_msg
3771 .as_str()
3772 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3773
3774 Ok(())
3775 }
3776
3777 pub(crate) fn future_cancel_read(
3779 self,
3780 store: &mut StoreOpaque,
3781 caller: RuntimeComponentInstanceIndex,
3782 ty: TypeFutureTableIndex,
3783 async_: bool,
3784 reader: u32,
3785 ) -> Result<u32> {
3786 self.id().get(store).check_may_leave(caller)?;
3787 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3788 .map(|v| v.encode())
3789 }
3790
3791 pub(crate) fn future_cancel_write(
3793 self,
3794 store: &mut StoreOpaque,
3795 caller: RuntimeComponentInstanceIndex,
3796 ty: TypeFutureTableIndex,
3797 async_: bool,
3798 writer: u32,
3799 ) -> Result<u32> {
3800 self.id().get(store).check_may_leave(caller)?;
3801 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3802 .map(|v| v.encode())
3803 }
3804
3805 pub(crate) fn stream_cancel_read(
3807 self,
3808 store: &mut StoreOpaque,
3809 caller: RuntimeComponentInstanceIndex,
3810 ty: TypeStreamTableIndex,
3811 async_: bool,
3812 reader: u32,
3813 ) -> Result<u32> {
3814 self.id().get(store).check_may_leave(caller)?;
3815 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3816 .map(|v| v.encode())
3817 }
3818
3819 pub(crate) fn stream_cancel_write(
3821 self,
3822 store: &mut StoreOpaque,
3823 caller: RuntimeComponentInstanceIndex,
3824 ty: TypeStreamTableIndex,
3825 async_: bool,
3826 writer: u32,
3827 ) -> Result<u32> {
3828 self.id().get(store).check_may_leave(caller)?;
3829 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3830 .map(|v| v.encode())
3831 }
3832
3833 pub(crate) fn future_drop_readable(
3835 self,
3836 store: &mut StoreOpaque,
3837 caller: RuntimeComponentInstanceIndex,
3838 ty: TypeFutureTableIndex,
3839 reader: u32,
3840 ) -> Result<()> {
3841 self.id().get(store).check_may_leave(caller)?;
3842 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3843 }
3844
3845 pub(crate) fn stream_drop_readable(
3847 self,
3848 store: &mut StoreOpaque,
3849 caller: RuntimeComponentInstanceIndex,
3850 ty: TypeStreamTableIndex,
3851 reader: u32,
3852 ) -> Result<()> {
3853 self.id().get(store).check_may_leave(caller)?;
3854 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3855 }
3856
3857 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
3861 let (write, read) = store.concurrent_state_mut().new_transmit()?;
3862
3863 let table = self.id().get_mut(store).table_for_transmit(ty);
3864 let (read_handle, write_handle) = match ty {
3865 TransmitIndex::Future(ty) => (
3866 table.future_insert_read(ty, read.rep())?,
3867 table.future_insert_write(ty, write.rep())?,
3868 ),
3869 TransmitIndex::Stream(ty) => (
3870 table.stream_insert_read(ty, read.rep())?,
3871 table.stream_insert_write(ty, write.rep())?,
3872 ),
3873 };
3874
3875 let state = store.concurrent_state_mut();
3876 state.get_mut(read)?.common.handle = Some(read_handle);
3877 state.get_mut(write)?.common.handle = Some(write_handle);
3878
3879 Ok(ResourcePair {
3880 write: write_handle,
3881 read: read_handle,
3882 })
3883 }
3884
3885 pub(crate) fn error_context_drop(
3887 self,
3888 store: &mut StoreOpaque,
3889 caller: RuntimeComponentInstanceIndex,
3890 ty: TypeComponentLocalErrorContextTableIndex,
3891 error_context: u32,
3892 ) -> Result<()> {
3893 let instance = self.id().get_mut(store);
3894 instance.check_may_leave(caller)?;
3895
3896 let local_handle_table = instance.table_for_error_context(ty);
3897
3898 let rep = local_handle_table.error_context_drop(error_context)?;
3899
3900 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3901
3902 let state = store.concurrent_state_mut();
3903 let GlobalErrorContextRefCount(global_ref_count) = state
3904 .global_error_context_ref_counts
3905 .get_mut(&global_ref_count_idx)
3906 .expect("retrieve concurrent state for error context during drop");
3907
3908 assert!(*global_ref_count >= 1);
3910 *global_ref_count -= 1;
3911 if *global_ref_count == 0 {
3912 state
3913 .global_error_context_ref_counts
3914 .remove(&global_ref_count_idx);
3915
3916 state
3917 .delete(TableId::<ErrorContextState>::new(rep))
3918 .context("deleting component-global error context data")?;
3919 }
3920
3921 Ok(())
3922 }
3923
3924 fn guest_transfer(
3927 self,
3928 store: &mut StoreOpaque,
3929 src_idx: u32,
3930 src: TransmitIndex,
3931 dst: TransmitIndex,
3932 ) -> Result<u32> {
3933 let mut instance = self.id().get_mut(store);
3934 let src_table = instance.as_mut().table_for_transmit(src);
3935 let (rep, is_done) = match src {
3936 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
3937 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
3938 };
3939 if is_done {
3940 bail!("cannot lift after being notified that the writable end dropped");
3941 }
3942 let dst_table = instance.table_for_transmit(dst);
3943 let handle = match dst {
3944 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
3945 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
3946 }?;
3947 store
3948 .concurrent_state_mut()
3949 .get_mut(TableId::<TransmitHandle>::new(rep))?
3950 .common
3951 .handle = Some(handle);
3952 Ok(handle)
3953 }
3954
3955 pub(crate) fn future_new(
3957 self,
3958 store: &mut StoreOpaque,
3959 caller: RuntimeComponentInstanceIndex,
3960 ty: TypeFutureTableIndex,
3961 ) -> Result<ResourcePair> {
3962 self.id().get(store).check_may_leave(caller)?;
3963 self.guest_new(store, TransmitIndex::Future(ty))
3964 }
3965
3966 pub(crate) fn stream_new(
3968 self,
3969 store: &mut StoreOpaque,
3970 caller: RuntimeComponentInstanceIndex,
3971 ty: TypeStreamTableIndex,
3972 ) -> Result<ResourcePair> {
3973 self.id().get(store).check_may_leave(caller)?;
3974 self.guest_new(store, TransmitIndex::Stream(ty))
3975 }
3976
3977 pub(crate) fn future_transfer(
3980 self,
3981 store: &mut StoreOpaque,
3982 src_idx: u32,
3983 src: TypeFutureTableIndex,
3984 dst: TypeFutureTableIndex,
3985 ) -> Result<u32> {
3986 self.guest_transfer(
3987 store,
3988 src_idx,
3989 TransmitIndex::Future(src),
3990 TransmitIndex::Future(dst),
3991 )
3992 }
3993
3994 pub(crate) fn stream_transfer(
3997 self,
3998 store: &mut StoreOpaque,
3999 src_idx: u32,
4000 src: TypeStreamTableIndex,
4001 dst: TypeStreamTableIndex,
4002 ) -> Result<u32> {
4003 self.guest_transfer(
4004 store,
4005 src_idx,
4006 TransmitIndex::Stream(src),
4007 TransmitIndex::Stream(dst),
4008 )
4009 }
4010
4011 pub(crate) fn error_context_transfer(
4013 self,
4014 store: &mut StoreOpaque,
4015 src_idx: u32,
4016 src: TypeComponentLocalErrorContextTableIndex,
4017 dst: TypeComponentLocalErrorContextTableIndex,
4018 ) -> Result<u32> {
4019 let mut instance = self.id().get_mut(store);
4020 let rep = instance
4021 .as_mut()
4022 .table_for_error_context(src)
4023 .error_context_rep(src_idx)?;
4024 let dst_idx = instance
4025 .table_for_error_context(dst)
4026 .error_context_insert(rep)?;
4027
4028 let global_ref_count = store
4032 .concurrent_state_mut()
4033 .global_error_context_ref_counts
4034 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4035 .context("global ref count present for existing (sub)component error context")?;
4036 global_ref_count.0 += 1;
4037
4038 Ok(dst_idx)
4039 }
4040}
4041
4042impl ComponentInstance {
4043 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4044 let (tables, types) = self.guest_tables();
4045 let runtime_instance = match ty {
4046 TransmitIndex::Stream(ty) => types[ty].instance,
4047 TransmitIndex::Future(ty) => types[ty].instance,
4048 };
4049 &mut tables[runtime_instance]
4050 }
4051
4052 fn table_for_error_context(
4053 self: Pin<&mut Self>,
4054 ty: TypeComponentLocalErrorContextTableIndex,
4055 ) -> &mut HandleTable {
4056 let (tables, types) = self.guest_tables();
4057 let runtime_instance = types[ty].instance;
4058 &mut tables[runtime_instance]
4059 }
4060
4061 fn get_mut_by_index(
4062 self: Pin<&mut Self>,
4063 ty: TransmitIndex,
4064 index: u32,
4065 ) -> Result<(u32, &mut TransmitLocalState)> {
4066 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4067 }
4068}
4069
4070impl ConcurrentState {
4071 fn send_write_result(
4072 &mut self,
4073 ty: TransmitIndex,
4074 id: TableId<TransmitState>,
4075 handle: u32,
4076 code: ReturnCode,
4077 ) -> Result<()> {
4078 let write_handle = self.get_mut(id)?.write_handle.rep();
4079 self.set_event(
4080 write_handle,
4081 match ty {
4082 TransmitIndex::Future(ty) => Event::FutureWrite {
4083 code,
4084 pending: Some((ty, handle)),
4085 },
4086 TransmitIndex::Stream(ty) => Event::StreamWrite {
4087 code,
4088 pending: Some((ty, handle)),
4089 },
4090 },
4091 )
4092 }
4093
4094 fn send_read_result(
4095 &mut self,
4096 ty: TransmitIndex,
4097 id: TableId<TransmitState>,
4098 handle: u32,
4099 code: ReturnCode,
4100 ) -> Result<()> {
4101 let read_handle = self.get_mut(id)?.read_handle.rep();
4102 self.set_event(
4103 read_handle,
4104 match ty {
4105 TransmitIndex::Future(ty) => Event::FutureRead {
4106 code,
4107 pending: Some((ty, handle)),
4108 },
4109 TransmitIndex::Stream(ty) => Event::StreamRead {
4110 code,
4111 pending: Some((ty, handle)),
4112 },
4113 },
4114 )
4115 }
4116
4117 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4118 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4119 }
4120
4121 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4122 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4123 }
4124
4125 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4136 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4137
4138 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4139 let (ReturnCode::Completed(count)
4140 | ReturnCode::Dropped(count)
4141 | ReturnCode::Cancelled(count)) = old
4142 else {
4143 unreachable!()
4144 };
4145
4146 match new {
4147 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4148 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4149 _ => unreachable!(),
4150 }
4151 }
4152
4153 let event = match (waitable.take_event(self)?, event) {
4154 (None, _) => event,
4155 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4156 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4157 (
4158 Some(Event::StreamWrite {
4159 code: old_code,
4160 pending: old_pending,
4161 }),
4162 Event::StreamWrite { code, pending },
4163 ) => Event::StreamWrite {
4164 code: update_code(old_code, code),
4165 pending: old_pending.or(pending),
4166 },
4167 (
4168 Some(Event::StreamRead {
4169 code: old_code,
4170 pending: old_pending,
4171 }),
4172 Event::StreamRead { code, pending },
4173 ) => Event::StreamRead {
4174 code: update_code(old_code, code),
4175 pending: old_pending.or(pending),
4176 },
4177 _ => unreachable!(),
4178 };
4179
4180 waitable.set_event(self, Some(event))
4181 }
4182
4183 fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4186 let state_id = self.push(TransmitState::default())?;
4187
4188 let write = self.push(TransmitHandle::new(state_id))?;
4189 let read = self.push(TransmitHandle::new(state_id))?;
4190
4191 let state = self.get_mut(state_id)?;
4192 state.write_handle = write;
4193 state.read_handle = read;
4194
4195 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4196
4197 Ok((write, read))
4198 }
4199
4200 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4202 let state = self.delete(state_id)?;
4203 self.delete(state.write_handle)?;
4204 self.delete(state.read_handle)?;
4205
4206 log::trace!(
4207 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4208 state.write_handle,
4209 state.read_handle,
4210 );
4211
4212 Ok(())
4213 }
4214}
4215
4216pub(crate) struct ResourcePair {
4217 pub(crate) write: u32,
4218 pub(crate) read: u32,
4219}
4220
4221#[cfg(test)]
4222mod tests {
4223 use super::*;
4224 use crate::{Engine, Store};
4225 use core::future::pending;
4226 use core::pin::pin;
4227 use std::sync::LazyLock;
4228
4229 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4230
4231 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4232 where
4233 T: FutureProducer<()>,
4234 {
4235 rx.poll_produce(
4236 &mut Context::from_waker(Waker::noop()),
4237 Store::new(&ENGINE, ()).as_context_mut(),
4238 finish,
4239 )
4240 }
4241
4242 #[test]
4243 fn future_producer() {
4244 let mut fut = pin!(async { anyhow::Ok(()) });
4245 assert!(matches!(
4246 poll_future_producer(fut.as_mut(), false),
4247 Poll::Ready(Ok(Some(()))),
4248 ));
4249
4250 let mut fut = pin!(async { anyhow::Ok(()) });
4251 assert!(matches!(
4252 poll_future_producer(fut.as_mut(), true),
4253 Poll::Ready(Ok(Some(()))),
4254 ));
4255
4256 let mut fut = pin!(pending::<Result<()>>());
4257 assert!(matches!(
4258 poll_future_producer(fut.as_mut(), false),
4259 Poll::Pending,
4260 ));
4261 assert!(matches!(
4262 poll_future_producer(fut.as_mut(), true),
4263 Poll::Ready(Ok(None)),
4264 ));
4265
4266 let (tx, rx) = oneshot::channel();
4267 let mut rx = pin!(rx);
4268 assert!(matches!(
4269 poll_future_producer(rx.as_mut(), false),
4270 Poll::Pending,
4271 ));
4272 assert!(matches!(
4273 poll_future_producer(rx.as_mut(), true),
4274 Poll::Ready(Ok(None)),
4275 ));
4276 tx.send(()).unwrap();
4277 assert!(matches!(
4278 poll_future_producer(rx.as_mut(), true),
4279 Poll::Ready(Ok(Some(()))),
4280 ));
4281
4282 let (tx, rx) = oneshot::channel();
4283 let mut rx = pin!(rx);
4284 tx.send(()).unwrap();
4285 assert!(matches!(
4286 poll_future_producer(rx.as_mut(), false),
4287 Poll::Ready(Ok(Some(()))),
4288 ));
4289
4290 let (tx, rx) = oneshot::channel::<()>();
4291 let mut rx = pin!(rx);
4292 drop(tx);
4293 assert!(matches!(
4294 poll_future_producer(rx.as_mut(), false),
4295 Poll::Ready(Err(..)),
4296 ));
4297
4298 let (tx, rx) = oneshot::channel::<()>();
4299 let mut rx = pin!(rx);
4300 drop(tx);
4301 assert!(matches!(
4302 poll_future_producer(rx.as_mut(), true),
4303 Poll::Ready(Err(..)),
4304 ));
4305 }
4306}