1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::prelude::*;
13use crate::store::{StoreOpaque, StoreToken};
14use crate::try_mutex::{TryMutex, TryMutexGuard};
15use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
16use crate::vm::{AlwaysMut, VMStore};
17use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
18use crate::{
19 Error, Result, Trap, bail, bail_bug, ensure,
20 error::{Context as _, format_err},
21};
22use alloc::sync::Arc;
23use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
24use core::any::{Any, TypeId};
25use core::fmt;
26use core::future;
27use core::iter;
28use core::marker::PhantomData;
29use core::mem::{self, ManuallyDrop, MaybeUninit};
30use core::ops::{Deref, DerefMut};
31use core::pin::Pin;
32use core::task::{Context, Poll, Waker, ready};
33use futures::channel::oneshot;
34use futures::{FutureExt as _, stream};
35use wasmtime_environ::component::{
36 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
37 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
38 TypeFutureTableIndex, TypeStreamTableIndex,
39};
40
41pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
42
43mod buffers;
44
45#[derive(Copy, Clone, Debug)]
48pub enum TransmitKind {
49 Stream,
50 Future,
51}
52
53#[derive(Copy, Clone, Debug, PartialEq)]
55pub enum ReturnCode {
56 Blocked,
57 Completed(ItemCount),
58 Dropped(ItemCount),
59 Cancelled(ItemCount),
60}
61
62impl ReturnCode {
63 pub fn encode(&self) -> u32 {
68 const BLOCKED: u32 = 0xffff_ffff;
69 const COMPLETED: u32 = 0x0;
70 const DROPPED: u32 = 0x1;
71 const CANCELLED: u32 = 0x2;
72 match self {
73 ReturnCode::Blocked => BLOCKED,
74 ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED,
75 ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED,
76 ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED,
77 }
78 }
79
80 fn completed(kind: TransmitKind, count: ItemCount) -> Self {
83 Self::Completed(if let TransmitKind::Future = kind {
84 ItemCount::ZERO
85 } else {
86 count
87 })
88 }
89}
90
91#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
98#[repr(transparent)]
99pub struct ItemCount {
100 raw: u32,
101}
102
103impl ItemCount {
104 const MAX: u32 = 1 << 28;
105 const ZERO: ItemCount = ItemCount { raw: 0 };
106
107 fn new(count: u32) -> Result<Self, Trap> {
110 if count < Self::MAX {
111 Ok(Self { raw: count })
112 } else {
113 Err(Trap::StreamOpTooBig)
114 }
115 }
116
117 fn new_usize(count: usize) -> Result<Self, Trap> {
119 let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?;
120 Self::new(count)
121 }
122
123 fn as_u32(&self) -> u32 {
124 self.raw
125 }
126
127 fn as_usize(&self) -> usize {
128 usize::try_from(self.raw).unwrap()
129 }
130
131 fn inc(&mut self, amt: usize) -> Result<(), Trap> {
134 let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?;
135 let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?;
136 if new_raw < Self::MAX {
137 self.raw = new_raw;
138 Ok(())
139 } else {
140 Err(Trap::StreamOpTooBig)
141 }
142 }
143
144 fn add(&self, other: ItemCount) -> Result<ItemCount> {
150 match self.raw.checked_add(other.raw) {
151 Some(raw) => Ok(ItemCount::new(raw)?),
152 None => bail_bug!("overflow in `ItemCount::add`"),
153 }
154 }
155
156 fn sub(&self, other: ItemCount) -> Result<ItemCount> {
161 match self.raw.checked_sub(other.raw) {
162 Some(raw) => Ok(ItemCount { raw }),
163 None => bail_bug!("underflow in `ItemCount::sub`"),
164 }
165 }
166}
167
168impl fmt::Display for ItemCount {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 self.raw.fmt(f)
171 }
172}
173
174impl fmt::Debug for ItemCount {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 self.raw.fmt(f)
177 }
178}
179
180impl PartialEq<u32> for ItemCount {
181 fn eq(&self, other: &u32) -> bool {
182 self.raw == *other
183 }
184}
185
186impl PartialOrd<u32> for ItemCount {
187 fn partial_cmp(&self, other: &u32) -> Option<core::cmp::Ordering> {
188 self.raw.partial_cmp(other)
189 }
190}
191
192#[derive(Copy, Clone, Debug)]
197pub enum TransmitIndex {
198 Stream(TypeStreamTableIndex),
199 Future(TypeFutureTableIndex),
200}
201
202impl TransmitIndex {
203 pub fn kind(&self) -> TransmitKind {
204 match self {
205 TransmitIndex::Stream(_) => TransmitKind::Stream,
206 TransmitIndex::Future(_) => TransmitKind::Future,
207 }
208 }
209
210 fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
213 match self {
214 TransmitIndex::Stream(i) => {
215 let ty = types[*i].ty;
216 types[ty].payload.as_ref()
217 }
218 TransmitIndex::Future(i) => {
219 let ty = types[*i].ty;
220 types[ty].payload.as_ref()
221 }
222 }
223 }
224}
225
226fn get_mut_by_index_from(
229 handle_table: &mut HandleTable,
230 ty: TransmitIndex,
231 index: u32,
232) -> Result<(u32, &mut TransmitLocalState)> {
233 match ty {
234 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
235 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
236 }
237}
238
239fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
240 mut store: StoreContextMut<U>,
241 instance: Instance,
242 caller_thread: QualifiedThreadId,
243 options: OptionsIndex,
244 ty: TransmitIndex,
245 address: usize,
246 count: usize,
247 buffer: &mut B,
248) -> Result<()> {
249 let count = buffer.remaining().len().min(count);
250
251 let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
255 let old_thread = store.0.set_thread(caller_thread)?;
256 (
257 &mut LowerContext::new(store.as_context_mut(), options, instance),
258 Some(old_thread),
259 )
260 } else {
261 (
262 &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
263 None,
264 )
265 };
266
267 if address % usize::try_from(T::ALIGN32)? != 0 {
268 bail!("read pointer not aligned");
269 }
270 lower
271 .as_slice_mut()
272 .get_mut(address..)
273 .and_then(|b| b.get_mut(..T::SIZE32 * count))
274 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
275
276 if let Some(ty) = ty.payload(lower.types) {
277 T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
278 }
279
280 if let Some(old_thread) = old_thread {
281 store.0.set_thread(old_thread)?;
282 }
283
284 buffer.skip(count);
285
286 Ok(())
287}
288
289fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
290 lift: &mut LiftContext<'_>,
291 ty: Option<InterfaceType>,
292 buffer: &mut B,
293 address: usize,
294 count: usize,
295) -> Result<()> {
296 let count = count.min(buffer.remaining_capacity());
297 if T::IS_RUST_UNIT_TYPE {
298 buffer.extend(
302 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
303 )
304 } else {
305 let ty = match ty {
306 Some(ty) => ty,
307 None => bail_bug!("type required for non-unit lift"),
308 };
309 if address % usize::try_from(T::ALIGN32)? != 0 {
310 bail!("write pointer not aligned");
311 }
312 lift.memory()
313 .get(address..)
314 .and_then(|b| b.get(..T::SIZE32 * count))
315 .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
316
317 let list = &WasmList::new(address, count, lift, ty)?;
318 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
319 }
320 Ok(())
321}
322
323#[derive(Debug, PartialEq, Eq, PartialOrd)]
325pub(super) struct ErrorContextState {
326 pub(crate) debug_msg: String,
328}
329
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub(super) struct FlatAbi {
334 pub(super) size: u32,
335 pub(super) align: u32,
336}
337
338struct HostBuffer<'a> {
339 dst: &'a mut Vec<u8>,
340 marked_written: &'a mut usize,
341}
342
343impl HostBuffer<'_> {
344 fn reborrow(&mut self) -> HostBuffer<'_> {
345 HostBuffer {
346 dst: &mut *self.dst,
347 marked_written: &mut *self.marked_written,
348 }
349 }
350}
351
352pub struct Destination<'a, T, B> {
354 id: TableId<TransmitState>,
355 buffer: &'a mut B,
356 host_buffer: Option<HostBuffer<'a>>,
357 _phantom: PhantomData<fn() -> T>,
358}
359
360impl<'a, T, B> Destination<'a, T, B> {
361 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
363 Destination {
364 id: self.id,
365 buffer: &mut *self.buffer,
366 host_buffer: self.host_buffer.as_mut().map(|b| b.reborrow()),
367 _phantom: PhantomData,
368 }
369 }
370
371 pub fn take_buffer(&mut self) -> B
377 where
378 B: Default,
379 {
380 mem::take(self.buffer)
381 }
382
383 pub fn set_buffer(&mut self, buffer: B) {
393 *self.buffer = buffer;
394 }
395
396 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
413 self.remaining_(store.as_context_mut().0).unwrap()
417 }
418
419 fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
420 let transmit = store.concurrent_state_mut()?.get_mut(self.id)?;
421
422 if let &ReadState::GuestReady { count, .. } = &transmit.read {
423 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
424 bail_bug!("expected WriteState::HostReady")
425 };
426
427 Ok(Some(count.as_usize() - guest_offset.as_usize()))
428 } else {
429 Ok(None)
430 }
431 }
432}
433
434impl<'a, B> Destination<'a, u8, B> {
435 pub fn as_direct<D>(
446 mut self,
447 store: StoreContextMut<'a, D>,
448 capacity: usize,
449 ) -> DirectDestination<'a, D> {
450 if let Some(buffer) = &mut self.host_buffer {
451 *buffer.marked_written = 0;
452 buffer.dst.resize(capacity, 0);
453 }
454
455 DirectDestination {
456 id: self.id,
457 host_buffer: self.host_buffer,
458 store,
459 }
460 }
461}
462
463pub struct DirectDestination<'a, D: 'static> {
466 id: TableId<TransmitState>,
467 host_buffer: Option<HostBuffer<'a>>,
468 store: StoreContextMut<'a, D>,
469}
470
471#[cfg(feature = "std")]
472impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
473 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
474 let rem = self.remaining();
475 let n = rem.len().min(buf.len());
476 rem[..n].copy_from_slice(&buf[..n]);
477 self.mark_written(n);
478 Ok(n)
479 }
480
481 fn flush(&mut self) -> std::io::Result<()> {
482 Ok(())
483 }
484}
485
486impl<D: 'static> DirectDestination<'_, D> {
487 pub fn remaining(&mut self) -> &mut [u8] {
489 self.remaining_().unwrap()
493 }
494
495 fn remaining_(&mut self) -> Result<&mut [u8]> {
496 if let Some(buffer) = self.host_buffer.as_mut() {
497 return Ok(buffer.dst);
498 }
499 let transmit = self
500 .store
501 .as_context_mut()
502 .0
503 .concurrent_state_mut()?
504 .get_mut(self.id)?;
505
506 let &ReadState::GuestReady {
507 address,
508 count,
509 options,
510 instance,
511 ..
512 } = &transmit.read
513 else {
514 bail_bug!("expected ReadState::GuestReady")
515 };
516
517 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
518 bail_bug!("expected WriteState::HostReady")
519 };
520
521 let memory = instance
522 .options_memory_mut(self.store.0, options)
523 .get_mut((address + guest_offset.as_usize())..)
524 .and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize())));
525 match memory {
526 Some(memory) => Ok(memory),
527 None => bail_bug!("guest buffer unexpectedly out of bounds"),
528 }
529 }
530
531 pub fn mark_written(&mut self, count: usize) {
538 self.mark_written_(count).unwrap()
542 }
543
544 fn mark_written_(&mut self, count: usize) -> Result<()> {
545 if let Some(buffer) = self.host_buffer.as_mut() {
546 *buffer.marked_written = buffer.marked_written.checked_add(count).unwrap();
549 } else {
550 let transmit = self
551 .store
552 .as_context_mut()
553 .0
554 .concurrent_state_mut()?
555 .get_mut(self.id)?;
556
557 let ReadState::GuestReady {
558 count: read_count, ..
559 } = &transmit.read
560 else {
561 bail_bug!("expected ReadState::GuestReady")
562 };
563
564 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
565 bail_bug!("expected WriteState::HostReady");
566 };
567
568 if guest_offset.as_usize() + count > read_count.as_usize() {
569 panic!(
572 "write count ({count}) must be less than or equal to read count ({read_count})"
573 )
574 } else {
575 guest_offset.inc(count)?;
576 }
577 }
578 Ok(())
579 }
580}
581
582#[derive(Copy, Clone, Debug)]
584pub enum StreamResult {
585 Completed,
588 Cancelled,
593 Dropped,
596}
597
598pub trait StreamProducer<D>: Send + 'static {
600 type Item;
602
603 type Buffer: WriteBuffer<Self::Item> + Default;
605
606 fn poll_produce<'a>(
742 self: Pin<&mut Self>,
743 cx: &mut Context<'_>,
744 store: StoreContextMut<'a, D>,
745 destination: Destination<'a, Self::Item, Self::Buffer>,
746 finish: bool,
747 ) -> Poll<Result<StreamResult>>;
748
749 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
755 Err(me)
756 }
757}
758
759impl<T, D> StreamProducer<D> for iter::Empty<T>
760where
761 T: Send + Sync + 'static,
762{
763 type Item = T;
764 type Buffer = Option<Self::Item>;
765
766 fn poll_produce<'a>(
767 self: Pin<&mut Self>,
768 _: &mut Context<'_>,
769 _: StoreContextMut<'a, D>,
770 _: Destination<'a, Self::Item, Self::Buffer>,
771 _: bool,
772 ) -> Poll<Result<StreamResult>> {
773 Poll::Ready(Ok(StreamResult::Dropped))
774 }
775}
776
777impl<T, D> StreamProducer<D> for stream::Empty<T>
778where
779 T: Send + Sync + 'static,
780{
781 type Item = T;
782 type Buffer = Option<Self::Item>;
783
784 fn poll_produce<'a>(
785 self: Pin<&mut Self>,
786 _: &mut Context<'_>,
787 _: StoreContextMut<'a, D>,
788 _: Destination<'a, Self::Item, Self::Buffer>,
789 _: bool,
790 ) -> Poll<Result<StreamResult>> {
791 Poll::Ready(Ok(StreamResult::Dropped))
792 }
793}
794
795impl<T, D> StreamProducer<D> for Vec<T>
796where
797 T: Unpin + Send + Sync + 'static,
798{
799 type Item = T;
800 type Buffer = VecBuffer<T>;
801
802 fn poll_produce<'a>(
803 self: Pin<&mut Self>,
804 _: &mut Context<'_>,
805 _: StoreContextMut<'a, D>,
806 mut dst: Destination<'a, Self::Item, Self::Buffer>,
807 _: bool,
808 ) -> Poll<Result<StreamResult>> {
809 dst.set_buffer(mem::take(self.get_mut()).into());
810 Poll::Ready(Ok(StreamResult::Dropped))
811 }
812}
813
814impl<T, D> StreamProducer<D> for Box<[T]>
815where
816 T: Unpin + Send + Sync + 'static,
817{
818 type Item = T;
819 type Buffer = VecBuffer<T>;
820
821 fn poll_produce<'a>(
822 self: Pin<&mut Self>,
823 _: &mut Context<'_>,
824 _: StoreContextMut<'a, D>,
825 mut dst: Destination<'a, Self::Item, Self::Buffer>,
826 _: bool,
827 ) -> Poll<Result<StreamResult>> {
828 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
829 Poll::Ready(Ok(StreamResult::Dropped))
830 }
831}
832
833#[cfg(feature = "component-model-bytes")]
834impl<D> StreamProducer<D> for bytes::Bytes {
835 type Item = u8;
836 type Buffer = Self;
837
838 fn poll_produce<'a>(
839 self: Pin<&mut Self>,
840 _: &mut Context<'_>,
841 _store: StoreContextMut<'a, D>,
842 mut dst: Destination<'a, Self::Item, Self::Buffer>,
843 _: bool,
844 ) -> Poll<Result<StreamResult>> {
845 dst.set_buffer(mem::take(self.get_mut()));
846 Poll::Ready(Ok(StreamResult::Dropped))
847 }
848}
849
850#[cfg(feature = "component-model-bytes")]
851impl<D> StreamProducer<D> for bytes::BytesMut {
852 type Item = u8;
853 type Buffer = Self;
854
855 fn poll_produce<'a>(
856 self: Pin<&mut Self>,
857 _: &mut Context<'_>,
858 _store: StoreContextMut<'a, D>,
859 mut dst: Destination<'a, Self::Item, Self::Buffer>,
860 _: bool,
861 ) -> Poll<Result<StreamResult>> {
862 dst.set_buffer(mem::take(self.get_mut()));
863 Poll::Ready(Ok(StreamResult::Dropped))
864 }
865}
866
867pub struct Source<'a, T> {
869 id: TableId<TransmitState>,
870 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
871}
872
873impl<'a, T> Source<'a, T> {
874 pub fn reborrow(&mut self) -> Source<'_, T> {
876 Source {
877 id: self.id,
878 host_buffer: self.host_buffer.as_deref_mut(),
879 }
880 }
881
882 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
884 where
885 T: func::Lift + 'static,
886 B: ReadBuffer<T>,
887 {
888 if let Some(input) = &mut self.host_buffer {
889 let count = input.remaining().len().min(buffer.remaining_capacity());
890 buffer.move_from(*input, count);
891 } else {
892 let store = store.as_context_mut();
893 let transmit = store.0.concurrent_state_mut()?.get_mut(self.id)?;
894
895 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
896 bail_bug!("expected ReadState::HostReady");
897 };
898
899 let &WriteState::GuestReady {
900 ty,
901 address,
902 count,
903 options,
904 instance,
905 ..
906 } = &transmit.write
907 else {
908 bail_bug!("expected WriteState::GuestReady");
909 };
910
911 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
912 let ty = ty.payload(cx.types);
913 let old_remaining = buffer.remaining_capacity();
914 lift::<T, B>(
915 cx,
916 ty.copied(),
917 buffer,
918 address + (T::SIZE32 * guest_offset.as_usize()),
919 count.as_usize() - guest_offset.as_usize(),
920 )?;
921
922 let transmit = store.0.concurrent_state_mut()?.get_mut(self.id)?;
923
924 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
925 bail_bug!("expected ReadState::HostReady");
926 };
927
928 guest_offset.inc(old_remaining - buffer.remaining_capacity())?;
929 }
930
931 Ok(())
932 }
933
934 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
937 where
938 T: 'static,
939 {
940 self.remaining_(store.as_context_mut().0).unwrap()
944 }
945
946 fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
947 where
948 T: 'static,
949 {
950 let transmit = store.concurrent_state_mut()?.get_mut(self.id)?;
951
952 if let &WriteState::GuestReady { count, .. } = &transmit.write {
953 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
954 bail_bug!("expected ReadState::HostReady")
955 };
956
957 Ok(count.as_usize() - guest_offset.as_usize())
958 } else if let Some(host_buffer) = &self.host_buffer {
959 Ok(host_buffer.remaining().len())
960 } else {
961 bail_bug!("expected either WriteState::GuestReady or host buffer")
962 }
963 }
964}
965
966impl<'a> Source<'a, u8> {
967 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
969 DirectSource {
970 id: self.id,
971 host_buffer: self.host_buffer,
972 store,
973 }
974 }
975}
976
977pub struct DirectSource<'a, D: 'static> {
980 id: TableId<TransmitState>,
981 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
982 store: StoreContextMut<'a, D>,
983}
984
985#[cfg(feature = "std")]
986impl<D: 'static> std::io::Read for DirectSource<'_, D> {
987 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
988 let rem = self.remaining();
989 let n = rem.len().min(buf.len());
990 buf[..n].copy_from_slice(&rem[..n]);
991 self.mark_read(n);
992 Ok(n)
993 }
994}
995
996impl<D: 'static> DirectSource<'_, D> {
997 pub fn remaining(&mut self) -> &[u8] {
999 self.remaining_().unwrap()
1003 }
1004
1005 fn remaining_(&mut self) -> Result<&[u8]> {
1006 if let Some(buffer) = self.host_buffer.as_deref_mut() {
1007 return Ok(buffer.remaining());
1008 }
1009 let transmit = self
1010 .store
1011 .as_context_mut()
1012 .0
1013 .concurrent_state_mut()?
1014 .get_mut(self.id)?;
1015
1016 let &WriteState::GuestReady {
1017 address,
1018 count,
1019 options,
1020 instance,
1021 ..
1022 } = &transmit.write
1023 else {
1024 bail_bug!("expected WriteState::GuestReady")
1025 };
1026
1027 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
1028 bail_bug!("expected ReadState::HostReady")
1029 };
1030
1031 let memory = instance
1032 .options_memory(self.store.0, options)
1033 .get((address + guest_offset.as_usize())..)
1034 .and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize())));
1035 match memory {
1036 Some(memory) => Ok(memory),
1037 None => bail_bug!("guest buffer unexpectedly out of bounds"),
1038 }
1039 }
1040
1041 pub fn mark_read(&mut self, count: usize) {
1048 self.mark_read_(count).unwrap()
1052 }
1053
1054 fn mark_read_(&mut self, count: usize) -> Result<()> {
1055 if let Some(buffer) = self.host_buffer.as_deref_mut() {
1056 buffer.skip(count);
1057 return Ok(());
1058 }
1059
1060 let transmit = self
1061 .store
1062 .as_context_mut()
1063 .0
1064 .concurrent_state_mut()?
1065 .get_mut(self.id)?;
1066
1067 let WriteState::GuestReady {
1068 count: write_count, ..
1069 } = &transmit.write
1070 else {
1071 bail_bug!("expected WriteState::GuestReady");
1072 };
1073
1074 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
1075 bail_bug!("expected ReadState::HostReady");
1076 };
1077
1078 if guest_offset.as_usize() + count > write_count.as_usize() {
1079 panic!("read count ({count}) must be less than or equal to write count ({write_count})")
1081 } else {
1082 guest_offset.inc(count)?;
1083 }
1084 Ok(())
1085 }
1086}
1087
1088pub trait StreamConsumer<D>: Send + 'static {
1090 type Item;
1092
1093 fn poll_consume(
1176 self: Pin<&mut Self>,
1177 cx: &mut Context<'_>,
1178 store: StoreContextMut<D>,
1179 source: Source<'_, Self::Item>,
1180 finish: bool,
1181 ) -> Poll<Result<StreamResult>>;
1182}
1183
1184pub trait FutureProducer<D>: Send + 'static {
1186 type Item;
1188
1189 fn poll_produce(
1199 self: Pin<&mut Self>,
1200 cx: &mut Context<'_>,
1201 store: StoreContextMut<D>,
1202 finish: bool,
1203 ) -> Poll<Result<Option<Self::Item>>>;
1204}
1205
1206impl<T, E, D, Fut> FutureProducer<D> for Fut
1207where
1208 E: Into<Error>,
1209 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1210{
1211 type Item = T;
1212
1213 fn poll_produce<'a>(
1214 self: Pin<&mut Self>,
1215 cx: &mut Context<'_>,
1216 _: StoreContextMut<'a, D>,
1217 finish: bool,
1218 ) -> Poll<Result<Option<T>>> {
1219 match self.poll(cx) {
1220 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1221 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1222 Poll::Pending if finish => Poll::Ready(Ok(None)),
1223 Poll::Pending => Poll::Pending,
1224 }
1225 }
1226}
1227
1228pub trait FutureConsumer<D>: Send + 'static {
1230 type Item;
1232
1233 fn poll_consume(
1245 self: Pin<&mut Self>,
1246 cx: &mut Context<'_>,
1247 store: StoreContextMut<D>,
1248 source: Source<'_, Self::Item>,
1249 finish: bool,
1250 ) -> Poll<Result<()>>;
1251}
1252
1253pub struct FutureReader<T> {
1260 id: TableId<TransmitHandle>,
1261 _phantom: PhantomData<T>,
1262}
1263
1264impl<T> FutureReader<T> {
1265 pub fn new<S: AsContextMut>(
1274 mut store: S,
1275 producer: impl FutureProducer<S::Data, Item = T>,
1276 ) -> Result<Self>
1277 where
1278 T: func::Lower + func::Lift + Send + Sync + 'static,
1279 {
1280 ensure!(
1281 store.as_context().0.concurrency_support(),
1282 "concurrency support is not enabled"
1283 );
1284
1285 struct Producer<P>(P);
1286
1287 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1288 for Producer<P>
1289 {
1290 type Item = P::Item;
1291 type Buffer = Option<P::Item>;
1292
1293 fn poll_produce<'a>(
1294 self: Pin<&mut Self>,
1295 cx: &mut Context<'_>,
1296 store: StoreContextMut<D>,
1297 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1298 finish: bool,
1299 ) -> Poll<Result<StreamResult>> {
1300 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1303
1304 Poll::Ready(Ok(
1305 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1306 destination.set_buffer(Some(value));
1307
1308 StreamResult::Completed
1315 } else {
1316 StreamResult::Cancelled
1317 },
1318 ))
1319 }
1320 }
1321
1322 Ok(Self::new_(
1323 store
1324 .as_context_mut()
1325 .new_transmit(TransmitKind::Future, Producer(producer))?,
1326 ))
1327 }
1328
1329 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1330 Self {
1331 id,
1332 _phantom: PhantomData,
1333 }
1334 }
1335
1336 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1337 self.id
1338 }
1339
1340 pub fn pipe<S: AsContextMut>(
1350 self,
1351 mut store: S,
1352 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1353 ) -> Result<()>
1354 where
1355 T: func::Lift + 'static,
1356 {
1357 struct Consumer<C>(C);
1358
1359 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1360 for Consumer<C>
1361 {
1362 type Item = T;
1363
1364 fn poll_consume(
1365 self: Pin<&mut Self>,
1366 cx: &mut Context<'_>,
1367 mut store: StoreContextMut<D>,
1368 mut source: Source<Self::Item>,
1369 finish: bool,
1370 ) -> Poll<Result<StreamResult>> {
1371 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1374
1375 ready!(consumer.poll_consume(
1376 cx,
1377 store.as_context_mut(),
1378 source.reborrow(),
1379 finish
1380 ))?;
1381
1382 Poll::Ready(Ok(if source.remaining(store) == 0 {
1383 StreamResult::Completed
1389 } else {
1390 StreamResult::Cancelled
1391 }))
1392 }
1393 }
1394
1395 store
1396 .as_context_mut()
1397 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1398 }
1399
1400 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1402 let id = lift_index_to_future(cx, ty, index)?;
1403 Ok(Self::new_(id))
1404 }
1405
1406 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1424 future_close(store.as_context_mut().0, &mut self.id)
1425 }
1426
1427 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1429 accessor.as_accessor().with(|access| self.close(access))
1430 }
1431
1432 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1438 where
1439 A: AsAccessor,
1440 {
1441 GuardedFutureReader::new(accessor, self)
1442 }
1443
1444 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1451 where
1452 T: ComponentType + 'static,
1453 {
1454 FutureAny::try_from_future_reader(store, self)
1455 }
1456
1457 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1464 where
1465 T: ComponentType + 'static,
1466 {
1467 future.try_into_future_reader()
1468 }
1469}
1470
1471impl<T> fmt::Debug for FutureReader<T> {
1472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1473 f.debug_struct("FutureReader")
1474 .field("id", &self.id)
1475 .finish()
1476 }
1477}
1478
1479pub(super) fn future_close(
1480 store: &mut StoreOpaque,
1481 id: &mut TableId<TransmitHandle>,
1482) -> Result<()> {
1483 let id = mem::replace(id, TableId::new(u32::MAX));
1484 store.host_drop_reader(id, TransmitKind::Future)
1485}
1486
1487pub(super) fn lift_index_to_future(
1489 cx: &mut LiftContext<'_>,
1490 ty: InterfaceType,
1491 index: u32,
1492) -> Result<TableId<TransmitHandle>> {
1493 match ty {
1494 InterfaceType::Future(src) => {
1495 let (state, instance) = cx.concurrent_state_and_instance_mut();
1496 lift_index_to_transmit(instance, state, TransmitIndex::Future(src), index)
1497 }
1498 _ => func::bad_type_info(),
1499 }
1500}
1501
1502pub(super) fn lower_future_to_index<U>(
1504 id: TableId<TransmitHandle>,
1505 cx: &mut LowerContext<'_, U>,
1506 ty: InterfaceType,
1507) -> Result<u32> {
1508 match ty {
1509 InterfaceType::Future(dst) => {
1510 cx.instance_handle()
1511 .lower_transmit_to_index(cx.store.0, TransmitIndex::Future(dst), id)
1512 }
1513 _ => func::bad_type_info(),
1514 }
1515}
1516
1517unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1520 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1521
1522 type Lower = <u32 as func::ComponentType>::Lower;
1523
1524 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1525 match ty {
1526 InterfaceType::Future(ty) => {
1527 let ty = types.types[*ty].ty;
1528 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1529 }
1530 other => bail!("expected `future`, found `{}`", func::desc(other)),
1531 }
1532 }
1533}
1534
1535unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1537 fn linear_lower_to_flat<U>(
1538 &self,
1539 cx: &mut LowerContext<'_, U>,
1540 ty: InterfaceType,
1541 dst: &mut MaybeUninit<Self::Lower>,
1542 ) -> Result<()> {
1543 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1544 }
1545
1546 fn linear_lower_to_memory<U>(
1547 &self,
1548 cx: &mut LowerContext<'_, U>,
1549 ty: InterfaceType,
1550 offset: usize,
1551 ) -> Result<()> {
1552 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1553 cx,
1554 InterfaceType::U32,
1555 offset,
1556 )
1557 }
1558}
1559
1560unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1562 fn linear_lift_from_flat(
1563 cx: &mut LiftContext<'_>,
1564 ty: InterfaceType,
1565 src: &Self::Lower,
1566 ) -> Result<Self> {
1567 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1568 Self::lift_from_index(cx, ty, index)
1569 }
1570
1571 fn linear_lift_from_memory(
1572 cx: &mut LiftContext<'_>,
1573 ty: InterfaceType,
1574 bytes: &[u8],
1575 ) -> Result<Self> {
1576 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1577 Self::lift_from_index(cx, ty, index)
1578 }
1579}
1580
1581pub struct GuardedFutureReader<T, A>
1589where
1590 A: AsAccessor,
1591{
1592 reader: Option<FutureReader<T>>,
1596 accessor: A,
1597}
1598
1599impl<T, A> GuardedFutureReader<T, A>
1600where
1601 A: AsAccessor,
1602{
1603 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1611 assert!(
1612 accessor
1613 .as_accessor()
1614 .with(|a| a.as_context().0.concurrency_support())
1615 );
1616 Self {
1617 reader: Some(reader),
1618 accessor,
1619 }
1620 }
1621
1622 pub fn into_future(self) -> FutureReader<T> {
1625 self.into()
1626 }
1627}
1628
1629impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1630where
1631 A: AsAccessor,
1632{
1633 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1634 guard.reader.take().unwrap()
1635 }
1636}
1637
1638impl<T, A> Drop for GuardedFutureReader<T, A>
1639where
1640 A: AsAccessor,
1641{
1642 fn drop(&mut self) {
1643 if let Some(reader) = &mut self.reader {
1644 let result = reader.close_with(&self.accessor);
1647 debug_assert!(result.is_ok());
1648 }
1649 }
1650}
1651
1652pub struct StreamReader<T> {
1659 id: TableId<TransmitHandle>,
1660 _phantom: PhantomData<T>,
1661}
1662
1663impl<T> StreamReader<T> {
1664 pub fn new<S: AsContextMut>(
1673 mut store: S,
1674 producer: impl StreamProducer<S::Data, Item = T>,
1675 ) -> Result<Self>
1676 where
1677 T: func::Lower + func::Lift + Send + Sync + 'static,
1678 {
1679 ensure!(
1680 store.as_context().0.concurrency_support(),
1681 "concurrency support is not enabled",
1682 );
1683 Ok(Self::new_(
1684 store
1685 .as_context_mut()
1686 .new_transmit(TransmitKind::Stream, producer)?,
1687 ))
1688 }
1689
1690 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1691 Self {
1692 id,
1693 _phantom: PhantomData,
1694 }
1695 }
1696
1697 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1698 self.id
1699 }
1700
1701 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1723 let store = store.as_context_mut();
1724 let state = store.0.concurrent_state_mut_already_forced_current_thread();
1725 let id = state.get_mut(self.id).unwrap().state;
1726 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1727 match try_into(TypeId::of::<V>()) {
1728 Some(result) => {
1729 self.close(store).unwrap();
1730 Ok(*result.downcast::<V>().unwrap())
1731 }
1732 None => Err(self),
1733 }
1734 } else {
1735 Err(self)
1736 }
1737 }
1738
1739 pub fn pipe<S: AsContextMut>(
1749 self,
1750 mut store: S,
1751 consumer: impl StreamConsumer<S::Data, Item = T>,
1752 ) -> Result<()>
1753 where
1754 T: 'static,
1755 {
1756 store
1757 .as_context_mut()
1758 .set_consumer(self.id, TransmitKind::Stream, consumer)
1759 }
1760
1761 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1763 let id = lift_index_to_stream(cx, ty, index)?;
1764 Ok(Self::new_(id))
1765 }
1766
1767 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1783 stream_close(store.as_context_mut().0, &mut self.id)
1784 }
1785
1786 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1788 accessor.as_accessor().with(|access| self.close(access))
1789 }
1790
1791 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1797 where
1798 A: AsAccessor,
1799 {
1800 GuardedStreamReader::new(accessor, self)
1801 }
1802
1803 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1810 where
1811 T: ComponentType + 'static,
1812 {
1813 StreamAny::try_from_stream_reader(store, self)
1814 }
1815
1816 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1823 where
1824 T: ComponentType + 'static,
1825 {
1826 stream.try_into_stream_reader()
1827 }
1828}
1829
1830impl<T> fmt::Debug for StreamReader<T> {
1831 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1832 f.debug_struct("StreamReader")
1833 .field("id", &self.id)
1834 .finish()
1835 }
1836}
1837
1838pub(super) fn stream_close(
1839 store: &mut StoreOpaque,
1840 id: &mut TableId<TransmitHandle>,
1841) -> Result<()> {
1842 let id = mem::replace(id, TableId::new(u32::MAX));
1843 store.host_drop_reader(id, TransmitKind::Stream)
1844}
1845
1846pub(super) fn lift_index_to_stream(
1848 cx: &mut LiftContext<'_>,
1849 ty: InterfaceType,
1850 index: u32,
1851) -> Result<TableId<TransmitHandle>> {
1852 match ty {
1853 InterfaceType::Stream(src) => {
1854 let (state, instance) = cx.concurrent_state_and_instance_mut();
1855 lift_index_to_transmit(instance, state, TransmitIndex::Stream(src), index)
1856 }
1857 _ => func::bad_type_info(),
1858 }
1859}
1860
1861pub(super) fn lower_stream_to_index<U>(
1863 id: TableId<TransmitHandle>,
1864 cx: &mut LowerContext<'_, U>,
1865 ty: InterfaceType,
1866) -> Result<u32> {
1867 match ty {
1868 InterfaceType::Stream(dst) => {
1869 cx.instance_handle()
1870 .lower_transmit_to_index(cx.store.0, TransmitIndex::Stream(dst), id)
1871 }
1872 _ => func::bad_type_info(),
1873 }
1874}
1875
1876unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1879 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1880
1881 type Lower = <u32 as func::ComponentType>::Lower;
1882
1883 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1884 match ty {
1885 InterfaceType::Stream(ty) => {
1886 let ty = types.types[*ty].ty;
1887 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1888 }
1889 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1890 }
1891 }
1892}
1893
1894unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1896 fn linear_lower_to_flat<U>(
1897 &self,
1898 cx: &mut LowerContext<'_, U>,
1899 ty: InterfaceType,
1900 dst: &mut MaybeUninit<Self::Lower>,
1901 ) -> Result<()> {
1902 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1903 }
1904
1905 fn linear_lower_to_memory<U>(
1906 &self,
1907 cx: &mut LowerContext<'_, U>,
1908 ty: InterfaceType,
1909 offset: usize,
1910 ) -> Result<()> {
1911 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1912 cx,
1913 InterfaceType::U32,
1914 offset,
1915 )
1916 }
1917}
1918
1919unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1921 fn linear_lift_from_flat(
1922 cx: &mut LiftContext<'_>,
1923 ty: InterfaceType,
1924 src: &Self::Lower,
1925 ) -> Result<Self> {
1926 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1927 Self::lift_from_index(cx, ty, index)
1928 }
1929
1930 fn linear_lift_from_memory(
1931 cx: &mut LiftContext<'_>,
1932 ty: InterfaceType,
1933 bytes: &[u8],
1934 ) -> Result<Self> {
1935 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1936 Self::lift_from_index(cx, ty, index)
1937 }
1938}
1939
1940pub struct GuardedStreamReader<T, A>
1948where
1949 A: AsAccessor,
1950{
1951 reader: Option<StreamReader<T>>,
1955 accessor: A,
1956}
1957
1958impl<T, A> GuardedStreamReader<T, A>
1959where
1960 A: AsAccessor,
1961{
1962 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1971 assert!(
1972 accessor
1973 .as_accessor()
1974 .with(|a| a.as_context().0.concurrency_support())
1975 );
1976 Self {
1977 reader: Some(reader),
1978 accessor,
1979 }
1980 }
1981
1982 pub fn into_stream(self) -> StreamReader<T> {
1985 self.into()
1986 }
1987}
1988
1989impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1990where
1991 A: AsAccessor,
1992{
1993 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1994 guard.reader.take().unwrap()
1995 }
1996}
1997
1998impl<T, A> Drop for GuardedStreamReader<T, A>
1999where
2000 A: AsAccessor,
2001{
2002 fn drop(&mut self) {
2003 if let Some(reader) = &mut self.reader {
2004 let result = reader.close_with(&self.accessor);
2007 debug_assert!(result.is_ok());
2008 }
2009 }
2010}
2011
2012pub struct ErrorContext {
2014 rep: u32,
2015}
2016
2017impl ErrorContext {
2018 pub(crate) fn new(rep: u32) -> Self {
2019 Self { rep }
2020 }
2021
2022 pub fn into_val(self) -> Val {
2024 Val::ErrorContext(ErrorContextAny(self.rep))
2025 }
2026
2027 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
2029 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
2030 bail!("expected `error-context`; got `{}`", value.desc());
2031 };
2032 Ok(Self::new(*rep))
2033 }
2034
2035 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
2036 match ty {
2037 InterfaceType::ErrorContext(src) => {
2038 let rep = cx
2039 .instance_mut()
2040 .table_for_error_context(src)
2041 .error_context_rep(index)?;
2042
2043 Ok(Self { rep })
2044 }
2045 _ => func::bad_type_info(),
2046 }
2047 }
2048}
2049
2050pub(crate) fn lower_error_context_to_index<U>(
2051 rep: u32,
2052 cx: &mut LowerContext<'_, U>,
2053 ty: InterfaceType,
2054) -> Result<u32> {
2055 match ty {
2056 InterfaceType::ErrorContext(dst) => {
2057 let tbl = cx.instance_mut().table_for_error_context(dst);
2058 tbl.error_context_insert(rep)
2059 }
2060 _ => func::bad_type_info(),
2061 }
2062}
2063unsafe impl func::ComponentType for ErrorContext {
2066 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2067
2068 type Lower = <u32 as func::ComponentType>::Lower;
2069
2070 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2071 match ty {
2072 InterfaceType::ErrorContext(_) => Ok(()),
2073 other => bail!("expected `error`, found `{}`", func::desc(other)),
2074 }
2075 }
2076}
2077
2078unsafe impl func::Lower for ErrorContext {
2080 fn linear_lower_to_flat<T>(
2081 &self,
2082 cx: &mut LowerContext<'_, T>,
2083 ty: InterfaceType,
2084 dst: &mut MaybeUninit<Self::Lower>,
2085 ) -> Result<()> {
2086 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2087 cx,
2088 InterfaceType::U32,
2089 dst,
2090 )
2091 }
2092
2093 fn linear_lower_to_memory<T>(
2094 &self,
2095 cx: &mut LowerContext<'_, T>,
2096 ty: InterfaceType,
2097 offset: usize,
2098 ) -> Result<()> {
2099 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2100 cx,
2101 InterfaceType::U32,
2102 offset,
2103 )
2104 }
2105}
2106
2107unsafe impl func::Lift for ErrorContext {
2109 fn linear_lift_from_flat(
2110 cx: &mut LiftContext<'_>,
2111 ty: InterfaceType,
2112 src: &Self::Lower,
2113 ) -> Result<Self> {
2114 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2115 Self::lift_from_index(cx, ty, index)
2116 }
2117
2118 fn linear_lift_from_memory(
2119 cx: &mut LiftContext<'_>,
2120 ty: InterfaceType,
2121 bytes: &[u8],
2122 ) -> Result<Self> {
2123 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2124 Self::lift_from_index(cx, ty, index)
2125 }
2126}
2127
2128pub(super) struct TransmitHandle {
2130 pub(super) common: WaitableCommon,
2131 state: TableId<TransmitState>,
2133}
2134
2135impl TransmitHandle {
2136 fn new(state: TableId<TransmitState>) -> Self {
2137 Self {
2138 common: WaitableCommon::default(),
2139 state,
2140 }
2141 }
2142}
2143
2144impl TableDebug for TransmitHandle {
2145 fn type_name() -> &'static str {
2146 "TransmitHandle"
2147 }
2148}
2149
2150struct TransmitState {
2152 write_handle: TableId<TransmitHandle>,
2154 read_handle: TableId<TransmitHandle>,
2156 write: WriteState,
2158 read: ReadState,
2160 done: bool,
2162 pub(super) origin: TransmitOrigin,
2165}
2166
2167#[derive(Copy, Clone)]
2168pub(super) enum TransmitOrigin {
2169 Host,
2170 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2171 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2172}
2173
2174impl TransmitState {
2175 fn new(origin: TransmitOrigin) -> Self {
2176 Self {
2177 write_handle: TableId::new(u32::MAX),
2178 read_handle: TableId::new(u32::MAX),
2179 read: ReadState::Open,
2180 write: WriteState::Open,
2181 done: false,
2182 origin,
2183 }
2184 }
2185}
2186
2187impl TableDebug for TransmitState {
2188 fn type_name() -> &'static str {
2189 "TransmitState"
2190 }
2191}
2192
2193impl TransmitOrigin {
2194 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2195 match index {
2196 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2197 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2198 }
2199 }
2200}
2201
2202type PollStream = Box<
2203 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2204>;
2205
2206type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2207
2208enum WriteState {
2210 Open,
2212 GuestReady {
2214 instance: Instance,
2215 caller: RuntimeComponentInstanceIndex,
2216 ty: TransmitIndex,
2217 flat_abi: Option<FlatAbi>,
2218 options: OptionsIndex,
2219 address: usize,
2220 count: ItemCount,
2221 handle: u32,
2222 },
2223 HostReady {
2225 produce: PollStream,
2226 try_into: TryInto,
2227 guest_offset: ItemCount,
2228 cancel: bool,
2229 cancel_waker: Option<Waker>,
2230 },
2231 Dropped,
2233}
2234
2235impl fmt::Debug for WriteState {
2236 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2237 match self {
2238 Self::Open => f.debug_tuple("Open").finish(),
2239 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2240 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2241 Self::Dropped => f.debug_tuple("Dropped").finish(),
2242 }
2243 }
2244}
2245
2246enum ReadState {
2248 Open,
2250 GuestReady {
2252 ty: TransmitIndex,
2253 caller_instance: RuntimeComponentInstanceIndex,
2254 caller_thread: QualifiedThreadId,
2255 flat_abi: Option<FlatAbi>,
2256 instance: Instance,
2257 options: OptionsIndex,
2258 address: usize,
2259 count: ItemCount,
2260 handle: u32,
2261 },
2262 HostReady {
2264 consume: PollStream,
2265 guest_offset: ItemCount,
2266 cancel: bool,
2267 cancel_waker: Option<Waker>,
2268 },
2269 HostToHost {
2271 accept: Box<
2272 dyn for<'a> Fn(
2273 &'a mut UntypedWriteBuffer<'a>,
2274 )
2275 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2276 + Send
2277 + Sync,
2278 >,
2279 buffer: Vec<u8>,
2280 limit: usize,
2281 },
2282 Dropped,
2284}
2285
2286impl fmt::Debug for ReadState {
2287 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2288 match self {
2289 Self::Open => f.debug_tuple("Open").finish(),
2290 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2291 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2292 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2293 Self::Dropped => f.debug_tuple("Dropped").finish(),
2294 }
2295 }
2296}
2297
2298fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result<ReturnCode> {
2299 Ok(match state {
2300 StreamResult::Dropped => ReturnCode::Dropped(count),
2301 StreamResult::Completed => ReturnCode::completed(kind, count),
2302 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2303 })
2304}
2305
2306impl StoreOpaque {
2307 fn pipe_from_guest(
2308 &mut self,
2309 kind: TransmitKind,
2310 id: TableId<TransmitState>,
2311 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2312 ) {
2313 let future = async move {
2314 let stream_state = future.await?;
2315 tls::get(|store| {
2316 let state = store.concurrent_state_mut()?;
2317 let transmit = state.get_mut(id)?;
2318 let ReadState::HostReady {
2319 consume,
2320 guest_offset,
2321 ..
2322 } = mem::replace(&mut transmit.read, ReadState::Open)
2323 else {
2324 bail_bug!("expected ReadState::HostReady")
2325 };
2326 let code = return_code(kind, stream_state, guest_offset)?;
2327 transmit.read = match stream_state {
2328 StreamResult::Dropped => ReadState::Dropped,
2329 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2330 consume,
2331 guest_offset: ItemCount::ZERO,
2332 cancel: false,
2333 cancel_waker: None,
2334 },
2335 };
2336 let WriteState::GuestReady { ty, handle, .. } =
2337 mem::replace(&mut transmit.write, WriteState::Open)
2338 else {
2339 bail_bug!("expected WriteState::GuestReady")
2340 };
2341 state.send_write_result(ty, id, handle, code)?;
2342 Ok(())
2343 })
2344 };
2345
2346 self.concurrent_state_mut_already_forced_current_thread()
2347 .push_future(future.boxed());
2348 }
2349
2350 fn pipe_to_guest(
2351 &mut self,
2352 kind: TransmitKind,
2353 id: TableId<TransmitState>,
2354 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2355 ) {
2356 let future = async move {
2357 let stream_state = future.await?;
2358 tls::get(|store| {
2359 let state = store.concurrent_state_mut()?;
2360 let transmit = state.get_mut(id)?;
2361 let WriteState::HostReady {
2362 produce,
2363 try_into,
2364 guest_offset,
2365 ..
2366 } = mem::replace(&mut transmit.write, WriteState::Open)
2367 else {
2368 bail_bug!("expected WriteState::HostReady")
2369 };
2370 let code = return_code(kind, stream_state, guest_offset)?;
2371 transmit.write = match stream_state {
2372 StreamResult::Dropped => WriteState::Dropped,
2373 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2374 produce,
2375 try_into,
2376 guest_offset: ItemCount::ZERO,
2377 cancel: false,
2378 cancel_waker: None,
2379 },
2380 };
2381 let ReadState::GuestReady { ty, handle, .. } =
2382 mem::replace(&mut transmit.read, ReadState::Open)
2383 else {
2384 bail_bug!("expected ReadState::GuestReady")
2385 };
2386 state.send_read_result(ty, id, handle, code)?;
2387 Ok(())
2388 })
2389 };
2390
2391 self.concurrent_state_mut_already_forced_current_thread()
2392 .push_future(future.boxed());
2393 }
2394
2395 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2397 let state = self.concurrent_state_mut()?;
2398 Waitable::Transmit(id).join(state, None)?;
2399 let transmit_id = state.get_mut(id)?.state;
2400 let transmit = state
2401 .get_mut(transmit_id)
2402 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2403 log::trace!(
2404 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2405 transmit.read,
2406 transmit.write
2407 );
2408
2409 transmit.read = ReadState::Dropped;
2410
2411 let new_state = if let WriteState::Dropped = &transmit.write {
2414 WriteState::Dropped
2415 } else {
2416 WriteState::Open
2417 };
2418
2419 let write_handle = transmit.write_handle;
2420
2421 match mem::replace(&mut transmit.write, new_state) {
2422 WriteState::GuestReady { ty, handle, .. } => {
2425 state.update_event(
2426 write_handle.rep(),
2427 match ty {
2428 TransmitIndex::Future(ty) => Event::FutureWrite {
2429 code: ReturnCode::Dropped(ItemCount::ZERO),
2430 pending: Some((ty, handle)),
2431 },
2432 TransmitIndex::Stream(ty) => Event::StreamWrite {
2433 code: ReturnCode::Dropped(ItemCount::ZERO),
2434 pending: Some((ty, handle)),
2435 },
2436 },
2437 )?;
2438 }
2439
2440 WriteState::Open => {
2441 state.update_event(
2442 write_handle.rep(),
2443 match kind {
2444 TransmitKind::Future => Event::FutureWrite {
2445 code: ReturnCode::Dropped(ItemCount::ZERO),
2446 pending: None,
2447 },
2448 TransmitKind::Stream => Event::StreamWrite {
2449 code: ReturnCode::Dropped(ItemCount::ZERO),
2450 pending: None,
2451 },
2452 },
2453 )?;
2454 }
2455
2456 WriteState::Dropped | WriteState::HostReady { .. } => {
2461 log::trace!("host_drop_reader delete {transmit_id:?}");
2462 state.delete_transmit(transmit_id)?;
2463 }
2464 }
2465 Ok(())
2466 }
2467
2468 fn host_drop_writer(
2470 &mut self,
2471 id: TableId<TransmitHandle>,
2472 on_drop_open: Option<fn() -> Result<()>>,
2473 ) -> Result<()> {
2474 let state = self.concurrent_state_mut()?;
2475 Waitable::Transmit(id).join(state, None)?;
2476 let transmit_id = state.get_mut(id)?.state;
2477 let transmit = state
2478 .get_mut(transmit_id)
2479 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2480 log::trace!(
2481 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2482 transmit.read,
2483 transmit.write
2484 );
2485
2486 match &mut transmit.write {
2488 WriteState::GuestReady { .. } => {
2489 bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2490 }
2491 WriteState::HostReady { .. } => {}
2492 v @ WriteState::Open => {
2493 if let (Some(on_drop_open), false) = (on_drop_open, transmit.done) {
2494 on_drop_open()?;
2495 } else {
2496 *v = WriteState::Dropped;
2497 }
2498 }
2499 WriteState::Dropped => bail_bug!("write state is already dropped"),
2500 }
2501
2502 let transmit = self.concurrent_state_mut()?.get_mut(transmit_id)?;
2503
2504 let new_state = if let ReadState::Dropped = &transmit.read {
2510 ReadState::Dropped
2511 } else {
2512 ReadState::Open
2513 };
2514
2515 let read_handle = transmit.read_handle;
2516
2517 match mem::replace(&mut transmit.read, new_state) {
2519 ReadState::GuestReady { ty, handle, .. } => {
2523 self.concurrent_state_mut()?.update_event(
2525 read_handle.rep(),
2526 match ty {
2527 TransmitIndex::Future(ty) => Event::FutureRead {
2528 code: ReturnCode::Dropped(ItemCount::ZERO),
2529 pending: Some((ty, handle)),
2530 },
2531 TransmitIndex::Stream(ty) => Event::StreamRead {
2532 code: ReturnCode::Dropped(ItemCount::ZERO),
2533 pending: Some((ty, handle)),
2534 },
2535 },
2536 )?;
2537 }
2538
2539 ReadState::Open => {
2541 self.concurrent_state_mut()?.update_event(
2542 read_handle.rep(),
2543 match on_drop_open {
2544 Some(_) => Event::FutureRead {
2545 code: ReturnCode::Dropped(ItemCount::ZERO),
2546 pending: None,
2547 },
2548 None => Event::StreamRead {
2549 code: ReturnCode::Dropped(ItemCount::ZERO),
2550 pending: None,
2551 },
2552 },
2553 )?;
2554 }
2555
2556 ReadState::Dropped | ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
2563 log::trace!("host_drop_writer delete {transmit_id:?}");
2564 self.concurrent_state_mut()?.delete_transmit(transmit_id)?;
2565 }
2566 }
2567 Ok(())
2568 }
2569
2570 pub(super) fn transmit_origin(
2571 &mut self,
2572 id: TableId<TransmitHandle>,
2573 ) -> Result<TransmitOrigin> {
2574 let state = self.concurrent_state_mut()?;
2575 let state_id = state.get_mut(id)?.state;
2576 Ok(state.get_mut(state_id)?.origin)
2577 }
2578}
2579
2580impl<T> StoreContextMut<'_, T> {
2581 fn new_transmit<P: StreamProducer<T>>(
2582 mut self,
2583 kind: TransmitKind,
2584 producer: P,
2585 ) -> Result<TableId<TransmitHandle>>
2586 where
2587 P::Item: func::Lower,
2588 {
2589 let token = StoreToken::new(self.as_context_mut());
2590 let state = self.0.concurrent_state_mut()?;
2591 let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2592 let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2593 let id = state.get_mut(read)?.state;
2594 let mut dropped = false;
2595 let produce = Box::new({
2596 let producer = producer.clone();
2597 move || {
2598 let producer = producer.clone();
2599 async move {
2600 let mut state = producer.take()?;
2601 let (mine, buffer) = &mut *state;
2602
2603 let (result, cancelled) = if buffer.remaining().is_empty() {
2604 future::poll_fn(|cx| {
2605 tls::get(|store| {
2606 let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2607
2608 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2609 bail_bug!("expected WriteState::HostReady")
2610 };
2611
2612 let mut host_written = 0;
2613 let mut host_buffer =
2614 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2615 Some(mem::take(buffer))
2616 } else {
2617 None
2618 };
2619
2620 let poll = mine.as_mut().poll_produce(
2621 cx,
2622 token.as_context_mut(store),
2623 Destination {
2624 id,
2625 buffer,
2626 host_buffer: host_buffer.as_mut().map(|b| {
2627 HostBuffer {
2628 dst: b,
2629 marked_written: &mut host_written,
2630 }
2631 }),
2632 _phantom: PhantomData,
2633 },
2634 cancel,
2635 );
2636
2637 let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2638
2639 let host_offset = if let (
2640 Some(host_buffer),
2641 ReadState::HostToHost { buffer, limit, .. },
2642 ) = (host_buffer, &mut transmit.read)
2643 {
2644 *limit = host_written;
2645 *buffer = host_buffer;
2646 *limit
2647 } else {
2648 0
2649 };
2650
2651 {
2652 let WriteState::HostReady {
2653 guest_offset,
2654 cancel,
2655 cancel_waker,
2656 ..
2657 } = &mut transmit.write
2658 else {
2659 bail_bug!("expected WriteState::HostReady")
2660 };
2661
2662 if poll.is_pending() {
2663 if !buffer.remaining().is_empty()
2664 || *guest_offset > 0
2665 || host_offset > 0
2666 {
2667 bail!(
2668 "StreamProducer::poll_produce returned Poll::Pending \
2669 after producing at least one item"
2670 )
2671 }
2672 *cancel_waker = Some(cx.waker().clone());
2673 } else {
2674 *cancel_waker = None;
2675 *cancel = false;
2676 }
2677 }
2678
2679 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2680 })?
2681 })
2682 .await?
2683 } else {
2684 (StreamResult::Completed, false)
2685 };
2686
2687 let (guest_offset, host_offset, count) = tls::get(|store| {
2688 let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2689 let (count, host_offset) = match &transmit.read {
2690 &ReadState::GuestReady { count, .. } => (count.as_u32(), 0),
2691 &ReadState::HostToHost { limit, .. } => (1, limit),
2692 _ => bail_bug!("invalid read state"),
2693 };
2694 let guest_offset = match &transmit.write {
2695 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2696 _ => bail_bug!("invalid write state"),
2697 };
2698 Ok((guest_offset, host_offset, count))
2699 })?;
2700
2701 match result {
2702 StreamResult::Completed => {
2703 if count > 1
2704 && buffer.remaining().is_empty()
2705 && guest_offset == 0
2706 && host_offset == 0
2707 {
2708 bail!(
2709 "StreamProducer::poll_produce returned StreamResult::Completed \
2710 without producing any items"
2711 );
2712 }
2713 }
2714 StreamResult::Cancelled => {
2715 if !cancelled {
2716 bail!(
2717 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2718 without being given a `finish` parameter value of true"
2719 );
2720 }
2721 }
2722 StreamResult::Dropped => {
2723 dropped = true;
2724 }
2725 }
2726
2727 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2728
2729 drop(state);
2730
2731 if write_buffer {
2732 write(token, id, producer.clone(), kind).await?;
2733 }
2734
2735 Ok(if dropped {
2736 if producer.with(|p| p.1.remaining().is_empty())? {
2737 StreamResult::Dropped
2738 } else {
2739 StreamResult::Completed
2740 }
2741 } else {
2742 result
2743 })
2744 }
2745 .boxed()
2746 }
2747 });
2748 let try_into = Box::new(move |ty| {
2749 let (mine, buffer) = producer.try_lock().ok()?.take()?;
2750 match P::try_into(mine, ty) {
2751 Ok(value) => Some(value),
2752 Err(mine) => {
2753 *producer.try_lock().ok()? = Some((mine, buffer));
2754 None
2755 }
2756 }
2757 });
2758 state.get_mut(id)?.write = WriteState::HostReady {
2759 produce,
2760 try_into,
2761 guest_offset: ItemCount::ZERO,
2762 cancel: false,
2763 cancel_waker: None,
2764 };
2765 Ok(read)
2766 }
2767
2768 fn set_consumer<C: StreamConsumer<T>>(
2769 mut self,
2770 id: TableId<TransmitHandle>,
2771 kind: TransmitKind,
2772 consumer: C,
2773 ) -> Result<()> {
2774 let token = StoreToken::new(self.as_context_mut());
2775 let state = self.0.concurrent_state_mut()?;
2776 let id = state.get_mut(id)?.state;
2777 let transmit = state.get_mut(id)?;
2778 let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2779 let consume_with_buffer = {
2780 let consumer = consumer.clone();
2781 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2782 let mut mine = consumer.take()?;
2783
2784 let host_buffer_remaining_before =
2785 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2786
2787 let (result, cancelled) = future::poll_fn(|cx| {
2788 tls::get(|store| {
2789 let cancel = match &store.concurrent_state_mut()?.get_mut(id)?.read {
2790 &ReadState::HostReady { cancel, .. } => cancel,
2791 ReadState::Open => false,
2792 _ => bail_bug!("unexpected read state"),
2793 };
2794
2795 let poll = mine.as_mut().poll_consume(
2796 cx,
2797 token.as_context_mut(store),
2798 Source {
2799 id,
2800 host_buffer: host_buffer.as_deref_mut(),
2801 },
2802 cancel,
2803 );
2804
2805 if let ReadState::HostReady {
2806 cancel_waker,
2807 cancel,
2808 ..
2809 } = &mut store.concurrent_state_mut()?.get_mut(id)?.read
2810 {
2811 if poll.is_pending() {
2812 *cancel_waker = Some(cx.waker().clone());
2813 } else {
2814 *cancel_waker = None;
2815 *cancel = false;
2816 }
2817 }
2818
2819 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2820 })?
2821 })
2822 .await?;
2823
2824 let (guest_offset, count) = tls::get(|store| {
2825 let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2826 Ok((
2827 match &transmit.read {
2828 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2829 ReadState::Open => ItemCount::ZERO,
2830 _ => bail_bug!("invalid read state"),
2831 },
2832 match &transmit.write {
2833 WriteState::GuestReady { count, .. } => count.as_usize(),
2834 WriteState::HostReady { .. } => match host_buffer_remaining_before {
2835 Some(n) => n,
2836 None => bail_bug!("host_buffer_remaining_before should be set"),
2837 },
2838 _ => bail_bug!("invalid write state"),
2839 },
2840 ))
2841 })?;
2842
2843 match result {
2844 StreamResult::Completed => {
2845 if count > 0
2846 && guest_offset == 0
2847 && host_buffer_remaining_before
2848 .zip(host_buffer.map(|v| v.remaining().len()))
2849 .map(|(before, after)| before == after)
2850 .unwrap_or(false)
2851 {
2852 bail!(
2853 "StreamConsumer::poll_consume returned StreamResult::Completed \
2854 without consuming any items"
2855 );
2856 }
2857
2858 if let TransmitKind::Future = kind {
2859 tls::get(|store| {
2860 store.concurrent_state_mut()?.get_mut(id)?.done = true;
2861 crate::error::Ok(())
2862 })?;
2863 }
2864 }
2865 StreamResult::Cancelled => {
2866 if !cancelled {
2867 bail!(
2868 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2869 without being given a `finish` parameter value of true"
2870 );
2871 }
2872 }
2873 StreamResult::Dropped => {}
2874 }
2875
2876 Ok(result)
2877 }
2878 };
2879 let consume = {
2880 let consume = consume_with_buffer.clone();
2881 Box::new(move || {
2882 let consume = consume.clone();
2883 async move { consume(None).await }.boxed()
2884 })
2885 };
2886
2887 match &transmit.write {
2888 WriteState::Open => {
2889 transmit.read = ReadState::HostReady {
2890 consume,
2891 guest_offset: ItemCount::ZERO,
2892 cancel: false,
2893 cancel_waker: None,
2894 };
2895 }
2896 &WriteState::GuestReady { .. } => {
2897 let future = consume();
2898 transmit.read = ReadState::HostReady {
2899 consume,
2900 guest_offset: ItemCount::ZERO,
2901 cancel: false,
2902 cancel_waker: None,
2903 };
2904 self.0.pipe_from_guest(kind, id, future);
2905 }
2906 WriteState::HostReady { .. } => {
2907 let WriteState::HostReady { produce, .. } = mem::replace(
2908 &mut transmit.write,
2909 WriteState::HostReady {
2910 produce: Box::new(|| {
2911 Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2912 }),
2913 try_into: Box::new(|_| None),
2914 guest_offset: ItemCount::ZERO,
2915 cancel: false,
2916 cancel_waker: None,
2917 },
2918 ) else {
2919 bail_bug!("expected WriteState::HostReady")
2920 };
2921
2922 transmit.read = ReadState::HostToHost {
2923 accept: Box::new(move |input| {
2924 let consume = consume_with_buffer.clone();
2925 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2926 }),
2927 buffer: Vec::new(),
2928 limit: 0,
2929 };
2930
2931 let future = async move {
2932 loop {
2933 if tls::get(|store| {
2934 crate::error::Ok(matches!(
2935 store.concurrent_state_mut()?.get_mut(id)?.read,
2936 ReadState::Dropped
2937 ))
2938 })? {
2939 break Ok(());
2940 }
2941
2942 match produce().await? {
2943 StreamResult::Completed | StreamResult::Cancelled => {}
2944 StreamResult::Dropped => break Ok(()),
2945 }
2946
2947 if let TransmitKind::Future = kind {
2948 break Ok(());
2949 }
2950 }
2951 }
2952 .map(move |result| {
2953 tls::get(|store| store.concurrent_state_mut()?.delete_transmit(id))?;
2954 result
2955 });
2956
2957 state.push_future(Box::pin(future));
2958 }
2959 WriteState::Dropped => {
2960 let reader = transmit.read_handle;
2961 self.0.host_drop_reader(reader, kind)?;
2962 }
2963 }
2964 Ok(())
2965 }
2966}
2967
2968async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2969 token: StoreToken<D>,
2970 id: TableId<TransmitState>,
2971 pair: Arc<LockedState<(P, B)>>,
2972 kind: TransmitKind,
2973) -> Result<()> {
2974 let (read, guest_offset) = tls::get(|store| {
2975 let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2976
2977 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2978 Some(guest_offset)
2979 } else {
2980 None
2981 };
2982
2983 crate::error::Ok((
2984 mem::replace(&mut transmit.read, ReadState::Open),
2985 guest_offset,
2986 ))
2987 })?;
2988
2989 match read {
2990 ReadState::GuestReady {
2991 ty,
2992 flat_abi,
2993 options,
2994 address,
2995 count,
2996 handle,
2997 instance,
2998 caller_instance,
2999 caller_thread,
3000 } => {
3001 let guest_offset = match guest_offset {
3002 Some(i) => i,
3003 None => bail_bug!("guest_offset should be present if ready"),
3004 };
3005
3006 if let TransmitKind::Future = kind {
3007 tls::get(|store| {
3008 store.concurrent_state_mut()?.get_mut(id)?.done = true;
3009 crate::error::Ok(())
3010 })?;
3011 }
3012
3013 let old_remaining = pair.with(|p| p.1.remaining().len())?;
3014 let accept = {
3015 let pair = pair.clone();
3016 move |mut store: StoreContextMut<D>| {
3017 let mut state = pair.take()?;
3018 lower::<T, B, D>(
3019 store.as_context_mut(),
3020 instance,
3021 caller_thread,
3022 options,
3023 ty,
3024 address + (T::SIZE32 * guest_offset.as_usize()),
3025 count.as_usize() - guest_offset.as_usize(),
3026 &mut state.1,
3027 )?;
3028 crate::error::Ok(())
3029 }
3030 };
3031
3032 if guest_offset < count {
3033 if T::MAY_REQUIRE_REALLOC {
3034 let (tx, rx) = oneshot::channel();
3039 tls::get(move |store| {
3040 store
3041 .concurrent_state_mut()?
3042 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(
3043 Box::new(move |store| {
3044 _ = tx.send(accept(token.as_context_mut(store))?);
3045 Ok(())
3046 }),
3047 )));
3048 crate::error::Ok(())
3049 })?;
3050 match rx.await {
3051 Ok(r) => r,
3052 Err(oneshot::Canceled) => bail_bug!("work cancelled"),
3053 }
3054 } else {
3055 tls::get(|store| accept(token.as_context_mut(store)))?
3060 }
3061 }
3062
3063 tls::get(|store| {
3064 let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3065
3066 let transmit = store.concurrent_state_mut()?.get_mut(id)?;
3067
3068 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3069 bail_bug!("expected WriteState::HostReady")
3070 };
3071
3072 guest_offset.inc(count)?;
3073
3074 transmit.read = ReadState::GuestReady {
3075 ty,
3076 flat_abi,
3077 options,
3078 address,
3079 count: ItemCount::new_usize(count)?,
3080 handle,
3081 instance,
3082 caller_instance,
3083 caller_thread,
3084 };
3085
3086 crate::error::Ok(())
3087 })?;
3088
3089 Ok(())
3090 }
3091
3092 ReadState::HostToHost {
3093 accept,
3094 mut buffer,
3095 limit,
3096 } => {
3097 let mut state = StreamResult::Completed;
3098 let mut position = 0;
3099
3100 while !matches!(state, StreamResult::Dropped) && position < limit {
3101 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3102 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3103 (buffer, position, _) = slice_buffer.into_parts();
3104 }
3105
3106 {
3107 let mut pair = pair.take()?;
3108 let (_, buffer) = &mut *pair;
3109
3110 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3111 state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3112 }
3113 }
3114
3115 tls::get(|store| {
3116 store.concurrent_state_mut()?.get_mut(id)?.read = match state {
3117 StreamResult::Dropped => ReadState::Dropped,
3118 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3119 accept,
3120 buffer,
3121 limit: 0,
3122 },
3123 };
3124
3125 crate::error::Ok(())
3126 })?;
3127 Ok(())
3128 }
3129
3130 _ => bail_bug!("unexpected read state"),
3131 }
3132}
3133
3134impl Instance {
3135 fn consume(
3138 self,
3139 store: &mut dyn VMStore,
3140 kind: TransmitKind,
3141 transmit_id: TableId<TransmitState>,
3142 consume: PollStream,
3143 guest_offset: ItemCount,
3144 cancel: bool,
3145 ) -> Result<ReturnCode> {
3146 let mut future = consume();
3147 store.concurrent_state_mut()?.get_mut(transmit_id)?.read = ReadState::HostReady {
3148 consume,
3149 guest_offset,
3150 cancel,
3151 cancel_waker: None,
3152 };
3153 let poll = tls::set(store, || {
3154 future
3155 .as_mut()
3156 .poll(&mut Context::from_waker(&Waker::noop()))
3157 });
3158
3159 Ok(match poll {
3160 Poll::Ready(state) => {
3161 let transmit = store.concurrent_state_mut()?.get_mut(transmit_id)?;
3162 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3163 bail_bug!("expected ReadState::HostReady")
3164 };
3165 let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3166 transmit.write = WriteState::Open;
3167 code
3168 }
3169 Poll::Pending => {
3170 store.pipe_from_guest(kind, transmit_id, future);
3171 ReturnCode::Blocked
3172 }
3173 })
3174 }
3175
3176 fn produce(
3179 self,
3180 store: &mut dyn VMStore,
3181 kind: TransmitKind,
3182 transmit_id: TableId<TransmitState>,
3183 produce: PollStream,
3184 try_into: TryInto,
3185 guest_offset: ItemCount,
3186 cancel: bool,
3187 ) -> Result<ReturnCode> {
3188 let mut future = produce();
3189 store.concurrent_state_mut()?.get_mut(transmit_id)?.write = WriteState::HostReady {
3190 produce,
3191 try_into,
3192 guest_offset,
3193 cancel,
3194 cancel_waker: None,
3195 };
3196 let poll = tls::set(store, || {
3197 future
3198 .as_mut()
3199 .poll(&mut Context::from_waker(&Waker::noop()))
3200 });
3201
3202 Ok(match poll {
3203 Poll::Ready(state) => {
3204 let transmit = store.concurrent_state_mut()?.get_mut(transmit_id)?;
3205 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3206 bail_bug!("expected WriteState::HostReady")
3207 };
3208 let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3209 transmit.read = ReadState::Open;
3210 code
3211 }
3212 Poll::Pending => {
3213 store.pipe_to_guest(kind, transmit_id, future);
3214 ReturnCode::Blocked
3215 }
3216 })
3217 }
3218
3219 pub(super) fn guest_drop_writable(
3221 self,
3222 store: &mut StoreOpaque,
3223 ty: TransmitIndex,
3224 writer: u32,
3225 ) -> Result<()> {
3226 let table = self.id().get_mut(store).table_for_transmit(ty);
3227 let transmit_rep = match ty {
3228 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3229 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3230 };
3231
3232 let id = TableId::<TransmitHandle>::new(transmit_rep);
3233 log::trace!("guest_drop_writable: drop writer {id:?}");
3234 match ty {
3235 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3236 TransmitIndex::Future(_) => store.host_drop_writer(
3237 id,
3238 Some(|| {
3239 Err(format_err!(
3240 "cannot drop future write end without first writing a value"
3241 ))
3242 }),
3243 ),
3244 }
3245 }
3246
3247 fn copy<T: 'static>(
3250 store: StoreContextMut<T>,
3251 flat_abi: Option<FlatAbi>,
3252 write_instance: Instance,
3253 write_caller_instance: RuntimeComponentInstanceIndex,
3254 write_ty: TransmitIndex,
3255 write_options: OptionsIndex,
3256 write_address: usize,
3257 read_instance: Instance,
3258 read_caller_instance: RuntimeComponentInstanceIndex,
3259 read_caller_thread: QualifiedThreadId,
3260 read_ty: TransmitIndex,
3261 read_options: OptionsIndex,
3262 read_address: usize,
3263 count: ItemCount,
3264 rep: u32,
3265 ) -> Result<()> {
3266 let (write_component, store) = write_instance.component_and_store_mut(store.0);
3267 let (read_component, mut store) = read_instance.component_and_store_mut(store);
3268 let write_types = write_component.types();
3269 let read_types = read_component.types();
3270 let count = count.as_usize();
3271
3272 let write_payload_ty = write_ty.payload(write_types);
3275 let write_abi = match write_payload_ty {
3276 Some(ty) => write_types.canonical_abi(ty),
3277 None => &CanonicalAbiInfo::ZERO,
3278 };
3279 let write_length_in_bytes = match flat_abi {
3280 Some(abi) => usize::try_from(abi.size)? * count,
3281 None => usize::try_from(write_abi.size32)? * count,
3282 };
3283 if write_length_in_bytes > 0 {
3284 if write_address % usize::try_from(write_abi.align32)? != 0 {
3285 bail!("write pointer not aligned");
3286 }
3287 write_instance
3288 .options_memory(store, write_options)
3289 .get(write_address..)
3290 .and_then(|b| b.get(..write_length_in_bytes))
3291 .ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
3292 }
3293
3294 let read_payload_ty = read_ty.payload(read_types);
3295 let read_abi = match read_payload_ty {
3296 Some(ty) => read_types.canonical_abi(ty),
3297 None => &CanonicalAbiInfo::ZERO,
3298 };
3299 let read_length_in_bytes = match flat_abi {
3300 Some(abi) => usize::try_from(abi.size)? * count,
3301 None => usize::try_from(read_abi.size32)? * count,
3302 };
3303 if read_length_in_bytes > 0 {
3304 if read_address % usize::try_from(read_abi.align32)? != 0 {
3305 bail!("read pointer not aligned");
3306 }
3307 read_instance
3308 .options_memory(store, read_options)
3309 .get(read_address..)
3310 .and_then(|b| b.get(..read_length_in_bytes))
3311 .ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
3312 }
3313
3314 if write_caller_instance == read_caller_instance
3315 && !allow_intra_component_read_write(write_payload_ty)
3316 {
3317 bail!(
3318 "cannot read from and write to intra-component future/stream with non-numeric payload"
3319 )
3320 }
3321
3322 match (write_ty, read_ty) {
3323 (TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
3324 if count != 1 {
3325 bail_bug!("futures can only send 1 item");
3326 }
3327
3328 let val = write_payload_ty
3329 .map(|ty| {
3330 let lift = &mut LiftContext::new(store, write_options, write_instance);
3331 let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3332 Val::load(lift, *ty, bytes)
3333 })
3334 .transpose()?;
3335
3336 if let Some(val) = val {
3337 let old_thread = store.set_thread(read_caller_thread)?;
3341 let lower =
3342 &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3343 let ptr = func::validate_inbounds_dynamic(
3344 read_abi,
3345 lower.as_slice_mut(),
3346 &ValRaw::u32(read_address.try_into()?),
3347 )?;
3348 let ty = match read_payload_ty {
3349 Some(ty) => ty,
3350 None => bail_bug!("expected read payload type to be present"),
3351 };
3352 val.store(lower, *ty, ptr)?;
3353 store.set_thread(old_thread)?;
3354 }
3355 }
3356 (TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
3357 if write_length_in_bytes == 0 {
3358 return Ok(());
3359 }
3360 let write_payload_ty = match write_payload_ty {
3361 Some(ty) => ty,
3362 None => bail_bug!("expected write payload type to be present"),
3363 };
3364 let read_payload_ty = match read_payload_ty {
3365 Some(ty) => ty,
3366 None => bail_bug!("expected read payload type to be present"),
3367 };
3368 if flat_abi.is_some() {
3369 let store_opaque = store.store_opaque_mut();
3371
3372 assert_eq!(read_length_in_bytes, write_length_in_bytes);
3373
3374 if read_instance
3375 .options_memory(store_opaque, read_options)
3376 .as_ptr()
3377 == write_instance
3378 .options_memory(store_opaque, write_options)
3379 .as_ptr()
3380 {
3381 let memory = read_instance.options_memory_mut(store_opaque, read_options);
3382 memory.copy_within(
3383 write_address..write_address + write_length_in_bytes,
3384 read_address,
3385 );
3386 } else {
3387 let src = write_instance.options_memory(store_opaque, write_options)
3388 [write_address..][..write_length_in_bytes]
3389 .as_ptr();
3390 let dst = read_instance.options_memory_mut(store_opaque, read_options)
3391 [read_address..][..read_length_in_bytes]
3392 .as_mut_ptr();
3393
3394 unsafe {
3404 src.copy_to_nonoverlapping(dst, write_length_in_bytes);
3405 }
3406 }
3407 } else {
3408 let store_opaque = store.store_opaque_mut();
3409 let lift = &mut LiftContext::new(store_opaque, write_options, write_instance);
3410 let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3411 lift.consume_fuel_array(count, size_of::<Val>())?;
3412
3413 let values = (0..count)
3414 .map(|index| {
3415 let size = usize::try_from(write_abi.size32)?;
3416 Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
3417 })
3418 .collect::<Result<Vec<_>>>()?;
3419
3420 let id = TableId::<TransmitHandle>::new(rep);
3421 log::trace!("copy values {values:?} for {id:?}");
3422
3423 let old_thread = store.set_thread(read_caller_thread)?;
3427 let lower =
3428 &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3429 let mut ptr = read_address;
3430 for value in values {
3431 value.store(lower, *read_payload_ty, ptr)?;
3432 ptr += usize::try_from(read_abi.size32)?;
3433 }
3434 store.set_thread(old_thread)?;
3435 }
3436 }
3437 _ => bail_bug!("mismatched transmit types in copy"),
3438 }
3439
3440 Ok(())
3441 }
3442
3443 fn check_bounds(
3444 self,
3445 store: &StoreOpaque,
3446 options: OptionsIndex,
3447 ty: TransmitIndex,
3448 address: usize,
3449 count: usize,
3450 ) -> Result<()> {
3451 let types = self.id().get(store).component().types();
3452 let size = usize::try_from(
3453 match ty {
3454 TransmitIndex::Future(ty) => types[types[ty].ty]
3455 .payload
3456 .map(|ty| types.canonical_abi(&ty).size32),
3457 TransmitIndex::Stream(ty) => types[types[ty].ty]
3458 .payload
3459 .map(|ty| types.canonical_abi(&ty).size32),
3460 }
3461 .unwrap_or(0),
3462 )?;
3463
3464 if count > 0 && size > 0 {
3465 self.options_memory(store, options)
3466 .get(address..)
3467 .and_then(|b| b.get(..size.checked_mul(count)?))
3468 .map(drop)
3469 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3470 } else {
3471 Ok(())
3472 }
3473 }
3474
3475 pub(super) fn guest_write<T: 'static>(
3477 self,
3478 mut store: StoreContextMut<T>,
3479 caller: RuntimeComponentInstanceIndex,
3480 ty: TransmitIndex,
3481 options: OptionsIndex,
3482 flat_abi: Option<FlatAbi>,
3483 handle: u32,
3484 address: u32,
3485 count: u32,
3486 ) -> Result<ReturnCode> {
3487 let count = ItemCount::new(count)?;
3488
3489 if !self.options(store.0, options).async_ {
3490 store.0.check_blocking()?;
3494 }
3495
3496 let address = usize::try_from(address)?;
3497 self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3498 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3499 let TransmitLocalState::Write { done } = *state else {
3500 bail!(Trap::ConcurrentFutureStreamOp);
3501 };
3502
3503 if done {
3504 bail!("cannot write after being notified that the readable end dropped");
3505 }
3506
3507 *state = TransmitLocalState::Busy;
3508 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3509 let concurrent_state = store.0.concurrent_state_mut()?;
3510 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3511 let transmit = concurrent_state.get_mut(transmit_id)?;
3512 log::trace!(
3513 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3514 transmit.read
3515 );
3516
3517 if transmit.done {
3518 bail!("cannot write to future after previous write succeeded or readable end dropped");
3519 }
3520
3521 let new_state = if let ReadState::Dropped = &transmit.read {
3522 ReadState::Dropped
3523 } else {
3524 ReadState::Open
3525 };
3526
3527 let set_guest_ready = |me: &mut ConcurrentState| {
3528 let transmit = me.get_mut(transmit_id)?;
3529 if !matches!(&transmit.write, WriteState::Open) {
3530 bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3531 }
3532 transmit.write = WriteState::GuestReady {
3533 instance: self,
3534 caller,
3535 ty,
3536 flat_abi,
3537 options,
3538 address,
3539 count,
3540 handle,
3541 };
3542 Ok::<_, crate::Error>(())
3543 };
3544
3545 let mut result = match mem::replace(&mut transmit.read, new_state) {
3546 ReadState::GuestReady {
3547 ty: read_ty,
3548 flat_abi: read_flat_abi,
3549 options: read_options,
3550 address: read_address,
3551 count: read_count,
3552 handle: read_handle,
3553 instance: read_instance,
3554 caller_instance: read_caller_instance,
3555 caller_thread: read_caller_thread,
3556 } => {
3557 if flat_abi != read_flat_abi {
3558 bail_bug!("expected flat ABI calculations to be the same");
3559 }
3560
3561 if let TransmitIndex::Future(_) = ty {
3562 transmit.done = true;
3563 }
3564
3565 let write_complete = count == 0 || read_count > 0;
3587 let read_complete = count > 0;
3588 let read_buffer_remaining = count < read_count;
3589
3590 let read_handle_rep = transmit.read_handle.rep();
3591
3592 let count = count.min(read_count);
3593
3594 Instance::copy(
3595 store.as_context_mut(),
3596 flat_abi,
3597 self,
3598 caller,
3599 ty,
3600 options,
3601 address,
3602 read_instance,
3603 read_caller_instance,
3604 read_caller_thread,
3605 read_ty,
3606 read_options,
3607 read_address,
3608 count,
3609 rep,
3610 )?;
3611
3612 let instance = read_instance.id().get(store.0);
3613 let types = instance.component().types();
3614 let item_size = match read_ty.payload(types) {
3615 Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3616 None => 0,
3617 };
3618 let concurrent_state = store.0.concurrent_state_mut()?;
3619 if read_complete {
3620 let total = if let Some(Event::StreamRead {
3621 code: ReturnCode::Completed(old_total),
3622 ..
3623 }) = concurrent_state.take_event(read_handle_rep)?
3624 {
3625 count.add(old_total)?
3626 } else {
3627 count
3628 };
3629
3630 let code = ReturnCode::completed(ty.kind(), total);
3631
3632 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3633 }
3634
3635 if read_buffer_remaining || (count == 0 && read_count == 0) {
3642 let transmit = concurrent_state.get_mut(transmit_id)?;
3643 transmit.read = ReadState::GuestReady {
3644 ty: read_ty,
3645 flat_abi: read_flat_abi,
3646 options: read_options,
3647 address: read_address + (count.as_usize() * item_size),
3648 count: read_count.sub(count)?,
3649 handle: read_handle,
3650 instance: read_instance,
3651 caller_instance: read_caller_instance,
3652 caller_thread: read_caller_thread,
3653 };
3654 }
3655
3656 if write_complete {
3657 ReturnCode::completed(ty.kind(), count)
3658 } else {
3659 set_guest_ready(concurrent_state)?;
3660 ReturnCode::Blocked
3661 }
3662 }
3663
3664 ReadState::HostReady {
3665 consume,
3666 guest_offset,
3667 cancel,
3668 cancel_waker,
3669 } => {
3670 if cancel_waker.is_some() {
3671 bail_bug!("expected cancel_waker to be none");
3672 }
3673 if cancel {
3674 bail_bug!("expected cancel to be false");
3675 }
3676 if guest_offset != 0 {
3677 bail_bug!("expected guest_offset to be 0");
3678 }
3679
3680 if let TransmitIndex::Future(_) = ty {
3681 transmit.done = true;
3682 }
3683
3684 set_guest_ready(concurrent_state)?;
3685 self.consume(
3686 store.0,
3687 ty.kind(),
3688 transmit_id,
3689 consume,
3690 ItemCount::ZERO,
3691 false,
3692 )?
3693 }
3694
3695 ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3696
3697 ReadState::Open => {
3698 set_guest_ready(concurrent_state)?;
3699 ReturnCode::Blocked
3700 }
3701
3702 ReadState::Dropped => {
3703 if let TransmitIndex::Future(_) = ty {
3704 transmit.done = true;
3705 }
3706
3707 ReturnCode::Dropped(ItemCount::ZERO)
3708 }
3709 };
3710
3711 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3712 result = self.wait_for_write(store.0, transmit_handle)?;
3713 }
3714
3715 if result != ReturnCode::Blocked {
3716 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3717 TransmitLocalState::Write {
3718 done: matches!(result, ReturnCode::Dropped(_)),
3719 };
3720 }
3721
3722 log::trace!(
3723 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3724 );
3725
3726 Ok(result)
3727 }
3728
3729 pub(super) fn guest_read<T: 'static>(
3731 self,
3732 mut store: StoreContextMut<T>,
3733 caller_instance: RuntimeComponentInstanceIndex,
3734 ty: TransmitIndex,
3735 options: OptionsIndex,
3736 flat_abi: Option<FlatAbi>,
3737 handle: u32,
3738 address: u32,
3739 count: u32,
3740 ) -> Result<ReturnCode> {
3741 let count = ItemCount::new(count)?;
3742
3743 if !self.options(store.0, options).async_ {
3744 store.0.check_blocking()?;
3748 }
3749
3750 let address = usize::try_from(address)?;
3751 self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3752 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3753 let TransmitLocalState::Read { done } = *state else {
3754 bail!(Trap::ConcurrentFutureStreamOp);
3755 };
3756
3757 if done {
3758 bail!("cannot read after being notified that the writable end dropped");
3759 }
3760
3761 *state = TransmitLocalState::Busy;
3762 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3763 let caller_thread = store.0.current_guest_thread()?;
3764 let concurrent_state = store.0.concurrent_state_mut()?;
3765 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3766 let transmit = concurrent_state.get_mut(transmit_id)?;
3767 log::trace!(
3768 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3769 transmit.write
3770 );
3771
3772 if transmit.done {
3773 bail!("cannot read from future after previous read succeeded");
3774 }
3775
3776 let new_state = if let WriteState::Dropped = &transmit.write {
3777 WriteState::Dropped
3778 } else {
3779 WriteState::Open
3780 };
3781
3782 let set_guest_ready = |me: &mut ConcurrentState| {
3783 let transmit = me.get_mut(transmit_id)?;
3784 if !matches!(&transmit.read, ReadState::Open) {
3785 bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3786 }
3787 transmit.read = ReadState::GuestReady {
3788 ty,
3789 flat_abi,
3790 options,
3791 address,
3792 count,
3793 handle,
3794 instance: self,
3795 caller_instance,
3796 caller_thread,
3797 };
3798 Ok::<_, crate::Error>(())
3799 };
3800
3801 let mut result = match mem::replace(&mut transmit.write, new_state) {
3802 WriteState::GuestReady {
3803 instance: write_instance,
3804 ty: write_ty,
3805 flat_abi: write_flat_abi,
3806 options: write_options,
3807 address: write_address,
3808 count: write_count,
3809 handle: write_handle,
3810 caller: write_caller,
3811 } => {
3812 if flat_abi != write_flat_abi {
3813 bail_bug!("expected flat ABI calculations to be the same");
3814 }
3815
3816 if let TransmitIndex::Future(_) = ty {
3817 transmit.done = true;
3818 }
3819
3820 let write_handle_rep = transmit.write_handle.rep();
3821
3822 let write_complete = write_count == 0 || count > 0;
3827 let read_complete = write_count > 0;
3828 let write_buffer_remaining = count < write_count;
3829
3830 let count = count.min(write_count);
3831
3832 Instance::copy(
3833 store.as_context_mut(),
3834 flat_abi,
3835 write_instance,
3836 write_caller,
3837 write_ty,
3838 write_options,
3839 write_address,
3840 self,
3841 caller_instance,
3842 caller_thread,
3843 ty,
3844 options,
3845 address,
3846 count,
3847 rep,
3848 )?;
3849
3850 let instance = write_instance.id().get(store.0);
3851 let types = instance.component().types();
3852 let item_size = match write_ty.payload(types) {
3853 Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3854 None => 0,
3855 };
3856 let concurrent_state = store.0.concurrent_state_mut()?;
3857
3858 if write_complete {
3859 let total = if let Some(Event::StreamWrite {
3860 code: ReturnCode::Completed(old_total),
3861 ..
3862 }) = concurrent_state.take_event(write_handle_rep)?
3863 {
3864 count.add(old_total)?
3865 } else {
3866 count
3867 };
3868
3869 let code = ReturnCode::completed(ty.kind(), total);
3870
3871 concurrent_state.send_write_result(
3872 write_ty,
3873 transmit_id,
3874 write_handle,
3875 code,
3876 )?;
3877 }
3878
3879 if write_buffer_remaining {
3880 let transmit = concurrent_state.get_mut(transmit_id)?;
3881 transmit.write = WriteState::GuestReady {
3882 instance: write_instance,
3883 caller: write_caller,
3884 ty: write_ty,
3885 flat_abi: write_flat_abi,
3886 options: write_options,
3887 address: write_address + (count.as_usize() * item_size),
3888 count: write_count.sub(count)?,
3889 handle: write_handle,
3890 };
3891 }
3892
3893 if read_complete {
3894 ReturnCode::completed(ty.kind(), count)
3895 } else {
3896 set_guest_ready(concurrent_state)?;
3897 ReturnCode::Blocked
3898 }
3899 }
3900
3901 WriteState::HostReady {
3902 produce,
3903 try_into,
3904 guest_offset,
3905 cancel,
3906 cancel_waker,
3907 } => {
3908 if cancel_waker.is_some() {
3909 bail_bug!("expected cancel_waker to be none");
3910 }
3911 if cancel {
3912 bail_bug!("expected cancel to be false");
3913 }
3914 if guest_offset != 0 {
3915 bail_bug!("expected guest_offset to be 0");
3916 }
3917
3918 set_guest_ready(concurrent_state)?;
3919
3920 let code = self.produce(
3921 store.0,
3922 ty.kind(),
3923 transmit_id,
3924 produce,
3925 try_into,
3926 ItemCount::ZERO,
3927 false,
3928 )?;
3929
3930 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3931 store.0.concurrent_state_mut()?.get_mut(transmit_id)?.done = true;
3932 }
3933
3934 code
3935 }
3936
3937 WriteState::Open => {
3938 set_guest_ready(concurrent_state)?;
3939 ReturnCode::Blocked
3940 }
3941
3942 WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO),
3943 };
3944
3945 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3946 result = self.wait_for_read(store.0, transmit_handle)?;
3947 }
3948
3949 if result != ReturnCode::Blocked {
3950 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3951 TransmitLocalState::Read {
3952 done: matches!(
3953 (result, ty),
3954 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3955 ),
3956 };
3957 }
3958
3959 log::trace!(
3960 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3961 );
3962
3963 Ok(result)
3964 }
3965
3966 fn wait_for_write(
3967 self,
3968 store: &mut StoreOpaque,
3969 handle: TableId<TransmitHandle>,
3970 ) -> Result<ReturnCode> {
3971 let waitable = Waitable::Transmit(handle);
3972 store.wait_for_event(waitable)?;
3973 let event = waitable.take_event(store.concurrent_state_mut()?)?;
3974 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3975 event
3976 {
3977 waitable.on_delivery(store, self, event)?;
3978 Ok(code)
3979 } else {
3980 bail_bug!("expected either a stream or future write event")
3981 }
3982 }
3983
3984 fn cancel_write(
3986 self,
3987 store: &mut StoreOpaque,
3988 transmit_id: TableId<TransmitState>,
3989 async_: bool,
3990 ) -> Result<ReturnCode> {
3991 let state = store.concurrent_state_mut()?;
3992 let transmit = state.get_mut(transmit_id)?;
3993 log::trace!(
3994 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3995 transmit.read,
3996 transmit.write
3997 );
3998 let waitable = Waitable::Transmit(transmit.write_handle);
3999
4000 let code = if let Some(event) = waitable.take_event(state)? {
4001 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
4002 bail_bug!("expected either a stream or future write event")
4003 };
4004 waitable.on_delivery(store, self, event)?;
4005 match (code, event) {
4006 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
4007 ReturnCode::Cancelled(count)
4008 }
4009 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4010 _ => bail_bug!("unexpected code/event combo"),
4011 }
4012 } else if let ReadState::HostReady {
4013 cancel,
4014 cancel_waker,
4015 ..
4016 } = &mut state.get_mut(transmit_id)?.read
4017 {
4018 *cancel = true;
4019 if let Some(waker) = cancel_waker.take() {
4020 waker.wake();
4021 }
4022
4023 if async_ {
4024 ReturnCode::Blocked
4025 } else {
4026 let handle = store
4027 .concurrent_state_mut()?
4028 .get_mut(transmit_id)?
4029 .write_handle;
4030 self.wait_for_write(store, handle)?
4031 }
4032 } else {
4033 ReturnCode::Cancelled(ItemCount::ZERO)
4034 };
4035
4036 if !matches!(code, ReturnCode::Blocked) {
4037 let transmit = store.concurrent_state_mut()?.get_mut(transmit_id)?;
4038
4039 match &transmit.write {
4040 WriteState::GuestReady { .. } => {
4041 transmit.write = WriteState::Open;
4042 }
4043 WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
4044 WriteState::Open | WriteState::Dropped => {}
4045 }
4046 }
4047
4048 log::trace!("cancelled write {transmit_id:?}: {code:?}");
4049
4050 Ok(code)
4051 }
4052
4053 fn wait_for_read(
4054 self,
4055 store: &mut StoreOpaque,
4056 handle: TableId<TransmitHandle>,
4057 ) -> Result<ReturnCode> {
4058 let waitable = Waitable::Transmit(handle);
4059 store.wait_for_event(waitable)?;
4060 let event = waitable.take_event(store.concurrent_state_mut()?)?;
4061 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
4062 event
4063 {
4064 waitable.on_delivery(store, self, event)?;
4065 Ok(code)
4066 } else {
4067 bail_bug!("expected either a stream or future read event")
4068 }
4069 }
4070
4071 fn cancel_read(
4073 self,
4074 store: &mut StoreOpaque,
4075 transmit_id: TableId<TransmitState>,
4076 async_: bool,
4077 ) -> Result<ReturnCode> {
4078 let state = store.concurrent_state_mut()?;
4079 let transmit = state.get_mut(transmit_id)?;
4080 log::trace!(
4081 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4082 transmit.read,
4083 transmit.write
4084 );
4085
4086 let waitable = Waitable::Transmit(transmit.read_handle);
4087 let code = if let Some(event) = waitable.take_event(state)? {
4088 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4089 bail_bug!("expected either a stream or future read event")
4090 };
4091 waitable.on_delivery(store, self, event)?;
4092 match (code, event) {
4093 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4094 ReturnCode::Cancelled(count)
4095 }
4096 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4097 _ => bail_bug!("unexpected code/event combo"),
4098 }
4099 } else if let WriteState::HostReady {
4100 cancel,
4101 cancel_waker,
4102 ..
4103 } = &mut state.get_mut(transmit_id)?.write
4104 {
4105 *cancel = true;
4106 if let Some(waker) = cancel_waker.take() {
4107 waker.wake();
4108 }
4109
4110 if async_ {
4111 ReturnCode::Blocked
4112 } else {
4113 let handle = store
4114 .concurrent_state_mut()?
4115 .get_mut(transmit_id)?
4116 .read_handle;
4117 self.wait_for_read(store, handle)?
4118 }
4119 } else {
4120 ReturnCode::Cancelled(ItemCount::ZERO)
4121 };
4122
4123 if !matches!(code, ReturnCode::Blocked) {
4124 let transmit = store.concurrent_state_mut()?.get_mut(transmit_id)?;
4125
4126 match &transmit.read {
4127 ReadState::GuestReady { .. } => {
4128 transmit.read = ReadState::Open;
4129 }
4130 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4131 bail_bug!("support host read cancellation")
4132 }
4133 ReadState::Open | ReadState::Dropped => {}
4134 }
4135 }
4136
4137 log::trace!("cancelled read {transmit_id:?}: {code:?}");
4138
4139 Ok(code)
4140 }
4141
4142 fn guest_cancel_write(
4144 self,
4145 store: &mut StoreOpaque,
4146 ty: TransmitIndex,
4147 async_: bool,
4148 writer: u32,
4149 ) -> Result<ReturnCode> {
4150 if !async_ {
4151 store.check_blocking()?;
4155 }
4156
4157 let (rep, state) =
4158 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4159 let id = TableId::<TransmitHandle>::new(rep);
4160 log::trace!("guest cancel write {id:?} (handle {writer})");
4161 match state {
4162 TransmitLocalState::Write { .. } => {
4163 bail!("stream or future write cancelled when no write is pending")
4164 }
4165 TransmitLocalState::Read { .. } => {
4166 bail!("passed read end to `{{stream|future}}.cancel-write`")
4167 }
4168 TransmitLocalState::Busy => {}
4169 }
4170 let transmit_id = store.concurrent_state_mut()?.get_mut(id)?.state;
4171 let code = self.cancel_write(store, transmit_id, async_)?;
4172 if !matches!(code, ReturnCode::Blocked) {
4173 let state =
4174 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4175 .1;
4176 if let TransmitLocalState::Busy = state {
4177 *state = TransmitLocalState::Write { done: false };
4178 }
4179 }
4180 Ok(code)
4181 }
4182
4183 fn guest_cancel_read(
4185 self,
4186 store: &mut StoreOpaque,
4187 ty: TransmitIndex,
4188 async_: bool,
4189 reader: u32,
4190 ) -> Result<ReturnCode> {
4191 if !async_ {
4192 store.check_blocking()?;
4196 }
4197
4198 let (rep, state) =
4199 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4200 let id = TableId::<TransmitHandle>::new(rep);
4201 log::trace!("guest cancel read {id:?} (handle {reader})");
4202 match state {
4203 TransmitLocalState::Read { .. } => {
4204 bail!("stream or future read cancelled when no read is pending")
4205 }
4206 TransmitLocalState::Write { .. } => {
4207 bail!("passed write end to `{{stream|future}}.cancel-read`")
4208 }
4209 TransmitLocalState::Busy => {}
4210 }
4211 let transmit_id = store.concurrent_state_mut()?.get_mut(id)?.state;
4212 let code = self.cancel_read(store, transmit_id, async_)?;
4213 if !matches!(code, ReturnCode::Blocked) {
4214 let state =
4215 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4216 .1;
4217 if let TransmitLocalState::Busy = state {
4218 *state = TransmitLocalState::Read { done: false };
4219 }
4220 }
4221 Ok(code)
4222 }
4223
4224 fn guest_drop_readable(
4226 self,
4227 store: &mut StoreOpaque,
4228 ty: TransmitIndex,
4229 reader: u32,
4230 ) -> Result<()> {
4231 let table = self.id().get_mut(store).table_for_transmit(ty);
4232 let (rep, _is_done) = match ty {
4233 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4234 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4235 };
4236 let kind = match ty {
4237 TransmitIndex::Stream(_) => TransmitKind::Stream,
4238 TransmitIndex::Future(_) => TransmitKind::Future,
4239 };
4240 let id = TableId::<TransmitHandle>::new(rep);
4241 log::trace!("guest_drop_readable: drop reader {id:?}");
4242 store.host_drop_reader(id, kind)
4243 }
4244
4245 pub(crate) fn error_context_new(
4247 self,
4248 store: &mut StoreOpaque,
4249 ty: TypeComponentLocalErrorContextTableIndex,
4250 options: OptionsIndex,
4251 debug_msg_address: u32,
4252 debug_msg_len: u32,
4253 ) -> Result<u32> {
4254 let lift_ctx = &mut LiftContext::new(store, options, self);
4255 let debug_msg = String::linear_lift_from_flat(
4256 lift_ctx,
4257 InterfaceType::String,
4258 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4259 )?;
4260
4261 let err_ctx = ErrorContextState { debug_msg };
4263 let state = store.concurrent_state_mut()?;
4264 let table_id = state.push(err_ctx)?;
4265 let global_ref_count_idx =
4266 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4267
4268 let _ = state
4270 .global_error_context_ref_counts
4271 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4272
4273 let local_idx = self
4280 .id()
4281 .get_mut(store)
4282 .table_for_error_context(ty)
4283 .error_context_insert(table_id.rep())?;
4284
4285 Ok(local_idx)
4286 }
4287
4288 pub(super) fn error_context_debug_message<T>(
4290 self,
4291 store: StoreContextMut<T>,
4292 ty: TypeComponentLocalErrorContextTableIndex,
4293 options: OptionsIndex,
4294 err_ctx_handle: u32,
4295 debug_msg_address: u32,
4296 ) -> Result<()> {
4297 let handle_table_id_rep = self
4299 .id()
4300 .get_mut(store.0)
4301 .table_for_error_context(ty)
4302 .error_context_rep(err_ctx_handle)?;
4303
4304 let state = store.0.concurrent_state_mut()?;
4305 let ErrorContextState { debug_msg } =
4307 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4308 let debug_msg = debug_msg.clone();
4309
4310 let lower_cx = &mut LowerContext::new(store, options, self);
4311 let debug_msg_address = usize::try_from(debug_msg_address)?;
4312 let offset = lower_cx
4318 .as_slice_mut()
4319 .get(debug_msg_address..)
4320 .and_then(|b| b.get(..8))
4321 .map(|_| debug_msg_address)
4322 .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4323 debug_msg
4324 .as_str()
4325 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4326
4327 Ok(())
4328 }
4329
4330 pub(crate) fn future_cancel_read(
4332 self,
4333 store: &mut StoreOpaque,
4334 ty: TypeFutureTableIndex,
4335 async_: bool,
4336 reader: u32,
4337 ) -> Result<u32> {
4338 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4339 .map(|v| v.encode())
4340 }
4341
4342 pub(crate) fn future_cancel_write(
4344 self,
4345 store: &mut StoreOpaque,
4346 ty: TypeFutureTableIndex,
4347 async_: bool,
4348 writer: u32,
4349 ) -> Result<u32> {
4350 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4351 .map(|v| v.encode())
4352 }
4353
4354 pub(crate) fn stream_cancel_read(
4356 self,
4357 store: &mut StoreOpaque,
4358 ty: TypeStreamTableIndex,
4359 async_: bool,
4360 reader: u32,
4361 ) -> Result<u32> {
4362 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4363 .map(|v| v.encode())
4364 }
4365
4366 pub(crate) fn stream_cancel_write(
4368 self,
4369 store: &mut StoreOpaque,
4370 ty: TypeStreamTableIndex,
4371 async_: bool,
4372 writer: u32,
4373 ) -> Result<u32> {
4374 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4375 .map(|v| v.encode())
4376 }
4377
4378 pub(crate) fn future_drop_readable(
4380 self,
4381 store: &mut StoreOpaque,
4382 ty: TypeFutureTableIndex,
4383 reader: u32,
4384 ) -> Result<()> {
4385 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4386 }
4387
4388 pub(crate) fn stream_drop_readable(
4390 self,
4391 store: &mut StoreOpaque,
4392 ty: TypeStreamTableIndex,
4393 reader: u32,
4394 ) -> Result<()> {
4395 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4396 }
4397
4398 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4402 let (write, read) = store
4403 .concurrent_state_mut()?
4404 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4405
4406 let table = self.id().get_mut(store).table_for_transmit(ty);
4407 let (read_handle, write_handle) = match ty {
4408 TransmitIndex::Future(ty) => (
4409 table.future_insert_read(ty, read.rep())?,
4410 table.future_insert_write(ty, write.rep())?,
4411 ),
4412 TransmitIndex::Stream(ty) => (
4413 table.stream_insert_read(ty, read.rep())?,
4414 table.stream_insert_write(ty, write.rep())?,
4415 ),
4416 };
4417
4418 let state = store.concurrent_state_mut()?;
4419 state.get_mut(read)?.common.handle = Some(read_handle);
4420 state.get_mut(write)?.common.handle = Some(write_handle);
4421
4422 Ok(ResourcePair {
4423 write: write_handle,
4424 read: read_handle,
4425 })
4426 }
4427
4428 pub(crate) fn error_context_drop(
4430 self,
4431 store: &mut StoreOpaque,
4432 ty: TypeComponentLocalErrorContextTableIndex,
4433 error_context: u32,
4434 ) -> Result<()> {
4435 let instance = self.id().get_mut(store);
4436
4437 let local_handle_table = instance.table_for_error_context(ty);
4438
4439 let rep = local_handle_table.error_context_drop(error_context)?;
4440
4441 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4442
4443 let state = store.concurrent_state_mut()?;
4444 let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4445 .global_error_context_ref_counts
4446 .get_mut(&global_ref_count_idx)
4447 else {
4448 bail_bug!("retrieve concurrent state for error context during drop")
4449 };
4450
4451 if *global_ref_count < 1 {
4453 bail_bug!("ref count unexpectedly zero");
4454 }
4455 *global_ref_count -= 1;
4456 if *global_ref_count == 0 {
4457 state
4458 .global_error_context_ref_counts
4459 .remove(&global_ref_count_idx);
4460
4461 state
4462 .delete(TableId::<ErrorContextState>::new(rep))
4463 .context("deleting component-global error context data")?;
4464 }
4465
4466 Ok(())
4467 }
4468
4469 fn guest_transfer(
4472 self,
4473 store: &mut StoreOpaque,
4474 src_idx: u32,
4475 src: TransmitIndex,
4476 dst: TransmitIndex,
4477 ) -> Result<u32> {
4478 let id = self.lift_index_to_transmit(store, src, src_idx)?;
4479 self.lower_transmit_to_index(store, dst, id)
4480 }
4481
4482 fn lift_index_to_transmit(
4483 self,
4484 store: &mut StoreOpaque,
4485 ty: TransmitIndex,
4486 src_idx: u32,
4487 ) -> Result<TableId<TransmitHandle>> {
4488 let (state, _, _, instance) = store.lift_context_parts(self);
4489 lift_index_to_transmit(instance, state.concurrent_state_mut(), ty, src_idx)
4490 }
4491
4492 fn lower_transmit_to_index(
4493 self,
4494 store: &mut StoreOpaque,
4495 ty: TransmitIndex,
4496 id: TableId<TransmitHandle>,
4497 ) -> Result<u32> {
4498 let (state, _, _, instance) = store.lift_context_parts(self);
4499 lower_transmit_to_index(instance, state.concurrent_state_mut(), ty, id)
4500 }
4501
4502 pub(crate) fn future_new(
4504 self,
4505 store: &mut StoreOpaque,
4506 ty: TypeFutureTableIndex,
4507 ) -> Result<ResourcePair> {
4508 self.guest_new(store, TransmitIndex::Future(ty))
4509 }
4510
4511 pub(crate) fn stream_new(
4513 self,
4514 store: &mut StoreOpaque,
4515 ty: TypeStreamTableIndex,
4516 ) -> Result<ResourcePair> {
4517 self.guest_new(store, TransmitIndex::Stream(ty))
4518 }
4519
4520 pub(crate) fn future_transfer(
4523 self,
4524 store: &mut StoreOpaque,
4525 src_idx: u32,
4526 src: TypeFutureTableIndex,
4527 dst: TypeFutureTableIndex,
4528 ) -> Result<u32> {
4529 self.guest_transfer(
4530 store,
4531 src_idx,
4532 TransmitIndex::Future(src),
4533 TransmitIndex::Future(dst),
4534 )
4535 }
4536
4537 pub(crate) fn stream_transfer(
4540 self,
4541 store: &mut StoreOpaque,
4542 src_idx: u32,
4543 src: TypeStreamTableIndex,
4544 dst: TypeStreamTableIndex,
4545 ) -> Result<u32> {
4546 self.guest_transfer(
4547 store,
4548 src_idx,
4549 TransmitIndex::Stream(src),
4550 TransmitIndex::Stream(dst),
4551 )
4552 }
4553
4554 pub(crate) fn error_context_transfer(
4556 self,
4557 store: &mut StoreOpaque,
4558 src_idx: u32,
4559 src: TypeComponentLocalErrorContextTableIndex,
4560 dst: TypeComponentLocalErrorContextTableIndex,
4561 ) -> Result<u32> {
4562 let mut instance = self.id().get_mut(store);
4563 let rep = instance
4564 .as_mut()
4565 .table_for_error_context(src)
4566 .error_context_rep(src_idx)?;
4567 let dst_idx = instance
4568 .table_for_error_context(dst)
4569 .error_context_insert(rep)?;
4570
4571 let global_ref_count = store
4575 .concurrent_state_mut()?
4576 .global_error_context_ref_counts
4577 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4578 .context("global ref count present for existing (sub)component error context")?;
4579
4580 global_ref_count.0 = global_ref_count
4581 .0
4582 .checked_add(1)
4583 .ok_or_else(|| format_err!(Trap::ReferenceCountOverflow))?;
4584
4585 Ok(dst_idx)
4586 }
4587}
4588
4589fn lift_index_to_transmit(
4595 instance: Pin<&mut ComponentInstance>,
4596 concurrent_state: &mut ConcurrentState,
4597 ty: TransmitIndex,
4598 src_idx: u32,
4599) -> Result<TableId<TransmitHandle>> {
4600 let handle_table = instance.table_for_transmit(ty);
4601 let (rep, is_done) = match ty {
4602 TransmitIndex::Future(idx) => handle_table.future_remove_readable(idx, src_idx)?,
4603 TransmitIndex::Stream(idx) => handle_table.stream_remove_readable(idx, src_idx)?,
4604 };
4605 let desc = match ty {
4606 TransmitIndex::Future(_) => "future",
4607 TransmitIndex::Stream(_) => "stream",
4608 };
4609 if is_done {
4610 bail!("cannot lift {desc} after being notified that the writable end dropped");
4611 }
4612 let id = TableId::<TransmitHandle>::new(rep);
4613 let future = concurrent_state.get_mut(id)?;
4614 if future.common.set.is_some() {
4615 bail!("cannot lift {desc} while it's in a waitable set");
4616 }
4617 future.common.handle = None;
4618
4619 let state = future.state;
4620 if concurrent_state.get_mut(state)?.done {
4621 bail!("cannot lift {desc} after previous read succeeded");
4622 }
4623
4624 Ok(id)
4625}
4626
4627fn lower_transmit_to_index(
4630 instance: Pin<&mut ComponentInstance>,
4631 concurrent_state: &mut ConcurrentState,
4632 ty: TransmitIndex,
4633 id: TableId<TransmitHandle>,
4634) -> Result<u32> {
4635 let state = concurrent_state.get_mut(id)?.state;
4636 debug_assert_eq!(concurrent_state.get_mut(state)?.read_handle, id);
4637 let handle_table = instance.table_for_transmit(ty);
4638 let handle = match ty {
4639 TransmitIndex::Future(idx) => handle_table.future_insert_read(idx, id.rep()),
4640 TransmitIndex::Stream(idx) => handle_table.stream_insert_read(idx, id.rep()),
4641 }?;
4642 concurrent_state.get_mut(id)?.common.handle = Some(handle);
4643 Ok(handle)
4644}
4645
4646impl ComponentInstance {
4647 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4648 let (states, types) = self.instance_states();
4649 let runtime_instance = match ty {
4650 TransmitIndex::Stream(ty) => types[ty].instance,
4651 TransmitIndex::Future(ty) => types[ty].instance,
4652 };
4653 states[runtime_instance].handle_table()
4654 }
4655
4656 fn table_for_error_context(
4657 self: Pin<&mut Self>,
4658 ty: TypeComponentLocalErrorContextTableIndex,
4659 ) -> &mut HandleTable {
4660 let (states, types) = self.instance_states();
4661 let runtime_instance = types[ty].instance;
4662 states[runtime_instance].handle_table()
4663 }
4664
4665 fn get_mut_by_index(
4666 self: Pin<&mut Self>,
4667 ty: TransmitIndex,
4668 index: u32,
4669 ) -> Result<(u32, &mut TransmitLocalState)> {
4670 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4671 }
4672}
4673
4674impl ConcurrentState {
4675 fn send_write_result(
4676 &mut self,
4677 ty: TransmitIndex,
4678 id: TableId<TransmitState>,
4679 handle: u32,
4680 code: ReturnCode,
4681 ) -> Result<()> {
4682 let write_handle = self.get_mut(id)?.write_handle.rep();
4683 self.set_event(
4684 write_handle,
4685 match ty {
4686 TransmitIndex::Future(ty) => Event::FutureWrite {
4687 code,
4688 pending: Some((ty, handle)),
4689 },
4690 TransmitIndex::Stream(ty) => Event::StreamWrite {
4691 code,
4692 pending: Some((ty, handle)),
4693 },
4694 },
4695 )
4696 }
4697
4698 fn send_read_result(
4699 &mut self,
4700 ty: TransmitIndex,
4701 id: TableId<TransmitState>,
4702 handle: u32,
4703 code: ReturnCode,
4704 ) -> Result<()> {
4705 let read_handle = self.get_mut(id)?.read_handle.rep();
4706 self.set_event(
4707 read_handle,
4708 match ty {
4709 TransmitIndex::Future(ty) => Event::FutureRead {
4710 code,
4711 pending: Some((ty, handle)),
4712 },
4713 TransmitIndex::Stream(ty) => Event::StreamRead {
4714 code,
4715 pending: Some((ty, handle)),
4716 },
4717 },
4718 )
4719 }
4720
4721 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4722 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4723 }
4724
4725 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4726 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4727 }
4728
4729 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4740 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4741
4742 fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4743 let (ReturnCode::Completed(count)
4744 | ReturnCode::Dropped(count)
4745 | ReturnCode::Cancelled(count)) = old
4746 else {
4747 bail_bug!("unexpected old return code")
4748 };
4749
4750 Ok(match new {
4751 ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count),
4752 ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count),
4753 _ => bail_bug!("unexpected new return code"),
4754 })
4755 }
4756
4757 let event = match (waitable.take_event(self)?, event) {
4758 (None, _) => event,
4759 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4760 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4761 (
4762 Some(Event::StreamWrite {
4763 code: old_code,
4764 pending: old_pending,
4765 }),
4766 Event::StreamWrite { code, pending },
4767 ) => Event::StreamWrite {
4768 code: update_code(old_code, code)?,
4769 pending: old_pending.or(pending),
4770 },
4771 (
4772 Some(Event::StreamRead {
4773 code: old_code,
4774 pending: old_pending,
4775 }),
4776 Event::StreamRead { code, pending },
4777 ) => Event::StreamRead {
4778 code: update_code(old_code, code)?,
4779 pending: old_pending.or(pending),
4780 },
4781 _ => bail_bug!("unexpected event combination"),
4782 };
4783
4784 waitable.set_event(self, Some(event))
4785 }
4786
4787 fn new_transmit(
4790 &mut self,
4791 origin: TransmitOrigin,
4792 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4793 let state_id = self.push(TransmitState::new(origin))?;
4794
4795 let write = self.push(TransmitHandle::new(state_id))?;
4796 let read = self.push(TransmitHandle::new(state_id))?;
4797
4798 let state = self.get_mut(state_id)?;
4799 state.write_handle = write;
4800 state.read_handle = read;
4801
4802 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4803
4804 Ok((write, read))
4805 }
4806
4807 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4809 let state = self.delete(state_id)?;
4810 self.delete(state.write_handle)?;
4811 self.delete(state.read_handle)?;
4812
4813 log::trace!(
4814 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4815 state.write_handle,
4816 state.read_handle,
4817 );
4818
4819 Ok(())
4820 }
4821}
4822
4823pub(crate) struct ResourcePair {
4824 pub(crate) write: u32,
4825 pub(crate) read: u32,
4826}
4827
4828impl Waitable {
4829 pub(super) fn on_delivery(
4832 &self,
4833 store: &mut StoreOpaque,
4834 instance: Instance,
4835 event: Event,
4836 ) -> Result<()> {
4837 let instance = instance.id().get_mut(store);
4838 let (rep, state, code) = match event {
4839 Event::FutureRead {
4840 pending: Some((ty, handle)),
4841 code,
4842 }
4843 | Event::FutureWrite {
4844 pending: Some((ty, handle)),
4845 code,
4846 } => {
4847 let runtime_instance = instance.component().types()[ty].instance;
4848 let (rep, state) = instance.instance_states().0[runtime_instance]
4849 .handle_table()
4850 .future_rep(ty, handle)?;
4851 (rep, state, code)
4852 }
4853 Event::StreamRead {
4854 pending: Some((ty, handle)),
4855 code,
4856 }
4857 | Event::StreamWrite {
4858 pending: Some((ty, handle)),
4859 code,
4860 } => {
4861 let runtime_instance = instance.component().types()[ty].instance;
4862 let (rep, state) = instance.instance_states().0[runtime_instance]
4863 .handle_table()
4864 .stream_rep(ty, handle)?;
4865 (rep, state, code)
4866 }
4867 _ => return Ok(()),
4868 };
4869 if rep != self.rep() {
4870 bail_bug!("unexpected rep mismatch");
4871 }
4872 if *state != TransmitLocalState::Busy {
4873 bail_bug!("expected state to be busy");
4874 }
4875 let done = matches!(code, ReturnCode::Dropped(_));
4876 *state = match event {
4877 Event::FutureRead { .. } | Event::StreamRead { .. } => {
4878 TransmitLocalState::Read { done }
4879 }
4880 Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
4881 TransmitLocalState::Write { done }
4882 }
4883 _ => bail_bug!("unexpected event for stream"),
4884 };
4885
4886 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4887 let state = store.concurrent_state_mut()?;
4888 let transmit_id = state.get_mut(transmit_handle)?.state;
4889 let transmit = state.get_mut(transmit_id)?;
4890
4891 match event {
4892 Event::StreamRead { .. } => {
4893 transmit.read = ReadState::Open;
4894 }
4895 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4896 _ => {}
4897 }
4898 Ok(())
4899 }
4900}
4901
4902fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
4906 matches!(
4907 ty,
4908 None | Some(
4909 InterfaceType::S8
4910 | InterfaceType::U8
4911 | InterfaceType::S16
4912 | InterfaceType::U16
4913 | InterfaceType::S32
4914 | InterfaceType::U32
4915 | InterfaceType::S64
4916 | InterfaceType::U64
4917 | InterfaceType::Float32
4918 | InterfaceType::Float64
4919 )
4920 )
4921}
4922
4923struct LockedState<T> {
4927 inner: TryMutex<Option<T>>,
4928}
4929
4930impl<T> LockedState<T> {
4931 fn new(value: T) -> Self {
4933 Self {
4934 inner: TryMutex::new(Some(value)),
4935 }
4936 }
4937
4938 fn try_lock(&self) -> Result<TryMutexGuard<'_, Option<T>>> {
4947 match self.inner.try_lock() {
4948 Some(lock) => Ok(lock),
4949 None => bail_bug!("should not have contention on state lock"),
4950 }
4951 }
4952
4953 fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4960 let result = self.try_lock()?.take();
4961 match result {
4962 Some(result) => Ok(LockedStateGuard {
4963 value: ManuallyDrop::new(result),
4964 state: self,
4965 }),
4966 None => bail_bug!("lock value unexpectedly missing"),
4967 }
4968 }
4969
4970 fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4979 let mut inner = self.try_lock()?;
4980 match &mut *inner {
4981 Some(state) => Ok(f(state)),
4982 None => bail_bug!("lock value unexpectedly missing"),
4983 }
4984 }
4985}
4986
4987struct LockedStateGuard<'a, T> {
4990 value: ManuallyDrop<T>,
4991 state: &'a LockedState<T>,
4992}
4993
4994impl<T> Deref for LockedStateGuard<'_, T> {
4995 type Target = T;
4996
4997 fn deref(&self) -> &T {
4998 &self.value
4999 }
5000}
5001
5002impl<T> DerefMut for LockedStateGuard<'_, T> {
5003 fn deref_mut(&mut self) -> &mut T {
5004 &mut self.value
5005 }
5006}
5007
5008impl<T> Drop for LockedStateGuard<'_, T> {
5009 fn drop(&mut self) {
5010 let value = unsafe { ManuallyDrop::take(&mut self.value) };
5015
5016 if let Ok(mut lock) = self.state.try_lock() {
5020 *lock = Some(value);
5021 }
5022 }
5023}
5024
5025#[cfg(test)]
5026mod tests {
5027 use super::*;
5028 use crate::{Engine, Store};
5029 use core::future::pending;
5030 use core::pin::pin;
5031 use std::sync::LazyLock;
5032
5033 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
5034
5035 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
5036 where
5037 T: FutureProducer<()>,
5038 {
5039 rx.poll_produce(
5040 &mut Context::from_waker(Waker::noop()),
5041 Store::new(&ENGINE, ()).as_context_mut(),
5042 finish,
5043 )
5044 }
5045
5046 #[test]
5047 fn future_producer() {
5048 let mut fut = pin!(async { crate::error::Ok(()) });
5049 assert!(matches!(
5050 poll_future_producer(fut.as_mut(), false),
5051 Poll::Ready(Ok(Some(()))),
5052 ));
5053
5054 let mut fut = pin!(async { crate::error::Ok(()) });
5055 assert!(matches!(
5056 poll_future_producer(fut.as_mut(), true),
5057 Poll::Ready(Ok(Some(()))),
5058 ));
5059
5060 let mut fut = pin!(pending::<Result<()>>());
5061 assert!(matches!(
5062 poll_future_producer(fut.as_mut(), false),
5063 Poll::Pending,
5064 ));
5065 assert!(matches!(
5066 poll_future_producer(fut.as_mut(), true),
5067 Poll::Ready(Ok(None)),
5068 ));
5069
5070 let (tx, rx) = oneshot::channel();
5071 let mut rx = pin!(rx);
5072 assert!(matches!(
5073 poll_future_producer(rx.as_mut(), false),
5074 Poll::Pending,
5075 ));
5076 assert!(matches!(
5077 poll_future_producer(rx.as_mut(), true),
5078 Poll::Ready(Ok(None)),
5079 ));
5080 tx.send(()).unwrap();
5081 assert!(matches!(
5082 poll_future_producer(rx.as_mut(), true),
5083 Poll::Ready(Ok(Some(()))),
5084 ));
5085
5086 let (tx, rx) = oneshot::channel();
5087 let mut rx = pin!(rx);
5088 tx.send(()).unwrap();
5089 assert!(matches!(
5090 poll_future_producer(rx.as_mut(), false),
5091 Poll::Ready(Ok(Some(()))),
5092 ));
5093
5094 let (tx, rx) = oneshot::channel::<()>();
5095 let mut rx = pin!(rx);
5096 drop(tx);
5097 assert!(matches!(
5098 poll_future_producer(rx.as_mut(), false),
5099 Poll::Ready(Err(..)),
5100 ));
5101
5102 let (tx, rx) = oneshot::channel::<()>();
5103 let mut rx = pin!(rx);
5104 drop(tx);
5105 assert!(matches!(
5106 poll_future_producer(rx.as_mut(), true),
5107 Poll::Ready(Err(..)),
5108 ));
5109 }
5110}