1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem};
4use crate::component::func::{self, LiftContext, LowerContext, Options};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::VMStore;
10use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context, Result, anyhow, bail};
13use buffers::Extender;
14use buffers::UntypedWriteBuffer;
15use futures::channel::oneshot;
16use std::boxed::Box;
17use std::fmt;
18use std::future;
19use std::iter;
20use std::marker::PhantomData;
21use std::mem::{self, MaybeUninit};
22use std::pin::Pin;
23use std::string::{String, ToString};
24use std::sync::{Arc, Mutex};
25use std::task::{Poll, Waker};
26use std::vec::Vec;
27use wasmtime_environ::component::{
28 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex,
29 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
30 TypeFutureTableIndex, TypeStreamTableIndex,
31};
32
33pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
34
35mod buffers;
36
37#[derive(Copy, Clone, Debug)]
40pub enum TransmitKind {
41 Stream,
42 Future,
43}
44
45#[derive(Copy, Clone, Debug, PartialEq)]
47pub enum ReturnCode {
48 Blocked,
49 Completed(u32),
50 Dropped(u32),
51 Cancelled(u32),
52}
53
54impl ReturnCode {
55 pub fn encode(&self) -> u32 {
60 const BLOCKED: u32 = 0xffff_ffff;
61 const COMPLETED: u32 = 0x0;
62 const DROPPED: u32 = 0x1;
63 const CANCELLED: u32 = 0x2;
64 match self {
65 ReturnCode::Blocked => BLOCKED,
66 ReturnCode::Completed(n) => {
67 debug_assert!(*n < (1 << 28));
68 (n << 4) | COMPLETED
69 }
70 ReturnCode::Dropped(n) => {
71 debug_assert!(*n < (1 << 28));
72 (n << 4) | DROPPED
73 }
74 ReturnCode::Cancelled(n) => {
75 debug_assert!(*n < (1 << 28));
76 (n << 4) | CANCELLED
77 }
78 }
79 }
80
81 fn completed(kind: TransmitKind, count: u32) -> Self {
84 Self::Completed(if let TransmitKind::Future = kind {
85 0
86 } else {
87 count
88 })
89 }
90}
91
92#[derive(Copy, Clone, Debug)]
97pub enum TransmitIndex {
98 Stream(TypeStreamTableIndex),
99 Future(TypeFutureTableIndex),
100}
101
102impl TransmitIndex {
103 pub fn kind(&self) -> TransmitKind {
104 match self {
105 TransmitIndex::Stream(_) => TransmitKind::Stream,
106 TransmitIndex::Future(_) => TransmitKind::Future,
107 }
108 }
109}
110
111enum PostWrite {
113 Continue,
115 Drop,
117}
118
119struct HostResult<B> {
121 buffer: B,
123 dropped: bool,
125}
126
127fn payload(ty: TransmitIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
130 match ty {
131 TransmitIndex::Future(ty) => types[types[ty].ty].payload,
132 TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
133 }
134}
135
136fn get_mut_by_index_from(
139 handle_table: &mut HandleTable,
140 ty: TransmitIndex,
141 index: u32,
142) -> Result<(u32, &mut TransmitLocalState)> {
143 match ty {
144 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
145 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
146 }
147}
148
149fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
152 mut store: StoreContextMut<U>,
153 instance: Instance,
154 reader: Reader,
155 mut buffer: B,
156 kind: TransmitKind,
157) -> Result<(HostResult<B>, ReturnCode)> {
158 Ok(match reader {
159 Reader::Guest {
160 options,
161 ty,
162 address,
163 count,
164 } => {
165 let types = instance.id().get(store.0).component().types().clone();
166 let count = buffer.remaining().len().min(count);
167
168 let lower = &mut if T::MAY_REQUIRE_REALLOC {
169 LowerContext::new
170 } else {
171 LowerContext::new_without_realloc
172 }(store.as_context_mut(), options, &types, instance);
173
174 if address % usize::try_from(T::ALIGN32)? != 0 {
175 bail!("read pointer not aligned");
176 }
177 lower
178 .as_slice_mut()
179 .get_mut(address..)
180 .and_then(|b| b.get_mut(..T::SIZE32 * count))
181 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
182
183 if let Some(ty) = payload(ty, &types) {
184 T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
185 }
186
187 buffer.skip(count);
188 (
189 HostResult {
190 buffer,
191 dropped: false,
192 },
193 ReturnCode::completed(kind, count.try_into().unwrap()),
194 )
195 }
196 Reader::Host { accept } => {
197 let count = buffer.remaining().len();
198 let mut untyped = UntypedWriteBuffer::new(&mut buffer);
199 let count = accept(&mut untyped, count);
200 (
201 HostResult {
202 buffer,
203 dropped: false,
204 },
205 ReturnCode::completed(kind, count.try_into().unwrap()),
206 )
207 }
208 Reader::End => (
209 HostResult {
210 buffer,
211 dropped: true,
212 },
213 ReturnCode::Dropped(0),
214 ),
215 })
216}
217
218fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
221 writer: Writer,
222 mut buffer: B,
223 kind: TransmitKind,
224) -> Result<(HostResult<B>, ReturnCode)> {
225 Ok(match writer {
226 Writer::Guest {
227 lift,
228 ty,
229 address,
230 count,
231 } => {
232 let count = count.min(buffer.remaining_capacity());
233 if T::IS_RUST_UNIT_TYPE {
234 buffer.extend(
238 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() })
239 .take(count),
240 )
241 } else {
242 let ty = ty.unwrap();
243 if address % usize::try_from(T::ALIGN32)? != 0 {
244 bail!("write pointer not aligned");
245 }
246 lift.memory()
247 .get(address..)
248 .and_then(|b| b.get(..T::SIZE32 * count))
249 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
250
251 let list = &WasmList::new(address, count, lift, ty)?;
252 T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))?
253 }
254 (
255 HostResult {
256 buffer,
257 dropped: false,
258 },
259 ReturnCode::completed(kind, count.try_into().unwrap()),
260 )
261 }
262 Writer::Host {
263 buffer: input,
264 count,
265 } => {
266 let count = count.min(buffer.remaining_capacity());
267 buffer.move_from(input.get_mut::<T>(), count);
268 (
269 HostResult {
270 buffer,
271 dropped: false,
272 },
273 ReturnCode::completed(kind, count.try_into().unwrap()),
274 )
275 }
276 Writer::End => (
277 HostResult {
278 buffer,
279 dropped: true,
280 },
281 ReturnCode::Dropped(0),
282 ),
283 })
284}
285
286async fn watch_reader(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
289 future::poll_fn(|cx| {
290 accessor
291 .as_accessor()
292 .with(|mut access| {
293 let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
294 let state_id = concurrent_state.get(id)?.state;
295 let state = concurrent_state.get_mut(state_id)?;
296 anyhow::Ok(if matches!(&state.read, ReadState::Dropped) {
297 Poll::Ready(())
298 } else {
299 state.reader_watcher = Some(cx.waker().clone());
300 Poll::Pending
301 })
302 })
303 .unwrap_or(Poll::Ready(()))
304 })
305 .await
306}
307
308async fn watch_writer(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
311 future::poll_fn(|cx| {
312 accessor
313 .as_accessor()
314 .with(|mut access| {
315 let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
316 let state_id = concurrent_state.get(id)?.state;
317 let state = concurrent_state.get_mut(state_id)?;
318 anyhow::Ok(
319 if matches!(
320 &state.write,
321 WriteState::Dropped
322 | WriteState::GuestReady {
323 post_write: PostWrite::Drop,
324 ..
325 }
326 | WriteState::HostReady {
327 post_write: PostWrite::Drop,
328 ..
329 }
330 ) {
331 Poll::Ready(())
332 } else {
333 state.writer_watcher = Some(cx.waker().clone());
334 Poll::Pending
335 },
336 )
337 })
338 .unwrap_or(Poll::Ready(()))
339 })
340 .await
341}
342
343#[derive(Debug, PartialEq, Eq, PartialOrd)]
345pub(super) struct ErrorContextState {
346 pub(crate) debug_msg: String,
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq)]
353pub(super) struct FlatAbi {
354 pub(super) size: u32,
355 pub(super) align: u32,
356}
357
358pub struct FutureWriter<T> {
365 default: fn() -> T,
366 id: TableId<TransmitHandle>,
367 instance: Instance,
368}
369
370impl<T> FutureWriter<T> {
371 fn new(default: fn() -> T, id: TableId<TransmitHandle>, instance: Instance) -> Self {
372 Self {
373 default,
374 id,
375 instance,
376 }
377 }
378
379 pub async fn write(self, accessor: impl AsAccessor, value: T) -> bool
390 where
391 T: func::Lower + Send + Sync + 'static,
392 {
393 self.guard(accessor).write(value).await
394 }
395
396 async fn write_(&mut self, accessor: impl AsAccessor, value: T) -> bool
399 where
400 T: func::Lower + Send + Sync + 'static,
401 {
402 let accessor = accessor.as_accessor();
403
404 let result = self
405 .instance
406 .host_write_async(accessor, self.id, Some(value), TransmitKind::Future)
407 .await;
408
409 match result {
410 Ok(HostResult { dropped, .. }) => !dropped,
411 Err(_) => todo!("guarantee buffer recovery if `host_write` fails"),
412 }
413 }
414
415 pub async fn watch_reader(&mut self, accessor: impl AsAccessor) {
425 watch_reader(accessor, self.instance, self.id).await
426 }
427
428 pub fn close(&mut self, mut store: impl AsContextMut)
436 where
437 T: func::Lower + Send + Sync + 'static,
438 {
439 let id = mem::replace(&mut self.id, TableId::new(0));
440 let default = self.default;
441 self.instance
442 .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default())))
443 .unwrap();
444 }
445
446 pub fn close_with(&mut self, accessor: impl AsAccessor)
448 where
449 T: func::Lower + Send + Sync + 'static,
450 {
451 accessor.as_accessor().with(|access| self.close(access))
452 }
453
454 pub fn guard<A>(self, accessor: A) -> GuardedFutureWriter<T, A>
460 where
461 T: func::Lower + Send + Sync + 'static,
462 A: AsAccessor,
463 {
464 GuardedFutureWriter::new(accessor, self)
465 }
466}
467
468pub struct GuardedFutureWriter<T, A>
474where
475 T: func::Lower + Send + Sync + 'static,
476 A: AsAccessor,
477{
478 writer: Option<FutureWriter<T>>,
482 accessor: A,
483}
484
485impl<T, A> GuardedFutureWriter<T, A>
486where
487 T: func::Lower + Send + Sync + 'static,
488 A: AsAccessor,
489{
490 pub fn new(accessor: A, writer: FutureWriter<T>) -> Self {
493 Self {
494 writer: Some(writer),
495 accessor,
496 }
497 }
498
499 pub async fn write(mut self, value: T) -> bool
501 where
502 T: func::Lower + Send + Sync + 'static,
503 {
504 self.writer
505 .as_mut()
506 .unwrap()
507 .write_(&self.accessor, value)
508 .await
509 }
510
511 pub async fn watch_reader(&mut self) {
513 self.writer
514 .as_mut()
515 .unwrap()
516 .watch_reader(&self.accessor)
517 .await
518 }
519
520 pub fn into_future(self) -> FutureWriter<T> {
523 self.into()
524 }
525}
526
527impl<T, A> From<GuardedFutureWriter<T, A>> for FutureWriter<T>
528where
529 T: func::Lower + Send + Sync + 'static,
530 A: AsAccessor,
531{
532 fn from(mut guard: GuardedFutureWriter<T, A>) -> Self {
533 guard.writer.take().unwrap()
534 }
535}
536
537impl<T, A> Drop for GuardedFutureWriter<T, A>
538where
539 T: func::Lower + Send + Sync + 'static,
540 A: AsAccessor,
541{
542 fn drop(&mut self) {
543 if let Some(writer) = &mut self.writer {
544 writer.close_with(&self.accessor)
545 }
546 }
547}
548
549pub struct FutureReader<T> {
556 instance: Instance,
557 id: TableId<TransmitHandle>,
558 _phantom: PhantomData<T>,
559}
560
561impl<T> FutureReader<T> {
562 fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
563 Self {
564 instance,
565 id,
566 _phantom: PhantomData,
567 }
568 }
569
570 pub async fn read(self, accessor: impl AsAccessor) -> Option<T>
583 where
584 T: func::Lift + Send + 'static,
585 {
586 self.guard(accessor).read().await
587 }
588
589 async fn read_(&mut self, accessor: impl AsAccessor) -> Option<T>
590 where
591 T: func::Lift + Send + 'static,
592 {
593 let accessor = accessor.as_accessor();
594
595 let result = self
596 .instance
597 .host_read_async(accessor, self.id, None, TransmitKind::Future)
598 .await;
599
600 if let Ok(HostResult {
601 mut buffer,
602 dropped: false,
603 }) = result
604 {
605 buffer.take()
606 } else {
607 None
608 }
609 }
610
611 pub async fn watch_writer(&mut self, accessor: impl AsAccessor) {
621 watch_writer(accessor, self.instance, self.id).await;
622 }
623
624 pub fn into_val(self) -> Val {
627 Val::Future(FutureAny(self.id.rep()))
628 }
629
630 pub fn from_val(
632 mut store: impl AsContextMut<Data: Send>,
633 instance: Instance,
634 value: &Val,
635 ) -> Result<Self> {
636 let Val::Future(FutureAny(rep)) = value else {
637 bail!("expected `future`; got `{}`", value.desc());
638 };
639 let store = store.as_context_mut();
640 let id = TableId::<TransmitHandle>::new(*rep);
641 instance.concurrent_state_mut(store.0).get(id)?; Ok(Self::new(id, instance))
643 }
644
645 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
647 match ty {
648 InterfaceType::Future(src) => {
649 let handle_table = cx
650 .instance_mut()
651 .table_for_transmit(TransmitIndex::Future(src));
652 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
653 if is_done {
654 bail!("cannot lift future after being notified that the writable end dropped");
655 }
656 let id = TableId::<TransmitHandle>::new(rep);
657 let concurrent_state = cx.instance_mut().concurrent_state_mut();
658 let future = concurrent_state.get_mut(id)?;
659 future.common.handle = None;
660 let state = future.state;
661
662 if concurrent_state.get(state)?.done {
663 bail!("cannot lift future after previous read succeeded");
664 }
665
666 Ok(Self::new(id, cx.instance_handle()))
667 }
668 _ => func::bad_type_info(),
669 }
670 }
671
672 pub fn close(&mut self, mut store: impl AsContextMut) {
680 let id = mem::replace(&mut self.id, TableId::new(0));
682 self.instance
683 .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Future)
684 .unwrap();
685 }
686
687 pub fn close_with(&mut self, accessor: impl AsAccessor) {
689 accessor.as_accessor().with(|access| self.close(access))
690 }
691
692 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
698 where
699 A: AsAccessor,
700 {
701 GuardedFutureReader::new(accessor, self)
702 }
703}
704
705impl<T> fmt::Debug for FutureReader<T> {
706 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707 f.debug_struct("FutureReader")
708 .field("id", &self.id)
709 .field("instance", &self.instance)
710 .finish()
711 }
712}
713
714pub(crate) fn lower_future_to_index<U>(
716 rep: u32,
717 cx: &mut LowerContext<'_, U>,
718 ty: InterfaceType,
719) -> Result<u32> {
720 match ty {
721 InterfaceType::Future(dst) => {
722 let concurrent_state = cx.instance_mut().concurrent_state_mut();
723 let id = TableId::<TransmitHandle>::new(rep);
724 let state = concurrent_state.get(id)?.state;
725 let rep = concurrent_state.get(state)?.read_handle.rep();
726
727 let handle = cx
728 .instance_mut()
729 .table_for_transmit(TransmitIndex::Future(dst))
730 .future_insert_read(dst, rep)?;
731
732 cx.instance_mut()
733 .concurrent_state_mut()
734 .get_mut(id)?
735 .common
736 .handle = Some(handle);
737
738 Ok(handle)
739 }
740 _ => func::bad_type_info(),
741 }
742}
743
744unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
747 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
748
749 type Lower = <u32 as func::ComponentType>::Lower;
750
751 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
752 match ty {
753 InterfaceType::Future(_) => Ok(()),
754 other => bail!("expected `future`, found `{}`", func::desc(other)),
755 }
756 }
757}
758
759unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
761 fn linear_lower_to_flat<U>(
762 &self,
763 cx: &mut LowerContext<'_, U>,
764 ty: InterfaceType,
765 dst: &mut MaybeUninit<Self::Lower>,
766 ) -> Result<()> {
767 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
768 cx,
769 InterfaceType::U32,
770 dst,
771 )
772 }
773
774 fn linear_lower_to_memory<U>(
775 &self,
776 cx: &mut LowerContext<'_, U>,
777 ty: InterfaceType,
778 offset: usize,
779 ) -> Result<()> {
780 lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
781 cx,
782 InterfaceType::U32,
783 offset,
784 )
785 }
786}
787
788unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
790 fn linear_lift_from_flat(
791 cx: &mut LiftContext<'_>,
792 ty: InterfaceType,
793 src: &Self::Lower,
794 ) -> Result<Self> {
795 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
796 Self::lift_from_index(cx, ty, index)
797 }
798
799 fn linear_lift_from_memory(
800 cx: &mut LiftContext<'_>,
801 ty: InterfaceType,
802 bytes: &[u8],
803 ) -> Result<Self> {
804 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
805 Self::lift_from_index(cx, ty, index)
806 }
807}
808
809pub struct GuardedFutureReader<T, A>
815where
816 A: AsAccessor,
817{
818 reader: Option<FutureReader<T>>,
822 accessor: A,
823}
824
825impl<T, A> GuardedFutureReader<T, A>
826where
827 A: AsAccessor,
828{
829 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
831 Self {
832 reader: Some(reader),
833 accessor,
834 }
835 }
836
837 pub async fn read(mut self) -> Option<T>
839 where
840 T: func::Lift + Send + 'static,
841 {
842 self.reader.as_mut().unwrap().read_(&self.accessor).await
843 }
844
845 pub async fn watch_writer(&mut self) {
847 self.reader
848 .as_mut()
849 .unwrap()
850 .watch_writer(&self.accessor)
851 .await
852 }
853
854 pub fn into_future(self) -> FutureReader<T> {
857 self.into()
858 }
859}
860
861impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
862where
863 A: AsAccessor,
864{
865 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
866 guard.reader.take().unwrap()
867 }
868}
869
870impl<T, A> Drop for GuardedFutureReader<T, A>
871where
872 A: AsAccessor,
873{
874 fn drop(&mut self) {
875 if let Some(reader) = &mut self.reader {
876 reader.close_with(&self.accessor)
877 }
878 }
879}
880
881pub struct StreamWriter<T> {
888 instance: Instance,
889 id: TableId<TransmitHandle>,
890 closed: bool,
891 _phantom: PhantomData<T>,
892}
893
894impl<T> StreamWriter<T> {
895 fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
896 Self {
897 instance,
898 id,
899 closed: false,
900 _phantom: PhantomData,
901 }
902 }
903
904 pub fn is_closed(&self) -> bool {
907 self.closed
908 }
909
910 pub async fn write<B>(&mut self, accessor: impl AsAccessor, buffer: B) -> B
928 where
929 T: func::Lower + 'static,
930 B: WriteBuffer<T>,
931 {
932 let result = self
933 .instance
934 .host_write_async(
935 accessor.as_accessor(),
936 self.id,
937 buffer,
938 TransmitKind::Stream,
939 )
940 .await;
941
942 match result {
943 Ok(HostResult { buffer, dropped }) => {
944 if self.closed {
945 debug_assert!(dropped);
946 }
947 self.closed = dropped;
948 buffer
949 }
950 Err(_) => todo!("guarantee buffer recovery if `host_write` fails"),
951 }
952 }
953
954 pub async fn write_all<B>(&mut self, accessor: impl AsAccessor, mut buffer: B) -> B
967 where
968 T: func::Lower + 'static,
969 B: WriteBuffer<T>,
970 {
971 let accessor = accessor.as_accessor();
972 while !self.is_closed() && buffer.remaining().len() > 0 {
973 buffer = self.write(accessor, buffer).await;
974 }
975 buffer
976 }
977
978 pub async fn watch_reader(&mut self, accessor: impl AsAccessor) {
985 watch_reader(accessor, self.instance, self.id).await
986 }
987
988 pub fn close(&mut self, mut store: impl AsContextMut) {
996 let id = mem::replace(&mut self.id, TableId::new(0));
998 self.instance
999 .host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>)
1000 .unwrap()
1001 }
1002
1003 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1005 accessor.as_accessor().with(|access| self.close(access))
1006 }
1007
1008 pub fn guard<A>(self, accessor: A) -> GuardedStreamWriter<T, A>
1014 where
1015 A: AsAccessor,
1016 {
1017 GuardedStreamWriter::new(accessor, self)
1018 }
1019}
1020
1021pub struct GuardedStreamWriter<T, A>
1027where
1028 A: AsAccessor,
1029{
1030 writer: Option<StreamWriter<T>>,
1034 accessor: A,
1035}
1036
1037impl<T, A> GuardedStreamWriter<T, A>
1038where
1039 A: AsAccessor,
1040{
1041 pub fn new(accessor: A, writer: StreamWriter<T>) -> Self {
1043 Self {
1044 writer: Some(writer),
1045 accessor,
1046 }
1047 }
1048
1049 pub fn is_closed(&self) -> bool {
1051 self.writer.as_ref().unwrap().is_closed()
1052 }
1053
1054 pub async fn write<B>(&mut self, buffer: B) -> B
1056 where
1057 T: func::Lower + 'static,
1058 B: WriteBuffer<T>,
1059 {
1060 self.writer
1061 .as_mut()
1062 .unwrap()
1063 .write(&self.accessor, buffer)
1064 .await
1065 }
1066
1067 pub async fn write_all<B>(&mut self, buffer: B) -> B
1069 where
1070 T: func::Lower + 'static,
1071 B: WriteBuffer<T>,
1072 {
1073 self.writer
1074 .as_mut()
1075 .unwrap()
1076 .write_all(&self.accessor, buffer)
1077 .await
1078 }
1079
1080 pub async fn watch_reader(&mut self) {
1082 self.writer
1083 .as_mut()
1084 .unwrap()
1085 .watch_reader(&self.accessor)
1086 .await
1087 }
1088
1089 pub fn into_stream(self) -> StreamWriter<T> {
1092 self.into()
1093 }
1094}
1095
1096impl<T, A> From<GuardedStreamWriter<T, A>> for StreamWriter<T>
1097where
1098 A: AsAccessor,
1099{
1100 fn from(mut guard: GuardedStreamWriter<T, A>) -> Self {
1101 guard.writer.take().unwrap()
1102 }
1103}
1104
1105impl<T, A> Drop for GuardedStreamWriter<T, A>
1106where
1107 A: AsAccessor,
1108{
1109 fn drop(&mut self) {
1110 if let Some(writer) = &mut self.writer {
1111 writer.close_with(&self.accessor)
1112 }
1113 }
1114}
1115
1116pub struct StreamReader<T> {
1123 instance: Instance,
1124 id: TableId<TransmitHandle>,
1125 closed: bool,
1126 _phantom: PhantomData<T>,
1127}
1128
1129impl<T> StreamReader<T> {
1130 fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1131 Self {
1132 instance,
1133 id,
1134 closed: false,
1135 _phantom: PhantomData,
1136 }
1137 }
1138
1139 pub fn is_closed(&self) -> bool {
1142 self.closed
1143 }
1144
1145 pub async fn read<B>(&mut self, accessor: impl AsAccessor, buffer: B) -> B
1158 where
1159 T: func::Lift + 'static,
1160 B: ReadBuffer<T> + Send + 'static,
1161 {
1162 let result = self
1163 .instance
1164 .host_read_async(
1165 accessor.as_accessor(),
1166 self.id,
1167 buffer,
1168 TransmitKind::Stream,
1169 )
1170 .await;
1171
1172 match result {
1173 Ok(HostResult { buffer, dropped }) => {
1174 if self.closed {
1175 debug_assert!(dropped);
1176 }
1177 self.closed = dropped;
1178 buffer
1179 }
1180 Err(_) => {
1181 todo!("guarantee buffer recovery if `host_read` fails")
1182 }
1183 }
1184 }
1185
1186 pub async fn watch_writer(&mut self, accessor: impl AsAccessor) {
1193 watch_writer(accessor, self.instance, self.id).await
1194 }
1195
1196 pub fn into_val(self) -> Val {
1199 Val::Stream(StreamAny(self.id.rep()))
1200 }
1201
1202 pub fn from_val(
1204 mut store: impl AsContextMut<Data: Send>,
1205 instance: Instance,
1206 value: &Val,
1207 ) -> Result<Self> {
1208 let Val::Stream(StreamAny(rep)) = value else {
1209 bail!("expected `stream`; got `{}`", value.desc());
1210 };
1211 let store = store.as_context_mut();
1212 let id = TableId::<TransmitHandle>::new(*rep);
1213 instance.concurrent_state_mut(store.0).get(id)?; Ok(Self::new(id, instance))
1215 }
1216
1217 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1219 match ty {
1220 InterfaceType::Stream(src) => {
1221 let handle_table = cx
1222 .instance_mut()
1223 .table_for_transmit(TransmitIndex::Stream(src));
1224 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1225 if is_done {
1226 bail!("cannot lift stream after being notified that the writable end dropped");
1227 }
1228 let id = TableId::<TransmitHandle>::new(rep);
1229 cx.instance_mut()
1230 .concurrent_state_mut()
1231 .get_mut(id)?
1232 .common
1233 .handle = None;
1234 Ok(Self::new(id, cx.instance_handle()))
1235 }
1236 _ => func::bad_type_info(),
1237 }
1238 }
1239
1240 pub fn close(&mut self, mut store: impl AsContextMut) {
1248 let id = mem::replace(&mut self.id, TableId::new(0));
1250 self.instance
1251 .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Stream)
1252 .unwrap()
1253 }
1254
1255 pub fn close_with(&mut self, accessor: impl AsAccessor) {
1257 accessor.as_accessor().with(|access| self.close(access))
1258 }
1259
1260 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1266 where
1267 A: AsAccessor,
1268 {
1269 GuardedStreamReader::new(accessor, self)
1270 }
1271}
1272
1273impl<T> fmt::Debug for StreamReader<T> {
1274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1275 f.debug_struct("StreamReader")
1276 .field("id", &self.id)
1277 .field("instance", &self.instance)
1278 .finish()
1279 }
1280}
1281
1282pub(crate) fn lower_stream_to_index<U>(
1284 rep: u32,
1285 cx: &mut LowerContext<'_, U>,
1286 ty: InterfaceType,
1287) -> Result<u32> {
1288 match ty {
1289 InterfaceType::Stream(dst) => {
1290 let concurrent_state = cx.instance_mut().concurrent_state_mut();
1291 let id = TableId::<TransmitHandle>::new(rep);
1292 let state = concurrent_state.get(id)?.state;
1293 let rep = concurrent_state.get(state)?.read_handle.rep();
1294
1295 let handle = cx
1296 .instance_mut()
1297 .table_for_transmit(TransmitIndex::Stream(dst))
1298 .stream_insert_read(dst, rep)?;
1299
1300 cx.instance_mut()
1301 .concurrent_state_mut()
1302 .get_mut(id)?
1303 .common
1304 .handle = Some(handle);
1305
1306 Ok(handle)
1307 }
1308 _ => func::bad_type_info(),
1309 }
1310}
1311
1312unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1315 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1316
1317 type Lower = <u32 as func::ComponentType>::Lower;
1318
1319 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1320 match ty {
1321 InterfaceType::Stream(_) => Ok(()),
1322 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1323 }
1324 }
1325}
1326
1327unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1329 fn linear_lower_to_flat<U>(
1330 &self,
1331 cx: &mut LowerContext<'_, U>,
1332 ty: InterfaceType,
1333 dst: &mut MaybeUninit<Self::Lower>,
1334 ) -> Result<()> {
1335 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1336 cx,
1337 InterfaceType::U32,
1338 dst,
1339 )
1340 }
1341
1342 fn linear_lower_to_memory<U>(
1343 &self,
1344 cx: &mut LowerContext<'_, U>,
1345 ty: InterfaceType,
1346 offset: usize,
1347 ) -> Result<()> {
1348 lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1349 cx,
1350 InterfaceType::U32,
1351 offset,
1352 )
1353 }
1354}
1355
1356unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1358 fn linear_lift_from_flat(
1359 cx: &mut LiftContext<'_>,
1360 ty: InterfaceType,
1361 src: &Self::Lower,
1362 ) -> Result<Self> {
1363 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1364 Self::lift_from_index(cx, ty, index)
1365 }
1366
1367 fn linear_lift_from_memory(
1368 cx: &mut LiftContext<'_>,
1369 ty: InterfaceType,
1370 bytes: &[u8],
1371 ) -> Result<Self> {
1372 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1373 Self::lift_from_index(cx, ty, index)
1374 }
1375}
1376
1377pub struct GuardedStreamReader<T, A>
1383where
1384 A: AsAccessor,
1385{
1386 reader: Option<StreamReader<T>>,
1390 accessor: A,
1391}
1392
1393impl<T, A> GuardedStreamReader<T, A>
1394where
1395 A: AsAccessor,
1396{
1397 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1400 Self {
1401 reader: Some(reader),
1402 accessor,
1403 }
1404 }
1405
1406 pub fn is_closed(&self) -> bool {
1408 self.reader.as_ref().unwrap().is_closed()
1409 }
1410
1411 pub async fn read<B>(&mut self, buffer: B) -> B
1413 where
1414 T: func::Lift + 'static,
1415 B: ReadBuffer<T> + Send + 'static,
1416 {
1417 self.reader
1418 .as_mut()
1419 .unwrap()
1420 .read(&self.accessor, buffer)
1421 .await
1422 }
1423
1424 pub async fn watch_writer(&mut self) {
1426 self.reader
1427 .as_mut()
1428 .unwrap()
1429 .watch_writer(&self.accessor)
1430 .await
1431 }
1432
1433 pub fn into_stream(self) -> StreamReader<T> {
1436 self.into()
1437 }
1438}
1439
1440impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1441where
1442 A: AsAccessor,
1443{
1444 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1445 guard.reader.take().unwrap()
1446 }
1447}
1448
1449impl<T, A> Drop for GuardedStreamReader<T, A>
1450where
1451 A: AsAccessor,
1452{
1453 fn drop(&mut self) {
1454 if let Some(reader) = &mut self.reader {
1455 reader.close_with(&self.accessor)
1456 }
1457 }
1458}
1459
1460pub struct ErrorContext {
1462 rep: u32,
1463}
1464
1465impl ErrorContext {
1466 pub(crate) fn new(rep: u32) -> Self {
1467 Self { rep }
1468 }
1469
1470 pub fn into_val(self) -> Val {
1472 Val::ErrorContext(ErrorContextAny(self.rep))
1473 }
1474
1475 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1477 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1478 bail!("expected `error-context`; got `{}`", value.desc());
1479 };
1480 Ok(Self::new(*rep))
1481 }
1482
1483 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1484 match ty {
1485 InterfaceType::ErrorContext(src) => {
1486 let rep = cx
1487 .instance_mut()
1488 .table_for_error_context(src)
1489 .error_context_rep(index)?;
1490
1491 Ok(Self { rep })
1492 }
1493 _ => func::bad_type_info(),
1494 }
1495 }
1496}
1497
1498pub(crate) fn lower_error_context_to_index<U>(
1499 rep: u32,
1500 cx: &mut LowerContext<'_, U>,
1501 ty: InterfaceType,
1502) -> Result<u32> {
1503 match ty {
1504 InterfaceType::ErrorContext(dst) => {
1505 let tbl = cx.instance_mut().table_for_error_context(dst);
1506 tbl.error_context_insert(rep)
1507 }
1508 _ => func::bad_type_info(),
1509 }
1510}
1511unsafe impl func::ComponentType for ErrorContext {
1514 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1515
1516 type Lower = <u32 as func::ComponentType>::Lower;
1517
1518 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1519 match ty {
1520 InterfaceType::ErrorContext(_) => Ok(()),
1521 other => bail!("expected `error`, found `{}`", func::desc(other)),
1522 }
1523 }
1524}
1525
1526unsafe impl func::Lower for ErrorContext {
1528 fn linear_lower_to_flat<T>(
1529 &self,
1530 cx: &mut LowerContext<'_, T>,
1531 ty: InterfaceType,
1532 dst: &mut MaybeUninit<Self::Lower>,
1533 ) -> Result<()> {
1534 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1535 cx,
1536 InterfaceType::U32,
1537 dst,
1538 )
1539 }
1540
1541 fn linear_lower_to_memory<T>(
1542 &self,
1543 cx: &mut LowerContext<'_, T>,
1544 ty: InterfaceType,
1545 offset: usize,
1546 ) -> Result<()> {
1547 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1548 cx,
1549 InterfaceType::U32,
1550 offset,
1551 )
1552 }
1553}
1554
1555unsafe impl func::Lift for ErrorContext {
1557 fn linear_lift_from_flat(
1558 cx: &mut LiftContext<'_>,
1559 ty: InterfaceType,
1560 src: &Self::Lower,
1561 ) -> Result<Self> {
1562 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1563 Self::lift_from_index(cx, ty, index)
1564 }
1565
1566 fn linear_lift_from_memory(
1567 cx: &mut LiftContext<'_>,
1568 ty: InterfaceType,
1569 bytes: &[u8],
1570 ) -> Result<Self> {
1571 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1572 Self::lift_from_index(cx, ty, index)
1573 }
1574}
1575
1576pub(super) struct TransmitHandle {
1578 pub(super) common: WaitableCommon,
1579 state: TableId<TransmitState>,
1581}
1582
1583impl TransmitHandle {
1584 fn new(state: TableId<TransmitState>) -> Self {
1585 Self {
1586 common: WaitableCommon::default(),
1587 state,
1588 }
1589 }
1590}
1591
1592impl TableDebug for TransmitHandle {
1593 fn type_name() -> &'static str {
1594 "TransmitHandle"
1595 }
1596}
1597
1598struct TransmitState {
1600 write_handle: TableId<TransmitHandle>,
1602 read_handle: TableId<TransmitHandle>,
1604 write: WriteState,
1606 read: ReadState,
1608 writer_watcher: Option<Waker>,
1614 reader_watcher: Option<Waker>,
1616 done: bool,
1618}
1619
1620impl Default for TransmitState {
1621 fn default() -> Self {
1622 Self {
1623 write_handle: TableId::new(0),
1624 read_handle: TableId::new(0),
1625 read: ReadState::Open,
1626 write: WriteState::Open,
1627 reader_watcher: None,
1628 writer_watcher: None,
1629 done: false,
1630 }
1631 }
1632}
1633
1634impl TableDebug for TransmitState {
1635 fn type_name() -> &'static str {
1636 "TransmitState"
1637 }
1638}
1639
1640enum WriteState {
1642 Open,
1644 GuestReady {
1646 ty: TransmitIndex,
1647 flat_abi: Option<FlatAbi>,
1648 options: Options,
1649 address: usize,
1650 count: usize,
1651 handle: u32,
1652 post_write: PostWrite,
1653 },
1654 HostReady {
1656 accept:
1657 Box<dyn FnOnce(&mut dyn VMStore, Instance, Reader) -> Result<ReturnCode> + Send + Sync>,
1658 post_write: PostWrite,
1659 },
1660 Dropped,
1662}
1663
1664impl fmt::Debug for WriteState {
1665 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1666 match self {
1667 Self::Open => f.debug_tuple("Open").finish(),
1668 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1669 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1670 Self::Dropped => f.debug_tuple("Dropped").finish(),
1671 }
1672 }
1673}
1674
1675enum ReadState {
1677 Open,
1679 GuestReady {
1681 ty: TransmitIndex,
1682 flat_abi: Option<FlatAbi>,
1683 options: Options,
1684 address: usize,
1685 count: usize,
1686 handle: u32,
1687 },
1688 HostReady {
1690 accept: Box<dyn FnOnce(Writer) -> Result<ReturnCode> + Send + Sync>,
1691 },
1692 Dropped,
1694}
1695
1696impl fmt::Debug for ReadState {
1697 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1698 match self {
1699 Self::Open => f.debug_tuple("Open").finish(),
1700 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1701 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1702 Self::Dropped => f.debug_tuple("Dropped").finish(),
1703 }
1704 }
1705}
1706
1707enum Writer<'a> {
1711 Guest {
1713 lift: &'a mut LiftContext<'a>,
1714 ty: Option<InterfaceType>,
1715 address: usize,
1716 count: usize,
1717 },
1718 Host {
1720 buffer: &'a mut UntypedWriteBuffer<'a>,
1721 count: usize,
1722 },
1723 End,
1725}
1726
1727enum Reader<'a> {
1731 Guest {
1733 options: &'a Options,
1734 ty: TransmitIndex,
1735 address: usize,
1736 count: usize,
1737 },
1738 Host {
1740 accept: Box<dyn FnOnce(&mut UntypedWriteBuffer, usize) -> usize + 'a>,
1741 },
1742 End,
1744}
1745
1746impl Instance {
1747 pub fn future<T: func::Lower + func::Lift + Send + Sync + 'static>(
1754 self,
1755 mut store: impl AsContextMut,
1756 default: fn() -> T,
1757 ) -> Result<(FutureWriter<T>, FutureReader<T>)> {
1758 let (write, read) = self
1759 .concurrent_state_mut(store.as_context_mut().0)
1760 .new_transmit()?;
1761
1762 Ok((
1763 FutureWriter::new(default, write, self),
1764 FutureReader::new(read, self),
1765 ))
1766 }
1767
1768 pub fn stream<T: func::Lower + func::Lift + Send + 'static>(
1771 self,
1772 mut store: impl AsContextMut,
1773 ) -> Result<(StreamWriter<T>, StreamReader<T>)> {
1774 let (write, read) = self
1775 .concurrent_state_mut(store.as_context_mut().0)
1776 .new_transmit()?;
1777
1778 Ok((
1779 StreamWriter::new(write, self),
1780 StreamReader::new(read, self),
1781 ))
1782 }
1783
1784 fn host_write<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U>(
1786 self,
1787 mut store: StoreContextMut<U>,
1788 id: TableId<TransmitHandle>,
1789 mut buffer: B,
1790 kind: TransmitKind,
1791 post_write: PostWrite,
1792 ) -> Result<Result<HostResult<B>, oneshot::Receiver<HostResult<B>>>> {
1793 let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
1794 let transmit = self
1795 .concurrent_state_mut(store.0)
1796 .get_mut(transmit_id)
1797 .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?;
1798 log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read);
1799
1800 let new_state = if let ReadState::Dropped = &transmit.read {
1801 ReadState::Dropped
1802 } else {
1803 ReadState::Open
1804 };
1805
1806 if matches!(post_write, PostWrite::Drop) && !matches!(transmit.read, ReadState::Open) {
1807 transmit.write = WriteState::Dropped;
1808 }
1809
1810 Ok(match mem::replace(&mut transmit.read, new_state) {
1811 ReadState::Open => {
1812 assert!(matches!(&transmit.write, WriteState::Open));
1813
1814 let token = StoreToken::new(store.as_context_mut());
1815 let (tx, rx) = oneshot::channel();
1816 let state = WriteState::HostReady {
1817 accept: Box::new(move |store, instance, reader| {
1818 let (result, code) = accept_reader::<T, B, U>(
1819 token.as_context_mut(store),
1820 instance,
1821 reader,
1822 buffer,
1823 kind,
1824 )?;
1825 _ = tx.send(result);
1826 Ok(code)
1827 }),
1828 post_write,
1829 };
1830 self.concurrent_state_mut(store.0)
1831 .get_mut(transmit_id)?
1832 .write = state;
1833
1834 Err(rx)
1835 }
1836
1837 ReadState::GuestReady {
1838 ty,
1839 flat_abi: _,
1840 options,
1841 address,
1842 count,
1843 handle,
1844 ..
1845 } => {
1846 if let TransmitKind::Future = kind {
1847 transmit.done = true;
1848 }
1849
1850 let read_handle = transmit.read_handle;
1851 let accept = move |mut store: StoreContextMut<U>| {
1852 let (result, code) = accept_reader::<T, B, U>(
1853 store.as_context_mut(),
1854 self,
1855 Reader::Guest {
1856 options: &options,
1857 ty,
1858 address,
1859 count,
1860 },
1861 buffer,
1862 kind,
1863 )?;
1864
1865 self.concurrent_state_mut(store.0).set_event(
1866 read_handle.rep(),
1867 match ty {
1868 TransmitIndex::Future(ty) => Event::FutureRead {
1869 code,
1870 pending: Some((ty, handle)),
1871 },
1872 TransmitIndex::Stream(ty) => Event::StreamRead {
1873 code,
1874 pending: Some((ty, handle)),
1875 },
1876 },
1877 )?;
1878
1879 anyhow::Ok(result)
1880 };
1881
1882 if T::MAY_REQUIRE_REALLOC {
1883 let (tx, rx) = oneshot::channel();
1888 let token = StoreToken::new(store.as_context_mut());
1889 self.concurrent_state_mut(store.0).push_high_priority(
1890 WorkItem::WorkerFunction(Mutex::new(Box::new(move |store, _| {
1891 _ = tx.send(accept(token.as_context_mut(store))?);
1892 Ok(())
1893 }))),
1894 );
1895 Err(rx)
1896 } else {
1897 Ok(accept(store)?)
1902 }
1903 }
1904
1905 ReadState::HostReady { accept } => {
1906 let count = buffer.remaining().len();
1907 let mut untyped = UntypedWriteBuffer::new(&mut buffer);
1908 let code = accept(Writer::Host {
1909 buffer: &mut untyped,
1910 count,
1911 })?;
1912 let (ReturnCode::Completed(_) | ReturnCode::Dropped(_)) = code else {
1913 unreachable!()
1914 };
1915
1916 Ok(HostResult {
1917 buffer,
1918 dropped: false,
1919 })
1920 }
1921
1922 ReadState::Dropped => Ok(HostResult {
1923 buffer,
1924 dropped: true,
1925 }),
1926 })
1927 }
1928
1929 async fn host_write_async<T: func::Lower + Send + 'static, B: WriteBuffer<T>>(
1931 self,
1932 accessor: impl AsAccessor,
1933 id: TableId<TransmitHandle>,
1934 buffer: B,
1935 kind: TransmitKind,
1936 ) -> Result<HostResult<B>> {
1937 match accessor.as_accessor().with(move |mut access| {
1938 self.host_write(
1939 access.as_context_mut(),
1940 id,
1941 buffer,
1942 kind,
1943 PostWrite::Continue,
1944 )
1945 })? {
1946 Ok(result) => Ok(result),
1947 Err(rx) => Ok(rx.await?),
1948 }
1949 }
1950
1951 fn host_read<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
1953 self,
1954 store: StoreContextMut<U>,
1955 id: TableId<TransmitHandle>,
1956 mut buffer: B,
1957 kind: TransmitKind,
1958 ) -> Result<Result<HostResult<B>, oneshot::Receiver<HostResult<B>>>> {
1959 let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
1960 let transmit = self
1961 .concurrent_state_mut(store.0)
1962 .get_mut(transmit_id)
1963 .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?;
1964 log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write);
1965
1966 let new_state = if let WriteState::Dropped = &transmit.write {
1967 WriteState::Dropped
1968 } else {
1969 WriteState::Open
1970 };
1971
1972 Ok(match mem::replace(&mut transmit.write, new_state) {
1973 WriteState::Open => {
1974 assert!(matches!(&transmit.read, ReadState::Open));
1975
1976 let (tx, rx) = oneshot::channel();
1977 transmit.read = ReadState::HostReady {
1978 accept: Box::new(move |writer| {
1979 let (result, code) = accept_writer::<T, B, U>(writer, buffer, kind)?;
1980 _ = tx.send(result);
1981 Ok(code)
1982 }),
1983 };
1984
1985 Err(rx)
1986 }
1987
1988 WriteState::GuestReady {
1989 ty,
1990 flat_abi: _,
1991 options,
1992 address,
1993 count,
1994 handle,
1995 post_write,
1996 ..
1997 } => {
1998 if let TransmitIndex::Future(_) = ty {
1999 transmit.done = true;
2000 }
2001
2002 let write_handle = transmit.write_handle;
2003 let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self);
2004 let (result, code) = accept_writer::<T, B, U>(
2005 Writer::Guest {
2006 ty: payload(ty, lift.types),
2007 lift,
2008 address,
2009 count,
2010 },
2011 buffer,
2012 kind,
2013 )?;
2014
2015 let state = self.concurrent_state_mut(store.0);
2016 let pending = if let PostWrite::Drop = post_write {
2017 state.get_mut(transmit_id)?.write = WriteState::Dropped;
2018 false
2019 } else {
2020 true
2021 };
2022
2023 state.set_event(
2024 write_handle.rep(),
2025 match ty {
2026 TransmitIndex::Future(ty) => Event::FutureWrite {
2027 code,
2028 pending: pending.then_some((ty, handle)),
2029 },
2030 TransmitIndex::Stream(ty) => Event::StreamWrite {
2031 code,
2032 pending: pending.then_some((ty, handle)),
2033 },
2034 },
2035 )?;
2036
2037 Ok(result)
2038 }
2039
2040 WriteState::HostReady { accept, post_write } => {
2041 accept(
2042 store.0,
2043 self,
2044 Reader::Host {
2045 accept: Box::new(|input, count| {
2046 let count = count.min(buffer.remaining_capacity());
2047 buffer.move_from(input.get_mut::<T>(), count);
2048 count
2049 }),
2050 },
2051 )?;
2052
2053 if let PostWrite::Drop = post_write {
2054 self.concurrent_state_mut(store.0)
2055 .get_mut(transmit_id)?
2056 .write = WriteState::Dropped;
2057 }
2058
2059 Ok(HostResult {
2060 buffer,
2061 dropped: false,
2062 })
2063 }
2064
2065 WriteState::Dropped => Ok(HostResult {
2066 buffer,
2067 dropped: true,
2068 }),
2069 })
2070 }
2071
2072 async fn host_read_async<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
2074 self,
2075 accessor: impl AsAccessor,
2076 id: TableId<TransmitHandle>,
2077 buffer: B,
2078 kind: TransmitKind,
2079 ) -> Result<HostResult<B>> {
2080 match accessor
2081 .as_accessor()
2082 .with(move |mut access| self.host_read(access.as_context_mut(), id, buffer, kind))?
2083 {
2084 Ok(result) => Ok(result),
2085 Err(rx) => Ok(rx.await?),
2086 }
2087 }
2088
2089 fn host_drop_reader(
2091 self,
2092 store: &mut dyn VMStore,
2093 id: TableId<TransmitHandle>,
2094 kind: TransmitKind,
2095 ) -> Result<()> {
2096 let transmit_id = self.concurrent_state_mut(store).get(id)?.state;
2097 let state = self.concurrent_state_mut(store);
2098 let transmit = state
2099 .get_mut(transmit_id)
2100 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2101 log::trace!(
2102 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2103 transmit.read,
2104 transmit.write
2105 );
2106
2107 transmit.read = ReadState::Dropped;
2108 if let Some(waker) = transmit.reader_watcher.take() {
2109 waker.wake();
2110 }
2111
2112 let new_state = if let WriteState::Dropped = &transmit.write {
2115 WriteState::Dropped
2116 } else {
2117 WriteState::Open
2118 };
2119
2120 let write_handle = transmit.write_handle;
2121
2122 match mem::replace(&mut transmit.write, new_state) {
2123 WriteState::GuestReady {
2126 ty,
2127 handle,
2128 post_write,
2129 ..
2130 } => {
2131 if let PostWrite::Drop = post_write {
2132 state.delete_transmit(transmit_id)?;
2133 } else {
2134 state.update_event(
2135 write_handle.rep(),
2136 match ty {
2137 TransmitIndex::Future(ty) => Event::FutureWrite {
2138 code: ReturnCode::Dropped(0),
2139 pending: Some((ty, handle)),
2140 },
2141 TransmitIndex::Stream(ty) => Event::StreamWrite {
2142 code: ReturnCode::Dropped(0),
2143 pending: Some((ty, handle)),
2144 },
2145 },
2146 )?;
2147 };
2148 }
2149
2150 WriteState::HostReady { accept, .. } => {
2151 accept(store, self, Reader::End)?;
2152 }
2153
2154 WriteState::Open => {
2155 state.update_event(
2156 write_handle.rep(),
2157 match kind {
2158 TransmitKind::Future => Event::FutureWrite {
2159 code: ReturnCode::Dropped(0),
2160 pending: None,
2161 },
2162 TransmitKind::Stream => Event::StreamWrite {
2163 code: ReturnCode::Dropped(0),
2164 pending: None,
2165 },
2166 },
2167 )?;
2168 }
2169
2170 WriteState::Dropped => {
2171 log::trace!("host_drop_reader delete {transmit_id:?}");
2172 state.delete_transmit(transmit_id)?;
2173 }
2174 }
2175 Ok(())
2176 }
2177
2178 fn host_drop_writer<T: func::Lower + Send + 'static, U>(
2180 self,
2181 mut store: StoreContextMut<U>,
2182 id: TableId<TransmitHandle>,
2183 default: Option<&dyn Fn() -> Result<T>>,
2184 ) -> Result<()> {
2185 let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
2186 let transmit = self
2187 .concurrent_state_mut(store.0)
2188 .get_mut(transmit_id)
2189 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2190 log::trace!(
2191 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2192 transmit.read,
2193 transmit.write
2194 );
2195
2196 if let Some(waker) = transmit.writer_watcher.take() {
2197 waker.wake();
2198 }
2199
2200 match &mut transmit.write {
2202 WriteState::GuestReady { .. } => {
2203 unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2204 }
2205 WriteState::HostReady { post_write, .. } => {
2206 *post_write = PostWrite::Drop;
2207 }
2208 v @ WriteState::Open => {
2209 if let (Some(default), false) = (
2210 default,
2211 transmit.done || matches!(transmit.read, ReadState::Dropped),
2212 ) {
2213 _ = self.host_write(
2216 store.as_context_mut(),
2217 id,
2218 Some(default()?),
2219 TransmitKind::Future,
2220 PostWrite::Drop,
2221 )?;
2222 } else {
2223 *v = WriteState::Dropped;
2224 }
2225 }
2226 WriteState::Dropped => unreachable!("write state is already dropped"),
2227 }
2228
2229 let transmit = self.concurrent_state_mut(store.0).get_mut(transmit_id)?;
2230
2231 let new_state = if let ReadState::Dropped = &transmit.read {
2237 ReadState::Dropped
2238 } else {
2239 ReadState::Open
2240 };
2241
2242 let read_handle = transmit.read_handle;
2243
2244 match mem::replace(&mut transmit.read, new_state) {
2246 ReadState::GuestReady { ty, handle, .. } => {
2250 self.concurrent_state_mut(store.0).update_event(
2252 read_handle.rep(),
2253 match ty {
2254 TransmitIndex::Future(ty) => Event::FutureRead {
2255 code: ReturnCode::Dropped(0),
2256 pending: Some((ty, handle)),
2257 },
2258 TransmitIndex::Stream(ty) => Event::StreamRead {
2259 code: ReturnCode::Dropped(0),
2260 pending: Some((ty, handle)),
2261 },
2262 },
2263 )?;
2264 }
2265
2266 ReadState::HostReady { accept } => {
2269 accept(Writer::End)?;
2270 }
2271
2272 ReadState::Open => {
2274 self.concurrent_state_mut(store.0).update_event(
2275 read_handle.rep(),
2276 match default {
2277 Some(_) => Event::FutureRead {
2278 code: ReturnCode::Dropped(0),
2279 pending: None,
2280 },
2281 None => Event::StreamRead {
2282 code: ReturnCode::Dropped(0),
2283 pending: None,
2284 },
2285 },
2286 )?;
2287 }
2288
2289 ReadState::Dropped => {
2292 log::trace!("host_drop_writer delete {transmit_id:?}");
2293 self.concurrent_state_mut(store.0)
2294 .delete_transmit(transmit_id)?;
2295 }
2296 }
2297 Ok(())
2298 }
2299
2300 pub(super) fn guest_drop_writable<T>(
2302 self,
2303 store: StoreContextMut<T>,
2304 ty: TransmitIndex,
2305 writer: u32,
2306 ) -> Result<()> {
2307 let table = self.id().get_mut(store.0).table_for_transmit(ty);
2308 let transmit_rep = match ty {
2309 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2310 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2311 };
2312
2313 let id = TableId::<TransmitHandle>::new(transmit_rep);
2314 log::trace!("guest_drop_writable: drop writer {id:?}");
2315 match ty {
2316 TransmitIndex::Stream(_) => {
2317 self.host_drop_writer(store, id, None::<&dyn Fn() -> Result<()>>)
2318 }
2319 TransmitIndex::Future(_) => self.host_drop_writer(
2320 store,
2321 id,
2322 Some(&|| {
2323 Err::<(), _>(anyhow!(
2324 "cannot drop future write end without first writing a value"
2325 ))
2326 }),
2327 ),
2328 }
2329 }
2330
2331 fn copy<T: 'static>(
2334 self,
2335 mut store: StoreContextMut<T>,
2336 flat_abi: Option<FlatAbi>,
2337 write_ty: TransmitIndex,
2338 write_options: &Options,
2339 write_address: usize,
2340 read_ty: TransmitIndex,
2341 read_options: &Options,
2342 read_address: usize,
2343 count: usize,
2344 rep: u32,
2345 ) -> Result<()> {
2346 let types = self.id().get(store.0).component().types().clone();
2347 match (write_ty, read_ty) {
2348 (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2349 assert_eq!(count, 1);
2350
2351 let val = types[types[write_ty].ty]
2352 .payload
2353 .map(|ty| {
2354 let abi = types.canonical_abi(&ty);
2355 if write_address % usize::try_from(abi.align32)? != 0 {
2357 bail!("write pointer not aligned");
2358 }
2359
2360 let lift =
2361 &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2362 let bytes = lift
2363 .memory()
2364 .get(write_address..)
2365 .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2366 .ok_or_else(|| {
2367 anyhow::anyhow!("write pointer out of bounds of memory")
2368 })?;
2369
2370 Val::load(lift, ty, bytes)
2371 })
2372 .transpose()?;
2373
2374 if let Some(val) = val {
2375 let lower =
2376 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2377 let ty = types[types[read_ty].ty].payload.unwrap();
2378 let ptr = func::validate_inbounds_dynamic(
2379 types.canonical_abi(&ty),
2380 lower.as_slice_mut(),
2381 &ValRaw::u32(read_address.try_into().unwrap()),
2382 )?;
2383 val.store(lower, ty, ptr)?;
2384 }
2385 }
2386 (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2387 if let Some(flat_abi) = flat_abi {
2388 let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2390 if length_in_bytes > 0 {
2391 if write_address % usize::try_from(flat_abi.align)? != 0 {
2392 bail!("write pointer not aligned");
2393 }
2394 if read_address % usize::try_from(flat_abi.align)? != 0 {
2395 bail!("read pointer not aligned");
2396 }
2397
2398 let store_opaque = store.0.store_opaque_mut();
2399
2400 {
2401 let src = write_options
2402 .memory(store_opaque)
2403 .get(write_address..)
2404 .and_then(|b| b.get(..length_in_bytes))
2405 .ok_or_else(|| {
2406 anyhow::anyhow!("write pointer out of bounds of memory")
2407 })?
2408 .as_ptr();
2409 let dst = read_options
2410 .memory_mut(store_opaque)
2411 .get_mut(read_address..)
2412 .and_then(|b| b.get_mut(..length_in_bytes))
2413 .ok_or_else(|| {
2414 anyhow::anyhow!("read pointer out of bounds of memory")
2415 })?
2416 .as_mut_ptr();
2417 unsafe { src.copy_to(dst, length_in_bytes) };
2420 }
2421 }
2422 } else {
2423 let store_opaque = store.0.store_opaque_mut();
2424 let lift = &mut LiftContext::new(store_opaque, write_options, self);
2425 let ty = types[types[write_ty].ty].payload.unwrap();
2426 let abi = lift.types.canonical_abi(&ty);
2427 let size = usize::try_from(abi.size32).unwrap();
2428 if write_address % usize::try_from(abi.align32)? != 0 {
2429 bail!("write pointer not aligned");
2430 }
2431 let bytes = lift
2432 .memory()
2433 .get(write_address..)
2434 .and_then(|b| b.get(..size * count))
2435 .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2436
2437 let values = (0..count)
2438 .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2439 .collect::<Result<Vec<_>>>()?;
2440
2441 let id = TableId::<TransmitHandle>::new(rep);
2442 log::trace!("copy values {values:?} for {id:?}");
2443
2444 let lower =
2445 &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2446 let ty = types[types[read_ty].ty].payload.unwrap();
2447 let abi = lower.types.canonical_abi(&ty);
2448 if read_address % usize::try_from(abi.align32)? != 0 {
2449 bail!("read pointer not aligned");
2450 }
2451 let size = usize::try_from(abi.size32).unwrap();
2452 lower
2453 .as_slice_mut()
2454 .get_mut(read_address..)
2455 .and_then(|b| b.get_mut(..size * count))
2456 .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2457 let mut ptr = read_address;
2458 for value in values {
2459 value.store(lower, ty, ptr)?;
2460 ptr += size
2461 }
2462 }
2463 }
2464 _ => unreachable!(),
2465 }
2466
2467 Ok(())
2468 }
2469
2470 pub(super) fn guest_write<T: 'static>(
2472 self,
2473 mut store: StoreContextMut<T>,
2474 ty: TransmitIndex,
2475 options: OptionsIndex,
2476 flat_abi: Option<FlatAbi>,
2477 handle: u32,
2478 address: u32,
2479 count: u32,
2480 ) -> Result<ReturnCode> {
2481 let address = usize::try_from(address).unwrap();
2482 let count = usize::try_from(count).unwrap();
2483 let options = Options::new_index(store.0, self, options);
2484 if !options.async_() {
2485 bail!("synchronous stream and future writes not yet supported");
2486 }
2487 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
2488 let TransmitLocalState::Write { done } = *state else {
2489 bail!(
2490 "invalid handle {handle}; expected `Write`; got {:?}",
2491 *state
2492 );
2493 };
2494
2495 if done {
2496 bail!("cannot write to stream after being notified that the readable end dropped");
2497 }
2498
2499 *state = TransmitLocalState::Busy;
2500 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2501 let concurrent_state = self.concurrent_state_mut(store.0);
2502 let transmit_id = concurrent_state.get(transmit_handle)?.state;
2503 let transmit = concurrent_state.get_mut(transmit_id)?;
2504 log::trace!(
2505 "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2506 transmit.read
2507 );
2508
2509 if transmit.done {
2510 bail!("cannot write to future after previous write succeeded or readable end dropped");
2511 }
2512
2513 let new_state = if let ReadState::Dropped = &transmit.read {
2514 ReadState::Dropped
2515 } else {
2516 ReadState::Open
2517 };
2518
2519 let set_guest_ready = |me: &mut ConcurrentState| {
2520 let transmit = me.get_mut(transmit_id)?;
2521 assert!(matches!(&transmit.write, WriteState::Open));
2522 transmit.write = WriteState::GuestReady {
2523 ty,
2524 flat_abi,
2525 options,
2526 address,
2527 count,
2528 handle,
2529 post_write: PostWrite::Continue,
2530 };
2531 Ok::<_, crate::Error>(())
2532 };
2533
2534 let result = match mem::replace(&mut transmit.read, new_state) {
2535 ReadState::GuestReady {
2536 ty: read_ty,
2537 flat_abi: read_flat_abi,
2538 options: read_options,
2539 address: read_address,
2540 count: read_count,
2541 handle: read_handle,
2542 } => {
2543 assert_eq!(flat_abi, read_flat_abi);
2544
2545 if let TransmitIndex::Future(_) = ty {
2546 transmit.done = true;
2547 }
2548
2549 let write_complete = count == 0 || read_count > 0;
2571 let read_complete = count > 0;
2572 let read_buffer_remaining = count < read_count;
2573
2574 let read_handle_rep = transmit.read_handle.rep();
2575
2576 let count = count.min(read_count);
2577
2578 self.copy(
2579 store.as_context_mut(),
2580 flat_abi,
2581 ty,
2582 &options,
2583 address,
2584 read_ty,
2585 &read_options,
2586 read_address,
2587 count,
2588 rep,
2589 )?;
2590
2591 let instance = self.id().get_mut(store.0);
2592 let types = instance.component().types();
2593 let item_size = payload(ty, types)
2594 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2595 .unwrap_or(0);
2596 let concurrent_state = instance.concurrent_state_mut();
2597 if read_complete {
2598 let count = u32::try_from(count).unwrap();
2599 let total = if let Some(Event::StreamRead {
2600 code: ReturnCode::Completed(old_total),
2601 ..
2602 }) = concurrent_state.take_event(read_handle_rep)?
2603 {
2604 count + old_total
2605 } else {
2606 count
2607 };
2608
2609 let code = ReturnCode::completed(ty.kind(), total);
2610
2611 concurrent_state.set_event(
2612 read_handle_rep,
2613 match read_ty {
2614 TransmitIndex::Future(ty) => Event::FutureRead {
2615 code,
2616 pending: Some((ty, read_handle)),
2617 },
2618 TransmitIndex::Stream(ty) => Event::StreamRead {
2619 code,
2620 pending: Some((ty, read_handle)),
2621 },
2622 },
2623 )?;
2624 }
2625
2626 if read_buffer_remaining {
2627 let transmit = concurrent_state.get_mut(transmit_id)?;
2628 transmit.read = ReadState::GuestReady {
2629 ty: read_ty,
2630 flat_abi: read_flat_abi,
2631 options: read_options,
2632 address: read_address + (count * item_size),
2633 count: read_count - count,
2634 handle: read_handle,
2635 };
2636 }
2637
2638 if write_complete {
2639 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2640 } else {
2641 set_guest_ready(concurrent_state)?;
2642 ReturnCode::Blocked
2643 }
2644 }
2645
2646 ReadState::HostReady { accept } => {
2647 if let TransmitIndex::Future(_) = ty {
2648 transmit.done = true;
2649 }
2650
2651 let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self);
2652 accept(Writer::Guest {
2653 ty: payload(ty, lift.types),
2654 lift,
2655 address,
2656 count,
2657 })?
2658 }
2659
2660 ReadState::Open => {
2661 set_guest_ready(concurrent_state)?;
2662 ReturnCode::Blocked
2663 }
2664
2665 ReadState::Dropped => {
2666 if let TransmitIndex::Future(_) = ty {
2667 transmit.done = true;
2668 }
2669
2670 ReturnCode::Dropped(0)
2671 }
2672 };
2673
2674 if result != ReturnCode::Blocked {
2675 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
2676 TransmitLocalState::Write {
2677 done: matches!(
2678 (result, ty),
2679 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
2680 ),
2681 };
2682 }
2683
2684 Ok(result)
2685 }
2686
2687 pub(super) fn guest_read<T: 'static>(
2689 self,
2690 mut store: StoreContextMut<T>,
2691 ty: TransmitIndex,
2692 options: OptionsIndex,
2693 flat_abi: Option<FlatAbi>,
2694 handle: u32,
2695 address: u32,
2696 count: u32,
2697 ) -> Result<ReturnCode> {
2698 let address = usize::try_from(address).unwrap();
2699 let options = Options::new_index(store.0, self, options);
2700 if !options.async_() {
2701 bail!("synchronous stream and future reads not yet supported");
2702 }
2703 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
2704 let TransmitLocalState::Read { done } = *state else {
2705 bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
2706 };
2707
2708 if done {
2709 bail!("cannot read from stream after being notified that the writable end dropped");
2710 }
2711
2712 *state = TransmitLocalState::Busy;
2713 let transmit_handle = TableId::<TransmitHandle>::new(rep);
2714 let concurrent_state = self.concurrent_state_mut(store.0);
2715 let transmit_id = concurrent_state.get(transmit_handle)?.state;
2716 let transmit = concurrent_state.get_mut(transmit_id)?;
2717 log::trace!(
2718 "guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2719 transmit.write
2720 );
2721
2722 if transmit.done {
2723 bail!("cannot read from future after previous read succeeded");
2724 }
2725
2726 let new_state = if let WriteState::Dropped = &transmit.write {
2727 WriteState::Dropped
2728 } else {
2729 WriteState::Open
2730 };
2731
2732 let set_guest_ready = |me: &mut ConcurrentState| {
2733 let transmit = me.get_mut(transmit_id)?;
2734 assert!(matches!(&transmit.read, ReadState::Open));
2735 transmit.read = ReadState::GuestReady {
2736 ty,
2737 flat_abi,
2738 options,
2739 address,
2740 count: usize::try_from(count).unwrap(),
2741 handle,
2742 };
2743 Ok::<_, crate::Error>(())
2744 };
2745
2746 let result = match mem::replace(&mut transmit.write, new_state) {
2747 WriteState::GuestReady {
2748 ty: write_ty,
2749 flat_abi: write_flat_abi,
2750 options: write_options,
2751 address: write_address,
2752 count: write_count,
2753 handle: write_handle,
2754 post_write,
2755 } => {
2756 assert_eq!(flat_abi, write_flat_abi);
2757
2758 if let TransmitIndex::Future(_) = ty {
2759 transmit.done = true;
2760 }
2761
2762 let write_handle_rep = transmit.write_handle.rep();
2763
2764 let count = usize::try_from(count).unwrap();
2769
2770 let write_complete = write_count == 0 || count > 0;
2771 let read_complete = write_count > 0;
2772 let write_buffer_remaining = count < write_count;
2773
2774 let count = count.min(write_count);
2775
2776 self.copy(
2777 store.as_context_mut(),
2778 flat_abi,
2779 write_ty,
2780 &write_options,
2781 write_address,
2782 ty,
2783 &options,
2784 address,
2785 count,
2786 rep,
2787 )?;
2788
2789 let instance = self.id().get_mut(store.0);
2790 let types = instance.component().types();
2791 let item_size = payload(ty, types)
2792 .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2793 .unwrap_or(0);
2794 let concurrent_state = instance.concurrent_state_mut();
2795 let pending = if let PostWrite::Drop = post_write {
2796 concurrent_state.get_mut(transmit_id)?.write = WriteState::Dropped;
2797 false
2798 } else {
2799 true
2800 };
2801
2802 if write_complete {
2803 let count = u32::try_from(count).unwrap();
2804 let total = if let Some(Event::StreamWrite {
2805 code: ReturnCode::Completed(old_total),
2806 ..
2807 }) = concurrent_state.take_event(write_handle_rep)?
2808 {
2809 count + old_total
2810 } else {
2811 count
2812 };
2813
2814 let code = ReturnCode::completed(ty.kind(), total);
2815
2816 concurrent_state.set_event(
2817 write_handle_rep,
2818 match write_ty {
2819 TransmitIndex::Future(ty) => Event::FutureWrite {
2820 code,
2821 pending: pending.then_some((ty, write_handle)),
2822 },
2823 TransmitIndex::Stream(ty) => Event::StreamWrite {
2824 code,
2825 pending: pending.then_some((ty, write_handle)),
2826 },
2827 },
2828 )?;
2829 }
2830
2831 if write_buffer_remaining {
2832 let transmit = concurrent_state.get_mut(transmit_id)?;
2833 transmit.write = WriteState::GuestReady {
2834 ty: write_ty,
2835 flat_abi: write_flat_abi,
2836 options: write_options,
2837 address: write_address + (count * item_size),
2838 count: write_count - count,
2839 handle: write_handle,
2840 post_write,
2841 };
2842 }
2843
2844 if read_complete {
2845 ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2846 } else {
2847 set_guest_ready(concurrent_state)?;
2848 ReturnCode::Blocked
2849 }
2850 }
2851
2852 WriteState::HostReady { accept, post_write } => {
2853 if let TransmitIndex::Future(_) = ty {
2854 transmit.done = true;
2855 }
2856
2857 let code = accept(
2858 store.0,
2859 self,
2860 Reader::Guest {
2861 options: &options,
2862 ty,
2863 address,
2864 count: count.try_into().unwrap(),
2865 },
2866 )?;
2867
2868 if let PostWrite::Drop = post_write {
2869 self.concurrent_state_mut(store.0)
2870 .get_mut(transmit_id)?
2871 .write = WriteState::Dropped;
2872 }
2873
2874 code
2875 }
2876
2877 WriteState::Open => {
2878 set_guest_ready(concurrent_state)?;
2879 ReturnCode::Blocked
2880 }
2881
2882 WriteState::Dropped => ReturnCode::Dropped(0),
2883 };
2884
2885 if result != ReturnCode::Blocked {
2886 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
2887 TransmitLocalState::Read {
2888 done: matches!(
2889 (result, ty),
2890 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
2891 ),
2892 };
2893 }
2894
2895 Ok(result)
2896 }
2897
2898 fn guest_drop_readable(
2900 self,
2901 store: &mut dyn VMStore,
2902 ty: TransmitIndex,
2903 reader: u32,
2904 ) -> Result<()> {
2905 let table = self.id().get_mut(store).table_for_transmit(ty);
2906 let (rep, _is_done) = match ty {
2907 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
2908 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
2909 };
2910 let kind = match ty {
2911 TransmitIndex::Stream(_) => TransmitKind::Stream,
2912 TransmitIndex::Future(_) => TransmitKind::Future,
2913 };
2914 let id = TableId::<TransmitHandle>::new(rep);
2915 log::trace!("guest_drop_readable: drop reader {id:?}");
2916 self.host_drop_reader(store, id, kind)
2917 }
2918
2919 pub(crate) fn error_context_new(
2921 self,
2922 store: &mut StoreOpaque,
2923 ty: TypeComponentLocalErrorContextTableIndex,
2924 options: OptionsIndex,
2925 debug_msg_address: u32,
2926 debug_msg_len: u32,
2927 ) -> Result<u32> {
2928 let options = Options::new_index(store, self, options);
2929 let lift_ctx = &mut LiftContext::new(store, &options, self);
2930 let address = usize::try_from(debug_msg_address)?;
2932 let len = usize::try_from(debug_msg_len)?;
2933 lift_ctx
2934 .memory()
2935 .get(address..)
2936 .and_then(|b| b.get(..len))
2937 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
2938 let message = WasmStr::new(address, len, lift_ctx)?;
2939
2940 let err_ctx = ErrorContextState {
2942 debug_msg: message
2943 .to_str_from_memory(options.memory(store))?
2944 .to_string(),
2945 };
2946 let state = self.concurrent_state_mut(store);
2947 let table_id = state.push(err_ctx)?;
2948 let global_ref_count_idx =
2949 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
2950
2951 let _ = state
2953 .global_error_context_ref_counts
2954 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
2955
2956 let local_idx = self
2963 .id()
2964 .get_mut(store)
2965 .table_for_error_context(ty)
2966 .error_context_insert(table_id.rep())?;
2967
2968 Ok(local_idx)
2969 }
2970
2971 pub(super) fn error_context_debug_message<T>(
2973 self,
2974 store: StoreContextMut<T>,
2975 ty: TypeComponentLocalErrorContextTableIndex,
2976 options: OptionsIndex,
2977 err_ctx_handle: u32,
2978 debug_msg_address: u32,
2979 ) -> Result<()> {
2980 let handle_table_id_rep = self
2982 .id()
2983 .get_mut(store.0)
2984 .table_for_error_context(ty)
2985 .error_context_rep(err_ctx_handle)?;
2986
2987 let state = self.concurrent_state_mut(store.0);
2988 let ErrorContextState { debug_msg } =
2990 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
2991 let debug_msg = debug_msg.clone();
2992
2993 let options = Options::new_index(store.0, self, options);
2994 let types = self.id().get(store.0).component().types().clone();
2995 let lower_cx = &mut LowerContext::new(store, &options, &types, self);
2996 let debug_msg_address = usize::try_from(debug_msg_address)?;
2997 let offset = lower_cx
2999 .as_slice_mut()
3000 .get(debug_msg_address..)
3001 .and_then(|b| b.get(..debug_msg.bytes().len()))
3002 .map(|_| debug_msg_address)
3003 .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3004 debug_msg
3005 .as_str()
3006 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3007
3008 Ok(())
3009 }
3010
3011 pub(crate) fn future_drop_readable(
3013 self,
3014 store: &mut dyn VMStore,
3015 ty: TypeFutureTableIndex,
3016 reader: u32,
3017 ) -> Result<()> {
3018 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3019 }
3020
3021 pub(crate) fn stream_drop_readable(
3023 self,
3024 store: &mut dyn VMStore,
3025 ty: TypeStreamTableIndex,
3026 reader: u32,
3027 ) -> Result<()> {
3028 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3029 }
3030}
3031
3032impl ComponentInstance {
3033 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
3034 let (tables, types) = self.guest_tables();
3035 let runtime_instance = match ty {
3036 TransmitIndex::Stream(ty) => types[ty].instance,
3037 TransmitIndex::Future(ty) => types[ty].instance,
3038 };
3039 &mut tables[runtime_instance]
3040 }
3041
3042 fn table_for_error_context(
3043 self: Pin<&mut Self>,
3044 ty: TypeComponentLocalErrorContextTableIndex,
3045 ) -> &mut HandleTable {
3046 let (tables, types) = self.guest_tables();
3047 let runtime_instance = types[ty].instance;
3048 &mut tables[runtime_instance]
3049 }
3050
3051 fn get_mut_by_index(
3052 self: Pin<&mut Self>,
3053 ty: TransmitIndex,
3054 index: u32,
3055 ) -> Result<(u32, &mut TransmitLocalState)> {
3056 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
3057 }
3058
3059 fn guest_new(mut self: Pin<&mut Self>, ty: TransmitIndex) -> Result<ResourcePair> {
3063 let (write, read) = self.as_mut().concurrent_state_mut().new_transmit()?;
3064
3065 let table = self.as_mut().table_for_transmit(ty);
3066 let (read_handle, write_handle) = match ty {
3067 TransmitIndex::Future(ty) => (
3068 table.future_insert_read(ty, read.rep())?,
3069 table.future_insert_write(ty, write.rep())?,
3070 ),
3071 TransmitIndex::Stream(ty) => (
3072 table.stream_insert_read(ty, read.rep())?,
3073 table.stream_insert_write(ty, write.rep())?,
3074 ),
3075 };
3076
3077 let state = self.as_mut().concurrent_state_mut();
3078 state.get_mut(read)?.common.handle = Some(read_handle);
3079 state.get_mut(write)?.common.handle = Some(write_handle);
3080
3081 Ok(ResourcePair {
3082 write: write_handle,
3083 read: read_handle,
3084 })
3085 }
3086
3087 fn guest_cancel_write(
3089 mut self: Pin<&mut Self>,
3090 ty: TransmitIndex,
3091 writer: u32,
3092 _async_: bool,
3093 ) -> Result<ReturnCode> {
3094 let (rep, state) = get_mut_by_index_from(self.as_mut().table_for_transmit(ty), ty, writer)?;
3095 let id = TableId::<TransmitHandle>::new(rep);
3096 log::trace!("guest cancel write {id:?} (handle {writer})");
3097 match state {
3098 TransmitLocalState::Write { .. } => {
3099 bail!("stream or future write cancelled when no write is pending")
3100 }
3101 TransmitLocalState::Read { .. } => {
3102 bail!("passed read end to `{{stream|future}}.cancel-write`")
3103 }
3104 TransmitLocalState::Busy => {
3105 *state = TransmitLocalState::Write { done: false };
3106 }
3107 }
3108 let state = self.concurrent_state_mut();
3109 let rep = state.get(id)?.state.rep();
3110 state.host_cancel_write(rep)
3111 }
3112
3113 fn guest_cancel_read(
3115 mut self: Pin<&mut Self>,
3116 ty: TransmitIndex,
3117 reader: u32,
3118 _async_: bool,
3119 ) -> Result<ReturnCode> {
3120 let (rep, state) = get_mut_by_index_from(self.as_mut().table_for_transmit(ty), ty, reader)?;
3121 let id = TableId::<TransmitHandle>::new(rep);
3122 log::trace!("guest cancel read {id:?} (handle {reader})");
3123 match state {
3124 TransmitLocalState::Read { .. } => {
3125 bail!("stream or future read cancelled when no read is pending")
3126 }
3127 TransmitLocalState::Write { .. } => {
3128 bail!("passed write end to `{{stream|future}}.cancel-read`")
3129 }
3130 TransmitLocalState::Busy => {
3131 *state = TransmitLocalState::Read { done: false };
3132 }
3133 }
3134 let state = self.concurrent_state_mut();
3135 let rep = state.get(id)?.state.rep();
3136 state.host_cancel_read(rep)
3137 }
3138
3139 pub(crate) fn error_context_drop(
3141 mut self: Pin<&mut Self>,
3142 ty: TypeComponentLocalErrorContextTableIndex,
3143 error_context: u32,
3144 ) -> Result<()> {
3145 let local_handle_table = self.as_mut().table_for_error_context(ty);
3146
3147 let rep = local_handle_table.error_context_drop(error_context)?;
3148
3149 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3150
3151 let state = self.concurrent_state_mut();
3152 let GlobalErrorContextRefCount(global_ref_count) = state
3153 .global_error_context_ref_counts
3154 .get_mut(&global_ref_count_idx)
3155 .expect("retrieve concurrent state for error context during drop");
3156
3157 assert!(*global_ref_count >= 1);
3159 *global_ref_count -= 1;
3160 if *global_ref_count == 0 {
3161 state
3162 .global_error_context_ref_counts
3163 .remove(&global_ref_count_idx);
3164
3165 state
3166 .delete(TableId::<ErrorContextState>::new(rep))
3167 .context("deleting component-global error context data")?;
3168 }
3169
3170 Ok(())
3171 }
3172
3173 fn guest_transfer(
3176 mut self: Pin<&mut Self>,
3177 src_idx: u32,
3178 src: TransmitIndex,
3179 dst: TransmitIndex,
3180 ) -> Result<u32> {
3181 let src_table = self.as_mut().table_for_transmit(src);
3182 let (rep, is_done) = match src {
3183 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
3184 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
3185 };
3186 if is_done {
3187 bail!("cannot lift after being notified that the writable end dropped");
3188 }
3189 let dst_table = self.as_mut().table_for_transmit(dst);
3190 let handle = match dst {
3191 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
3192 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
3193 }?;
3194 self.concurrent_state_mut()
3195 .get_mut(TableId::<TransmitHandle>::new(rep))?
3196 .common
3197 .handle = Some(handle);
3198 Ok(handle)
3199 }
3200
3201 pub(crate) fn future_new(
3203 self: Pin<&mut Self>,
3204 ty: TypeFutureTableIndex,
3205 ) -> Result<ResourcePair> {
3206 self.guest_new(TransmitIndex::Future(ty))
3207 }
3208
3209 pub(crate) fn future_cancel_write(
3211 self: Pin<&mut Self>,
3212 ty: TypeFutureTableIndex,
3213 async_: bool,
3214 writer: u32,
3215 ) -> Result<u32> {
3216 self.guest_cancel_write(TransmitIndex::Future(ty), writer, async_)
3217 .map(|result| result.encode())
3218 }
3219
3220 pub(crate) fn future_cancel_read(
3222 self: Pin<&mut Self>,
3223 ty: TypeFutureTableIndex,
3224 async_: bool,
3225 reader: u32,
3226 ) -> Result<u32> {
3227 self.guest_cancel_read(TransmitIndex::Future(ty), reader, async_)
3228 .map(|result| result.encode())
3229 }
3230
3231 pub(crate) fn stream_new(
3233 self: Pin<&mut Self>,
3234 ty: TypeStreamTableIndex,
3235 ) -> Result<ResourcePair> {
3236 self.guest_new(TransmitIndex::Stream(ty))
3237 }
3238
3239 pub(crate) fn stream_cancel_write(
3241 self: Pin<&mut Self>,
3242 ty: TypeStreamTableIndex,
3243 async_: bool,
3244 writer: u32,
3245 ) -> Result<u32> {
3246 self.guest_cancel_write(TransmitIndex::Stream(ty), writer, async_)
3247 .map(|result| result.encode())
3248 }
3249
3250 pub(crate) fn stream_cancel_read(
3252 self: Pin<&mut Self>,
3253 ty: TypeStreamTableIndex,
3254 async_: bool,
3255 reader: u32,
3256 ) -> Result<u32> {
3257 self.guest_cancel_read(TransmitIndex::Stream(ty), reader, async_)
3258 .map(|result| result.encode())
3259 }
3260
3261 pub(crate) fn future_transfer(
3264 self: Pin<&mut Self>,
3265 src_idx: u32,
3266 src: TypeFutureTableIndex,
3267 dst: TypeFutureTableIndex,
3268 ) -> Result<u32> {
3269 self.guest_transfer(
3270 src_idx,
3271 TransmitIndex::Future(src),
3272 TransmitIndex::Future(dst),
3273 )
3274 }
3275
3276 pub(crate) fn stream_transfer(
3279 self: Pin<&mut Self>,
3280 src_idx: u32,
3281 src: TypeStreamTableIndex,
3282 dst: TypeStreamTableIndex,
3283 ) -> Result<u32> {
3284 self.guest_transfer(
3285 src_idx,
3286 TransmitIndex::Stream(src),
3287 TransmitIndex::Stream(dst),
3288 )
3289 }
3290
3291 pub(crate) fn error_context_transfer(
3293 mut self: Pin<&mut Self>,
3294 src_idx: u32,
3295 src: TypeComponentLocalErrorContextTableIndex,
3296 dst: TypeComponentLocalErrorContextTableIndex,
3297 ) -> Result<u32> {
3298 let rep = self
3299 .as_mut()
3300 .table_for_error_context(src)
3301 .error_context_rep(src_idx)?;
3302 let dst_idx = self
3303 .as_mut()
3304 .table_for_error_context(dst)
3305 .error_context_insert(rep)?;
3306
3307 let global_ref_count = self
3311 .concurrent_state_mut()
3312 .global_error_context_ref_counts
3313 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
3314 .context("global ref count present for existing (sub)component error context")?;
3315 global_ref_count.0 += 1;
3316
3317 Ok(dst_idx)
3318 }
3319}
3320
3321impl ConcurrentState {
3322 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
3323 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
3324 }
3325
3326 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3327 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
3328 }
3329
3330 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3341 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
3342
3343 fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
3344 let (ReturnCode::Completed(count)
3345 | ReturnCode::Dropped(count)
3346 | ReturnCode::Cancelled(count)) = old
3347 else {
3348 unreachable!()
3349 };
3350
3351 match new {
3352 ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
3353 ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
3354 _ => unreachable!(),
3355 }
3356 }
3357
3358 let event = match (waitable.take_event(self)?, event) {
3359 (None, _) => event,
3360 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
3361 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
3362 (
3363 Some(Event::StreamWrite {
3364 code: old_code,
3365 pending: old_pending,
3366 }),
3367 Event::StreamWrite { code, pending },
3368 ) => Event::StreamWrite {
3369 code: update_code(old_code, code),
3370 pending: old_pending.or(pending),
3371 },
3372 (
3373 Some(Event::StreamRead {
3374 code: old_code,
3375 pending: old_pending,
3376 }),
3377 Event::StreamRead { code, pending },
3378 ) => Event::StreamRead {
3379 code: update_code(old_code, code),
3380 pending: old_pending.or(pending),
3381 },
3382 _ => unreachable!(),
3383 };
3384
3385 waitable.set_event(self, Some(event))
3386 }
3387
3388 fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
3391 let state_id = self.push(TransmitState::default())?;
3392
3393 let write = self.push(TransmitHandle::new(state_id))?;
3394 let read = self.push(TransmitHandle::new(state_id))?;
3395
3396 let state = self.get_mut(state_id)?;
3397 state.write_handle = write;
3398 state.read_handle = read;
3399
3400 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
3401
3402 Ok((write, read))
3403 }
3404
3405 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
3407 let state = self.delete(state_id)?;
3408 self.delete(state.write_handle)?;
3409 self.delete(state.read_handle)?;
3410
3411 log::trace!(
3412 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
3413 state.write_handle,
3414 state.read_handle,
3415 );
3416
3417 Ok(())
3418 }
3419
3420 fn host_cancel_write(&mut self, rep: u32) -> Result<ReturnCode> {
3426 let transmit_id = TableId::<TransmitState>::new(rep);
3427 let transmit = self.get_mut(transmit_id)?;
3428 log::trace!(
3429 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3430 transmit.read,
3431 transmit.write
3432 );
3433
3434 let code = if let Some(event) =
3435 Waitable::Transmit(transmit.write_handle).take_event(self)?
3436 {
3437 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3438 unreachable!();
3439 };
3440 match (code, event) {
3441 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3442 ReturnCode::Cancelled(count)
3443 }
3444 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3445 _ => unreachable!(),
3446 }
3447 } else {
3448 ReturnCode::Cancelled(0)
3449 };
3450
3451 let transmit = self.get_mut(transmit_id)?;
3452
3453 match &transmit.write {
3454 WriteState::GuestReady { .. } | WriteState::HostReady { .. } => {
3455 transmit.write = WriteState::Open;
3456 }
3457
3458 WriteState::Open | WriteState::Dropped => {}
3459 }
3460
3461 log::trace!("cancelled write {transmit_id:?}");
3462
3463 Ok(code)
3464 }
3465
3466 fn host_cancel_read(&mut self, rep: u32) -> Result<ReturnCode> {
3472 let transmit_id = TableId::<TransmitState>::new(rep);
3473 let transmit = self.get_mut(transmit_id)?;
3474 log::trace!(
3475 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3476 transmit.read,
3477 transmit.write
3478 );
3479
3480 let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
3481 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3482 unreachable!();
3483 };
3484 match (code, event) {
3485 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3486 ReturnCode::Cancelled(count)
3487 }
3488 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3489 _ => unreachable!(),
3490 }
3491 } else {
3492 ReturnCode::Cancelled(0)
3493 };
3494
3495 let transmit = self.get_mut(transmit_id)?;
3496
3497 match &transmit.read {
3498 ReadState::GuestReady { .. } | ReadState::HostReady { .. } => {
3499 transmit.read = ReadState::Open;
3500 }
3501
3502 ReadState::Open | ReadState::Dropped => {}
3503 }
3504
3505 log::trace!("cancelled read {transmit_id:?}");
3506
3507 Ok(code)
3508 }
3509}
3510
3511pub(crate) struct ResourcePair {
3512 pub(crate) write: u32,
3513 pub(crate) read: u32,
3514}