1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::prelude::*;
13use crate::store::{StoreOpaque, StoreToken};
14use crate::try_mutex::{TryMutex, TryMutexGuard};
15use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
16use crate::vm::{AlwaysMut, VMStore};
17use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
18use crate::{
19 Error, Result, Trap, bail, bail_bug, ensure,
20 error::{Context as _, format_err},
21};
22use alloc::sync::Arc;
23use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
24use core::any::{Any, TypeId};
25use core::fmt;
26use core::future;
27use core::iter;
28use core::marker::PhantomData;
29use core::mem::{self, ManuallyDrop, MaybeUninit};
30use core::ops::{Deref, DerefMut};
31use core::pin::Pin;
32use core::task::{Context, Poll, Waker, ready};
33use futures::channel::oneshot;
34use futures::{FutureExt as _, stream};
35use wasmtime_environ::component::{
36 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
37 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
38 TypeFutureTableIndex, TypeStreamTableIndex,
39};
40
41pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
42
43mod buffers;
44
45#[derive(Copy, Clone, Debug)]
48pub enum TransmitKind {
49 Stream,
50 Future,
51}
52
53#[derive(Copy, Clone, Debug, PartialEq)]
55pub enum ReturnCode {
56 Blocked,
57 Completed(ItemCount),
58 Dropped(ItemCount),
59 Cancelled(ItemCount),
60}
61
62impl ReturnCode {
63 pub fn encode(&self) -> u32 {
68 const BLOCKED: u32 = 0xffff_ffff;
69 const COMPLETED: u32 = 0x0;
70 const DROPPED: u32 = 0x1;
71 const CANCELLED: u32 = 0x2;
72 match self {
73 ReturnCode::Blocked => BLOCKED,
74 ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED,
75 ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED,
76 ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED,
77 }
78 }
79
80 fn completed(kind: TransmitKind, count: ItemCount) -> Self {
83 Self::Completed(if let TransmitKind::Future = kind {
84 ItemCount::ZERO
85 } else {
86 count
87 })
88 }
89}
90
91#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
98#[repr(transparent)]
99pub struct ItemCount {
100 raw: u32,
101}
102
103impl ItemCount {
104 const MAX: u32 = 1 << 28;
105 const ZERO: ItemCount = ItemCount { raw: 0 };
106
107 fn new(count: u32) -> Result<Self, Trap> {
110 if count < Self::MAX {
111 Ok(Self { raw: count })
112 } else {
113 Err(Trap::StreamOpTooBig)
114 }
115 }
116
117 fn new_usize(count: usize) -> Result<Self, Trap> {
119 let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?;
120 Self::new(count)
121 }
122
123 fn as_u32(&self) -> u32 {
124 self.raw
125 }
126
127 fn as_usize(&self) -> usize {
128 usize::try_from(self.raw).unwrap()
129 }
130
131 fn inc(&mut self, amt: usize) -> Result<(), Trap> {
134 let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?;
135 let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?;
136 if new_raw < Self::MAX {
137 self.raw = new_raw;
138 Ok(())
139 } else {
140 Err(Trap::StreamOpTooBig)
141 }
142 }
143
144 fn add(&self, other: ItemCount) -> Result<ItemCount> {
150 match self.raw.checked_add(other.raw) {
151 Some(raw) => Ok(ItemCount::new(raw)?),
152 None => bail_bug!("overflow in `ItemCount::add`"),
153 }
154 }
155
156 fn sub(&self, other: ItemCount) -> Result<ItemCount> {
161 match self.raw.checked_sub(other.raw) {
162 Some(raw) => Ok(ItemCount { raw }),
163 None => bail_bug!("underflow in `ItemCount::sub`"),
164 }
165 }
166}
167
168impl fmt::Display for ItemCount {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 self.raw.fmt(f)
171 }
172}
173
174impl fmt::Debug for ItemCount {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 self.raw.fmt(f)
177 }
178}
179
180impl PartialEq<u32> for ItemCount {
181 fn eq(&self, other: &u32) -> bool {
182 self.raw == *other
183 }
184}
185
186impl PartialOrd<u32> for ItemCount {
187 fn partial_cmp(&self, other: &u32) -> Option<core::cmp::Ordering> {
188 self.raw.partial_cmp(other)
189 }
190}
191
192#[derive(Copy, Clone, Debug)]
197pub enum TransmitIndex {
198 Stream(TypeStreamTableIndex),
199 Future(TypeFutureTableIndex),
200}
201
202impl TransmitIndex {
203 pub fn kind(&self) -> TransmitKind {
204 match self {
205 TransmitIndex::Stream(_) => TransmitKind::Stream,
206 TransmitIndex::Future(_) => TransmitKind::Future,
207 }
208 }
209
210 fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
213 match self {
214 TransmitIndex::Stream(i) => {
215 let ty = types[*i].ty;
216 types[ty].payload.as_ref()
217 }
218 TransmitIndex::Future(i) => {
219 let ty = types[*i].ty;
220 types[ty].payload.as_ref()
221 }
222 }
223 }
224}
225
226fn get_mut_by_index_from(
229 handle_table: &mut HandleTable,
230 ty: TransmitIndex,
231 index: u32,
232) -> Result<(u32, &mut TransmitLocalState)> {
233 match ty {
234 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
235 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
236 }
237}
238
239fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
240 mut store: StoreContextMut<U>,
241 instance: Instance,
242 caller_thread: QualifiedThreadId,
243 options: OptionsIndex,
244 ty: TransmitIndex,
245 address: usize,
246 count: usize,
247 buffer: &mut B,
248) -> Result<()> {
249 let count = buffer.remaining().len().min(count);
250
251 let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
255 let old_thread = store.0.set_thread(caller_thread)?;
256 (
257 &mut LowerContext::new(store.as_context_mut(), options, instance),
258 Some(old_thread),
259 )
260 } else {
261 (
262 &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
263 None,
264 )
265 };
266
267 if address % usize::try_from(T::ALIGN32)? != 0 {
268 bail!("read pointer not aligned");
269 }
270 lower
271 .as_slice_mut()
272 .get_mut(address..)
273 .and_then(|b| b.get_mut(..T::SIZE32 * count))
274 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
275
276 if let Some(ty) = ty.payload(lower.types) {
277 T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
278 }
279
280 if let Some(old_thread) = old_thread {
281 store.0.set_thread(old_thread)?;
282 }
283
284 buffer.skip(count);
285
286 Ok(())
287}
288
289fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
290 lift: &mut LiftContext<'_>,
291 ty: Option<InterfaceType>,
292 buffer: &mut B,
293 address: usize,
294 count: usize,
295) -> Result<()> {
296 let count = count.min(buffer.remaining_capacity());
297 if T::IS_RUST_UNIT_TYPE {
298 buffer.extend(
302 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
303 )
304 } else {
305 let ty = match ty {
306 Some(ty) => ty,
307 None => bail_bug!("type required for non-unit lift"),
308 };
309 if address % usize::try_from(T::ALIGN32)? != 0 {
310 bail!("write pointer not aligned");
311 }
312 lift.memory()
313 .get(address..)
314 .and_then(|b| b.get(..T::SIZE32 * count))
315 .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
316
317 let list = &WasmList::new(address, count, lift, ty)?;
318 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
319 }
320 Ok(())
321}
322
323#[derive(Debug, PartialEq, Eq, PartialOrd)]
325pub(super) struct ErrorContextState {
326 pub(crate) debug_msg: String,
328}
329
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub(super) struct FlatAbi {
334 pub(super) size: u32,
335 pub(super) align: u32,
336}
337
338struct HostBuffer<'a> {
339 dst: &'a mut Vec<u8>,
340 marked_written: &'a mut usize,
341}
342
343impl HostBuffer<'_> {
344 fn reborrow(&mut self) -> HostBuffer<'_> {
345 HostBuffer {
346 dst: &mut *self.dst,
347 marked_written: &mut *self.marked_written,
348 }
349 }
350}
351
352pub struct Destination<'a, T, B> {
354 id: TableId<TransmitState>,
355 buffer: &'a mut B,
356 host_buffer: Option<HostBuffer<'a>>,
357 _phantom: PhantomData<fn() -> T>,
358}
359
360impl<'a, T, B> Destination<'a, T, B> {
361 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
363 Destination {
364 id: self.id,
365 buffer: &mut *self.buffer,
366 host_buffer: self.host_buffer.as_mut().map(|b| b.reborrow()),
367 _phantom: PhantomData,
368 }
369 }
370
371 pub fn take_buffer(&mut self) -> B
377 where
378 B: Default,
379 {
380 mem::take(self.buffer)
381 }
382
383 pub fn set_buffer(&mut self, buffer: B) {
393 *self.buffer = buffer;
394 }
395
396 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
413 self.remaining_(store.as_context_mut().0).unwrap()
417 }
418
419 fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
420 let transmit = store.concurrent_state_mut().get_mut(self.id)?;
421
422 if let &ReadState::GuestReady { count, .. } = &transmit.read {
423 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
424 bail_bug!("expected WriteState::HostReady")
425 };
426
427 Ok(Some(count.as_usize() - guest_offset.as_usize()))
428 } else {
429 Ok(None)
430 }
431 }
432}
433
434impl<'a, B> Destination<'a, u8, B> {
435 pub fn as_direct<D>(
446 mut self,
447 store: StoreContextMut<'a, D>,
448 capacity: usize,
449 ) -> DirectDestination<'a, D> {
450 if let Some(buffer) = &mut self.host_buffer {
451 *buffer.marked_written = 0;
452 buffer.dst.resize(capacity, 0);
453 }
454
455 DirectDestination {
456 id: self.id,
457 host_buffer: self.host_buffer,
458 store,
459 }
460 }
461}
462
463pub struct DirectDestination<'a, D: 'static> {
466 id: TableId<TransmitState>,
467 host_buffer: Option<HostBuffer<'a>>,
468 store: StoreContextMut<'a, D>,
469}
470
471#[cfg(feature = "std")]
472impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
473 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
474 let rem = self.remaining();
475 let n = rem.len().min(buf.len());
476 rem[..n].copy_from_slice(&buf[..n]);
477 self.mark_written(n);
478 Ok(n)
479 }
480
481 fn flush(&mut self) -> std::io::Result<()> {
482 Ok(())
483 }
484}
485
486impl<D: 'static> DirectDestination<'_, D> {
487 pub fn remaining(&mut self) -> &mut [u8] {
489 self.remaining_().unwrap()
493 }
494
495 fn remaining_(&mut self) -> Result<&mut [u8]> {
496 if let Some(buffer) = self.host_buffer.as_mut() {
497 return Ok(buffer.dst);
498 }
499 let transmit = self
500 .store
501 .as_context_mut()
502 .0
503 .concurrent_state_mut()
504 .get_mut(self.id)?;
505
506 let &ReadState::GuestReady {
507 address,
508 count,
509 options,
510 instance,
511 ..
512 } = &transmit.read
513 else {
514 bail_bug!("expected ReadState::GuestReady")
515 };
516
517 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
518 bail_bug!("expected WriteState::HostReady")
519 };
520
521 let memory = instance
522 .options_memory_mut(self.store.0, options)
523 .get_mut((address + guest_offset.as_usize())..)
524 .and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize())));
525 match memory {
526 Some(memory) => Ok(memory),
527 None => bail_bug!("guest buffer unexpectedly out of bounds"),
528 }
529 }
530
531 pub fn mark_written(&mut self, count: usize) {
538 self.mark_written_(count).unwrap()
542 }
543
544 fn mark_written_(&mut self, count: usize) -> Result<()> {
545 if let Some(buffer) = self.host_buffer.as_mut() {
546 *buffer.marked_written = buffer.marked_written.checked_add(count).unwrap();
549 } else {
550 let transmit = self
551 .store
552 .as_context_mut()
553 .0
554 .concurrent_state_mut()
555 .get_mut(self.id)?;
556
557 let ReadState::GuestReady {
558 count: read_count, ..
559 } = &transmit.read
560 else {
561 bail_bug!("expected ReadState::GuestReady")
562 };
563
564 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
565 bail_bug!("expected WriteState::HostReady");
566 };
567
568 if guest_offset.as_usize() + count > read_count.as_usize() {
569 panic!(
572 "write count ({count}) must be less than or equal to read count ({read_count})"
573 )
574 } else {
575 guest_offset.inc(count)?;
576 }
577 }
578 Ok(())
579 }
580}
581
582#[derive(Copy, Clone, Debug)]
584pub enum StreamResult {
585 Completed,
588 Cancelled,
593 Dropped,
596}
597
598pub trait StreamProducer<D>: Send + 'static {
600 type Item;
602
603 type Buffer: WriteBuffer<Self::Item> + Default;
605
606 fn poll_produce<'a>(
742 self: Pin<&mut Self>,
743 cx: &mut Context<'_>,
744 store: StoreContextMut<'a, D>,
745 destination: Destination<'a, Self::Item, Self::Buffer>,
746 finish: bool,
747 ) -> Poll<Result<StreamResult>>;
748
749 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
755 Err(me)
756 }
757}
758
759impl<T, D> StreamProducer<D> for iter::Empty<T>
760where
761 T: Send + Sync + 'static,
762{
763 type Item = T;
764 type Buffer = Option<Self::Item>;
765
766 fn poll_produce<'a>(
767 self: Pin<&mut Self>,
768 _: &mut Context<'_>,
769 _: StoreContextMut<'a, D>,
770 _: Destination<'a, Self::Item, Self::Buffer>,
771 _: bool,
772 ) -> Poll<Result<StreamResult>> {
773 Poll::Ready(Ok(StreamResult::Dropped))
774 }
775}
776
777impl<T, D> StreamProducer<D> for stream::Empty<T>
778where
779 T: Send + Sync + 'static,
780{
781 type Item = T;
782 type Buffer = Option<Self::Item>;
783
784 fn poll_produce<'a>(
785 self: Pin<&mut Self>,
786 _: &mut Context<'_>,
787 _: StoreContextMut<'a, D>,
788 _: Destination<'a, Self::Item, Self::Buffer>,
789 _: bool,
790 ) -> Poll<Result<StreamResult>> {
791 Poll::Ready(Ok(StreamResult::Dropped))
792 }
793}
794
795impl<T, D> StreamProducer<D> for Vec<T>
796where
797 T: Unpin + Send + Sync + 'static,
798{
799 type Item = T;
800 type Buffer = VecBuffer<T>;
801
802 fn poll_produce<'a>(
803 self: Pin<&mut Self>,
804 _: &mut Context<'_>,
805 _: StoreContextMut<'a, D>,
806 mut dst: Destination<'a, Self::Item, Self::Buffer>,
807 _: bool,
808 ) -> Poll<Result<StreamResult>> {
809 dst.set_buffer(mem::take(self.get_mut()).into());
810 Poll::Ready(Ok(StreamResult::Dropped))
811 }
812}
813
814impl<T, D> StreamProducer<D> for Box<[T]>
815where
816 T: Unpin + Send + Sync + 'static,
817{
818 type Item = T;
819 type Buffer = VecBuffer<T>;
820
821 fn poll_produce<'a>(
822 self: Pin<&mut Self>,
823 _: &mut Context<'_>,
824 _: StoreContextMut<'a, D>,
825 mut dst: Destination<'a, Self::Item, Self::Buffer>,
826 _: bool,
827 ) -> Poll<Result<StreamResult>> {
828 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
829 Poll::Ready(Ok(StreamResult::Dropped))
830 }
831}
832
833#[cfg(feature = "component-model-bytes")]
834impl<D> StreamProducer<D> for bytes::Bytes {
835 type Item = u8;
836 type Buffer = Self;
837
838 fn poll_produce<'a>(
839 self: Pin<&mut Self>,
840 _: &mut Context<'_>,
841 _store: StoreContextMut<'a, D>,
842 mut dst: Destination<'a, Self::Item, Self::Buffer>,
843 _: bool,
844 ) -> Poll<Result<StreamResult>> {
845 dst.set_buffer(mem::take(self.get_mut()));
846 Poll::Ready(Ok(StreamResult::Dropped))
847 }
848}
849
850#[cfg(feature = "component-model-bytes")]
851impl<D> StreamProducer<D> for bytes::BytesMut {
852 type Item = u8;
853 type Buffer = Self;
854
855 fn poll_produce<'a>(
856 self: Pin<&mut Self>,
857 _: &mut Context<'_>,
858 _store: StoreContextMut<'a, D>,
859 mut dst: Destination<'a, Self::Item, Self::Buffer>,
860 _: bool,
861 ) -> Poll<Result<StreamResult>> {
862 dst.set_buffer(mem::take(self.get_mut()));
863 Poll::Ready(Ok(StreamResult::Dropped))
864 }
865}
866
867pub struct Source<'a, T> {
869 id: TableId<TransmitState>,
870 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
871}
872
873impl<'a, T> Source<'a, T> {
874 pub fn reborrow(&mut self) -> Source<'_, T> {
876 Source {
877 id: self.id,
878 host_buffer: self.host_buffer.as_deref_mut(),
879 }
880 }
881
882 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
884 where
885 T: func::Lift + 'static,
886 B: ReadBuffer<T>,
887 {
888 if let Some(input) = &mut self.host_buffer {
889 let count = input.remaining().len().min(buffer.remaining_capacity());
890 buffer.move_from(*input, count);
891 } else {
892 let store = store.as_context_mut();
893 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
894
895 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
896 bail_bug!("expected ReadState::HostReady");
897 };
898
899 let &WriteState::GuestReady {
900 ty,
901 address,
902 count,
903 options,
904 instance,
905 ..
906 } = &transmit.write
907 else {
908 bail_bug!("expected WriteState::GuestReady");
909 };
910
911 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
912 let ty = ty.payload(cx.types);
913 let old_remaining = buffer.remaining_capacity();
914 lift::<T, B>(
915 cx,
916 ty.copied(),
917 buffer,
918 address + (T::SIZE32 * guest_offset.as_usize()),
919 count.as_usize() - guest_offset.as_usize(),
920 )?;
921
922 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
923
924 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
925 bail_bug!("expected ReadState::HostReady");
926 };
927
928 guest_offset.inc(old_remaining - buffer.remaining_capacity())?;
929 }
930
931 Ok(())
932 }
933
934 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
937 where
938 T: 'static,
939 {
940 self.remaining_(store.as_context_mut().0).unwrap()
944 }
945
946 fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
947 where
948 T: 'static,
949 {
950 let transmit = store.concurrent_state_mut().get_mut(self.id)?;
951
952 if let &WriteState::GuestReady { count, .. } = &transmit.write {
953 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
954 bail_bug!("expected ReadState::HostReady")
955 };
956
957 Ok(count.as_usize() - guest_offset.as_usize())
958 } else if let Some(host_buffer) = &self.host_buffer {
959 Ok(host_buffer.remaining().len())
960 } else {
961 bail_bug!("expected either WriteState::GuestReady or host buffer")
962 }
963 }
964}
965
966impl<'a> Source<'a, u8> {
967 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
969 DirectSource {
970 id: self.id,
971 host_buffer: self.host_buffer,
972 store,
973 }
974 }
975}
976
977pub struct DirectSource<'a, D: 'static> {
980 id: TableId<TransmitState>,
981 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
982 store: StoreContextMut<'a, D>,
983}
984
985#[cfg(feature = "std")]
986impl<D: 'static> std::io::Read for DirectSource<'_, D> {
987 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
988 let rem = self.remaining();
989 let n = rem.len().min(buf.len());
990 buf[..n].copy_from_slice(&rem[..n]);
991 self.mark_read(n);
992 Ok(n)
993 }
994}
995
996impl<D: 'static> DirectSource<'_, D> {
997 pub fn remaining(&mut self) -> &[u8] {
999 self.remaining_().unwrap()
1003 }
1004
1005 fn remaining_(&mut self) -> Result<&[u8]> {
1006 if let Some(buffer) = self.host_buffer.as_deref_mut() {
1007 return Ok(buffer.remaining());
1008 }
1009 let transmit = self
1010 .store
1011 .as_context_mut()
1012 .0
1013 .concurrent_state_mut()
1014 .get_mut(self.id)?;
1015
1016 let &WriteState::GuestReady {
1017 address,
1018 count,
1019 options,
1020 instance,
1021 ..
1022 } = &transmit.write
1023 else {
1024 bail_bug!("expected WriteState::GuestReady")
1025 };
1026
1027 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
1028 bail_bug!("expected ReadState::HostReady")
1029 };
1030
1031 let memory = instance
1032 .options_memory(self.store.0, options)
1033 .get((address + guest_offset.as_usize())..)
1034 .and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize())));
1035 match memory {
1036 Some(memory) => Ok(memory),
1037 None => bail_bug!("guest buffer unexpectedly out of bounds"),
1038 }
1039 }
1040
1041 pub fn mark_read(&mut self, count: usize) {
1048 self.mark_read_(count).unwrap()
1052 }
1053
1054 fn mark_read_(&mut self, count: usize) -> Result<()> {
1055 if let Some(buffer) = self.host_buffer.as_deref_mut() {
1056 buffer.skip(count);
1057 return Ok(());
1058 }
1059
1060 let transmit = self
1061 .store
1062 .as_context_mut()
1063 .0
1064 .concurrent_state_mut()
1065 .get_mut(self.id)?;
1066
1067 let WriteState::GuestReady {
1068 count: write_count, ..
1069 } = &transmit.write
1070 else {
1071 bail_bug!("expected WriteState::GuestReady");
1072 };
1073
1074 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
1075 bail_bug!("expected ReadState::HostReady");
1076 };
1077
1078 if guest_offset.as_usize() + count > write_count.as_usize() {
1079 panic!("read count ({count}) must be less than or equal to write count ({write_count})")
1081 } else {
1082 guest_offset.inc(count)?;
1083 }
1084 Ok(())
1085 }
1086}
1087
1088pub trait StreamConsumer<D>: Send + 'static {
1090 type Item;
1092
1093 fn poll_consume(
1176 self: Pin<&mut Self>,
1177 cx: &mut Context<'_>,
1178 store: StoreContextMut<D>,
1179 source: Source<'_, Self::Item>,
1180 finish: bool,
1181 ) -> Poll<Result<StreamResult>>;
1182}
1183
1184pub trait FutureProducer<D>: Send + 'static {
1186 type Item;
1188
1189 fn poll_produce(
1199 self: Pin<&mut Self>,
1200 cx: &mut Context<'_>,
1201 store: StoreContextMut<D>,
1202 finish: bool,
1203 ) -> Poll<Result<Option<Self::Item>>>;
1204}
1205
1206impl<T, E, D, Fut> FutureProducer<D> for Fut
1207where
1208 E: Into<Error>,
1209 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1210{
1211 type Item = T;
1212
1213 fn poll_produce<'a>(
1214 self: Pin<&mut Self>,
1215 cx: &mut Context<'_>,
1216 _: StoreContextMut<'a, D>,
1217 finish: bool,
1218 ) -> Poll<Result<Option<T>>> {
1219 match self.poll(cx) {
1220 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1221 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1222 Poll::Pending if finish => Poll::Ready(Ok(None)),
1223 Poll::Pending => Poll::Pending,
1224 }
1225 }
1226}
1227
1228pub trait FutureConsumer<D>: Send + 'static {
1230 type Item;
1232
1233 fn poll_consume(
1245 self: Pin<&mut Self>,
1246 cx: &mut Context<'_>,
1247 store: StoreContextMut<D>,
1248 source: Source<'_, Self::Item>,
1249 finish: bool,
1250 ) -> Poll<Result<()>>;
1251}
1252
1253pub struct FutureReader<T> {
1260 id: TableId<TransmitHandle>,
1261 _phantom: PhantomData<T>,
1262}
1263
1264impl<T> FutureReader<T> {
1265 pub fn new<S: AsContextMut>(
1274 mut store: S,
1275 producer: impl FutureProducer<S::Data, Item = T>,
1276 ) -> Result<Self>
1277 where
1278 T: func::Lower + func::Lift + Send + Sync + 'static,
1279 {
1280 ensure!(
1281 store.as_context().0.concurrency_support(),
1282 "concurrency support is not enabled"
1283 );
1284
1285 struct Producer<P>(P);
1286
1287 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1288 for Producer<P>
1289 {
1290 type Item = P::Item;
1291 type Buffer = Option<P::Item>;
1292
1293 fn poll_produce<'a>(
1294 self: Pin<&mut Self>,
1295 cx: &mut Context<'_>,
1296 store: StoreContextMut<D>,
1297 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1298 finish: bool,
1299 ) -> Poll<Result<StreamResult>> {
1300 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1303
1304 Poll::Ready(Ok(
1305 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1306 destination.set_buffer(Some(value));
1307
1308 StreamResult::Completed
1315 } else {
1316 StreamResult::Cancelled
1317 },
1318 ))
1319 }
1320 }
1321
1322 Ok(Self::new_(
1323 store
1324 .as_context_mut()
1325 .new_transmit(TransmitKind::Future, Producer(producer))?,
1326 ))
1327 }
1328
1329 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1330 Self {
1331 id,
1332 _phantom: PhantomData,
1333 }
1334 }
1335
1336 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1337 self.id
1338 }
1339
1340 pub fn pipe<S: AsContextMut>(
1350 self,
1351 mut store: S,
1352 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1353 ) -> Result<()>
1354 where
1355 T: func::Lift + 'static,
1356 {
1357 struct Consumer<C>(C);
1358
1359 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1360 for Consumer<C>
1361 {
1362 type Item = T;
1363
1364 fn poll_consume(
1365 self: Pin<&mut Self>,
1366 cx: &mut Context<'_>,
1367 mut store: StoreContextMut<D>,
1368 mut source: Source<Self::Item>,
1369 finish: bool,
1370 ) -> Poll<Result<StreamResult>> {
1371 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1374
1375 ready!(consumer.poll_consume(
1376 cx,
1377 store.as_context_mut(),
1378 source.reborrow(),
1379 finish
1380 ))?;
1381
1382 Poll::Ready(Ok(if source.remaining(store) == 0 {
1383 StreamResult::Completed
1389 } else {
1390 StreamResult::Cancelled
1391 }))
1392 }
1393 }
1394
1395 store
1396 .as_context_mut()
1397 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1398 }
1399
1400 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1402 let id = lift_index_to_future(cx, ty, index)?;
1403 Ok(Self::new_(id))
1404 }
1405
1406 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1424 future_close(store.as_context_mut().0, &mut self.id)
1425 }
1426
1427 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1429 accessor.as_accessor().with(|access| self.close(access))
1430 }
1431
1432 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1438 where
1439 A: AsAccessor,
1440 {
1441 GuardedFutureReader::new(accessor, self)
1442 }
1443
1444 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1451 where
1452 T: ComponentType + 'static,
1453 {
1454 FutureAny::try_from_future_reader(store, self)
1455 }
1456
1457 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1464 where
1465 T: ComponentType + 'static,
1466 {
1467 future.try_into_future_reader()
1468 }
1469}
1470
1471impl<T> fmt::Debug for FutureReader<T> {
1472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1473 f.debug_struct("FutureReader")
1474 .field("id", &self.id)
1475 .finish()
1476 }
1477}
1478
1479pub(super) fn future_close(
1480 store: &mut StoreOpaque,
1481 id: &mut TableId<TransmitHandle>,
1482) -> Result<()> {
1483 let id = mem::replace(id, TableId::new(u32::MAX));
1484 store.host_drop_reader(id, TransmitKind::Future)
1485}
1486
1487pub(super) fn lift_index_to_future(
1489 cx: &mut LiftContext<'_>,
1490 ty: InterfaceType,
1491 index: u32,
1492) -> Result<TableId<TransmitHandle>> {
1493 match ty {
1494 InterfaceType::Future(src) => {
1495 let (state, instance) = cx.concurrent_state_and_instance_mut();
1496 lift_index_to_transmit(instance, state, TransmitIndex::Future(src), index)
1497 }
1498 _ => func::bad_type_info(),
1499 }
1500}
1501
1502pub(super) fn lower_future_to_index<U>(
1504 id: TableId<TransmitHandle>,
1505 cx: &mut LowerContext<'_, U>,
1506 ty: InterfaceType,
1507) -> Result<u32> {
1508 match ty {
1509 InterfaceType::Future(dst) => {
1510 cx.instance_handle()
1511 .lower_transmit_to_index(cx.store.0, TransmitIndex::Future(dst), id)
1512 }
1513 _ => func::bad_type_info(),
1514 }
1515}
1516
1517unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1520 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1521
1522 type Lower = <u32 as func::ComponentType>::Lower;
1523
1524 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1525 match ty {
1526 InterfaceType::Future(ty) => {
1527 let ty = types.types[*ty].ty;
1528 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1529 }
1530 other => bail!("expected `future`, found `{}`", func::desc(other)),
1531 }
1532 }
1533}
1534
1535unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1537 fn linear_lower_to_flat<U>(
1538 &self,
1539 cx: &mut LowerContext<'_, U>,
1540 ty: InterfaceType,
1541 dst: &mut MaybeUninit<Self::Lower>,
1542 ) -> Result<()> {
1543 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1544 }
1545
1546 fn linear_lower_to_memory<U>(
1547 &self,
1548 cx: &mut LowerContext<'_, U>,
1549 ty: InterfaceType,
1550 offset: usize,
1551 ) -> Result<()> {
1552 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1553 cx,
1554 InterfaceType::U32,
1555 offset,
1556 )
1557 }
1558}
1559
1560unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1562 fn linear_lift_from_flat(
1563 cx: &mut LiftContext<'_>,
1564 ty: InterfaceType,
1565 src: &Self::Lower,
1566 ) -> Result<Self> {
1567 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1568 Self::lift_from_index(cx, ty, index)
1569 }
1570
1571 fn linear_lift_from_memory(
1572 cx: &mut LiftContext<'_>,
1573 ty: InterfaceType,
1574 bytes: &[u8],
1575 ) -> Result<Self> {
1576 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1577 Self::lift_from_index(cx, ty, index)
1578 }
1579}
1580
1581pub struct GuardedFutureReader<T, A>
1589where
1590 A: AsAccessor,
1591{
1592 reader: Option<FutureReader<T>>,
1596 accessor: A,
1597}
1598
1599impl<T, A> GuardedFutureReader<T, A>
1600where
1601 A: AsAccessor,
1602{
1603 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1611 assert!(
1612 accessor
1613 .as_accessor()
1614 .with(|a| a.as_context().0.concurrency_support())
1615 );
1616 Self {
1617 reader: Some(reader),
1618 accessor,
1619 }
1620 }
1621
1622 pub fn into_future(self) -> FutureReader<T> {
1625 self.into()
1626 }
1627}
1628
1629impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1630where
1631 A: AsAccessor,
1632{
1633 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1634 guard.reader.take().unwrap()
1635 }
1636}
1637
1638impl<T, A> Drop for GuardedFutureReader<T, A>
1639where
1640 A: AsAccessor,
1641{
1642 fn drop(&mut self) {
1643 if let Some(reader) = &mut self.reader {
1644 let result = reader.close_with(&self.accessor);
1647 debug_assert!(result.is_ok());
1648 }
1649 }
1650}
1651
1652pub struct StreamReader<T> {
1659 id: TableId<TransmitHandle>,
1660 _phantom: PhantomData<T>,
1661}
1662
1663impl<T> StreamReader<T> {
1664 pub fn new<S: AsContextMut>(
1673 mut store: S,
1674 producer: impl StreamProducer<S::Data, Item = T>,
1675 ) -> Result<Self>
1676 where
1677 T: func::Lower + func::Lift + Send + Sync + 'static,
1678 {
1679 ensure!(
1680 store.as_context().0.concurrency_support(),
1681 "concurrency support is not enabled",
1682 );
1683 Ok(Self::new_(
1684 store
1685 .as_context_mut()
1686 .new_transmit(TransmitKind::Stream, producer)?,
1687 ))
1688 }
1689
1690 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1691 Self {
1692 id,
1693 _phantom: PhantomData,
1694 }
1695 }
1696
1697 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1698 self.id
1699 }
1700
1701 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1723 let store = store.as_context_mut();
1724 let state = store.0.concurrent_state_mut();
1725 let id = state.get_mut(self.id).unwrap().state;
1726 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1727 match try_into(TypeId::of::<V>()) {
1728 Some(result) => {
1729 self.close(store).unwrap();
1730 Ok(*result.downcast::<V>().unwrap())
1731 }
1732 None => Err(self),
1733 }
1734 } else {
1735 Err(self)
1736 }
1737 }
1738
1739 pub fn pipe<S: AsContextMut>(
1749 self,
1750 mut store: S,
1751 consumer: impl StreamConsumer<S::Data, Item = T>,
1752 ) -> Result<()>
1753 where
1754 T: 'static,
1755 {
1756 store
1757 .as_context_mut()
1758 .set_consumer(self.id, TransmitKind::Stream, consumer)
1759 }
1760
1761 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1763 let id = lift_index_to_stream(cx, ty, index)?;
1764 Ok(Self::new_(id))
1765 }
1766
1767 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1783 stream_close(store.as_context_mut().0, &mut self.id)
1784 }
1785
1786 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1788 accessor.as_accessor().with(|access| self.close(access))
1789 }
1790
1791 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1797 where
1798 A: AsAccessor,
1799 {
1800 GuardedStreamReader::new(accessor, self)
1801 }
1802
1803 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1810 where
1811 T: ComponentType + 'static,
1812 {
1813 StreamAny::try_from_stream_reader(store, self)
1814 }
1815
1816 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1823 where
1824 T: ComponentType + 'static,
1825 {
1826 stream.try_into_stream_reader()
1827 }
1828}
1829
1830impl<T> fmt::Debug for StreamReader<T> {
1831 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1832 f.debug_struct("StreamReader")
1833 .field("id", &self.id)
1834 .finish()
1835 }
1836}
1837
1838pub(super) fn stream_close(
1839 store: &mut StoreOpaque,
1840 id: &mut TableId<TransmitHandle>,
1841) -> Result<()> {
1842 let id = mem::replace(id, TableId::new(u32::MAX));
1843 store.host_drop_reader(id, TransmitKind::Stream)
1844}
1845
1846pub(super) fn lift_index_to_stream(
1848 cx: &mut LiftContext<'_>,
1849 ty: InterfaceType,
1850 index: u32,
1851) -> Result<TableId<TransmitHandle>> {
1852 match ty {
1853 InterfaceType::Stream(src) => {
1854 let (state, instance) = cx.concurrent_state_and_instance_mut();
1855 lift_index_to_transmit(instance, state, TransmitIndex::Stream(src), index)
1856 }
1857 _ => func::bad_type_info(),
1858 }
1859}
1860
1861pub(super) fn lower_stream_to_index<U>(
1863 id: TableId<TransmitHandle>,
1864 cx: &mut LowerContext<'_, U>,
1865 ty: InterfaceType,
1866) -> Result<u32> {
1867 match ty {
1868 InterfaceType::Stream(dst) => {
1869 cx.instance_handle()
1870 .lower_transmit_to_index(cx.store.0, TransmitIndex::Stream(dst), id)
1871 }
1872 _ => func::bad_type_info(),
1873 }
1874}
1875
1876unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1879 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1880
1881 type Lower = <u32 as func::ComponentType>::Lower;
1882
1883 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1884 match ty {
1885 InterfaceType::Stream(ty) => {
1886 let ty = types.types[*ty].ty;
1887 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1888 }
1889 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1890 }
1891 }
1892}
1893
1894unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1896 fn linear_lower_to_flat<U>(
1897 &self,
1898 cx: &mut LowerContext<'_, U>,
1899 ty: InterfaceType,
1900 dst: &mut MaybeUninit<Self::Lower>,
1901 ) -> Result<()> {
1902 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1903 }
1904
1905 fn linear_lower_to_memory<U>(
1906 &self,
1907 cx: &mut LowerContext<'_, U>,
1908 ty: InterfaceType,
1909 offset: usize,
1910 ) -> Result<()> {
1911 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1912 cx,
1913 InterfaceType::U32,
1914 offset,
1915 )
1916 }
1917}
1918
1919unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1921 fn linear_lift_from_flat(
1922 cx: &mut LiftContext<'_>,
1923 ty: InterfaceType,
1924 src: &Self::Lower,
1925 ) -> Result<Self> {
1926 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1927 Self::lift_from_index(cx, ty, index)
1928 }
1929
1930 fn linear_lift_from_memory(
1931 cx: &mut LiftContext<'_>,
1932 ty: InterfaceType,
1933 bytes: &[u8],
1934 ) -> Result<Self> {
1935 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1936 Self::lift_from_index(cx, ty, index)
1937 }
1938}
1939
1940pub struct GuardedStreamReader<T, A>
1948where
1949 A: AsAccessor,
1950{
1951 reader: Option<StreamReader<T>>,
1955 accessor: A,
1956}
1957
1958impl<T, A> GuardedStreamReader<T, A>
1959where
1960 A: AsAccessor,
1961{
1962 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1971 assert!(
1972 accessor
1973 .as_accessor()
1974 .with(|a| a.as_context().0.concurrency_support())
1975 );
1976 Self {
1977 reader: Some(reader),
1978 accessor,
1979 }
1980 }
1981
1982 pub fn into_stream(self) -> StreamReader<T> {
1985 self.into()
1986 }
1987}
1988
1989impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1990where
1991 A: AsAccessor,
1992{
1993 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1994 guard.reader.take().unwrap()
1995 }
1996}
1997
1998impl<T, A> Drop for GuardedStreamReader<T, A>
1999where
2000 A: AsAccessor,
2001{
2002 fn drop(&mut self) {
2003 if let Some(reader) = &mut self.reader {
2004 let result = reader.close_with(&self.accessor);
2007 debug_assert!(result.is_ok());
2008 }
2009 }
2010}
2011
2012pub struct ErrorContext {
2014 rep: u32,
2015}
2016
2017impl ErrorContext {
2018 pub(crate) fn new(rep: u32) -> Self {
2019 Self { rep }
2020 }
2021
2022 pub fn into_val(self) -> Val {
2024 Val::ErrorContext(ErrorContextAny(self.rep))
2025 }
2026
2027 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
2029 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
2030 bail!("expected `error-context`; got `{}`", value.desc());
2031 };
2032 Ok(Self::new(*rep))
2033 }
2034
2035 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
2036 match ty {
2037 InterfaceType::ErrorContext(src) => {
2038 let rep = cx
2039 .instance_mut()
2040 .table_for_error_context(src)
2041 .error_context_rep(index)?;
2042
2043 Ok(Self { rep })
2044 }
2045 _ => func::bad_type_info(),
2046 }
2047 }
2048}
2049
2050pub(crate) fn lower_error_context_to_index<U>(
2051 rep: u32,
2052 cx: &mut LowerContext<'_, U>,
2053 ty: InterfaceType,
2054) -> Result<u32> {
2055 match ty {
2056 InterfaceType::ErrorContext(dst) => {
2057 let tbl = cx.instance_mut().table_for_error_context(dst);
2058 tbl.error_context_insert(rep)
2059 }
2060 _ => func::bad_type_info(),
2061 }
2062}
2063unsafe impl func::ComponentType for ErrorContext {
2066 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2067
2068 type Lower = <u32 as func::ComponentType>::Lower;
2069
2070 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2071 match ty {
2072 InterfaceType::ErrorContext(_) => Ok(()),
2073 other => bail!("expected `error`, found `{}`", func::desc(other)),
2074 }
2075 }
2076}
2077
2078unsafe impl func::Lower for ErrorContext {
2080 fn linear_lower_to_flat<T>(
2081 &self,
2082 cx: &mut LowerContext<'_, T>,
2083 ty: InterfaceType,
2084 dst: &mut MaybeUninit<Self::Lower>,
2085 ) -> Result<()> {
2086 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2087 cx,
2088 InterfaceType::U32,
2089 dst,
2090 )
2091 }
2092
2093 fn linear_lower_to_memory<T>(
2094 &self,
2095 cx: &mut LowerContext<'_, T>,
2096 ty: InterfaceType,
2097 offset: usize,
2098 ) -> Result<()> {
2099 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2100 cx,
2101 InterfaceType::U32,
2102 offset,
2103 )
2104 }
2105}
2106
2107unsafe impl func::Lift for ErrorContext {
2109 fn linear_lift_from_flat(
2110 cx: &mut LiftContext<'_>,
2111 ty: InterfaceType,
2112 src: &Self::Lower,
2113 ) -> Result<Self> {
2114 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2115 Self::lift_from_index(cx, ty, index)
2116 }
2117
2118 fn linear_lift_from_memory(
2119 cx: &mut LiftContext<'_>,
2120 ty: InterfaceType,
2121 bytes: &[u8],
2122 ) -> Result<Self> {
2123 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2124 Self::lift_from_index(cx, ty, index)
2125 }
2126}
2127
2128pub(super) struct TransmitHandle {
2130 pub(super) common: WaitableCommon,
2131 state: TableId<TransmitState>,
2133}
2134
2135impl TransmitHandle {
2136 fn new(state: TableId<TransmitState>) -> Self {
2137 Self {
2138 common: WaitableCommon::default(),
2139 state,
2140 }
2141 }
2142}
2143
2144impl TableDebug for TransmitHandle {
2145 fn type_name() -> &'static str {
2146 "TransmitHandle"
2147 }
2148}
2149
2150struct TransmitState {
2152 write_handle: TableId<TransmitHandle>,
2154 read_handle: TableId<TransmitHandle>,
2156 write: WriteState,
2158 read: ReadState,
2160 done: bool,
2162 pub(super) origin: TransmitOrigin,
2165}
2166
2167#[derive(Copy, Clone)]
2168pub(super) enum TransmitOrigin {
2169 Host,
2170 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2171 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2172}
2173
2174impl TransmitState {
2175 fn new(origin: TransmitOrigin) -> Self {
2176 Self {
2177 write_handle: TableId::new(u32::MAX),
2178 read_handle: TableId::new(u32::MAX),
2179 read: ReadState::Open,
2180 write: WriteState::Open,
2181 done: false,
2182 origin,
2183 }
2184 }
2185}
2186
2187impl TableDebug for TransmitState {
2188 fn type_name() -> &'static str {
2189 "TransmitState"
2190 }
2191}
2192
2193impl TransmitOrigin {
2194 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2195 match index {
2196 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2197 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2198 }
2199 }
2200}
2201
2202type PollStream = Box<
2203 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2204>;
2205
2206type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2207
2208enum WriteState {
2210 Open,
2212 GuestReady {
2214 instance: Instance,
2215 caller: RuntimeComponentInstanceIndex,
2216 ty: TransmitIndex,
2217 flat_abi: Option<FlatAbi>,
2218 options: OptionsIndex,
2219 address: usize,
2220 count: ItemCount,
2221 handle: u32,
2222 },
2223 HostReady {
2225 produce: PollStream,
2226 try_into: TryInto,
2227 guest_offset: ItemCount,
2228 cancel: bool,
2229 cancel_waker: Option<Waker>,
2230 },
2231 Dropped,
2233}
2234
2235impl fmt::Debug for WriteState {
2236 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2237 match self {
2238 Self::Open => f.debug_tuple("Open").finish(),
2239 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2240 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2241 Self::Dropped => f.debug_tuple("Dropped").finish(),
2242 }
2243 }
2244}
2245
2246enum ReadState {
2248 Open,
2250 GuestReady {
2252 ty: TransmitIndex,
2253 caller_instance: RuntimeComponentInstanceIndex,
2254 caller_thread: QualifiedThreadId,
2255 flat_abi: Option<FlatAbi>,
2256 instance: Instance,
2257 options: OptionsIndex,
2258 address: usize,
2259 count: ItemCount,
2260 handle: u32,
2261 },
2262 HostReady {
2264 consume: PollStream,
2265 guest_offset: ItemCount,
2266 cancel: bool,
2267 cancel_waker: Option<Waker>,
2268 },
2269 HostToHost {
2271 accept: Box<
2272 dyn for<'a> Fn(
2273 &'a mut UntypedWriteBuffer<'a>,
2274 )
2275 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2276 + Send
2277 + Sync,
2278 >,
2279 buffer: Vec<u8>,
2280 limit: usize,
2281 },
2282 Dropped,
2284}
2285
2286impl fmt::Debug for ReadState {
2287 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2288 match self {
2289 Self::Open => f.debug_tuple("Open").finish(),
2290 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2291 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2292 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2293 Self::Dropped => f.debug_tuple("Dropped").finish(),
2294 }
2295 }
2296}
2297
2298fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result<ReturnCode> {
2299 Ok(match state {
2300 StreamResult::Dropped => ReturnCode::Dropped(count),
2301 StreamResult::Completed => ReturnCode::completed(kind, count),
2302 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2303 })
2304}
2305
2306impl StoreOpaque {
2307 fn pipe_from_guest(
2308 &mut self,
2309 kind: TransmitKind,
2310 id: TableId<TransmitState>,
2311 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2312 ) {
2313 let future = async move {
2314 let stream_state = future.await?;
2315 tls::get(|store| {
2316 let state = store.concurrent_state_mut();
2317 let transmit = state.get_mut(id)?;
2318 let ReadState::HostReady {
2319 consume,
2320 guest_offset,
2321 ..
2322 } = mem::replace(&mut transmit.read, ReadState::Open)
2323 else {
2324 bail_bug!("expected ReadState::HostReady")
2325 };
2326 let code = return_code(kind, stream_state, guest_offset)?;
2327 transmit.read = match stream_state {
2328 StreamResult::Dropped => ReadState::Dropped,
2329 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2330 consume,
2331 guest_offset: ItemCount::ZERO,
2332 cancel: false,
2333 cancel_waker: None,
2334 },
2335 };
2336 let WriteState::GuestReady { ty, handle, .. } =
2337 mem::replace(&mut transmit.write, WriteState::Open)
2338 else {
2339 bail_bug!("expected WriteState::GuestReady")
2340 };
2341 state.send_write_result(ty, id, handle, code)?;
2342 Ok(())
2343 })
2344 };
2345
2346 self.concurrent_state_mut().push_future(future.boxed());
2347 }
2348
2349 fn pipe_to_guest(
2350 &mut self,
2351 kind: TransmitKind,
2352 id: TableId<TransmitState>,
2353 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2354 ) {
2355 let future = async move {
2356 let stream_state = future.await?;
2357 tls::get(|store| {
2358 let state = store.concurrent_state_mut();
2359 let transmit = state.get_mut(id)?;
2360 let WriteState::HostReady {
2361 produce,
2362 try_into,
2363 guest_offset,
2364 ..
2365 } = mem::replace(&mut transmit.write, WriteState::Open)
2366 else {
2367 bail_bug!("expected WriteState::HostReady")
2368 };
2369 let code = return_code(kind, stream_state, guest_offset)?;
2370 transmit.write = match stream_state {
2371 StreamResult::Dropped => WriteState::Dropped,
2372 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2373 produce,
2374 try_into,
2375 guest_offset: ItemCount::ZERO,
2376 cancel: false,
2377 cancel_waker: None,
2378 },
2379 };
2380 let ReadState::GuestReady { ty, handle, .. } =
2381 mem::replace(&mut transmit.read, ReadState::Open)
2382 else {
2383 bail_bug!("expected ReadState::GuestReady")
2384 };
2385 state.send_read_result(ty, id, handle, code)?;
2386 Ok(())
2387 })
2388 };
2389
2390 self.concurrent_state_mut().push_future(future.boxed());
2391 }
2392
2393 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2395 let state = self.concurrent_state_mut();
2396 Waitable::Transmit(id).join(state, None)?;
2397 let transmit_id = state.get_mut(id)?.state;
2398 let transmit = state
2399 .get_mut(transmit_id)
2400 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2401 log::trace!(
2402 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2403 transmit.read,
2404 transmit.write
2405 );
2406
2407 transmit.read = ReadState::Dropped;
2408
2409 let new_state = if let WriteState::Dropped = &transmit.write {
2412 WriteState::Dropped
2413 } else {
2414 WriteState::Open
2415 };
2416
2417 let write_handle = transmit.write_handle;
2418
2419 match mem::replace(&mut transmit.write, new_state) {
2420 WriteState::GuestReady { ty, handle, .. } => {
2423 state.update_event(
2424 write_handle.rep(),
2425 match ty {
2426 TransmitIndex::Future(ty) => Event::FutureWrite {
2427 code: ReturnCode::Dropped(ItemCount::ZERO),
2428 pending: Some((ty, handle)),
2429 },
2430 TransmitIndex::Stream(ty) => Event::StreamWrite {
2431 code: ReturnCode::Dropped(ItemCount::ZERO),
2432 pending: Some((ty, handle)),
2433 },
2434 },
2435 )?;
2436 }
2437
2438 WriteState::Open => {
2439 state.update_event(
2440 write_handle.rep(),
2441 match kind {
2442 TransmitKind::Future => Event::FutureWrite {
2443 code: ReturnCode::Dropped(ItemCount::ZERO),
2444 pending: None,
2445 },
2446 TransmitKind::Stream => Event::StreamWrite {
2447 code: ReturnCode::Dropped(ItemCount::ZERO),
2448 pending: None,
2449 },
2450 },
2451 )?;
2452 }
2453
2454 WriteState::Dropped | WriteState::HostReady { .. } => {
2459 log::trace!("host_drop_reader delete {transmit_id:?}");
2460 state.delete_transmit(transmit_id)?;
2461 }
2462 }
2463 Ok(())
2464 }
2465
2466 fn host_drop_writer(
2468 &mut self,
2469 id: TableId<TransmitHandle>,
2470 on_drop_open: Option<fn() -> Result<()>>,
2471 ) -> Result<()> {
2472 let state = self.concurrent_state_mut();
2473 Waitable::Transmit(id).join(state, None)?;
2474 let transmit_id = state.get_mut(id)?.state;
2475 let transmit = state
2476 .get_mut(transmit_id)
2477 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2478 log::trace!(
2479 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2480 transmit.read,
2481 transmit.write
2482 );
2483
2484 match &mut transmit.write {
2486 WriteState::GuestReady { .. } => {
2487 bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2488 }
2489 WriteState::HostReady { .. } => {}
2490 v @ WriteState::Open => {
2491 if let (Some(on_drop_open), false) = (on_drop_open, transmit.done) {
2492 on_drop_open()?;
2493 } else {
2494 *v = WriteState::Dropped;
2495 }
2496 }
2497 WriteState::Dropped => bail_bug!("write state is already dropped"),
2498 }
2499
2500 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2501
2502 let new_state = if let ReadState::Dropped = &transmit.read {
2508 ReadState::Dropped
2509 } else {
2510 ReadState::Open
2511 };
2512
2513 let read_handle = transmit.read_handle;
2514
2515 match mem::replace(&mut transmit.read, new_state) {
2517 ReadState::GuestReady { ty, handle, .. } => {
2521 self.concurrent_state_mut().update_event(
2523 read_handle.rep(),
2524 match ty {
2525 TransmitIndex::Future(ty) => Event::FutureRead {
2526 code: ReturnCode::Dropped(ItemCount::ZERO),
2527 pending: Some((ty, handle)),
2528 },
2529 TransmitIndex::Stream(ty) => Event::StreamRead {
2530 code: ReturnCode::Dropped(ItemCount::ZERO),
2531 pending: Some((ty, handle)),
2532 },
2533 },
2534 )?;
2535 }
2536
2537 ReadState::Open => {
2539 self.concurrent_state_mut().update_event(
2540 read_handle.rep(),
2541 match on_drop_open {
2542 Some(_) => Event::FutureRead {
2543 code: ReturnCode::Dropped(ItemCount::ZERO),
2544 pending: None,
2545 },
2546 None => Event::StreamRead {
2547 code: ReturnCode::Dropped(ItemCount::ZERO),
2548 pending: None,
2549 },
2550 },
2551 )?;
2552 }
2553
2554 ReadState::Dropped | ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
2561 log::trace!("host_drop_writer delete {transmit_id:?}");
2562 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2563 }
2564 }
2565 Ok(())
2566 }
2567
2568 pub(super) fn transmit_origin(
2569 &mut self,
2570 id: TableId<TransmitHandle>,
2571 ) -> Result<TransmitOrigin> {
2572 let state = self.concurrent_state_mut();
2573 let state_id = state.get_mut(id)?.state;
2574 Ok(state.get_mut(state_id)?.origin)
2575 }
2576}
2577
2578impl<T> StoreContextMut<'_, T> {
2579 fn new_transmit<P: StreamProducer<T>>(
2580 mut self,
2581 kind: TransmitKind,
2582 producer: P,
2583 ) -> Result<TableId<TransmitHandle>>
2584 where
2585 P::Item: func::Lower,
2586 {
2587 let token = StoreToken::new(self.as_context_mut());
2588 let state = self.0.concurrent_state_mut();
2589 let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2590 let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2591 let id = state.get_mut(read)?.state;
2592 let mut dropped = false;
2593 let produce = Box::new({
2594 let producer = producer.clone();
2595 move || {
2596 let producer = producer.clone();
2597 async move {
2598 let mut state = producer.take()?;
2599 let (mine, buffer) = &mut *state;
2600
2601 let (result, cancelled) = if buffer.remaining().is_empty() {
2602 future::poll_fn(|cx| {
2603 tls::get(|store| {
2604 let transmit = store.concurrent_state_mut().get_mut(id)?;
2605
2606 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2607 bail_bug!("expected WriteState::HostReady")
2608 };
2609
2610 let mut host_written = 0;
2611 let mut host_buffer =
2612 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2613 Some(mem::take(buffer))
2614 } else {
2615 None
2616 };
2617
2618 let poll = mine.as_mut().poll_produce(
2619 cx,
2620 token.as_context_mut(store),
2621 Destination {
2622 id,
2623 buffer,
2624 host_buffer: host_buffer.as_mut().map(|b| {
2625 HostBuffer {
2626 dst: b,
2627 marked_written: &mut host_written,
2628 }
2629 }),
2630 _phantom: PhantomData,
2631 },
2632 cancel,
2633 );
2634
2635 let transmit = store.concurrent_state_mut().get_mut(id)?;
2636
2637 let host_offset = if let (
2638 Some(host_buffer),
2639 ReadState::HostToHost { buffer, limit, .. },
2640 ) = (host_buffer, &mut transmit.read)
2641 {
2642 *limit = host_written;
2643 *buffer = host_buffer;
2644 *limit
2645 } else {
2646 0
2647 };
2648
2649 {
2650 let WriteState::HostReady {
2651 guest_offset,
2652 cancel,
2653 cancel_waker,
2654 ..
2655 } = &mut transmit.write
2656 else {
2657 bail_bug!("expected WriteState::HostReady")
2658 };
2659
2660 if poll.is_pending() {
2661 if !buffer.remaining().is_empty()
2662 || *guest_offset > 0
2663 || host_offset > 0
2664 {
2665 bail!(
2666 "StreamProducer::poll_produce returned Poll::Pending \
2667 after producing at least one item"
2668 )
2669 }
2670 *cancel_waker = Some(cx.waker().clone());
2671 } else {
2672 *cancel_waker = None;
2673 *cancel = false;
2674 }
2675 }
2676
2677 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2678 })?
2679 })
2680 .await?
2681 } else {
2682 (StreamResult::Completed, false)
2683 };
2684
2685 let (guest_offset, host_offset, count) = tls::get(|store| {
2686 let transmit = store.concurrent_state_mut().get_mut(id)?;
2687 let (count, host_offset) = match &transmit.read {
2688 &ReadState::GuestReady { count, .. } => (count.as_u32(), 0),
2689 &ReadState::HostToHost { limit, .. } => (1, limit),
2690 _ => bail_bug!("invalid read state"),
2691 };
2692 let guest_offset = match &transmit.write {
2693 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2694 _ => bail_bug!("invalid write state"),
2695 };
2696 Ok((guest_offset, host_offset, count))
2697 })?;
2698
2699 match result {
2700 StreamResult::Completed => {
2701 if count > 1
2702 && buffer.remaining().is_empty()
2703 && guest_offset == 0
2704 && host_offset == 0
2705 {
2706 bail!(
2707 "StreamProducer::poll_produce returned StreamResult::Completed \
2708 without producing any items"
2709 );
2710 }
2711 }
2712 StreamResult::Cancelled => {
2713 if !cancelled {
2714 bail!(
2715 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2716 without being given a `finish` parameter value of true"
2717 );
2718 }
2719 }
2720 StreamResult::Dropped => {
2721 dropped = true;
2722 }
2723 }
2724
2725 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2726
2727 drop(state);
2728
2729 if write_buffer {
2730 write(token, id, producer.clone(), kind).await?;
2731 }
2732
2733 Ok(if dropped {
2734 if producer.with(|p| p.1.remaining().is_empty())? {
2735 StreamResult::Dropped
2736 } else {
2737 StreamResult::Completed
2738 }
2739 } else {
2740 result
2741 })
2742 }
2743 .boxed()
2744 }
2745 });
2746 let try_into = Box::new(move |ty| {
2747 let (mine, buffer) = producer.try_lock().ok()?.take()?;
2748 match P::try_into(mine, ty) {
2749 Ok(value) => Some(value),
2750 Err(mine) => {
2751 *producer.try_lock().ok()? = Some((mine, buffer));
2752 None
2753 }
2754 }
2755 });
2756 state.get_mut(id)?.write = WriteState::HostReady {
2757 produce,
2758 try_into,
2759 guest_offset: ItemCount::ZERO,
2760 cancel: false,
2761 cancel_waker: None,
2762 };
2763 Ok(read)
2764 }
2765
2766 fn set_consumer<C: StreamConsumer<T>>(
2767 mut self,
2768 id: TableId<TransmitHandle>,
2769 kind: TransmitKind,
2770 consumer: C,
2771 ) -> Result<()> {
2772 let token = StoreToken::new(self.as_context_mut());
2773 let state = self.0.concurrent_state_mut();
2774 let id = state.get_mut(id)?.state;
2775 let transmit = state.get_mut(id)?;
2776 let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2777 let consume_with_buffer = {
2778 let consumer = consumer.clone();
2779 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2780 let mut mine = consumer.take()?;
2781
2782 let host_buffer_remaining_before =
2783 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2784
2785 let (result, cancelled) = future::poll_fn(|cx| {
2786 tls::get(|store| {
2787 let cancel = match &store.concurrent_state_mut().get_mut(id)?.read {
2788 &ReadState::HostReady { cancel, .. } => cancel,
2789 ReadState::Open => false,
2790 _ => bail_bug!("unexpected read state"),
2791 };
2792
2793 let poll = mine.as_mut().poll_consume(
2794 cx,
2795 token.as_context_mut(store),
2796 Source {
2797 id,
2798 host_buffer: host_buffer.as_deref_mut(),
2799 },
2800 cancel,
2801 );
2802
2803 if let ReadState::HostReady {
2804 cancel_waker,
2805 cancel,
2806 ..
2807 } = &mut store.concurrent_state_mut().get_mut(id)?.read
2808 {
2809 if poll.is_pending() {
2810 *cancel_waker = Some(cx.waker().clone());
2811 } else {
2812 *cancel_waker = None;
2813 *cancel = false;
2814 }
2815 }
2816
2817 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2818 })?
2819 })
2820 .await?;
2821
2822 let (guest_offset, count) = tls::get(|store| {
2823 let transmit = store.concurrent_state_mut().get_mut(id)?;
2824 Ok((
2825 match &transmit.read {
2826 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2827 ReadState::Open => ItemCount::ZERO,
2828 _ => bail_bug!("invalid read state"),
2829 },
2830 match &transmit.write {
2831 WriteState::GuestReady { count, .. } => count.as_usize(),
2832 WriteState::HostReady { .. } => match host_buffer_remaining_before {
2833 Some(n) => n,
2834 None => bail_bug!("host_buffer_remaining_before should be set"),
2835 },
2836 _ => bail_bug!("invalid write state"),
2837 },
2838 ))
2839 })?;
2840
2841 match result {
2842 StreamResult::Completed => {
2843 if count > 0
2844 && guest_offset == 0
2845 && host_buffer_remaining_before
2846 .zip(host_buffer.map(|v| v.remaining().len()))
2847 .map(|(before, after)| before == after)
2848 .unwrap_or(false)
2849 {
2850 bail!(
2851 "StreamConsumer::poll_consume returned StreamResult::Completed \
2852 without consuming any items"
2853 );
2854 }
2855
2856 if let TransmitKind::Future = kind {
2857 tls::get(|store| {
2858 store.concurrent_state_mut().get_mut(id)?.done = true;
2859 crate::error::Ok(())
2860 })?;
2861 }
2862 }
2863 StreamResult::Cancelled => {
2864 if !cancelled {
2865 bail!(
2866 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2867 without being given a `finish` parameter value of true"
2868 );
2869 }
2870 }
2871 StreamResult::Dropped => {}
2872 }
2873
2874 Ok(result)
2875 }
2876 };
2877 let consume = {
2878 let consume = consume_with_buffer.clone();
2879 Box::new(move || {
2880 let consume = consume.clone();
2881 async move { consume(None).await }.boxed()
2882 })
2883 };
2884
2885 match &transmit.write {
2886 WriteState::Open => {
2887 transmit.read = ReadState::HostReady {
2888 consume,
2889 guest_offset: ItemCount::ZERO,
2890 cancel: false,
2891 cancel_waker: None,
2892 };
2893 }
2894 &WriteState::GuestReady { .. } => {
2895 let future = consume();
2896 transmit.read = ReadState::HostReady {
2897 consume,
2898 guest_offset: ItemCount::ZERO,
2899 cancel: false,
2900 cancel_waker: None,
2901 };
2902 self.0.pipe_from_guest(kind, id, future);
2903 }
2904 WriteState::HostReady { .. } => {
2905 let WriteState::HostReady { produce, .. } = mem::replace(
2906 &mut transmit.write,
2907 WriteState::HostReady {
2908 produce: Box::new(|| {
2909 Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2910 }),
2911 try_into: Box::new(|_| None),
2912 guest_offset: ItemCount::ZERO,
2913 cancel: false,
2914 cancel_waker: None,
2915 },
2916 ) else {
2917 bail_bug!("expected WriteState::HostReady")
2918 };
2919
2920 transmit.read = ReadState::HostToHost {
2921 accept: Box::new(move |input| {
2922 let consume = consume_with_buffer.clone();
2923 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2924 }),
2925 buffer: Vec::new(),
2926 limit: 0,
2927 };
2928
2929 let future = async move {
2930 loop {
2931 if tls::get(|store| {
2932 crate::error::Ok(matches!(
2933 store.concurrent_state_mut().get_mut(id)?.read,
2934 ReadState::Dropped
2935 ))
2936 })? {
2937 break Ok(());
2938 }
2939
2940 match produce().await? {
2941 StreamResult::Completed | StreamResult::Cancelled => {}
2942 StreamResult::Dropped => break Ok(()),
2943 }
2944
2945 if let TransmitKind::Future = kind {
2946 break Ok(());
2947 }
2948 }
2949 }
2950 .map(move |result| {
2951 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2952 result
2953 });
2954
2955 state.push_future(Box::pin(future));
2956 }
2957 WriteState::Dropped => {
2958 let reader = transmit.read_handle;
2959 self.0.host_drop_reader(reader, kind)?;
2960 }
2961 }
2962 Ok(())
2963 }
2964}
2965
2966async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2967 token: StoreToken<D>,
2968 id: TableId<TransmitState>,
2969 pair: Arc<LockedState<(P, B)>>,
2970 kind: TransmitKind,
2971) -> Result<()> {
2972 let (read, guest_offset) = tls::get(|store| {
2973 let transmit = store.concurrent_state_mut().get_mut(id)?;
2974
2975 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2976 Some(guest_offset)
2977 } else {
2978 None
2979 };
2980
2981 crate::error::Ok((
2982 mem::replace(&mut transmit.read, ReadState::Open),
2983 guest_offset,
2984 ))
2985 })?;
2986
2987 match read {
2988 ReadState::GuestReady {
2989 ty,
2990 flat_abi,
2991 options,
2992 address,
2993 count,
2994 handle,
2995 instance,
2996 caller_instance,
2997 caller_thread,
2998 } => {
2999 let guest_offset = match guest_offset {
3000 Some(i) => i,
3001 None => bail_bug!("guest_offset should be present if ready"),
3002 };
3003
3004 if let TransmitKind::Future = kind {
3005 tls::get(|store| {
3006 store.concurrent_state_mut().get_mut(id)?.done = true;
3007 crate::error::Ok(())
3008 })?;
3009 }
3010
3011 let old_remaining = pair.with(|p| p.1.remaining().len())?;
3012 let accept = {
3013 let pair = pair.clone();
3014 move |mut store: StoreContextMut<D>| {
3015 let mut state = pair.take()?;
3016 lower::<T, B, D>(
3017 store.as_context_mut(),
3018 instance,
3019 caller_thread,
3020 options,
3021 ty,
3022 address + (T::SIZE32 * guest_offset.as_usize()),
3023 count.as_usize() - guest_offset.as_usize(),
3024 &mut state.1,
3025 )?;
3026 crate::error::Ok(())
3027 }
3028 };
3029
3030 if guest_offset < count {
3031 if T::MAY_REQUIRE_REALLOC {
3032 let (tx, rx) = oneshot::channel();
3037 tls::get(move |store| {
3038 store
3039 .concurrent_state_mut()
3040 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3041 move |store| {
3042 _ = tx.send(accept(token.as_context_mut(store))?);
3043 Ok(())
3044 },
3045 ))))
3046 });
3047 match rx.await {
3048 Ok(r) => r,
3049 Err(oneshot::Canceled) => bail_bug!("work cancelled"),
3050 }
3051 } else {
3052 tls::get(|store| accept(token.as_context_mut(store)))?
3057 }
3058 }
3059
3060 tls::get(|store| {
3061 let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3062
3063 let transmit = store.concurrent_state_mut().get_mut(id)?;
3064
3065 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3066 bail_bug!("expected WriteState::HostReady")
3067 };
3068
3069 guest_offset.inc(count)?;
3070
3071 transmit.read = ReadState::GuestReady {
3072 ty,
3073 flat_abi,
3074 options,
3075 address,
3076 count: ItemCount::new_usize(count)?,
3077 handle,
3078 instance,
3079 caller_instance,
3080 caller_thread,
3081 };
3082
3083 crate::error::Ok(())
3084 })?;
3085
3086 Ok(())
3087 }
3088
3089 ReadState::HostToHost {
3090 accept,
3091 mut buffer,
3092 limit,
3093 } => {
3094 let mut state = StreamResult::Completed;
3095 let mut position = 0;
3096
3097 while !matches!(state, StreamResult::Dropped) && position < limit {
3098 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3099 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3100 (buffer, position, _) = slice_buffer.into_parts();
3101 }
3102
3103 {
3104 let mut pair = pair.take()?;
3105 let (_, buffer) = &mut *pair;
3106
3107 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3108 state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3109 }
3110 }
3111
3112 tls::get(|store| {
3113 store.concurrent_state_mut().get_mut(id)?.read = match state {
3114 StreamResult::Dropped => ReadState::Dropped,
3115 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3116 accept,
3117 buffer,
3118 limit: 0,
3119 },
3120 };
3121
3122 crate::error::Ok(())
3123 })?;
3124 Ok(())
3125 }
3126
3127 _ => bail_bug!("unexpected read state"),
3128 }
3129}
3130
3131impl Instance {
3132 fn consume(
3135 self,
3136 store: &mut dyn VMStore,
3137 kind: TransmitKind,
3138 transmit_id: TableId<TransmitState>,
3139 consume: PollStream,
3140 guest_offset: ItemCount,
3141 cancel: bool,
3142 ) -> Result<ReturnCode> {
3143 let mut future = consume();
3144 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
3145 consume,
3146 guest_offset,
3147 cancel,
3148 cancel_waker: None,
3149 };
3150 let poll = tls::set(store, || {
3151 future
3152 .as_mut()
3153 .poll(&mut Context::from_waker(&Waker::noop()))
3154 });
3155
3156 Ok(match poll {
3157 Poll::Ready(state) => {
3158 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3159 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3160 bail_bug!("expected ReadState::HostReady")
3161 };
3162 let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3163 transmit.write = WriteState::Open;
3164 code
3165 }
3166 Poll::Pending => {
3167 store.pipe_from_guest(kind, transmit_id, future);
3168 ReturnCode::Blocked
3169 }
3170 })
3171 }
3172
3173 fn produce(
3176 self,
3177 store: &mut dyn VMStore,
3178 kind: TransmitKind,
3179 transmit_id: TableId<TransmitState>,
3180 produce: PollStream,
3181 try_into: TryInto,
3182 guest_offset: ItemCount,
3183 cancel: bool,
3184 ) -> Result<ReturnCode> {
3185 let mut future = produce();
3186 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3187 produce,
3188 try_into,
3189 guest_offset,
3190 cancel,
3191 cancel_waker: None,
3192 };
3193 let poll = tls::set(store, || {
3194 future
3195 .as_mut()
3196 .poll(&mut Context::from_waker(&Waker::noop()))
3197 });
3198
3199 Ok(match poll {
3200 Poll::Ready(state) => {
3201 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3202 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3203 bail_bug!("expected WriteState::HostReady")
3204 };
3205 let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3206 transmit.read = ReadState::Open;
3207 code
3208 }
3209 Poll::Pending => {
3210 store.pipe_to_guest(kind, transmit_id, future);
3211 ReturnCode::Blocked
3212 }
3213 })
3214 }
3215
3216 pub(super) fn guest_drop_writable(
3218 self,
3219 store: &mut StoreOpaque,
3220 ty: TransmitIndex,
3221 writer: u32,
3222 ) -> Result<()> {
3223 let table = self.id().get_mut(store).table_for_transmit(ty);
3224 let transmit_rep = match ty {
3225 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3226 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3227 };
3228
3229 let id = TableId::<TransmitHandle>::new(transmit_rep);
3230 log::trace!("guest_drop_writable: drop writer {id:?}");
3231 match ty {
3232 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3233 TransmitIndex::Future(_) => store.host_drop_writer(
3234 id,
3235 Some(|| {
3236 Err(format_err!(
3237 "cannot drop future write end without first writing a value"
3238 ))
3239 }),
3240 ),
3241 }
3242 }
3243
3244 fn copy<T: 'static>(
3247 store: StoreContextMut<T>,
3248 flat_abi: Option<FlatAbi>,
3249 write_instance: Instance,
3250 write_caller_instance: RuntimeComponentInstanceIndex,
3251 write_ty: TransmitIndex,
3252 write_options: OptionsIndex,
3253 write_address: usize,
3254 read_instance: Instance,
3255 read_caller_instance: RuntimeComponentInstanceIndex,
3256 read_caller_thread: QualifiedThreadId,
3257 read_ty: TransmitIndex,
3258 read_options: OptionsIndex,
3259 read_address: usize,
3260 count: ItemCount,
3261 rep: u32,
3262 ) -> Result<()> {
3263 let (write_component, store) = write_instance.component_and_store_mut(store.0);
3264 let (read_component, mut store) = read_instance.component_and_store_mut(store);
3265 let write_types = write_component.types();
3266 let read_types = read_component.types();
3267 let count = count.as_usize();
3268
3269 let write_payload_ty = write_ty.payload(write_types);
3272 let write_abi = match write_payload_ty {
3273 Some(ty) => write_types.canonical_abi(ty),
3274 None => &CanonicalAbiInfo::ZERO,
3275 };
3276 let write_length_in_bytes = match flat_abi {
3277 Some(abi) => usize::try_from(abi.size)? * count,
3278 None => usize::try_from(write_abi.size32)? * count,
3279 };
3280 if write_length_in_bytes > 0 {
3281 if write_address % usize::try_from(write_abi.align32)? != 0 {
3282 bail!("write pointer not aligned");
3283 }
3284 write_instance
3285 .options_memory(store, write_options)
3286 .get(write_address..)
3287 .and_then(|b| b.get(..write_length_in_bytes))
3288 .ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
3289 }
3290
3291 let read_payload_ty = read_ty.payload(read_types);
3292 let read_abi = match read_payload_ty {
3293 Some(ty) => read_types.canonical_abi(ty),
3294 None => &CanonicalAbiInfo::ZERO,
3295 };
3296 let read_length_in_bytes = match flat_abi {
3297 Some(abi) => usize::try_from(abi.size)? * count,
3298 None => usize::try_from(read_abi.size32)? * count,
3299 };
3300 if read_length_in_bytes > 0 {
3301 if read_address % usize::try_from(read_abi.align32)? != 0 {
3302 bail!("read pointer not aligned");
3303 }
3304 read_instance
3305 .options_memory(store, read_options)
3306 .get(read_address..)
3307 .and_then(|b| b.get(..read_length_in_bytes))
3308 .ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
3309 }
3310
3311 if write_caller_instance == read_caller_instance
3312 && !allow_intra_component_read_write(write_payload_ty)
3313 {
3314 bail!(
3315 "cannot read from and write to intra-component future/stream with non-numeric payload"
3316 )
3317 }
3318
3319 match (write_ty, read_ty) {
3320 (TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
3321 if count != 1 {
3322 bail_bug!("futures can only send 1 item");
3323 }
3324
3325 let val = write_payload_ty
3326 .map(|ty| {
3327 let lift = &mut LiftContext::new(store, write_options, write_instance);
3328 let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3329 Val::load(lift, *ty, bytes)
3330 })
3331 .transpose()?;
3332
3333 if let Some(val) = val {
3334 let old_thread = store.set_thread(read_caller_thread)?;
3338 let lower =
3339 &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3340 let ptr = func::validate_inbounds_dynamic(
3341 read_abi,
3342 lower.as_slice_mut(),
3343 &ValRaw::u32(read_address.try_into()?),
3344 )?;
3345 let ty = match read_payload_ty {
3346 Some(ty) => ty,
3347 None => bail_bug!("expected read payload type to be present"),
3348 };
3349 val.store(lower, *ty, ptr)?;
3350 store.set_thread(old_thread)?;
3351 }
3352 }
3353 (TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
3354 if write_length_in_bytes == 0 {
3355 return Ok(());
3356 }
3357 let write_payload_ty = match write_payload_ty {
3358 Some(ty) => ty,
3359 None => bail_bug!("expected write payload type to be present"),
3360 };
3361 let read_payload_ty = match read_payload_ty {
3362 Some(ty) => ty,
3363 None => bail_bug!("expected read payload type to be present"),
3364 };
3365 if flat_abi.is_some() {
3366 let store_opaque = store.store_opaque_mut();
3368
3369 assert_eq!(read_length_in_bytes, write_length_in_bytes);
3370
3371 if read_instance
3372 .options_memory(store_opaque, read_options)
3373 .as_ptr()
3374 == write_instance
3375 .options_memory(store_opaque, write_options)
3376 .as_ptr()
3377 {
3378 let memory = read_instance.options_memory_mut(store_opaque, read_options);
3379 memory.copy_within(
3380 write_address..write_address + write_length_in_bytes,
3381 read_address,
3382 );
3383 } else {
3384 let src = write_instance.options_memory(store_opaque, write_options)
3385 [write_address..][..write_length_in_bytes]
3386 .as_ptr();
3387 let dst = read_instance.options_memory_mut(store_opaque, read_options)
3388 [read_address..][..read_length_in_bytes]
3389 .as_mut_ptr();
3390
3391 unsafe {
3401 src.copy_to_nonoverlapping(dst, write_length_in_bytes);
3402 }
3403 }
3404 } else {
3405 let store_opaque = store.store_opaque_mut();
3406 let lift = &mut LiftContext::new(store_opaque, write_options, write_instance);
3407 let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3408 lift.consume_fuel_array(count, size_of::<Val>())?;
3409
3410 let values = (0..count)
3411 .map(|index| {
3412 let size = usize::try_from(write_abi.size32)?;
3413 Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
3414 })
3415 .collect::<Result<Vec<_>>>()?;
3416
3417 let id = TableId::<TransmitHandle>::new(rep);
3418 log::trace!("copy values {values:?} for {id:?}");
3419
3420 let old_thread = store.set_thread(read_caller_thread)?;
3424 let lower =
3425 &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3426 let mut ptr = read_address;
3427 for value in values {
3428 value.store(lower, *read_payload_ty, ptr)?;
3429 ptr += usize::try_from(read_abi.size32)?;
3430 }
3431 store.set_thread(old_thread)?;
3432 }
3433 }
3434 _ => bail_bug!("mismatched transmit types in copy"),
3435 }
3436
3437 Ok(())
3438 }
3439
3440 fn check_bounds(
3441 self,
3442 store: &StoreOpaque,
3443 options: OptionsIndex,
3444 ty: TransmitIndex,
3445 address: usize,
3446 count: usize,
3447 ) -> Result<()> {
3448 let types = self.id().get(store).component().types();
3449 let size = usize::try_from(
3450 match ty {
3451 TransmitIndex::Future(ty) => types[types[ty].ty]
3452 .payload
3453 .map(|ty| types.canonical_abi(&ty).size32),
3454 TransmitIndex::Stream(ty) => types[types[ty].ty]
3455 .payload
3456 .map(|ty| types.canonical_abi(&ty).size32),
3457 }
3458 .unwrap_or(0),
3459 )?;
3460
3461 if count > 0 && size > 0 {
3462 self.options_memory(store, options)
3463 .get(address..)
3464 .and_then(|b| b.get(..size.checked_mul(count)?))
3465 .map(drop)
3466 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3467 } else {
3468 Ok(())
3469 }
3470 }
3471
3472 pub(super) fn guest_write<T: 'static>(
3474 self,
3475 mut store: StoreContextMut<T>,
3476 caller: RuntimeComponentInstanceIndex,
3477 ty: TransmitIndex,
3478 options: OptionsIndex,
3479 flat_abi: Option<FlatAbi>,
3480 handle: u32,
3481 address: u32,
3482 count: u32,
3483 ) -> Result<ReturnCode> {
3484 let count = ItemCount::new(count)?;
3485
3486 if !self.options(store.0, options).async_ {
3487 store.0.check_blocking()?;
3491 }
3492
3493 let address = usize::try_from(address)?;
3494 self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3495 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3496 let TransmitLocalState::Write { done } = *state else {
3497 bail!(Trap::ConcurrentFutureStreamOp);
3498 };
3499
3500 if done {
3501 bail!("cannot write after being notified that the readable end dropped");
3502 }
3503
3504 *state = TransmitLocalState::Busy;
3505 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3506 let concurrent_state = store.0.concurrent_state_mut();
3507 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3508 let transmit = concurrent_state.get_mut(transmit_id)?;
3509 log::trace!(
3510 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3511 transmit.read
3512 );
3513
3514 if transmit.done {
3515 bail!("cannot write to future after previous write succeeded or readable end dropped");
3516 }
3517
3518 let new_state = if let ReadState::Dropped = &transmit.read {
3519 ReadState::Dropped
3520 } else {
3521 ReadState::Open
3522 };
3523
3524 let set_guest_ready = |me: &mut ConcurrentState| {
3525 let transmit = me.get_mut(transmit_id)?;
3526 if !matches!(&transmit.write, WriteState::Open) {
3527 bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3528 }
3529 transmit.write = WriteState::GuestReady {
3530 instance: self,
3531 caller,
3532 ty,
3533 flat_abi,
3534 options,
3535 address,
3536 count,
3537 handle,
3538 };
3539 Ok::<_, crate::Error>(())
3540 };
3541
3542 let mut result = match mem::replace(&mut transmit.read, new_state) {
3543 ReadState::GuestReady {
3544 ty: read_ty,
3545 flat_abi: read_flat_abi,
3546 options: read_options,
3547 address: read_address,
3548 count: read_count,
3549 handle: read_handle,
3550 instance: read_instance,
3551 caller_instance: read_caller_instance,
3552 caller_thread: read_caller_thread,
3553 } => {
3554 if flat_abi != read_flat_abi {
3555 bail_bug!("expected flat ABI calculations to be the same");
3556 }
3557
3558 if let TransmitIndex::Future(_) = ty {
3559 transmit.done = true;
3560 }
3561
3562 let write_complete = count == 0 || read_count > 0;
3584 let read_complete = count > 0;
3585 let read_buffer_remaining = count < read_count;
3586
3587 let read_handle_rep = transmit.read_handle.rep();
3588
3589 let count = count.min(read_count);
3590
3591 Instance::copy(
3592 store.as_context_mut(),
3593 flat_abi,
3594 self,
3595 caller,
3596 ty,
3597 options,
3598 address,
3599 read_instance,
3600 read_caller_instance,
3601 read_caller_thread,
3602 read_ty,
3603 read_options,
3604 read_address,
3605 count,
3606 rep,
3607 )?;
3608
3609 let instance = read_instance.id().get(store.0);
3610 let types = instance.component().types();
3611 let item_size = match read_ty.payload(types) {
3612 Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3613 None => 0,
3614 };
3615 let concurrent_state = store.0.concurrent_state_mut();
3616 if read_complete {
3617 let total = if let Some(Event::StreamRead {
3618 code: ReturnCode::Completed(old_total),
3619 ..
3620 }) = concurrent_state.take_event(read_handle_rep)?
3621 {
3622 count.add(old_total)?
3623 } else {
3624 count
3625 };
3626
3627 let code = ReturnCode::completed(ty.kind(), total);
3628
3629 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3630 }
3631
3632 if read_buffer_remaining || (count == 0 && read_count == 0) {
3639 let transmit = concurrent_state.get_mut(transmit_id)?;
3640 transmit.read = ReadState::GuestReady {
3641 ty: read_ty,
3642 flat_abi: read_flat_abi,
3643 options: read_options,
3644 address: read_address + (count.as_usize() * item_size),
3645 count: read_count.sub(count)?,
3646 handle: read_handle,
3647 instance: read_instance,
3648 caller_instance: read_caller_instance,
3649 caller_thread: read_caller_thread,
3650 };
3651 }
3652
3653 if write_complete {
3654 ReturnCode::completed(ty.kind(), count)
3655 } else {
3656 set_guest_ready(concurrent_state)?;
3657 ReturnCode::Blocked
3658 }
3659 }
3660
3661 ReadState::HostReady {
3662 consume,
3663 guest_offset,
3664 cancel,
3665 cancel_waker,
3666 } => {
3667 if cancel_waker.is_some() {
3668 bail_bug!("expected cancel_waker to be none");
3669 }
3670 if cancel {
3671 bail_bug!("expected cancel to be false");
3672 }
3673 if guest_offset != 0 {
3674 bail_bug!("expected guest_offset to be 0");
3675 }
3676
3677 if let TransmitIndex::Future(_) = ty {
3678 transmit.done = true;
3679 }
3680
3681 set_guest_ready(concurrent_state)?;
3682 self.consume(
3683 store.0,
3684 ty.kind(),
3685 transmit_id,
3686 consume,
3687 ItemCount::ZERO,
3688 false,
3689 )?
3690 }
3691
3692 ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3693
3694 ReadState::Open => {
3695 set_guest_ready(concurrent_state)?;
3696 ReturnCode::Blocked
3697 }
3698
3699 ReadState::Dropped => {
3700 if let TransmitIndex::Future(_) = ty {
3701 transmit.done = true;
3702 }
3703
3704 ReturnCode::Dropped(ItemCount::ZERO)
3705 }
3706 };
3707
3708 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3709 result = self.wait_for_write(store.0, transmit_handle)?;
3710 }
3711
3712 if result != ReturnCode::Blocked {
3713 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3714 TransmitLocalState::Write {
3715 done: matches!(result, ReturnCode::Dropped(_)),
3716 };
3717 }
3718
3719 log::trace!(
3720 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3721 );
3722
3723 Ok(result)
3724 }
3725
3726 pub(super) fn guest_read<T: 'static>(
3728 self,
3729 mut store: StoreContextMut<T>,
3730 caller_instance: RuntimeComponentInstanceIndex,
3731 ty: TransmitIndex,
3732 options: OptionsIndex,
3733 flat_abi: Option<FlatAbi>,
3734 handle: u32,
3735 address: u32,
3736 count: u32,
3737 ) -> Result<ReturnCode> {
3738 let count = ItemCount::new(count)?;
3739
3740 if !self.options(store.0, options).async_ {
3741 store.0.check_blocking()?;
3745 }
3746
3747 let address = usize::try_from(address)?;
3748 self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3749 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3750 let TransmitLocalState::Read { done } = *state else {
3751 bail!(Trap::ConcurrentFutureStreamOp);
3752 };
3753
3754 if done {
3755 bail!("cannot read after being notified that the writable end dropped");
3756 }
3757
3758 *state = TransmitLocalState::Busy;
3759 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3760 let concurrent_state = store.0.concurrent_state_mut();
3761 let caller_thread = concurrent_state.current_guest_thread()?;
3762 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3763 let transmit = concurrent_state.get_mut(transmit_id)?;
3764 log::trace!(
3765 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3766 transmit.write
3767 );
3768
3769 if transmit.done {
3770 bail!("cannot read from future after previous read succeeded");
3771 }
3772
3773 let new_state = if let WriteState::Dropped = &transmit.write {
3774 WriteState::Dropped
3775 } else {
3776 WriteState::Open
3777 };
3778
3779 let set_guest_ready = |me: &mut ConcurrentState| {
3780 let transmit = me.get_mut(transmit_id)?;
3781 if !matches!(&transmit.read, ReadState::Open) {
3782 bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3783 }
3784 transmit.read = ReadState::GuestReady {
3785 ty,
3786 flat_abi,
3787 options,
3788 address,
3789 count,
3790 handle,
3791 instance: self,
3792 caller_instance,
3793 caller_thread,
3794 };
3795 Ok::<_, crate::Error>(())
3796 };
3797
3798 let mut result = match mem::replace(&mut transmit.write, new_state) {
3799 WriteState::GuestReady {
3800 instance: write_instance,
3801 ty: write_ty,
3802 flat_abi: write_flat_abi,
3803 options: write_options,
3804 address: write_address,
3805 count: write_count,
3806 handle: write_handle,
3807 caller: write_caller,
3808 } => {
3809 if flat_abi != write_flat_abi {
3810 bail_bug!("expected flat ABI calculations to be the same");
3811 }
3812
3813 if let TransmitIndex::Future(_) = ty {
3814 transmit.done = true;
3815 }
3816
3817 let write_handle_rep = transmit.write_handle.rep();
3818
3819 let write_complete = write_count == 0 || count > 0;
3824 let read_complete = write_count > 0;
3825 let write_buffer_remaining = count < write_count;
3826
3827 let count = count.min(write_count);
3828
3829 Instance::copy(
3830 store.as_context_mut(),
3831 flat_abi,
3832 write_instance,
3833 write_caller,
3834 write_ty,
3835 write_options,
3836 write_address,
3837 self,
3838 caller_instance,
3839 caller_thread,
3840 ty,
3841 options,
3842 address,
3843 count,
3844 rep,
3845 )?;
3846
3847 let instance = write_instance.id().get(store.0);
3848 let types = instance.component().types();
3849 let item_size = match write_ty.payload(types) {
3850 Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3851 None => 0,
3852 };
3853 let concurrent_state = store.0.concurrent_state_mut();
3854
3855 if write_complete {
3856 let total = if let Some(Event::StreamWrite {
3857 code: ReturnCode::Completed(old_total),
3858 ..
3859 }) = concurrent_state.take_event(write_handle_rep)?
3860 {
3861 count.add(old_total)?
3862 } else {
3863 count
3864 };
3865
3866 let code = ReturnCode::completed(ty.kind(), total);
3867
3868 concurrent_state.send_write_result(
3869 write_ty,
3870 transmit_id,
3871 write_handle,
3872 code,
3873 )?;
3874 }
3875
3876 if write_buffer_remaining {
3877 let transmit = concurrent_state.get_mut(transmit_id)?;
3878 transmit.write = WriteState::GuestReady {
3879 instance: write_instance,
3880 caller: write_caller,
3881 ty: write_ty,
3882 flat_abi: write_flat_abi,
3883 options: write_options,
3884 address: write_address + (count.as_usize() * item_size),
3885 count: write_count.sub(count)?,
3886 handle: write_handle,
3887 };
3888 }
3889
3890 if read_complete {
3891 ReturnCode::completed(ty.kind(), count)
3892 } else {
3893 set_guest_ready(concurrent_state)?;
3894 ReturnCode::Blocked
3895 }
3896 }
3897
3898 WriteState::HostReady {
3899 produce,
3900 try_into,
3901 guest_offset,
3902 cancel,
3903 cancel_waker,
3904 } => {
3905 if cancel_waker.is_some() {
3906 bail_bug!("expected cancel_waker to be none");
3907 }
3908 if cancel {
3909 bail_bug!("expected cancel to be false");
3910 }
3911 if guest_offset != 0 {
3912 bail_bug!("expected guest_offset to be 0");
3913 }
3914
3915 set_guest_ready(concurrent_state)?;
3916
3917 let code = self.produce(
3918 store.0,
3919 ty.kind(),
3920 transmit_id,
3921 produce,
3922 try_into,
3923 ItemCount::ZERO,
3924 false,
3925 )?;
3926
3927 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3928 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3929 }
3930
3931 code
3932 }
3933
3934 WriteState::Open => {
3935 set_guest_ready(concurrent_state)?;
3936 ReturnCode::Blocked
3937 }
3938
3939 WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO),
3940 };
3941
3942 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3943 result = self.wait_for_read(store.0, transmit_handle)?;
3944 }
3945
3946 if result != ReturnCode::Blocked {
3947 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3948 TransmitLocalState::Read {
3949 done: matches!(
3950 (result, ty),
3951 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3952 ),
3953 };
3954 }
3955
3956 log::trace!(
3957 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3958 );
3959
3960 Ok(result)
3961 }
3962
3963 fn wait_for_write(
3964 self,
3965 store: &mut StoreOpaque,
3966 handle: TableId<TransmitHandle>,
3967 ) -> Result<ReturnCode> {
3968 let waitable = Waitable::Transmit(handle);
3969 store.wait_for_event(waitable)?;
3970 let event = waitable.take_event(store.concurrent_state_mut())?;
3971 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3972 event
3973 {
3974 waitable.on_delivery(store, self, event)?;
3975 Ok(code)
3976 } else {
3977 bail_bug!("expected either a stream or future write event")
3978 }
3979 }
3980
3981 fn cancel_write(
3983 self,
3984 store: &mut StoreOpaque,
3985 transmit_id: TableId<TransmitState>,
3986 async_: bool,
3987 ) -> Result<ReturnCode> {
3988 let state = store.concurrent_state_mut();
3989 let transmit = state.get_mut(transmit_id)?;
3990 log::trace!(
3991 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3992 transmit.read,
3993 transmit.write
3994 );
3995 let waitable = Waitable::Transmit(transmit.write_handle);
3996
3997 let code = if let Some(event) = waitable.take_event(state)? {
3998 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3999 bail_bug!("expected either a stream or future write event")
4000 };
4001 waitable.on_delivery(store, self, event)?;
4002 match (code, event) {
4003 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
4004 ReturnCode::Cancelled(count)
4005 }
4006 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4007 _ => bail_bug!("unexpected code/event combo"),
4008 }
4009 } else if let ReadState::HostReady {
4010 cancel,
4011 cancel_waker,
4012 ..
4013 } = &mut state.get_mut(transmit_id)?.read
4014 {
4015 *cancel = true;
4016 if let Some(waker) = cancel_waker.take() {
4017 waker.wake();
4018 }
4019
4020 if async_ {
4021 ReturnCode::Blocked
4022 } else {
4023 let handle = store
4024 .concurrent_state_mut()
4025 .get_mut(transmit_id)?
4026 .write_handle;
4027 self.wait_for_write(store, handle)?
4028 }
4029 } else {
4030 ReturnCode::Cancelled(ItemCount::ZERO)
4031 };
4032
4033 if !matches!(code, ReturnCode::Blocked) {
4034 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4035
4036 match &transmit.write {
4037 WriteState::GuestReady { .. } => {
4038 transmit.write = WriteState::Open;
4039 }
4040 WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
4041 WriteState::Open | WriteState::Dropped => {}
4042 }
4043 }
4044
4045 log::trace!("cancelled write {transmit_id:?}: {code:?}");
4046
4047 Ok(code)
4048 }
4049
4050 fn wait_for_read(
4051 self,
4052 store: &mut StoreOpaque,
4053 handle: TableId<TransmitHandle>,
4054 ) -> Result<ReturnCode> {
4055 let waitable = Waitable::Transmit(handle);
4056 store.wait_for_event(waitable)?;
4057 let event = waitable.take_event(store.concurrent_state_mut())?;
4058 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
4059 event
4060 {
4061 waitable.on_delivery(store, self, event)?;
4062 Ok(code)
4063 } else {
4064 bail_bug!("expected either a stream or future read event")
4065 }
4066 }
4067
4068 fn cancel_read(
4070 self,
4071 store: &mut StoreOpaque,
4072 transmit_id: TableId<TransmitState>,
4073 async_: bool,
4074 ) -> Result<ReturnCode> {
4075 let state = store.concurrent_state_mut();
4076 let transmit = state.get_mut(transmit_id)?;
4077 log::trace!(
4078 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4079 transmit.read,
4080 transmit.write
4081 );
4082
4083 let waitable = Waitable::Transmit(transmit.read_handle);
4084 let code = if let Some(event) = waitable.take_event(state)? {
4085 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4086 bail_bug!("expected either a stream or future read event")
4087 };
4088 waitable.on_delivery(store, self, event)?;
4089 match (code, event) {
4090 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4091 ReturnCode::Cancelled(count)
4092 }
4093 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4094 _ => bail_bug!("unexpected code/event combo"),
4095 }
4096 } else if let WriteState::HostReady {
4097 cancel,
4098 cancel_waker,
4099 ..
4100 } = &mut state.get_mut(transmit_id)?.write
4101 {
4102 *cancel = true;
4103 if let Some(waker) = cancel_waker.take() {
4104 waker.wake();
4105 }
4106
4107 if async_ {
4108 ReturnCode::Blocked
4109 } else {
4110 let handle = store
4111 .concurrent_state_mut()
4112 .get_mut(transmit_id)?
4113 .read_handle;
4114 self.wait_for_read(store, handle)?
4115 }
4116 } else {
4117 ReturnCode::Cancelled(ItemCount::ZERO)
4118 };
4119
4120 if !matches!(code, ReturnCode::Blocked) {
4121 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4122
4123 match &transmit.read {
4124 ReadState::GuestReady { .. } => {
4125 transmit.read = ReadState::Open;
4126 }
4127 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4128 bail_bug!("support host read cancellation")
4129 }
4130 ReadState::Open | ReadState::Dropped => {}
4131 }
4132 }
4133
4134 log::trace!("cancelled read {transmit_id:?}: {code:?}");
4135
4136 Ok(code)
4137 }
4138
4139 fn guest_cancel_write(
4141 self,
4142 store: &mut StoreOpaque,
4143 ty: TransmitIndex,
4144 async_: bool,
4145 writer: u32,
4146 ) -> Result<ReturnCode> {
4147 if !async_ {
4148 store.check_blocking()?;
4152 }
4153
4154 let (rep, state) =
4155 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4156 let id = TableId::<TransmitHandle>::new(rep);
4157 log::trace!("guest cancel write {id:?} (handle {writer})");
4158 match state {
4159 TransmitLocalState::Write { .. } => {
4160 bail!("stream or future write cancelled when no write is pending")
4161 }
4162 TransmitLocalState::Read { .. } => {
4163 bail!("passed read end to `{{stream|future}}.cancel-write`")
4164 }
4165 TransmitLocalState::Busy => {}
4166 }
4167 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4168 let code = self.cancel_write(store, transmit_id, async_)?;
4169 if !matches!(code, ReturnCode::Blocked) {
4170 let state =
4171 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4172 .1;
4173 if let TransmitLocalState::Busy = state {
4174 *state = TransmitLocalState::Write { done: false };
4175 }
4176 }
4177 Ok(code)
4178 }
4179
4180 fn guest_cancel_read(
4182 self,
4183 store: &mut StoreOpaque,
4184 ty: TransmitIndex,
4185 async_: bool,
4186 reader: u32,
4187 ) -> Result<ReturnCode> {
4188 if !async_ {
4189 store.check_blocking()?;
4193 }
4194
4195 let (rep, state) =
4196 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4197 let id = TableId::<TransmitHandle>::new(rep);
4198 log::trace!("guest cancel read {id:?} (handle {reader})");
4199 match state {
4200 TransmitLocalState::Read { .. } => {
4201 bail!("stream or future read cancelled when no read is pending")
4202 }
4203 TransmitLocalState::Write { .. } => {
4204 bail!("passed write end to `{{stream|future}}.cancel-read`")
4205 }
4206 TransmitLocalState::Busy => {}
4207 }
4208 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4209 let code = self.cancel_read(store, transmit_id, async_)?;
4210 if !matches!(code, ReturnCode::Blocked) {
4211 let state =
4212 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4213 .1;
4214 if let TransmitLocalState::Busy = state {
4215 *state = TransmitLocalState::Read { done: false };
4216 }
4217 }
4218 Ok(code)
4219 }
4220
4221 fn guest_drop_readable(
4223 self,
4224 store: &mut StoreOpaque,
4225 ty: TransmitIndex,
4226 reader: u32,
4227 ) -> Result<()> {
4228 let table = self.id().get_mut(store).table_for_transmit(ty);
4229 let (rep, _is_done) = match ty {
4230 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4231 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4232 };
4233 let kind = match ty {
4234 TransmitIndex::Stream(_) => TransmitKind::Stream,
4235 TransmitIndex::Future(_) => TransmitKind::Future,
4236 };
4237 let id = TableId::<TransmitHandle>::new(rep);
4238 log::trace!("guest_drop_readable: drop reader {id:?}");
4239 store.host_drop_reader(id, kind)
4240 }
4241
4242 pub(crate) fn error_context_new(
4244 self,
4245 store: &mut StoreOpaque,
4246 ty: TypeComponentLocalErrorContextTableIndex,
4247 options: OptionsIndex,
4248 debug_msg_address: u32,
4249 debug_msg_len: u32,
4250 ) -> Result<u32> {
4251 let lift_ctx = &mut LiftContext::new(store, options, self);
4252 let debug_msg = String::linear_lift_from_flat(
4253 lift_ctx,
4254 InterfaceType::String,
4255 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4256 )?;
4257
4258 let err_ctx = ErrorContextState { debug_msg };
4260 let state = store.concurrent_state_mut();
4261 let table_id = state.push(err_ctx)?;
4262 let global_ref_count_idx =
4263 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4264
4265 let _ = state
4267 .global_error_context_ref_counts
4268 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4269
4270 let local_idx = self
4277 .id()
4278 .get_mut(store)
4279 .table_for_error_context(ty)
4280 .error_context_insert(table_id.rep())?;
4281
4282 Ok(local_idx)
4283 }
4284
4285 pub(super) fn error_context_debug_message<T>(
4287 self,
4288 store: StoreContextMut<T>,
4289 ty: TypeComponentLocalErrorContextTableIndex,
4290 options: OptionsIndex,
4291 err_ctx_handle: u32,
4292 debug_msg_address: u32,
4293 ) -> Result<()> {
4294 let handle_table_id_rep = self
4296 .id()
4297 .get_mut(store.0)
4298 .table_for_error_context(ty)
4299 .error_context_rep(err_ctx_handle)?;
4300
4301 let state = store.0.concurrent_state_mut();
4302 let ErrorContextState { debug_msg } =
4304 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4305 let debug_msg = debug_msg.clone();
4306
4307 let lower_cx = &mut LowerContext::new(store, options, self);
4308 let debug_msg_address = usize::try_from(debug_msg_address)?;
4309 let offset = lower_cx
4315 .as_slice_mut()
4316 .get(debug_msg_address..)
4317 .and_then(|b| b.get(..8))
4318 .map(|_| debug_msg_address)
4319 .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4320 debug_msg
4321 .as_str()
4322 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4323
4324 Ok(())
4325 }
4326
4327 pub(crate) fn future_cancel_read(
4329 self,
4330 store: &mut StoreOpaque,
4331 ty: TypeFutureTableIndex,
4332 async_: bool,
4333 reader: u32,
4334 ) -> Result<u32> {
4335 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4336 .map(|v| v.encode())
4337 }
4338
4339 pub(crate) fn future_cancel_write(
4341 self,
4342 store: &mut StoreOpaque,
4343 ty: TypeFutureTableIndex,
4344 async_: bool,
4345 writer: u32,
4346 ) -> Result<u32> {
4347 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4348 .map(|v| v.encode())
4349 }
4350
4351 pub(crate) fn stream_cancel_read(
4353 self,
4354 store: &mut StoreOpaque,
4355 ty: TypeStreamTableIndex,
4356 async_: bool,
4357 reader: u32,
4358 ) -> Result<u32> {
4359 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4360 .map(|v| v.encode())
4361 }
4362
4363 pub(crate) fn stream_cancel_write(
4365 self,
4366 store: &mut StoreOpaque,
4367 ty: TypeStreamTableIndex,
4368 async_: bool,
4369 writer: u32,
4370 ) -> Result<u32> {
4371 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4372 .map(|v| v.encode())
4373 }
4374
4375 pub(crate) fn future_drop_readable(
4377 self,
4378 store: &mut StoreOpaque,
4379 ty: TypeFutureTableIndex,
4380 reader: u32,
4381 ) -> Result<()> {
4382 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4383 }
4384
4385 pub(crate) fn stream_drop_readable(
4387 self,
4388 store: &mut StoreOpaque,
4389 ty: TypeStreamTableIndex,
4390 reader: u32,
4391 ) -> Result<()> {
4392 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4393 }
4394
4395 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4399 let (write, read) = store
4400 .concurrent_state_mut()
4401 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4402
4403 let table = self.id().get_mut(store).table_for_transmit(ty);
4404 let (read_handle, write_handle) = match ty {
4405 TransmitIndex::Future(ty) => (
4406 table.future_insert_read(ty, read.rep())?,
4407 table.future_insert_write(ty, write.rep())?,
4408 ),
4409 TransmitIndex::Stream(ty) => (
4410 table.stream_insert_read(ty, read.rep())?,
4411 table.stream_insert_write(ty, write.rep())?,
4412 ),
4413 };
4414
4415 let state = store.concurrent_state_mut();
4416 state.get_mut(read)?.common.handle = Some(read_handle);
4417 state.get_mut(write)?.common.handle = Some(write_handle);
4418
4419 Ok(ResourcePair {
4420 write: write_handle,
4421 read: read_handle,
4422 })
4423 }
4424
4425 pub(crate) fn error_context_drop(
4427 self,
4428 store: &mut StoreOpaque,
4429 ty: TypeComponentLocalErrorContextTableIndex,
4430 error_context: u32,
4431 ) -> Result<()> {
4432 let instance = self.id().get_mut(store);
4433
4434 let local_handle_table = instance.table_for_error_context(ty);
4435
4436 let rep = local_handle_table.error_context_drop(error_context)?;
4437
4438 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4439
4440 let state = store.concurrent_state_mut();
4441 let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4442 .global_error_context_ref_counts
4443 .get_mut(&global_ref_count_idx)
4444 else {
4445 bail_bug!("retrieve concurrent state for error context during drop")
4446 };
4447
4448 if *global_ref_count < 1 {
4450 bail_bug!("ref count unexpectedly zero");
4451 }
4452 *global_ref_count -= 1;
4453 if *global_ref_count == 0 {
4454 state
4455 .global_error_context_ref_counts
4456 .remove(&global_ref_count_idx);
4457
4458 state
4459 .delete(TableId::<ErrorContextState>::new(rep))
4460 .context("deleting component-global error context data")?;
4461 }
4462
4463 Ok(())
4464 }
4465
4466 fn guest_transfer(
4469 self,
4470 store: &mut StoreOpaque,
4471 src_idx: u32,
4472 src: TransmitIndex,
4473 dst: TransmitIndex,
4474 ) -> Result<u32> {
4475 let id = self.lift_index_to_transmit(store, src, src_idx)?;
4476 self.lower_transmit_to_index(store, dst, id)
4477 }
4478
4479 fn lift_index_to_transmit(
4480 self,
4481 store: &mut StoreOpaque,
4482 ty: TransmitIndex,
4483 src_idx: u32,
4484 ) -> Result<TableId<TransmitHandle>> {
4485 let (state, _, _, instance) = store.lift_context_parts(self);
4486 lift_index_to_transmit(instance, state.concurrent_state_mut(), ty, src_idx)
4487 }
4488
4489 fn lower_transmit_to_index(
4490 self,
4491 store: &mut StoreOpaque,
4492 ty: TransmitIndex,
4493 id: TableId<TransmitHandle>,
4494 ) -> Result<u32> {
4495 let (state, _, _, instance) = store.lift_context_parts(self);
4496 lower_transmit_to_index(instance, state.concurrent_state_mut(), ty, id)
4497 }
4498
4499 pub(crate) fn future_new(
4501 self,
4502 store: &mut StoreOpaque,
4503 ty: TypeFutureTableIndex,
4504 ) -> Result<ResourcePair> {
4505 self.guest_new(store, TransmitIndex::Future(ty))
4506 }
4507
4508 pub(crate) fn stream_new(
4510 self,
4511 store: &mut StoreOpaque,
4512 ty: TypeStreamTableIndex,
4513 ) -> Result<ResourcePair> {
4514 self.guest_new(store, TransmitIndex::Stream(ty))
4515 }
4516
4517 pub(crate) fn future_transfer(
4520 self,
4521 store: &mut StoreOpaque,
4522 src_idx: u32,
4523 src: TypeFutureTableIndex,
4524 dst: TypeFutureTableIndex,
4525 ) -> Result<u32> {
4526 self.guest_transfer(
4527 store,
4528 src_idx,
4529 TransmitIndex::Future(src),
4530 TransmitIndex::Future(dst),
4531 )
4532 }
4533
4534 pub(crate) fn stream_transfer(
4537 self,
4538 store: &mut StoreOpaque,
4539 src_idx: u32,
4540 src: TypeStreamTableIndex,
4541 dst: TypeStreamTableIndex,
4542 ) -> Result<u32> {
4543 self.guest_transfer(
4544 store,
4545 src_idx,
4546 TransmitIndex::Stream(src),
4547 TransmitIndex::Stream(dst),
4548 )
4549 }
4550
4551 pub(crate) fn error_context_transfer(
4553 self,
4554 store: &mut StoreOpaque,
4555 src_idx: u32,
4556 src: TypeComponentLocalErrorContextTableIndex,
4557 dst: TypeComponentLocalErrorContextTableIndex,
4558 ) -> Result<u32> {
4559 let mut instance = self.id().get_mut(store);
4560 let rep = instance
4561 .as_mut()
4562 .table_for_error_context(src)
4563 .error_context_rep(src_idx)?;
4564 let dst_idx = instance
4565 .table_for_error_context(dst)
4566 .error_context_insert(rep)?;
4567
4568 let global_ref_count = store
4572 .concurrent_state_mut()
4573 .global_error_context_ref_counts
4574 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4575 .context("global ref count present for existing (sub)component error context")?;
4576
4577 global_ref_count.0 = global_ref_count
4578 .0
4579 .checked_add(1)
4580 .ok_or_else(|| format_err!(Trap::ReferenceCountOverflow))?;
4581
4582 Ok(dst_idx)
4583 }
4584}
4585
4586fn lift_index_to_transmit(
4592 instance: Pin<&mut ComponentInstance>,
4593 concurrent_state: &mut ConcurrentState,
4594 ty: TransmitIndex,
4595 src_idx: u32,
4596) -> Result<TableId<TransmitHandle>> {
4597 let handle_table = instance.table_for_transmit(ty);
4598 let (rep, is_done) = match ty {
4599 TransmitIndex::Future(idx) => handle_table.future_remove_readable(idx, src_idx)?,
4600 TransmitIndex::Stream(idx) => handle_table.stream_remove_readable(idx, src_idx)?,
4601 };
4602 let desc = match ty {
4603 TransmitIndex::Future(_) => "future",
4604 TransmitIndex::Stream(_) => "stream",
4605 };
4606 if is_done {
4607 bail!("cannot lift {desc} after being notified that the writable end dropped");
4608 }
4609 let id = TableId::<TransmitHandle>::new(rep);
4610 let future = concurrent_state.get_mut(id)?;
4611 if future.common.set.is_some() {
4612 bail!("cannot lift {desc} while it's in a waitable set");
4613 }
4614 future.common.handle = None;
4615
4616 let state = future.state;
4617 if concurrent_state.get_mut(state)?.done {
4618 bail!("cannot lift {desc} after previous read succeeded");
4619 }
4620
4621 Ok(id)
4622}
4623
4624fn lower_transmit_to_index(
4627 instance: Pin<&mut ComponentInstance>,
4628 concurrent_state: &mut ConcurrentState,
4629 ty: TransmitIndex,
4630 id: TableId<TransmitHandle>,
4631) -> Result<u32> {
4632 let state = concurrent_state.get_mut(id)?.state;
4633 debug_assert_eq!(concurrent_state.get_mut(state)?.read_handle, id);
4634 let handle_table = instance.table_for_transmit(ty);
4635 let handle = match ty {
4636 TransmitIndex::Future(idx) => handle_table.future_insert_read(idx, id.rep()),
4637 TransmitIndex::Stream(idx) => handle_table.stream_insert_read(idx, id.rep()),
4638 }?;
4639 concurrent_state.get_mut(id)?.common.handle = Some(handle);
4640 Ok(handle)
4641}
4642
4643impl ComponentInstance {
4644 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4645 let (states, types) = self.instance_states();
4646 let runtime_instance = match ty {
4647 TransmitIndex::Stream(ty) => types[ty].instance,
4648 TransmitIndex::Future(ty) => types[ty].instance,
4649 };
4650 states[runtime_instance].handle_table()
4651 }
4652
4653 fn table_for_error_context(
4654 self: Pin<&mut Self>,
4655 ty: TypeComponentLocalErrorContextTableIndex,
4656 ) -> &mut HandleTable {
4657 let (states, types) = self.instance_states();
4658 let runtime_instance = types[ty].instance;
4659 states[runtime_instance].handle_table()
4660 }
4661
4662 fn get_mut_by_index(
4663 self: Pin<&mut Self>,
4664 ty: TransmitIndex,
4665 index: u32,
4666 ) -> Result<(u32, &mut TransmitLocalState)> {
4667 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4668 }
4669}
4670
4671impl ConcurrentState {
4672 fn send_write_result(
4673 &mut self,
4674 ty: TransmitIndex,
4675 id: TableId<TransmitState>,
4676 handle: u32,
4677 code: ReturnCode,
4678 ) -> Result<()> {
4679 let write_handle = self.get_mut(id)?.write_handle.rep();
4680 self.set_event(
4681 write_handle,
4682 match ty {
4683 TransmitIndex::Future(ty) => Event::FutureWrite {
4684 code,
4685 pending: Some((ty, handle)),
4686 },
4687 TransmitIndex::Stream(ty) => Event::StreamWrite {
4688 code,
4689 pending: Some((ty, handle)),
4690 },
4691 },
4692 )
4693 }
4694
4695 fn send_read_result(
4696 &mut self,
4697 ty: TransmitIndex,
4698 id: TableId<TransmitState>,
4699 handle: u32,
4700 code: ReturnCode,
4701 ) -> Result<()> {
4702 let read_handle = self.get_mut(id)?.read_handle.rep();
4703 self.set_event(
4704 read_handle,
4705 match ty {
4706 TransmitIndex::Future(ty) => Event::FutureRead {
4707 code,
4708 pending: Some((ty, handle)),
4709 },
4710 TransmitIndex::Stream(ty) => Event::StreamRead {
4711 code,
4712 pending: Some((ty, handle)),
4713 },
4714 },
4715 )
4716 }
4717
4718 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4719 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4720 }
4721
4722 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4723 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4724 }
4725
4726 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4737 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4738
4739 fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4740 let (ReturnCode::Completed(count)
4741 | ReturnCode::Dropped(count)
4742 | ReturnCode::Cancelled(count)) = old
4743 else {
4744 bail_bug!("unexpected old return code")
4745 };
4746
4747 Ok(match new {
4748 ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count),
4749 ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count),
4750 _ => bail_bug!("unexpected new return code"),
4751 })
4752 }
4753
4754 let event = match (waitable.take_event(self)?, event) {
4755 (None, _) => event,
4756 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4757 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4758 (
4759 Some(Event::StreamWrite {
4760 code: old_code,
4761 pending: old_pending,
4762 }),
4763 Event::StreamWrite { code, pending },
4764 ) => Event::StreamWrite {
4765 code: update_code(old_code, code)?,
4766 pending: old_pending.or(pending),
4767 },
4768 (
4769 Some(Event::StreamRead {
4770 code: old_code,
4771 pending: old_pending,
4772 }),
4773 Event::StreamRead { code, pending },
4774 ) => Event::StreamRead {
4775 code: update_code(old_code, code)?,
4776 pending: old_pending.or(pending),
4777 },
4778 _ => bail_bug!("unexpected event combination"),
4779 };
4780
4781 waitable.set_event(self, Some(event))
4782 }
4783
4784 fn new_transmit(
4787 &mut self,
4788 origin: TransmitOrigin,
4789 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4790 let state_id = self.push(TransmitState::new(origin))?;
4791
4792 let write = self.push(TransmitHandle::new(state_id))?;
4793 let read = self.push(TransmitHandle::new(state_id))?;
4794
4795 let state = self.get_mut(state_id)?;
4796 state.write_handle = write;
4797 state.read_handle = read;
4798
4799 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4800
4801 Ok((write, read))
4802 }
4803
4804 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4806 let state = self.delete(state_id)?;
4807 self.delete(state.write_handle)?;
4808 self.delete(state.read_handle)?;
4809
4810 log::trace!(
4811 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4812 state.write_handle,
4813 state.read_handle,
4814 );
4815
4816 Ok(())
4817 }
4818}
4819
4820pub(crate) struct ResourcePair {
4821 pub(crate) write: u32,
4822 pub(crate) read: u32,
4823}
4824
4825impl Waitable {
4826 pub(super) fn on_delivery(
4829 &self,
4830 store: &mut StoreOpaque,
4831 instance: Instance,
4832 event: Event,
4833 ) -> Result<()> {
4834 let instance = instance.id().get_mut(store);
4835 let (rep, state, code) = match event {
4836 Event::FutureRead {
4837 pending: Some((ty, handle)),
4838 code,
4839 }
4840 | Event::FutureWrite {
4841 pending: Some((ty, handle)),
4842 code,
4843 } => {
4844 let runtime_instance = instance.component().types()[ty].instance;
4845 let (rep, state) = instance.instance_states().0[runtime_instance]
4846 .handle_table()
4847 .future_rep(ty, handle)?;
4848 (rep, state, code)
4849 }
4850 Event::StreamRead {
4851 pending: Some((ty, handle)),
4852 code,
4853 }
4854 | Event::StreamWrite {
4855 pending: Some((ty, handle)),
4856 code,
4857 } => {
4858 let runtime_instance = instance.component().types()[ty].instance;
4859 let (rep, state) = instance.instance_states().0[runtime_instance]
4860 .handle_table()
4861 .stream_rep(ty, handle)?;
4862 (rep, state, code)
4863 }
4864 _ => return Ok(()),
4865 };
4866 if rep != self.rep() {
4867 bail_bug!("unexpected rep mismatch");
4868 }
4869 if *state != TransmitLocalState::Busy {
4870 bail_bug!("expected state to be busy");
4871 }
4872 let done = matches!(code, ReturnCode::Dropped(_));
4873 *state = match event {
4874 Event::FutureRead { .. } | Event::StreamRead { .. } => {
4875 TransmitLocalState::Read { done }
4876 }
4877 Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
4878 TransmitLocalState::Write { done }
4879 }
4880 _ => bail_bug!("unexpected event for stream"),
4881 };
4882
4883 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4884 let state = store.concurrent_state_mut();
4885 let transmit_id = state.get_mut(transmit_handle)?.state;
4886 let transmit = state.get_mut(transmit_id)?;
4887
4888 match event {
4889 Event::StreamRead { .. } => {
4890 transmit.read = ReadState::Open;
4891 }
4892 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4893 _ => {}
4894 }
4895 Ok(())
4896 }
4897}
4898
4899fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
4903 matches!(
4904 ty,
4905 None | Some(
4906 InterfaceType::S8
4907 | InterfaceType::U8
4908 | InterfaceType::S16
4909 | InterfaceType::U16
4910 | InterfaceType::S32
4911 | InterfaceType::U32
4912 | InterfaceType::S64
4913 | InterfaceType::U64
4914 | InterfaceType::Float32
4915 | InterfaceType::Float64
4916 )
4917 )
4918}
4919
4920struct LockedState<T> {
4924 inner: TryMutex<Option<T>>,
4925}
4926
4927impl<T> LockedState<T> {
4928 fn new(value: T) -> Self {
4930 Self {
4931 inner: TryMutex::new(Some(value)),
4932 }
4933 }
4934
4935 fn try_lock(&self) -> Result<TryMutexGuard<'_, Option<T>>> {
4944 match self.inner.try_lock() {
4945 Some(lock) => Ok(lock),
4946 None => bail_bug!("should not have contention on state lock"),
4947 }
4948 }
4949
4950 fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4957 let result = self.try_lock()?.take();
4958 match result {
4959 Some(result) => Ok(LockedStateGuard {
4960 value: ManuallyDrop::new(result),
4961 state: self,
4962 }),
4963 None => bail_bug!("lock value unexpectedly missing"),
4964 }
4965 }
4966
4967 fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4976 let mut inner = self.try_lock()?;
4977 match &mut *inner {
4978 Some(state) => Ok(f(state)),
4979 None => bail_bug!("lock value unexpectedly missing"),
4980 }
4981 }
4982}
4983
4984struct LockedStateGuard<'a, T> {
4987 value: ManuallyDrop<T>,
4988 state: &'a LockedState<T>,
4989}
4990
4991impl<T> Deref for LockedStateGuard<'_, T> {
4992 type Target = T;
4993
4994 fn deref(&self) -> &T {
4995 &self.value
4996 }
4997}
4998
4999impl<T> DerefMut for LockedStateGuard<'_, T> {
5000 fn deref_mut(&mut self) -> &mut T {
5001 &mut self.value
5002 }
5003}
5004
5005impl<T> Drop for LockedStateGuard<'_, T> {
5006 fn drop(&mut self) {
5007 let value = unsafe { ManuallyDrop::take(&mut self.value) };
5012
5013 if let Ok(mut lock) = self.state.try_lock() {
5017 *lock = Some(value);
5018 }
5019 }
5020}
5021
5022#[cfg(test)]
5023mod tests {
5024 use super::*;
5025 use crate::{Engine, Store};
5026 use core::future::pending;
5027 use core::pin::pin;
5028 use std::sync::LazyLock;
5029
5030 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
5031
5032 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
5033 where
5034 T: FutureProducer<()>,
5035 {
5036 rx.poll_produce(
5037 &mut Context::from_waker(Waker::noop()),
5038 Store::new(&ENGINE, ()).as_context_mut(),
5039 finish,
5040 )
5041 }
5042
5043 #[test]
5044 fn future_producer() {
5045 let mut fut = pin!(async { crate::error::Ok(()) });
5046 assert!(matches!(
5047 poll_future_producer(fut.as_mut(), false),
5048 Poll::Ready(Ok(Some(()))),
5049 ));
5050
5051 let mut fut = pin!(async { crate::error::Ok(()) });
5052 assert!(matches!(
5053 poll_future_producer(fut.as_mut(), true),
5054 Poll::Ready(Ok(Some(()))),
5055 ));
5056
5057 let mut fut = pin!(pending::<Result<()>>());
5058 assert!(matches!(
5059 poll_future_producer(fut.as_mut(), false),
5060 Poll::Pending,
5061 ));
5062 assert!(matches!(
5063 poll_future_producer(fut.as_mut(), true),
5064 Poll::Ready(Ok(None)),
5065 ));
5066
5067 let (tx, rx) = oneshot::channel();
5068 let mut rx = pin!(rx);
5069 assert!(matches!(
5070 poll_future_producer(rx.as_mut(), false),
5071 Poll::Pending,
5072 ));
5073 assert!(matches!(
5074 poll_future_producer(rx.as_mut(), true),
5075 Poll::Ready(Ok(None)),
5076 ));
5077 tx.send(()).unwrap();
5078 assert!(matches!(
5079 poll_future_producer(rx.as_mut(), true),
5080 Poll::Ready(Ok(Some(()))),
5081 ));
5082
5083 let (tx, rx) = oneshot::channel();
5084 let mut rx = pin!(rx);
5085 tx.send(()).unwrap();
5086 assert!(matches!(
5087 poll_future_producer(rx.as_mut(), false),
5088 Poll::Ready(Ok(Some(()))),
5089 ));
5090
5091 let (tx, rx) = oneshot::channel::<()>();
5092 let mut rx = pin!(rx);
5093 drop(tx);
5094 assert!(matches!(
5095 poll_future_producer(rx.as_mut(), false),
5096 Poll::Ready(Err(..)),
5097 ));
5098
5099 let (tx, rx) = oneshot::channel::<()>();
5100 let mut rx = pin!(rx);
5101 drop(tx);
5102 assert!(matches!(
5103 poll_future_producer(rx.as_mut(), true),
5104 Poll::Ready(Err(..)),
5105 ));
5106 }
5107}