1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17 Error, Result, Trap, bail, bail_bug, ensure,
18 error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, ManuallyDrop, MaybeUninit};
26use core::ops::{Deref, DerefMut};
27use core::pin::Pin;
28use core::task::{Context, Poll, Waker, ready};
29use futures::channel::oneshot;
30use futures::{FutureExt as _, stream};
31use std::any::{Any, TypeId};
32use std::boxed::Box;
33use std::io::Cursor;
34use std::string::String;
35use std::sync::{Arc, Mutex, MutexGuard};
36use std::vec::Vec;
37use wasmtime_environ::component::{
38 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
39 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
40 TypeFutureTableIndex, TypeStreamTableIndex,
41};
42
43pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
44
45mod buffers;
46
47#[derive(Copy, Clone, Debug)]
50pub enum TransmitKind {
51 Stream,
52 Future,
53}
54
55#[derive(Copy, Clone, Debug, PartialEq)]
57pub enum ReturnCode {
58 Blocked,
59 Completed(ItemCount),
60 Dropped(ItemCount),
61 Cancelled(ItemCount),
62}
63
64impl ReturnCode {
65 pub fn encode(&self) -> u32 {
70 const BLOCKED: u32 = 0xffff_ffff;
71 const COMPLETED: u32 = 0x0;
72 const DROPPED: u32 = 0x1;
73 const CANCELLED: u32 = 0x2;
74 match self {
75 ReturnCode::Blocked => BLOCKED,
76 ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED,
77 ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED,
78 ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED,
79 }
80 }
81
82 fn completed(kind: TransmitKind, count: ItemCount) -> Self {
85 Self::Completed(if let TransmitKind::Future = kind {
86 ItemCount::ZERO
87 } else {
88 count
89 })
90 }
91}
92
93#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
100#[repr(transparent)]
101pub struct ItemCount {
102 raw: u32,
103}
104
105impl ItemCount {
106 const MAX: u32 = 1 << 28;
107 const ZERO: ItemCount = ItemCount { raw: 0 };
108
109 fn new(count: u32) -> Result<Self, Trap> {
112 if count < Self::MAX {
113 Ok(Self { raw: count })
114 } else {
115 Err(Trap::StreamOpTooBig)
116 }
117 }
118
119 fn new_usize(count: usize) -> Result<Self, Trap> {
121 let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?;
122 Self::new(count)
123 }
124
125 fn as_u32(&self) -> u32 {
126 self.raw
127 }
128
129 fn as_usize(&self) -> usize {
130 usize::try_from(self.raw).unwrap()
131 }
132
133 fn inc(&mut self, amt: usize) -> Result<(), Trap> {
136 let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?;
137 let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?;
138 if new_raw < Self::MAX {
139 self.raw = new_raw;
140 Ok(())
141 } else {
142 Err(Trap::StreamOpTooBig)
143 }
144 }
145
146 fn add(&self, other: ItemCount) -> Result<ItemCount> {
152 match self.raw.checked_add(other.raw) {
153 Some(raw) => Ok(ItemCount::new(raw)?),
154 None => bail_bug!("overflow in `ItemCount::add`"),
155 }
156 }
157
158 fn sub(&self, other: ItemCount) -> Result<ItemCount> {
163 match self.raw.checked_sub(other.raw) {
164 Some(raw) => Ok(ItemCount { raw }),
165 None => bail_bug!("underflow in `ItemCount::sub`"),
166 }
167 }
168}
169
170impl fmt::Display for ItemCount {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 self.raw.fmt(f)
173 }
174}
175
176impl fmt::Debug for ItemCount {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 self.raw.fmt(f)
179 }
180}
181
182impl PartialEq<u32> for ItemCount {
183 fn eq(&self, other: &u32) -> bool {
184 self.raw == *other
185 }
186}
187
188impl PartialOrd<u32> for ItemCount {
189 fn partial_cmp(&self, other: &u32) -> Option<std::cmp::Ordering> {
190 self.raw.partial_cmp(other)
191 }
192}
193
194#[derive(Copy, Clone, Debug)]
199pub enum TransmitIndex {
200 Stream(TypeStreamTableIndex),
201 Future(TypeFutureTableIndex),
202}
203
204impl TransmitIndex {
205 pub fn kind(&self) -> TransmitKind {
206 match self {
207 TransmitIndex::Stream(_) => TransmitKind::Stream,
208 TransmitIndex::Future(_) => TransmitKind::Future,
209 }
210 }
211
212 fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
215 match self {
216 TransmitIndex::Stream(i) => {
217 let ty = types[*i].ty;
218 types[ty].payload.as_ref()
219 }
220 TransmitIndex::Future(i) => {
221 let ty = types[*i].ty;
222 types[ty].payload.as_ref()
223 }
224 }
225 }
226}
227
228fn get_mut_by_index_from(
231 handle_table: &mut HandleTable,
232 ty: TransmitIndex,
233 index: u32,
234) -> Result<(u32, &mut TransmitLocalState)> {
235 match ty {
236 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
237 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
238 }
239}
240
241fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
242 mut store: StoreContextMut<U>,
243 instance: Instance,
244 caller_thread: QualifiedThreadId,
245 options: OptionsIndex,
246 ty: TransmitIndex,
247 address: usize,
248 count: usize,
249 buffer: &mut B,
250) -> Result<()> {
251 let count = buffer.remaining().len().min(count);
252
253 let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
257 let old_thread = store.0.set_thread(caller_thread)?;
258 (
259 &mut LowerContext::new(store.as_context_mut(), options, instance),
260 Some(old_thread),
261 )
262 } else {
263 (
264 &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
265 None,
266 )
267 };
268
269 if address % usize::try_from(T::ALIGN32)? != 0 {
270 bail!("read pointer not aligned");
271 }
272 lower
273 .as_slice_mut()
274 .get_mut(address..)
275 .and_then(|b| b.get_mut(..T::SIZE32 * count))
276 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
277
278 if let Some(ty) = ty.payload(lower.types) {
279 T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
280 }
281
282 if let Some(old_thread) = old_thread {
283 store.0.set_thread(old_thread)?;
284 }
285
286 buffer.skip(count);
287
288 Ok(())
289}
290
291fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
292 lift: &mut LiftContext<'_>,
293 ty: Option<InterfaceType>,
294 buffer: &mut B,
295 address: usize,
296 count: usize,
297) -> Result<()> {
298 let count = count.min(buffer.remaining_capacity());
299 if T::IS_RUST_UNIT_TYPE {
300 buffer.extend(
304 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
305 )
306 } else {
307 let ty = match ty {
308 Some(ty) => ty,
309 None => bail_bug!("type required for non-unit lift"),
310 };
311 if address % usize::try_from(T::ALIGN32)? != 0 {
312 bail!("write pointer not aligned");
313 }
314 lift.memory()
315 .get(address..)
316 .and_then(|b| b.get(..T::SIZE32 * count))
317 .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
318
319 let list = &WasmList::new(address, count, lift, ty)?;
320 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
321 }
322 Ok(())
323}
324
325#[derive(Debug, PartialEq, Eq, PartialOrd)]
327pub(super) struct ErrorContextState {
328 pub(crate) debug_msg: String,
330}
331
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
335pub(super) struct FlatAbi {
336 pub(super) size: u32,
337 pub(super) align: u32,
338}
339
340pub struct Destination<'a, T, B> {
342 id: TableId<TransmitState>,
343 buffer: &'a mut B,
344 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
345 _phantom: PhantomData<fn() -> T>,
346}
347
348impl<'a, T, B> Destination<'a, T, B> {
349 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
351 Destination {
352 id: self.id,
353 buffer: &mut *self.buffer,
354 host_buffer: self.host_buffer.as_deref_mut(),
355 _phantom: PhantomData,
356 }
357 }
358
359 pub fn take_buffer(&mut self) -> B
365 where
366 B: Default,
367 {
368 mem::take(self.buffer)
369 }
370
371 pub fn set_buffer(&mut self, buffer: B) {
381 *self.buffer = buffer;
382 }
383
384 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
401 self.remaining_(store.as_context_mut().0).unwrap()
405 }
406
407 fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
408 let transmit = store.concurrent_state_mut().get_mut(self.id)?;
409
410 if let &ReadState::GuestReady { count, .. } = &transmit.read {
411 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
412 bail_bug!("expected WriteState::HostReady")
413 };
414
415 Ok(Some(count.as_usize() - guest_offset.as_usize()))
416 } else {
417 Ok(None)
418 }
419 }
420}
421
422impl<'a, B> Destination<'a, u8, B> {
423 pub fn as_direct<D>(
434 mut self,
435 store: StoreContextMut<'a, D>,
436 capacity: usize,
437 ) -> DirectDestination<'a, D> {
438 if let Some(buffer) = self.host_buffer.as_deref_mut() {
439 buffer.set_position(0);
440 if buffer.get_mut().is_empty() {
441 buffer.get_mut().resize(capacity, 0);
442 }
443 }
444
445 DirectDestination {
446 id: self.id,
447 host_buffer: self.host_buffer,
448 store,
449 }
450 }
451}
452
453pub struct DirectDestination<'a, D: 'static> {
456 id: TableId<TransmitState>,
457 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
458 store: StoreContextMut<'a, D>,
459}
460
461impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
462 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
463 let rem = self.remaining();
464 let n = rem.len().min(buf.len());
465 rem[..n].copy_from_slice(&buf[..n]);
466 self.mark_written(n);
467 Ok(n)
468 }
469
470 fn flush(&mut self) -> std::io::Result<()> {
471 Ok(())
472 }
473}
474
475impl<D: 'static> DirectDestination<'_, D> {
476 pub fn remaining(&mut self) -> &mut [u8] {
478 self.remaining_().unwrap()
482 }
483
484 fn remaining_(&mut self) -> Result<&mut [u8]> {
485 if let Some(buffer) = self.host_buffer.as_deref_mut() {
486 return Ok(buffer.get_mut());
487 }
488 let transmit = self
489 .store
490 .as_context_mut()
491 .0
492 .concurrent_state_mut()
493 .get_mut(self.id)?;
494
495 let &ReadState::GuestReady {
496 address,
497 count,
498 options,
499 instance,
500 ..
501 } = &transmit.read
502 else {
503 bail_bug!("expected ReadState::GuestReady")
504 };
505
506 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
507 bail_bug!("expected WriteState::HostReady")
508 };
509
510 let memory = instance
511 .options_memory_mut(self.store.0, options)
512 .get_mut((address + guest_offset.as_usize())..)
513 .and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize())));
514 match memory {
515 Some(memory) => Ok(memory),
516 None => bail_bug!("guest buffer unexpectedly out of bounds"),
517 }
518 }
519
520 pub fn mark_written(&mut self, count: usize) {
527 self.mark_written_(count).unwrap()
531 }
532
533 fn mark_written_(&mut self, count: usize) -> Result<()> {
534 if let Some(buffer) = self.host_buffer.as_deref_mut() {
535 buffer.set_position(
536 buffer
539 .position()
540 .checked_add(u64::try_from(count).unwrap())
541 .unwrap(),
542 );
543 } else {
544 let transmit = self
545 .store
546 .as_context_mut()
547 .0
548 .concurrent_state_mut()
549 .get_mut(self.id)?;
550
551 let ReadState::GuestReady {
552 count: read_count, ..
553 } = &transmit.read
554 else {
555 bail_bug!("expected ReadState::GuestReady")
556 };
557
558 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
559 bail_bug!("expected WriteState::HostReady");
560 };
561
562 if guest_offset.as_usize() + count > read_count.as_usize() {
563 panic!(
566 "write count ({count}) must be less than or equal to read count ({read_count})"
567 )
568 } else {
569 guest_offset.inc(count)?;
570 }
571 }
572 Ok(())
573 }
574}
575
576#[derive(Copy, Clone, Debug)]
578pub enum StreamResult {
579 Completed,
582 Cancelled,
587 Dropped,
590}
591
592pub trait StreamProducer<D>: Send + 'static {
594 type Item;
596
597 type Buffer: WriteBuffer<Self::Item> + Default;
599
600 fn poll_produce<'a>(
736 self: Pin<&mut Self>,
737 cx: &mut Context<'_>,
738 store: StoreContextMut<'a, D>,
739 destination: Destination<'a, Self::Item, Self::Buffer>,
740 finish: bool,
741 ) -> Poll<Result<StreamResult>>;
742
743 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
749 Err(me)
750 }
751}
752
753impl<T, D> StreamProducer<D> for iter::Empty<T>
754where
755 T: Send + Sync + 'static,
756{
757 type Item = T;
758 type Buffer = Option<Self::Item>;
759
760 fn poll_produce<'a>(
761 self: Pin<&mut Self>,
762 _: &mut Context<'_>,
763 _: StoreContextMut<'a, D>,
764 _: Destination<'a, Self::Item, Self::Buffer>,
765 _: bool,
766 ) -> Poll<Result<StreamResult>> {
767 Poll::Ready(Ok(StreamResult::Dropped))
768 }
769}
770
771impl<T, D> StreamProducer<D> for stream::Empty<T>
772where
773 T: Send + Sync + 'static,
774{
775 type Item = T;
776 type Buffer = Option<Self::Item>;
777
778 fn poll_produce<'a>(
779 self: Pin<&mut Self>,
780 _: &mut Context<'_>,
781 _: StoreContextMut<'a, D>,
782 _: Destination<'a, Self::Item, Self::Buffer>,
783 _: bool,
784 ) -> Poll<Result<StreamResult>> {
785 Poll::Ready(Ok(StreamResult::Dropped))
786 }
787}
788
789impl<T, D> StreamProducer<D> for Vec<T>
790where
791 T: Unpin + Send + Sync + 'static,
792{
793 type Item = T;
794 type Buffer = VecBuffer<T>;
795
796 fn poll_produce<'a>(
797 self: Pin<&mut Self>,
798 _: &mut Context<'_>,
799 _: StoreContextMut<'a, D>,
800 mut dst: Destination<'a, Self::Item, Self::Buffer>,
801 _: bool,
802 ) -> Poll<Result<StreamResult>> {
803 dst.set_buffer(mem::take(self.get_mut()).into());
804 Poll::Ready(Ok(StreamResult::Dropped))
805 }
806}
807
808impl<T, D> StreamProducer<D> for Box<[T]>
809where
810 T: Unpin + Send + Sync + 'static,
811{
812 type Item = T;
813 type Buffer = VecBuffer<T>;
814
815 fn poll_produce<'a>(
816 self: Pin<&mut Self>,
817 _: &mut Context<'_>,
818 _: StoreContextMut<'a, D>,
819 mut dst: Destination<'a, Self::Item, Self::Buffer>,
820 _: bool,
821 ) -> Poll<Result<StreamResult>> {
822 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
823 Poll::Ready(Ok(StreamResult::Dropped))
824 }
825}
826
827#[cfg(feature = "component-model-async-bytes")]
828impl<D> StreamProducer<D> for bytes::Bytes {
829 type Item = u8;
830 type Buffer = Cursor<Self>;
831
832 fn poll_produce<'a>(
833 self: Pin<&mut Self>,
834 _: &mut Context<'_>,
835 _store: StoreContextMut<'a, D>,
836 mut dst: Destination<'a, Self::Item, Self::Buffer>,
837 _: bool,
838 ) -> Poll<Result<StreamResult>> {
839 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
840 Poll::Ready(Ok(StreamResult::Dropped))
841 }
842}
843
844#[cfg(feature = "component-model-async-bytes")]
845impl<D> StreamProducer<D> for bytes::BytesMut {
846 type Item = u8;
847 type Buffer = Cursor<Self>;
848
849 fn poll_produce<'a>(
850 self: Pin<&mut Self>,
851 _: &mut Context<'_>,
852 _store: StoreContextMut<'a, D>,
853 mut dst: Destination<'a, Self::Item, Self::Buffer>,
854 _: bool,
855 ) -> Poll<Result<StreamResult>> {
856 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
857 Poll::Ready(Ok(StreamResult::Dropped))
858 }
859}
860
861pub struct Source<'a, T> {
863 id: TableId<TransmitState>,
864 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
865}
866
867impl<'a, T> Source<'a, T> {
868 pub fn reborrow(&mut self) -> Source<'_, T> {
870 Source {
871 id: self.id,
872 host_buffer: self.host_buffer.as_deref_mut(),
873 }
874 }
875
876 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
878 where
879 T: func::Lift + 'static,
880 B: ReadBuffer<T>,
881 {
882 if let Some(input) = &mut self.host_buffer {
883 let count = input.remaining().len().min(buffer.remaining_capacity());
884 buffer.move_from(*input, count);
885 } else {
886 let store = store.as_context_mut();
887 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
888
889 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
890 bail_bug!("expected ReadState::HostReady");
891 };
892
893 let &WriteState::GuestReady {
894 ty,
895 address,
896 count,
897 options,
898 instance,
899 ..
900 } = &transmit.write
901 else {
902 bail_bug!("expected WriteState::GuestReady");
903 };
904
905 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
906 let ty = ty.payload(cx.types);
907 let old_remaining = buffer.remaining_capacity();
908 lift::<T, B>(
909 cx,
910 ty.copied(),
911 buffer,
912 address + (T::SIZE32 * guest_offset.as_usize()),
913 count.as_usize() - guest_offset.as_usize(),
914 )?;
915
916 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
917
918 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
919 bail_bug!("expected ReadState::HostReady");
920 };
921
922 guest_offset.inc(old_remaining - buffer.remaining_capacity())?;
923 }
924
925 Ok(())
926 }
927
928 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
931 where
932 T: 'static,
933 {
934 self.remaining_(store.as_context_mut().0).unwrap()
938 }
939
940 fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
941 where
942 T: 'static,
943 {
944 let transmit = store.concurrent_state_mut().get_mut(self.id)?;
945
946 if let &WriteState::GuestReady { count, .. } = &transmit.write {
947 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
948 bail_bug!("expected ReadState::HostReady")
949 };
950
951 Ok(count.as_usize() - guest_offset.as_usize())
952 } else if let Some(host_buffer) = &self.host_buffer {
953 Ok(host_buffer.remaining().len())
954 } else {
955 bail_bug!("expected either WriteState::GuestReady or host buffer")
956 }
957 }
958}
959
960impl<'a> Source<'a, u8> {
961 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
963 DirectSource {
964 id: self.id,
965 host_buffer: self.host_buffer,
966 store,
967 }
968 }
969}
970
971pub struct DirectSource<'a, D: 'static> {
974 id: TableId<TransmitState>,
975 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
976 store: StoreContextMut<'a, D>,
977}
978
979impl<D: 'static> std::io::Read for DirectSource<'_, D> {
980 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
981 let rem = self.remaining();
982 let n = rem.len().min(buf.len());
983 buf[..n].copy_from_slice(&rem[..n]);
984 self.mark_read(n);
985 Ok(n)
986 }
987}
988
989impl<D: 'static> DirectSource<'_, D> {
990 pub fn remaining(&mut self) -> &[u8] {
992 self.remaining_().unwrap()
996 }
997
998 fn remaining_(&mut self) -> Result<&[u8]> {
999 if let Some(buffer) = self.host_buffer.as_deref_mut() {
1000 return Ok(buffer.remaining());
1001 }
1002 let transmit = self
1003 .store
1004 .as_context_mut()
1005 .0
1006 .concurrent_state_mut()
1007 .get_mut(self.id)?;
1008
1009 let &WriteState::GuestReady {
1010 address,
1011 count,
1012 options,
1013 instance,
1014 ..
1015 } = &transmit.write
1016 else {
1017 bail_bug!("expected WriteState::GuestReady")
1018 };
1019
1020 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
1021 bail_bug!("expected ReadState::HostReady")
1022 };
1023
1024 let memory = instance
1025 .options_memory(self.store.0, options)
1026 .get((address + guest_offset.as_usize())..)
1027 .and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize())));
1028 match memory {
1029 Some(memory) => Ok(memory),
1030 None => bail_bug!("guest buffer unexpectedly out of bounds"),
1031 }
1032 }
1033
1034 pub fn mark_read(&mut self, count: usize) {
1041 self.mark_read_(count).unwrap()
1045 }
1046
1047 fn mark_read_(&mut self, count: usize) -> Result<()> {
1048 if let Some(buffer) = self.host_buffer.as_deref_mut() {
1049 buffer.skip(count);
1050 return Ok(());
1051 }
1052
1053 let transmit = self
1054 .store
1055 .as_context_mut()
1056 .0
1057 .concurrent_state_mut()
1058 .get_mut(self.id)?;
1059
1060 let WriteState::GuestReady {
1061 count: write_count, ..
1062 } = &transmit.write
1063 else {
1064 bail_bug!("expected WriteState::GuestReady");
1065 };
1066
1067 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
1068 bail_bug!("expected ReadState::HostReady");
1069 };
1070
1071 if guest_offset.as_usize() + count > write_count.as_usize() {
1072 panic!("read count ({count}) must be less than or equal to write count ({write_count})")
1074 } else {
1075 guest_offset.inc(count)?;
1076 }
1077 Ok(())
1078 }
1079}
1080
1081pub trait StreamConsumer<D>: Send + 'static {
1083 type Item;
1085
1086 fn poll_consume(
1169 self: Pin<&mut Self>,
1170 cx: &mut Context<'_>,
1171 store: StoreContextMut<D>,
1172 source: Source<'_, Self::Item>,
1173 finish: bool,
1174 ) -> Poll<Result<StreamResult>>;
1175}
1176
1177pub trait FutureProducer<D>: Send + 'static {
1179 type Item;
1181
1182 fn poll_produce(
1192 self: Pin<&mut Self>,
1193 cx: &mut Context<'_>,
1194 store: StoreContextMut<D>,
1195 finish: bool,
1196 ) -> Poll<Result<Option<Self::Item>>>;
1197}
1198
1199impl<T, E, D, Fut> FutureProducer<D> for Fut
1200where
1201 E: Into<Error>,
1202 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1203{
1204 type Item = T;
1205
1206 fn poll_produce<'a>(
1207 self: Pin<&mut Self>,
1208 cx: &mut Context<'_>,
1209 _: StoreContextMut<'a, D>,
1210 finish: bool,
1211 ) -> Poll<Result<Option<T>>> {
1212 match self.poll(cx) {
1213 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1214 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1215 Poll::Pending if finish => Poll::Ready(Ok(None)),
1216 Poll::Pending => Poll::Pending,
1217 }
1218 }
1219}
1220
1221pub trait FutureConsumer<D>: Send + 'static {
1223 type Item;
1225
1226 fn poll_consume(
1238 self: Pin<&mut Self>,
1239 cx: &mut Context<'_>,
1240 store: StoreContextMut<D>,
1241 source: Source<'_, Self::Item>,
1242 finish: bool,
1243 ) -> Poll<Result<()>>;
1244}
1245
1246pub struct FutureReader<T> {
1253 id: TableId<TransmitHandle>,
1254 _phantom: PhantomData<T>,
1255}
1256
1257impl<T> FutureReader<T> {
1258 pub fn new<S: AsContextMut>(
1267 mut store: S,
1268 producer: impl FutureProducer<S::Data, Item = T>,
1269 ) -> Result<Self>
1270 where
1271 T: func::Lower + func::Lift + Send + Sync + 'static,
1272 {
1273 ensure!(
1274 store.as_context().0.concurrency_support(),
1275 "concurrency support is not enabled"
1276 );
1277
1278 struct Producer<P>(P);
1279
1280 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1281 for Producer<P>
1282 {
1283 type Item = P::Item;
1284 type Buffer = Option<P::Item>;
1285
1286 fn poll_produce<'a>(
1287 self: Pin<&mut Self>,
1288 cx: &mut Context<'_>,
1289 store: StoreContextMut<D>,
1290 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1291 finish: bool,
1292 ) -> Poll<Result<StreamResult>> {
1293 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1296
1297 Poll::Ready(Ok(
1298 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1299 destination.set_buffer(Some(value));
1300
1301 StreamResult::Completed
1308 } else {
1309 StreamResult::Cancelled
1310 },
1311 ))
1312 }
1313 }
1314
1315 Ok(Self::new_(
1316 store
1317 .as_context_mut()
1318 .new_transmit(TransmitKind::Future, Producer(producer))?,
1319 ))
1320 }
1321
1322 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1323 Self {
1324 id,
1325 _phantom: PhantomData,
1326 }
1327 }
1328
1329 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1330 self.id
1331 }
1332
1333 pub fn pipe<S: AsContextMut>(
1343 self,
1344 mut store: S,
1345 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1346 ) -> Result<()>
1347 where
1348 T: func::Lift + 'static,
1349 {
1350 struct Consumer<C>(C);
1351
1352 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1353 for Consumer<C>
1354 {
1355 type Item = T;
1356
1357 fn poll_consume(
1358 self: Pin<&mut Self>,
1359 cx: &mut Context<'_>,
1360 mut store: StoreContextMut<D>,
1361 mut source: Source<Self::Item>,
1362 finish: bool,
1363 ) -> Poll<Result<StreamResult>> {
1364 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1367
1368 ready!(consumer.poll_consume(
1369 cx,
1370 store.as_context_mut(),
1371 source.reborrow(),
1372 finish
1373 ))?;
1374
1375 Poll::Ready(Ok(if source.remaining(store) == 0 {
1376 StreamResult::Completed
1382 } else {
1383 StreamResult::Cancelled
1384 }))
1385 }
1386 }
1387
1388 store
1389 .as_context_mut()
1390 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1391 }
1392
1393 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1395 let id = lift_index_to_future(cx, ty, index)?;
1396 Ok(Self::new_(id))
1397 }
1398
1399 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1417 future_close(store.as_context_mut().0, &mut self.id)
1418 }
1419
1420 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1422 accessor.as_accessor().with(|access| self.close(access))
1423 }
1424
1425 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1431 where
1432 A: AsAccessor,
1433 {
1434 GuardedFutureReader::new(accessor, self)
1435 }
1436
1437 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1444 where
1445 T: ComponentType + 'static,
1446 {
1447 FutureAny::try_from_future_reader(store, self)
1448 }
1449
1450 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1457 where
1458 T: ComponentType + 'static,
1459 {
1460 future.try_into_future_reader()
1461 }
1462}
1463
1464impl<T> fmt::Debug for FutureReader<T> {
1465 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1466 f.debug_struct("FutureReader")
1467 .field("id", &self.id)
1468 .finish()
1469 }
1470}
1471
1472pub(super) fn future_close(
1473 store: &mut StoreOpaque,
1474 id: &mut TableId<TransmitHandle>,
1475) -> Result<()> {
1476 let id = mem::replace(id, TableId::new(u32::MAX));
1477 store.host_drop_reader(id, TransmitKind::Future)
1478}
1479
1480pub(super) fn lift_index_to_future(
1482 cx: &mut LiftContext<'_>,
1483 ty: InterfaceType,
1484 index: u32,
1485) -> Result<TableId<TransmitHandle>> {
1486 match ty {
1487 InterfaceType::Future(src) => {
1488 let handle_table = cx
1489 .instance_mut()
1490 .table_for_transmit(TransmitIndex::Future(src));
1491 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1492 if is_done {
1493 bail!("cannot lift future after being notified that the writable end dropped");
1494 }
1495 let id = TableId::<TransmitHandle>::new(rep);
1496 let concurrent_state = cx.concurrent_state_mut();
1497 let future = concurrent_state.get_mut(id)?;
1498 future.common.handle = None;
1499 let state = future.state;
1500
1501 if concurrent_state.get_mut(state)?.done {
1502 bail!("cannot lift future after previous read succeeded");
1503 }
1504
1505 Ok(id)
1506 }
1507 _ => func::bad_type_info(),
1508 }
1509}
1510
1511pub(super) fn lower_future_to_index<U>(
1513 id: TableId<TransmitHandle>,
1514 cx: &mut LowerContext<'_, U>,
1515 ty: InterfaceType,
1516) -> Result<u32> {
1517 match ty {
1518 InterfaceType::Future(dst) => {
1519 let concurrent_state = cx.store.0.concurrent_state_mut();
1520 let state = concurrent_state.get_mut(id)?.state;
1521 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1522
1523 let handle = cx
1524 .instance_mut()
1525 .table_for_transmit(TransmitIndex::Future(dst))
1526 .future_insert_read(dst, rep)?;
1527
1528 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1529
1530 Ok(handle)
1531 }
1532 _ => func::bad_type_info(),
1533 }
1534}
1535
1536unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1539 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1540
1541 type Lower = <u32 as func::ComponentType>::Lower;
1542
1543 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1544 match ty {
1545 InterfaceType::Future(ty) => {
1546 let ty = types.types[*ty].ty;
1547 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1548 }
1549 other => bail!("expected `future`, found `{}`", func::desc(other)),
1550 }
1551 }
1552}
1553
1554unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1556 fn linear_lower_to_flat<U>(
1557 &self,
1558 cx: &mut LowerContext<'_, U>,
1559 ty: InterfaceType,
1560 dst: &mut MaybeUninit<Self::Lower>,
1561 ) -> Result<()> {
1562 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1563 }
1564
1565 fn linear_lower_to_memory<U>(
1566 &self,
1567 cx: &mut LowerContext<'_, U>,
1568 ty: InterfaceType,
1569 offset: usize,
1570 ) -> Result<()> {
1571 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1572 cx,
1573 InterfaceType::U32,
1574 offset,
1575 )
1576 }
1577}
1578
1579unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1581 fn linear_lift_from_flat(
1582 cx: &mut LiftContext<'_>,
1583 ty: InterfaceType,
1584 src: &Self::Lower,
1585 ) -> Result<Self> {
1586 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1587 Self::lift_from_index(cx, ty, index)
1588 }
1589
1590 fn linear_lift_from_memory(
1591 cx: &mut LiftContext<'_>,
1592 ty: InterfaceType,
1593 bytes: &[u8],
1594 ) -> Result<Self> {
1595 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1596 Self::lift_from_index(cx, ty, index)
1597 }
1598}
1599
1600pub struct GuardedFutureReader<T, A>
1608where
1609 A: AsAccessor,
1610{
1611 reader: Option<FutureReader<T>>,
1615 accessor: A,
1616}
1617
1618impl<T, A> GuardedFutureReader<T, A>
1619where
1620 A: AsAccessor,
1621{
1622 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1630 assert!(
1631 accessor
1632 .as_accessor()
1633 .with(|a| a.as_context().0.concurrency_support())
1634 );
1635 Self {
1636 reader: Some(reader),
1637 accessor,
1638 }
1639 }
1640
1641 pub fn into_future(self) -> FutureReader<T> {
1644 self.into()
1645 }
1646}
1647
1648impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1649where
1650 A: AsAccessor,
1651{
1652 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1653 guard.reader.take().unwrap()
1654 }
1655}
1656
1657impl<T, A> Drop for GuardedFutureReader<T, A>
1658where
1659 A: AsAccessor,
1660{
1661 fn drop(&mut self) {
1662 if let Some(reader) = &mut self.reader {
1663 let result = reader.close_with(&self.accessor);
1666 debug_assert!(result.is_ok());
1667 }
1668 }
1669}
1670
1671pub struct StreamReader<T> {
1678 id: TableId<TransmitHandle>,
1679 _phantom: PhantomData<T>,
1680}
1681
1682impl<T> StreamReader<T> {
1683 pub fn new<S: AsContextMut>(
1692 mut store: S,
1693 producer: impl StreamProducer<S::Data, Item = T>,
1694 ) -> Result<Self>
1695 where
1696 T: func::Lower + func::Lift + Send + Sync + 'static,
1697 {
1698 ensure!(
1699 store.as_context().0.concurrency_support(),
1700 "concurrency support is not enabled",
1701 );
1702 Ok(Self::new_(
1703 store
1704 .as_context_mut()
1705 .new_transmit(TransmitKind::Stream, producer)?,
1706 ))
1707 }
1708
1709 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1710 Self {
1711 id,
1712 _phantom: PhantomData,
1713 }
1714 }
1715
1716 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1717 self.id
1718 }
1719
1720 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1742 let store = store.as_context_mut();
1743 let state = store.0.concurrent_state_mut();
1744 let id = state.get_mut(self.id).unwrap().state;
1745 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1746 match try_into(TypeId::of::<V>()) {
1747 Some(result) => {
1748 self.close(store).unwrap();
1749 Ok(*result.downcast::<V>().unwrap())
1750 }
1751 None => Err(self),
1752 }
1753 } else {
1754 Err(self)
1755 }
1756 }
1757
1758 pub fn pipe<S: AsContextMut>(
1768 self,
1769 mut store: S,
1770 consumer: impl StreamConsumer<S::Data, Item = T>,
1771 ) -> Result<()>
1772 where
1773 T: 'static,
1774 {
1775 store
1776 .as_context_mut()
1777 .set_consumer(self.id, TransmitKind::Stream, consumer)
1778 }
1779
1780 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1782 let id = lift_index_to_stream(cx, ty, index)?;
1783 Ok(Self::new_(id))
1784 }
1785
1786 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1802 stream_close(store.as_context_mut().0, &mut self.id)
1803 }
1804
1805 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1807 accessor.as_accessor().with(|access| self.close(access))
1808 }
1809
1810 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1816 where
1817 A: AsAccessor,
1818 {
1819 GuardedStreamReader::new(accessor, self)
1820 }
1821
1822 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1829 where
1830 T: ComponentType + 'static,
1831 {
1832 StreamAny::try_from_stream_reader(store, self)
1833 }
1834
1835 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1842 where
1843 T: ComponentType + 'static,
1844 {
1845 stream.try_into_stream_reader()
1846 }
1847}
1848
1849impl<T> fmt::Debug for StreamReader<T> {
1850 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1851 f.debug_struct("StreamReader")
1852 .field("id", &self.id)
1853 .finish()
1854 }
1855}
1856
1857pub(super) fn stream_close(
1858 store: &mut StoreOpaque,
1859 id: &mut TableId<TransmitHandle>,
1860) -> Result<()> {
1861 let id = mem::replace(id, TableId::new(u32::MAX));
1862 store.host_drop_reader(id, TransmitKind::Stream)
1863}
1864
1865pub(super) fn lift_index_to_stream(
1867 cx: &mut LiftContext<'_>,
1868 ty: InterfaceType,
1869 index: u32,
1870) -> Result<TableId<TransmitHandle>> {
1871 match ty {
1872 InterfaceType::Stream(src) => {
1873 let handle_table = cx
1874 .instance_mut()
1875 .table_for_transmit(TransmitIndex::Stream(src));
1876 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1877 if is_done {
1878 bail!("cannot lift stream after being notified that the writable end dropped");
1879 }
1880 let id = TableId::<TransmitHandle>::new(rep);
1881 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1882 Ok(id)
1883 }
1884 _ => func::bad_type_info(),
1885 }
1886}
1887
1888pub(super) fn lower_stream_to_index<U>(
1890 id: TableId<TransmitHandle>,
1891 cx: &mut LowerContext<'_, U>,
1892 ty: InterfaceType,
1893) -> Result<u32> {
1894 match ty {
1895 InterfaceType::Stream(dst) => {
1896 let concurrent_state = cx.store.0.concurrent_state_mut();
1897 let state = concurrent_state.get_mut(id)?.state;
1898 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1899
1900 let handle = cx
1901 .instance_mut()
1902 .table_for_transmit(TransmitIndex::Stream(dst))
1903 .stream_insert_read(dst, rep)?;
1904
1905 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1906
1907 Ok(handle)
1908 }
1909 _ => func::bad_type_info(),
1910 }
1911}
1912
1913unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1916 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1917
1918 type Lower = <u32 as func::ComponentType>::Lower;
1919
1920 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1921 match ty {
1922 InterfaceType::Stream(ty) => {
1923 let ty = types.types[*ty].ty;
1924 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1925 }
1926 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1927 }
1928 }
1929}
1930
1931unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1933 fn linear_lower_to_flat<U>(
1934 &self,
1935 cx: &mut LowerContext<'_, U>,
1936 ty: InterfaceType,
1937 dst: &mut MaybeUninit<Self::Lower>,
1938 ) -> Result<()> {
1939 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1940 }
1941
1942 fn linear_lower_to_memory<U>(
1943 &self,
1944 cx: &mut LowerContext<'_, U>,
1945 ty: InterfaceType,
1946 offset: usize,
1947 ) -> Result<()> {
1948 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1949 cx,
1950 InterfaceType::U32,
1951 offset,
1952 )
1953 }
1954}
1955
1956unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1958 fn linear_lift_from_flat(
1959 cx: &mut LiftContext<'_>,
1960 ty: InterfaceType,
1961 src: &Self::Lower,
1962 ) -> Result<Self> {
1963 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1964 Self::lift_from_index(cx, ty, index)
1965 }
1966
1967 fn linear_lift_from_memory(
1968 cx: &mut LiftContext<'_>,
1969 ty: InterfaceType,
1970 bytes: &[u8],
1971 ) -> Result<Self> {
1972 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1973 Self::lift_from_index(cx, ty, index)
1974 }
1975}
1976
1977pub struct GuardedStreamReader<T, A>
1985where
1986 A: AsAccessor,
1987{
1988 reader: Option<StreamReader<T>>,
1992 accessor: A,
1993}
1994
1995impl<T, A> GuardedStreamReader<T, A>
1996where
1997 A: AsAccessor,
1998{
1999 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
2008 assert!(
2009 accessor
2010 .as_accessor()
2011 .with(|a| a.as_context().0.concurrency_support())
2012 );
2013 Self {
2014 reader: Some(reader),
2015 accessor,
2016 }
2017 }
2018
2019 pub fn into_stream(self) -> StreamReader<T> {
2022 self.into()
2023 }
2024}
2025
2026impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
2027where
2028 A: AsAccessor,
2029{
2030 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
2031 guard.reader.take().unwrap()
2032 }
2033}
2034
2035impl<T, A> Drop for GuardedStreamReader<T, A>
2036where
2037 A: AsAccessor,
2038{
2039 fn drop(&mut self) {
2040 if let Some(reader) = &mut self.reader {
2041 let result = reader.close_with(&self.accessor);
2044 debug_assert!(result.is_ok());
2045 }
2046 }
2047}
2048
2049pub struct ErrorContext {
2051 rep: u32,
2052}
2053
2054impl ErrorContext {
2055 pub(crate) fn new(rep: u32) -> Self {
2056 Self { rep }
2057 }
2058
2059 pub fn into_val(self) -> Val {
2061 Val::ErrorContext(ErrorContextAny(self.rep))
2062 }
2063
2064 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
2066 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
2067 bail!("expected `error-context`; got `{}`", value.desc());
2068 };
2069 Ok(Self::new(*rep))
2070 }
2071
2072 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
2073 match ty {
2074 InterfaceType::ErrorContext(src) => {
2075 let rep = cx
2076 .instance_mut()
2077 .table_for_error_context(src)
2078 .error_context_rep(index)?;
2079
2080 Ok(Self { rep })
2081 }
2082 _ => func::bad_type_info(),
2083 }
2084 }
2085}
2086
2087pub(crate) fn lower_error_context_to_index<U>(
2088 rep: u32,
2089 cx: &mut LowerContext<'_, U>,
2090 ty: InterfaceType,
2091) -> Result<u32> {
2092 match ty {
2093 InterfaceType::ErrorContext(dst) => {
2094 let tbl = cx.instance_mut().table_for_error_context(dst);
2095 tbl.error_context_insert(rep)
2096 }
2097 _ => func::bad_type_info(),
2098 }
2099}
2100unsafe impl func::ComponentType for ErrorContext {
2103 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2104
2105 type Lower = <u32 as func::ComponentType>::Lower;
2106
2107 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2108 match ty {
2109 InterfaceType::ErrorContext(_) => Ok(()),
2110 other => bail!("expected `error`, found `{}`", func::desc(other)),
2111 }
2112 }
2113}
2114
2115unsafe impl func::Lower for ErrorContext {
2117 fn linear_lower_to_flat<T>(
2118 &self,
2119 cx: &mut LowerContext<'_, T>,
2120 ty: InterfaceType,
2121 dst: &mut MaybeUninit<Self::Lower>,
2122 ) -> Result<()> {
2123 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2124 cx,
2125 InterfaceType::U32,
2126 dst,
2127 )
2128 }
2129
2130 fn linear_lower_to_memory<T>(
2131 &self,
2132 cx: &mut LowerContext<'_, T>,
2133 ty: InterfaceType,
2134 offset: usize,
2135 ) -> Result<()> {
2136 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2137 cx,
2138 InterfaceType::U32,
2139 offset,
2140 )
2141 }
2142}
2143
2144unsafe impl func::Lift for ErrorContext {
2146 fn linear_lift_from_flat(
2147 cx: &mut LiftContext<'_>,
2148 ty: InterfaceType,
2149 src: &Self::Lower,
2150 ) -> Result<Self> {
2151 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2152 Self::lift_from_index(cx, ty, index)
2153 }
2154
2155 fn linear_lift_from_memory(
2156 cx: &mut LiftContext<'_>,
2157 ty: InterfaceType,
2158 bytes: &[u8],
2159 ) -> Result<Self> {
2160 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2161 Self::lift_from_index(cx, ty, index)
2162 }
2163}
2164
2165pub(super) struct TransmitHandle {
2167 pub(super) common: WaitableCommon,
2168 state: TableId<TransmitState>,
2170}
2171
2172impl TransmitHandle {
2173 fn new(state: TableId<TransmitState>) -> Self {
2174 Self {
2175 common: WaitableCommon::default(),
2176 state,
2177 }
2178 }
2179}
2180
2181impl TableDebug for TransmitHandle {
2182 fn type_name() -> &'static str {
2183 "TransmitHandle"
2184 }
2185}
2186
2187struct TransmitState {
2189 write_handle: TableId<TransmitHandle>,
2191 read_handle: TableId<TransmitHandle>,
2193 write: WriteState,
2195 read: ReadState,
2197 done: bool,
2199 pub(super) origin: TransmitOrigin,
2202}
2203
2204#[derive(Copy, Clone)]
2205pub(super) enum TransmitOrigin {
2206 Host,
2207 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2208 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2209}
2210
2211impl TransmitState {
2212 fn new(origin: TransmitOrigin) -> Self {
2213 Self {
2214 write_handle: TableId::new(u32::MAX),
2215 read_handle: TableId::new(u32::MAX),
2216 read: ReadState::Open,
2217 write: WriteState::Open,
2218 done: false,
2219 origin,
2220 }
2221 }
2222}
2223
2224impl TableDebug for TransmitState {
2225 fn type_name() -> &'static str {
2226 "TransmitState"
2227 }
2228}
2229
2230impl TransmitOrigin {
2231 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2232 match index {
2233 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2234 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2235 }
2236 }
2237}
2238
2239type PollStream = Box<
2240 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2241>;
2242
2243type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2244
2245enum WriteState {
2247 Open,
2249 GuestReady {
2251 instance: Instance,
2252 caller: RuntimeComponentInstanceIndex,
2253 ty: TransmitIndex,
2254 flat_abi: Option<FlatAbi>,
2255 options: OptionsIndex,
2256 address: usize,
2257 count: ItemCount,
2258 handle: u32,
2259 },
2260 HostReady {
2262 produce: PollStream,
2263 try_into: TryInto,
2264 guest_offset: ItemCount,
2265 cancel: bool,
2266 cancel_waker: Option<Waker>,
2267 },
2268 Dropped,
2270}
2271
2272impl fmt::Debug for WriteState {
2273 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2274 match self {
2275 Self::Open => f.debug_tuple("Open").finish(),
2276 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2277 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2278 Self::Dropped => f.debug_tuple("Dropped").finish(),
2279 }
2280 }
2281}
2282
2283enum ReadState {
2285 Open,
2287 GuestReady {
2289 ty: TransmitIndex,
2290 caller_instance: RuntimeComponentInstanceIndex,
2291 caller_thread: QualifiedThreadId,
2292 flat_abi: Option<FlatAbi>,
2293 instance: Instance,
2294 options: OptionsIndex,
2295 address: usize,
2296 count: ItemCount,
2297 handle: u32,
2298 },
2299 HostReady {
2301 consume: PollStream,
2302 guest_offset: ItemCount,
2303 cancel: bool,
2304 cancel_waker: Option<Waker>,
2305 },
2306 HostToHost {
2308 accept: Box<
2309 dyn for<'a> Fn(
2310 &'a mut UntypedWriteBuffer<'a>,
2311 )
2312 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2313 + Send
2314 + Sync,
2315 >,
2316 buffer: Vec<u8>,
2317 limit: usize,
2318 },
2319 Dropped,
2321}
2322
2323impl fmt::Debug for ReadState {
2324 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2325 match self {
2326 Self::Open => f.debug_tuple("Open").finish(),
2327 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2328 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2329 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2330 Self::Dropped => f.debug_tuple("Dropped").finish(),
2331 }
2332 }
2333}
2334
2335fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result<ReturnCode> {
2336 Ok(match state {
2337 StreamResult::Dropped => ReturnCode::Dropped(count),
2338 StreamResult::Completed => ReturnCode::completed(kind, count),
2339 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2340 })
2341}
2342
2343impl StoreOpaque {
2344 fn pipe_from_guest(
2345 &mut self,
2346 kind: TransmitKind,
2347 id: TableId<TransmitState>,
2348 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2349 ) {
2350 let future = async move {
2351 let stream_state = future.await?;
2352 tls::get(|store| {
2353 let state = store.concurrent_state_mut();
2354 let transmit = state.get_mut(id)?;
2355 let ReadState::HostReady {
2356 consume,
2357 guest_offset,
2358 ..
2359 } = mem::replace(&mut transmit.read, ReadState::Open)
2360 else {
2361 bail_bug!("expected ReadState::HostReady")
2362 };
2363 let code = return_code(kind, stream_state, guest_offset)?;
2364 transmit.read = match stream_state {
2365 StreamResult::Dropped => ReadState::Dropped,
2366 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2367 consume,
2368 guest_offset: ItemCount::ZERO,
2369 cancel: false,
2370 cancel_waker: None,
2371 },
2372 };
2373 let WriteState::GuestReady { ty, handle, .. } =
2374 mem::replace(&mut transmit.write, WriteState::Open)
2375 else {
2376 bail_bug!("expected WriteState::GuestReady")
2377 };
2378 state.send_write_result(ty, id, handle, code)?;
2379 Ok(())
2380 })
2381 };
2382
2383 self.concurrent_state_mut().push_future(future.boxed());
2384 }
2385
2386 fn pipe_to_guest(
2387 &mut self,
2388 kind: TransmitKind,
2389 id: TableId<TransmitState>,
2390 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2391 ) {
2392 let future = async move {
2393 let stream_state = future.await?;
2394 tls::get(|store| {
2395 let state = store.concurrent_state_mut();
2396 let transmit = state.get_mut(id)?;
2397 let WriteState::HostReady {
2398 produce,
2399 try_into,
2400 guest_offset,
2401 ..
2402 } = mem::replace(&mut transmit.write, WriteState::Open)
2403 else {
2404 bail_bug!("expected WriteState::HostReady")
2405 };
2406 let code = return_code(kind, stream_state, guest_offset)?;
2407 transmit.write = match stream_state {
2408 StreamResult::Dropped => WriteState::Dropped,
2409 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2410 produce,
2411 try_into,
2412 guest_offset: ItemCount::ZERO,
2413 cancel: false,
2414 cancel_waker: None,
2415 },
2416 };
2417 let ReadState::GuestReady { ty, handle, .. } =
2418 mem::replace(&mut transmit.read, ReadState::Open)
2419 else {
2420 bail_bug!("expected ReadState::GuestReady")
2421 };
2422 state.send_read_result(ty, id, handle, code)?;
2423 Ok(())
2424 })
2425 };
2426
2427 self.concurrent_state_mut().push_future(future.boxed());
2428 }
2429
2430 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2432 let state = self.concurrent_state_mut();
2433 let transmit_id = state.get_mut(id)?.state;
2434 let transmit = state
2435 .get_mut(transmit_id)
2436 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2437 log::trace!(
2438 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2439 transmit.read,
2440 transmit.write
2441 );
2442
2443 transmit.read = ReadState::Dropped;
2444
2445 let new_state = if let WriteState::Dropped = &transmit.write {
2448 WriteState::Dropped
2449 } else {
2450 WriteState::Open
2451 };
2452
2453 let write_handle = transmit.write_handle;
2454
2455 match mem::replace(&mut transmit.write, new_state) {
2456 WriteState::GuestReady { ty, handle, .. } => {
2459 state.update_event(
2460 write_handle.rep(),
2461 match ty {
2462 TransmitIndex::Future(ty) => Event::FutureWrite {
2463 code: ReturnCode::Dropped(ItemCount::ZERO),
2464 pending: Some((ty, handle)),
2465 },
2466 TransmitIndex::Stream(ty) => Event::StreamWrite {
2467 code: ReturnCode::Dropped(ItemCount::ZERO),
2468 pending: Some((ty, handle)),
2469 },
2470 },
2471 )?;
2472 }
2473
2474 WriteState::HostReady { .. } => {}
2475
2476 WriteState::Open => {
2477 state.update_event(
2478 write_handle.rep(),
2479 match kind {
2480 TransmitKind::Future => Event::FutureWrite {
2481 code: ReturnCode::Dropped(ItemCount::ZERO),
2482 pending: None,
2483 },
2484 TransmitKind::Stream => Event::StreamWrite {
2485 code: ReturnCode::Dropped(ItemCount::ZERO),
2486 pending: None,
2487 },
2488 },
2489 )?;
2490 }
2491
2492 WriteState::Dropped => {
2493 log::trace!("host_drop_reader delete {transmit_id:?}");
2494 state.delete_transmit(transmit_id)?;
2495 }
2496 }
2497 Ok(())
2498 }
2499
2500 fn host_drop_writer(
2502 &mut self,
2503 id: TableId<TransmitHandle>,
2504 on_drop_open: Option<fn() -> Result<()>>,
2505 ) -> Result<()> {
2506 let state = self.concurrent_state_mut();
2507 let transmit_id = state.get_mut(id)?.state;
2508 let transmit = state
2509 .get_mut(transmit_id)
2510 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2511 log::trace!(
2512 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2513 transmit.read,
2514 transmit.write
2515 );
2516
2517 match &mut transmit.write {
2519 WriteState::GuestReady { .. } => {
2520 bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2521 }
2522 WriteState::HostReady { .. } => {}
2523 v @ WriteState::Open => {
2524 if let (Some(on_drop_open), false) = (
2525 on_drop_open,
2526 transmit.done || matches!(transmit.read, ReadState::Dropped),
2527 ) {
2528 on_drop_open()?;
2529 } else {
2530 *v = WriteState::Dropped;
2531 }
2532 }
2533 WriteState::Dropped => bail_bug!("write state is already dropped"),
2534 }
2535
2536 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2537
2538 let new_state = if let ReadState::Dropped = &transmit.read {
2544 ReadState::Dropped
2545 } else {
2546 ReadState::Open
2547 };
2548
2549 let read_handle = transmit.read_handle;
2550
2551 match mem::replace(&mut transmit.read, new_state) {
2553 ReadState::GuestReady { ty, handle, .. } => {
2557 self.concurrent_state_mut().update_event(
2559 read_handle.rep(),
2560 match ty {
2561 TransmitIndex::Future(ty) => Event::FutureRead {
2562 code: ReturnCode::Dropped(ItemCount::ZERO),
2563 pending: Some((ty, handle)),
2564 },
2565 TransmitIndex::Stream(ty) => Event::StreamRead {
2566 code: ReturnCode::Dropped(ItemCount::ZERO),
2567 pending: Some((ty, handle)),
2568 },
2569 },
2570 )?;
2571 }
2572
2573 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2574
2575 ReadState::Open => {
2577 self.concurrent_state_mut().update_event(
2578 read_handle.rep(),
2579 match on_drop_open {
2580 Some(_) => Event::FutureRead {
2581 code: ReturnCode::Dropped(ItemCount::ZERO),
2582 pending: None,
2583 },
2584 None => Event::StreamRead {
2585 code: ReturnCode::Dropped(ItemCount::ZERO),
2586 pending: None,
2587 },
2588 },
2589 )?;
2590 }
2591
2592 ReadState::Dropped => {
2595 log::trace!("host_drop_writer delete {transmit_id:?}");
2596 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2597 }
2598 }
2599 Ok(())
2600 }
2601
2602 pub(super) fn transmit_origin(
2603 &mut self,
2604 id: TableId<TransmitHandle>,
2605 ) -> Result<TransmitOrigin> {
2606 let state = self.concurrent_state_mut();
2607 let state_id = state.get_mut(id)?.state;
2608 Ok(state.get_mut(state_id)?.origin)
2609 }
2610}
2611
2612impl<T> StoreContextMut<'_, T> {
2613 fn new_transmit<P: StreamProducer<T>>(
2614 mut self,
2615 kind: TransmitKind,
2616 producer: P,
2617 ) -> Result<TableId<TransmitHandle>>
2618 where
2619 P::Item: func::Lower,
2620 {
2621 let token = StoreToken::new(self.as_context_mut());
2622 let state = self.0.concurrent_state_mut();
2623 let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2624 let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2625 let id = state.get_mut(read)?.state;
2626 let mut dropped = false;
2627 let produce = Box::new({
2628 let producer = producer.clone();
2629 move || {
2630 let producer = producer.clone();
2631 async move {
2632 let mut state = producer.take()?;
2633 let (mine, buffer) = &mut *state;
2634
2635 let (result, cancelled) = if buffer.remaining().is_empty() {
2636 future::poll_fn(|cx| {
2637 tls::get(|store| {
2638 let transmit = store.concurrent_state_mut().get_mut(id)?;
2639
2640 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2641 bail_bug!("expected WriteState::HostReady")
2642 };
2643
2644 let mut host_buffer =
2645 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2646 Some(Cursor::new(mem::take(buffer)))
2647 } else {
2648 None
2649 };
2650
2651 let poll = mine.as_mut().poll_produce(
2652 cx,
2653 token.as_context_mut(store),
2654 Destination {
2655 id,
2656 buffer,
2657 host_buffer: host_buffer.as_mut(),
2658 _phantom: PhantomData,
2659 },
2660 cancel,
2661 );
2662
2663 let transmit = store.concurrent_state_mut().get_mut(id)?;
2664
2665 let host_offset = if let (
2666 Some(host_buffer),
2667 ReadState::HostToHost { buffer, limit, .. },
2668 ) = (host_buffer, &mut transmit.read)
2669 {
2670 *limit = usize::try_from(host_buffer.position())?;
2671 *buffer = host_buffer.into_inner();
2672 *limit
2673 } else {
2674 0
2675 };
2676
2677 {
2678 let WriteState::HostReady {
2679 guest_offset,
2680 cancel,
2681 cancel_waker,
2682 ..
2683 } = &mut transmit.write
2684 else {
2685 bail_bug!("expected WriteState::HostReady")
2686 };
2687
2688 if poll.is_pending() {
2689 if !buffer.remaining().is_empty()
2690 || *guest_offset > 0
2691 || host_offset > 0
2692 {
2693 bail!(
2694 "StreamProducer::poll_produce returned Poll::Pending \
2695 after producing at least one item"
2696 )
2697 }
2698 *cancel_waker = Some(cx.waker().clone());
2699 } else {
2700 *cancel_waker = None;
2701 *cancel = false;
2702 }
2703 }
2704
2705 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2706 })?
2707 })
2708 .await?
2709 } else {
2710 (StreamResult::Completed, false)
2711 };
2712
2713 let (guest_offset, host_offset, count) = tls::get(|store| {
2714 let transmit = store.concurrent_state_mut().get_mut(id)?;
2715 let (count, host_offset) = match &transmit.read {
2716 &ReadState::GuestReady { count, .. } => (count.as_u32(), 0),
2717 &ReadState::HostToHost { limit, .. } => (1, limit),
2718 _ => bail_bug!("invalid read state"),
2719 };
2720 let guest_offset = match &transmit.write {
2721 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2722 _ => bail_bug!("invalid write state"),
2723 };
2724 Ok((guest_offset, host_offset, count))
2725 })?;
2726
2727 match result {
2728 StreamResult::Completed => {
2729 if count > 1
2730 && buffer.remaining().is_empty()
2731 && guest_offset == 0
2732 && host_offset == 0
2733 {
2734 bail!(
2735 "StreamProducer::poll_produce returned StreamResult::Completed \
2736 without producing any items"
2737 );
2738 }
2739 }
2740 StreamResult::Cancelled => {
2741 if !cancelled {
2742 bail!(
2743 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2744 without being given a `finish` parameter value of true"
2745 );
2746 }
2747 }
2748 StreamResult::Dropped => {
2749 dropped = true;
2750 }
2751 }
2752
2753 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2754
2755 drop(state);
2756
2757 if write_buffer {
2758 write(token, id, producer.clone(), kind).await?;
2759 }
2760
2761 Ok(if dropped {
2762 if producer.with(|p| p.1.remaining().is_empty())? {
2763 StreamResult::Dropped
2764 } else {
2765 StreamResult::Completed
2766 }
2767 } else {
2768 result
2769 })
2770 }
2771 .boxed()
2772 }
2773 });
2774 let try_into = Box::new(move |ty| {
2775 let (mine, buffer) = producer.try_lock().ok()?.take()?;
2776 match P::try_into(mine, ty) {
2777 Ok(value) => Some(value),
2778 Err(mine) => {
2779 *producer.try_lock().ok()? = Some((mine, buffer));
2780 None
2781 }
2782 }
2783 });
2784 state.get_mut(id)?.write = WriteState::HostReady {
2785 produce,
2786 try_into,
2787 guest_offset: ItemCount::ZERO,
2788 cancel: false,
2789 cancel_waker: None,
2790 };
2791 Ok(read)
2792 }
2793
2794 fn set_consumer<C: StreamConsumer<T>>(
2795 mut self,
2796 id: TableId<TransmitHandle>,
2797 kind: TransmitKind,
2798 consumer: C,
2799 ) -> Result<()> {
2800 let token = StoreToken::new(self.as_context_mut());
2801 let state = self.0.concurrent_state_mut();
2802 let id = state.get_mut(id)?.state;
2803 let transmit = state.get_mut(id)?;
2804 let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2805 let consume_with_buffer = {
2806 let consumer = consumer.clone();
2807 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2808 let mut mine = consumer.take()?;
2809
2810 let host_buffer_remaining_before =
2811 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2812
2813 let (result, cancelled) = future::poll_fn(|cx| {
2814 tls::get(|store| {
2815 let cancel = match &store.concurrent_state_mut().get_mut(id)?.read {
2816 &ReadState::HostReady { cancel, .. } => cancel,
2817 ReadState::Open => false,
2818 _ => bail_bug!("unexpected read state"),
2819 };
2820
2821 let poll = mine.as_mut().poll_consume(
2822 cx,
2823 token.as_context_mut(store),
2824 Source {
2825 id,
2826 host_buffer: host_buffer.as_deref_mut(),
2827 },
2828 cancel,
2829 );
2830
2831 if let ReadState::HostReady {
2832 cancel_waker,
2833 cancel,
2834 ..
2835 } = &mut store.concurrent_state_mut().get_mut(id)?.read
2836 {
2837 if poll.is_pending() {
2838 *cancel_waker = Some(cx.waker().clone());
2839 } else {
2840 *cancel_waker = None;
2841 *cancel = false;
2842 }
2843 }
2844
2845 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2846 })?
2847 })
2848 .await?;
2849
2850 let (guest_offset, count) = tls::get(|store| {
2851 let transmit = store.concurrent_state_mut().get_mut(id)?;
2852 Ok((
2853 match &transmit.read {
2854 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2855 ReadState::Open => ItemCount::ZERO,
2856 _ => bail_bug!("invalid read state"),
2857 },
2858 match &transmit.write {
2859 WriteState::GuestReady { count, .. } => count.as_usize(),
2860 WriteState::HostReady { .. } => match host_buffer_remaining_before {
2861 Some(n) => n,
2862 None => bail_bug!("host_buffer_remaining_before should be set"),
2863 },
2864 _ => bail_bug!("invalid write state"),
2865 },
2866 ))
2867 })?;
2868
2869 match result {
2870 StreamResult::Completed => {
2871 if count > 0
2872 && guest_offset == 0
2873 && host_buffer_remaining_before
2874 .zip(host_buffer.map(|v| v.remaining().len()))
2875 .map(|(before, after)| before == after)
2876 .unwrap_or(false)
2877 {
2878 bail!(
2879 "StreamConsumer::poll_consume returned StreamResult::Completed \
2880 without consuming any items"
2881 );
2882 }
2883
2884 if let TransmitKind::Future = kind {
2885 tls::get(|store| {
2886 store.concurrent_state_mut().get_mut(id)?.done = true;
2887 crate::error::Ok(())
2888 })?;
2889 }
2890 }
2891 StreamResult::Cancelled => {
2892 if !cancelled {
2893 bail!(
2894 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2895 without being given a `finish` parameter value of true"
2896 );
2897 }
2898 }
2899 StreamResult::Dropped => {}
2900 }
2901
2902 Ok(result)
2903 }
2904 };
2905 let consume = {
2906 let consume = consume_with_buffer.clone();
2907 Box::new(move || {
2908 let consume = consume.clone();
2909 async move { consume(None).await }.boxed()
2910 })
2911 };
2912
2913 match &transmit.write {
2914 WriteState::Open => {
2915 transmit.read = ReadState::HostReady {
2916 consume,
2917 guest_offset: ItemCount::ZERO,
2918 cancel: false,
2919 cancel_waker: None,
2920 };
2921 }
2922 &WriteState::GuestReady { .. } => {
2923 let future = consume();
2924 transmit.read = ReadState::HostReady {
2925 consume,
2926 guest_offset: ItemCount::ZERO,
2927 cancel: false,
2928 cancel_waker: None,
2929 };
2930 self.0.pipe_from_guest(kind, id, future);
2931 }
2932 WriteState::HostReady { .. } => {
2933 let WriteState::HostReady { produce, .. } = mem::replace(
2934 &mut transmit.write,
2935 WriteState::HostReady {
2936 produce: Box::new(|| {
2937 Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2938 }),
2939 try_into: Box::new(|_| None),
2940 guest_offset: ItemCount::ZERO,
2941 cancel: false,
2942 cancel_waker: None,
2943 },
2944 ) else {
2945 bail_bug!("expected WriteState::HostReady")
2946 };
2947
2948 transmit.read = ReadState::HostToHost {
2949 accept: Box::new(move |input| {
2950 let consume = consume_with_buffer.clone();
2951 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2952 }),
2953 buffer: Vec::new(),
2954 limit: 0,
2955 };
2956
2957 let future = async move {
2958 loop {
2959 if tls::get(|store| {
2960 crate::error::Ok(matches!(
2961 store.concurrent_state_mut().get_mut(id)?.read,
2962 ReadState::Dropped
2963 ))
2964 })? {
2965 break Ok(());
2966 }
2967
2968 match produce().await? {
2969 StreamResult::Completed | StreamResult::Cancelled => {}
2970 StreamResult::Dropped => break Ok(()),
2971 }
2972
2973 if let TransmitKind::Future = kind {
2974 break Ok(());
2975 }
2976 }
2977 }
2978 .map(move |result| {
2979 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2980 result
2981 });
2982
2983 state.push_future(Box::pin(future));
2984 }
2985 WriteState::Dropped => {
2986 let reader = transmit.read_handle;
2987 self.0.host_drop_reader(reader, kind)?;
2988 }
2989 }
2990 Ok(())
2991 }
2992}
2993
2994async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2995 token: StoreToken<D>,
2996 id: TableId<TransmitState>,
2997 pair: Arc<LockedState<(P, B)>>,
2998 kind: TransmitKind,
2999) -> Result<()> {
3000 let (read, guest_offset) = tls::get(|store| {
3001 let transmit = store.concurrent_state_mut().get_mut(id)?;
3002
3003 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
3004 Some(guest_offset)
3005 } else {
3006 None
3007 };
3008
3009 crate::error::Ok((
3010 mem::replace(&mut transmit.read, ReadState::Open),
3011 guest_offset,
3012 ))
3013 })?;
3014
3015 match read {
3016 ReadState::GuestReady {
3017 ty,
3018 flat_abi,
3019 options,
3020 address,
3021 count,
3022 handle,
3023 instance,
3024 caller_instance,
3025 caller_thread,
3026 } => {
3027 let guest_offset = match guest_offset {
3028 Some(i) => i,
3029 None => bail_bug!("guest_offset should be present if ready"),
3030 };
3031
3032 if let TransmitKind::Future = kind {
3033 tls::get(|store| {
3034 store.concurrent_state_mut().get_mut(id)?.done = true;
3035 crate::error::Ok(())
3036 })?;
3037 }
3038
3039 let old_remaining = pair.with(|p| p.1.remaining().len())?;
3040 let accept = {
3041 let pair = pair.clone();
3042 move |mut store: StoreContextMut<D>| {
3043 let mut state = pair.take()?;
3044 lower::<T, B, D>(
3045 store.as_context_mut(),
3046 instance,
3047 caller_thread,
3048 options,
3049 ty,
3050 address + (T::SIZE32 * guest_offset.as_usize()),
3051 count.as_usize() - guest_offset.as_usize(),
3052 &mut state.1,
3053 )?;
3054 crate::error::Ok(())
3055 }
3056 };
3057
3058 if guest_offset < count {
3059 if T::MAY_REQUIRE_REALLOC {
3060 let (tx, rx) = oneshot::channel();
3065 tls::get(move |store| {
3066 store
3067 .concurrent_state_mut()
3068 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3069 move |store| {
3070 _ = tx.send(accept(token.as_context_mut(store))?);
3071 Ok(())
3072 },
3073 ))))
3074 });
3075 rx.await?
3076 } else {
3077 tls::get(|store| accept(token.as_context_mut(store)))?
3082 };
3083 }
3084
3085 tls::get(|store| {
3086 let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3087
3088 let transmit = store.concurrent_state_mut().get_mut(id)?;
3089
3090 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3091 bail_bug!("expected WriteState::HostReady")
3092 };
3093
3094 guest_offset.inc(count)?;
3095
3096 transmit.read = ReadState::GuestReady {
3097 ty,
3098 flat_abi,
3099 options,
3100 address,
3101 count: ItemCount::new_usize(count)?,
3102 handle,
3103 instance,
3104 caller_instance,
3105 caller_thread,
3106 };
3107
3108 crate::error::Ok(())
3109 })?;
3110
3111 Ok(())
3112 }
3113
3114 ReadState::HostToHost {
3115 accept,
3116 mut buffer,
3117 limit,
3118 } => {
3119 let mut state = StreamResult::Completed;
3120 let mut position = 0;
3121
3122 while !matches!(state, StreamResult::Dropped) && position < limit {
3123 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3124 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3125 (buffer, position, _) = slice_buffer.into_parts();
3126 }
3127
3128 {
3129 let mut pair = pair.take()?;
3130 let (_, buffer) = &mut *pair;
3131
3132 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3133 state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3134 }
3135 }
3136
3137 tls::get(|store| {
3138 store.concurrent_state_mut().get_mut(id)?.read = match state {
3139 StreamResult::Dropped => ReadState::Dropped,
3140 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3141 accept,
3142 buffer,
3143 limit: 0,
3144 },
3145 };
3146
3147 crate::error::Ok(())
3148 })?;
3149 Ok(())
3150 }
3151
3152 _ => bail_bug!("unexpected read state"),
3153 }
3154}
3155
3156impl Instance {
3157 fn consume(
3160 self,
3161 store: &mut dyn VMStore,
3162 kind: TransmitKind,
3163 transmit_id: TableId<TransmitState>,
3164 consume: PollStream,
3165 guest_offset: ItemCount,
3166 cancel: bool,
3167 ) -> Result<ReturnCode> {
3168 let mut future = consume();
3169 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
3170 consume,
3171 guest_offset,
3172 cancel,
3173 cancel_waker: None,
3174 };
3175 let poll = tls::set(store, || {
3176 future
3177 .as_mut()
3178 .poll(&mut Context::from_waker(&Waker::noop()))
3179 });
3180
3181 Ok(match poll {
3182 Poll::Ready(state) => {
3183 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3184 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3185 bail_bug!("expected ReadState::HostReady")
3186 };
3187 let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3188 transmit.write = WriteState::Open;
3189 code
3190 }
3191 Poll::Pending => {
3192 store.pipe_from_guest(kind, transmit_id, future);
3193 ReturnCode::Blocked
3194 }
3195 })
3196 }
3197
3198 fn produce(
3201 self,
3202 store: &mut dyn VMStore,
3203 kind: TransmitKind,
3204 transmit_id: TableId<TransmitState>,
3205 produce: PollStream,
3206 try_into: TryInto,
3207 guest_offset: ItemCount,
3208 cancel: bool,
3209 ) -> Result<ReturnCode> {
3210 let mut future = produce();
3211 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3212 produce,
3213 try_into,
3214 guest_offset,
3215 cancel,
3216 cancel_waker: None,
3217 };
3218 let poll = tls::set(store, || {
3219 future
3220 .as_mut()
3221 .poll(&mut Context::from_waker(&Waker::noop()))
3222 });
3223
3224 Ok(match poll {
3225 Poll::Ready(state) => {
3226 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3227 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3228 bail_bug!("expected WriteState::HostReady")
3229 };
3230 let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3231 transmit.read = ReadState::Open;
3232 code
3233 }
3234 Poll::Pending => {
3235 store.pipe_to_guest(kind, transmit_id, future);
3236 ReturnCode::Blocked
3237 }
3238 })
3239 }
3240
3241 pub(super) fn guest_drop_writable(
3243 self,
3244 store: &mut StoreOpaque,
3245 ty: TransmitIndex,
3246 writer: u32,
3247 ) -> Result<()> {
3248 let table = self.id().get_mut(store).table_for_transmit(ty);
3249 let transmit_rep = match ty {
3250 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3251 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3252 };
3253
3254 let id = TableId::<TransmitHandle>::new(transmit_rep);
3255 log::trace!("guest_drop_writable: drop writer {id:?}");
3256 match ty {
3257 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3258 TransmitIndex::Future(_) => store.host_drop_writer(
3259 id,
3260 Some(|| {
3261 Err(format_err!(
3262 "cannot drop future write end without first writing a value"
3263 ))
3264 }),
3265 ),
3266 }
3267 }
3268
3269 fn copy<T: 'static>(
3272 self,
3273 store: StoreContextMut<T>,
3274 flat_abi: Option<FlatAbi>,
3275 write_caller_instance: RuntimeComponentInstanceIndex,
3276 write_ty: TransmitIndex,
3277 write_options: OptionsIndex,
3278 write_address: usize,
3279 read_caller_instance: RuntimeComponentInstanceIndex,
3280 read_caller_thread: QualifiedThreadId,
3281 read_ty: TransmitIndex,
3282 read_options: OptionsIndex,
3283 read_address: usize,
3284 count: ItemCount,
3285 rep: u32,
3286 ) -> Result<()> {
3287 let (component, mut store) = self.component_and_store_mut(store.0);
3288 let types = component.types();
3289 let count = count.as_usize();
3290
3291 let write_payload_ty = write_ty.payload(types);
3294 let write_abi = match write_payload_ty {
3295 Some(ty) => types.canonical_abi(ty),
3296 None => &CanonicalAbiInfo::ZERO,
3297 };
3298 let write_length_in_bytes = match flat_abi {
3299 Some(abi) => usize::try_from(abi.size)? * count,
3300 None => usize::try_from(write_abi.size32)? * count,
3301 };
3302 if write_length_in_bytes > 0 {
3303 if write_address % usize::try_from(write_abi.align32)? != 0 {
3304 bail!("write pointer not aligned");
3305 }
3306 self.options_memory(store, write_options)
3307 .get(write_address..)
3308 .and_then(|b| b.get(..write_length_in_bytes))
3309 .ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
3310 }
3311
3312 let read_payload_ty = read_ty.payload(types);
3313 let read_abi = match read_payload_ty {
3314 Some(ty) => types.canonical_abi(ty),
3315 None => &CanonicalAbiInfo::ZERO,
3316 };
3317 let read_length_in_bytes = match flat_abi {
3318 Some(abi) => usize::try_from(abi.size)? * count,
3319 None => usize::try_from(read_abi.size32)? * count,
3320 };
3321 if read_length_in_bytes > 0 {
3322 if read_address % usize::try_from(read_abi.align32)? != 0 {
3323 bail!("read pointer not aligned");
3324 }
3325 self.options_memory(store, read_options)
3326 .get(read_address..)
3327 .and_then(|b| b.get(..read_length_in_bytes))
3328 .ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
3329 }
3330
3331 if write_caller_instance == read_caller_instance
3332 && !allow_intra_component_read_write(write_payload_ty)
3333 {
3334 bail!(
3335 "cannot read from and write to intra-component future/stream with non-numeric payload"
3336 )
3337 }
3338
3339 match (write_ty, read_ty) {
3340 (TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
3341 if count != 1 {
3342 bail_bug!("futures can only send 1 item");
3343 }
3344
3345 let val = write_payload_ty
3346 .map(|ty| {
3347 let lift = &mut LiftContext::new(store, write_options, self);
3348 let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3349 Val::load(lift, *ty, bytes)
3350 })
3351 .transpose()?;
3352
3353 if let Some(val) = val {
3354 let old_thread = store.set_thread(read_caller_thread)?;
3358 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3359 let ptr = func::validate_inbounds_dynamic(
3360 read_abi,
3361 lower.as_slice_mut(),
3362 &ValRaw::u32(read_address.try_into()?),
3363 )?;
3364 let ty = match read_payload_ty {
3365 Some(ty) => ty,
3366 None => bail_bug!("expected read payload type to be present"),
3367 };
3368 val.store(lower, *ty, ptr)?;
3369 store.set_thread(old_thread)?;
3370 }
3371 }
3372 (TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
3373 if write_length_in_bytes == 0 {
3374 return Ok(());
3375 }
3376 let write_payload_ty = match write_payload_ty {
3377 Some(ty) => ty,
3378 None => bail_bug!("expected write payload type to be present"),
3379 };
3380 let read_payload_ty = match read_payload_ty {
3381 Some(ty) => ty,
3382 None => bail_bug!("expected read payload type to be present"),
3383 };
3384 if flat_abi.is_some() {
3385 let store_opaque = store.store_opaque_mut();
3387
3388 assert_eq!(read_length_in_bytes, write_length_in_bytes);
3389
3390 if self.options_memory(store_opaque, read_options).as_ptr()
3391 == self.options_memory(store_opaque, write_options).as_ptr()
3392 {
3393 let memory = self.options_memory_mut(store_opaque, read_options);
3394 memory.copy_within(
3395 write_address..write_address + write_length_in_bytes,
3396 read_address,
3397 );
3398 } else {
3399 let src = self.options_memory(store_opaque, write_options)[write_address..]
3400 [..write_length_in_bytes]
3401 .as_ptr();
3402 let dst = self.options_memory_mut(store_opaque, read_options)
3403 [read_address..][..read_length_in_bytes]
3404 .as_mut_ptr();
3405
3406 unsafe {
3416 src.copy_to_nonoverlapping(dst, write_length_in_bytes);
3417 }
3418 }
3419 } else {
3420 let store_opaque = store.store_opaque_mut();
3421 let lift = &mut LiftContext::new(store_opaque, write_options, self);
3422 let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3423 lift.consume_fuel_array(count, size_of::<Val>())?;
3424
3425 let values = (0..count)
3426 .map(|index| {
3427 let size = usize::try_from(write_abi.size32)?;
3428 Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
3429 })
3430 .collect::<Result<Vec<_>>>()?;
3431
3432 let id = TableId::<TransmitHandle>::new(rep);
3433 log::trace!("copy values {values:?} for {id:?}");
3434
3435 let old_thread = store.set_thread(read_caller_thread)?;
3439 let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3440 let mut ptr = read_address;
3441 for value in values {
3442 value.store(lower, *read_payload_ty, ptr)?;
3443 ptr += usize::try_from(read_abi.size32)?;
3444 }
3445 store.set_thread(old_thread)?;
3446 }
3447 }
3448 _ => bail_bug!("mismatched transmit types in copy"),
3449 }
3450
3451 Ok(())
3452 }
3453
3454 fn check_bounds(
3455 self,
3456 store: &StoreOpaque,
3457 options: OptionsIndex,
3458 ty: TransmitIndex,
3459 address: usize,
3460 count: usize,
3461 ) -> Result<()> {
3462 let types = self.id().get(store).component().types();
3463 let size = usize::try_from(
3464 match ty {
3465 TransmitIndex::Future(ty) => types[types[ty].ty]
3466 .payload
3467 .map(|ty| types.canonical_abi(&ty).size32),
3468 TransmitIndex::Stream(ty) => types[types[ty].ty]
3469 .payload
3470 .map(|ty| types.canonical_abi(&ty).size32),
3471 }
3472 .unwrap_or(0),
3473 )?;
3474
3475 if count > 0 && size > 0 {
3476 self.options_memory(store, options)
3477 .get(address..)
3478 .and_then(|b| b.get(..(size * count)))
3479 .map(drop)
3480 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3481 } else {
3482 Ok(())
3483 }
3484 }
3485
3486 pub(super) fn guest_write<T: 'static>(
3488 self,
3489 mut store: StoreContextMut<T>,
3490 caller: RuntimeComponentInstanceIndex,
3491 ty: TransmitIndex,
3492 options: OptionsIndex,
3493 flat_abi: Option<FlatAbi>,
3494 handle: u32,
3495 address: u32,
3496 count: u32,
3497 ) -> Result<ReturnCode> {
3498 let count = ItemCount::new(count)?;
3499
3500 if !self.options(store.0, options).async_ {
3501 store.0.check_blocking()?;
3505 }
3506
3507 let address = usize::try_from(address)?;
3508 self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3509 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3510 let TransmitLocalState::Write { done } = *state else {
3511 bail!(Trap::ConcurrentFutureStreamOp);
3512 };
3513
3514 if done {
3515 bail!("cannot write to stream after being notified that the readable end dropped");
3516 }
3517
3518 *state = TransmitLocalState::Busy;
3519 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3520 let concurrent_state = store.0.concurrent_state_mut();
3521 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3522 let transmit = concurrent_state.get_mut(transmit_id)?;
3523 log::trace!(
3524 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3525 transmit.read
3526 );
3527
3528 if transmit.done {
3529 bail!("cannot write to future after previous write succeeded or readable end dropped");
3530 }
3531
3532 let new_state = if let ReadState::Dropped = &transmit.read {
3533 ReadState::Dropped
3534 } else {
3535 ReadState::Open
3536 };
3537
3538 let set_guest_ready = |me: &mut ConcurrentState| {
3539 let transmit = me.get_mut(transmit_id)?;
3540 if !matches!(&transmit.write, WriteState::Open) {
3541 bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3542 }
3543 transmit.write = WriteState::GuestReady {
3544 instance: self,
3545 caller,
3546 ty,
3547 flat_abi,
3548 options,
3549 address,
3550 count,
3551 handle,
3552 };
3553 Ok::<_, crate::Error>(())
3554 };
3555
3556 let mut result = match mem::replace(&mut transmit.read, new_state) {
3557 ReadState::GuestReady {
3558 ty: read_ty,
3559 flat_abi: read_flat_abi,
3560 options: read_options,
3561 address: read_address,
3562 count: read_count,
3563 handle: read_handle,
3564 instance: read_instance,
3565 caller_instance: read_caller_instance,
3566 caller_thread: read_caller_thread,
3567 } => {
3568 if flat_abi != read_flat_abi {
3569 bail_bug!("expected flat ABI calculations to be the same");
3570 }
3571
3572 if let TransmitIndex::Future(_) = ty {
3573 transmit.done = true;
3574 }
3575
3576 let write_complete = count == 0 || read_count > 0;
3598 let read_complete = count > 0;
3599 let read_buffer_remaining = count < read_count;
3600
3601 let read_handle_rep = transmit.read_handle.rep();
3602
3603 let count = count.min(read_count);
3604
3605 self.copy(
3606 store.as_context_mut(),
3607 flat_abi,
3608 caller,
3609 ty,
3610 options,
3611 address,
3612 read_caller_instance,
3613 read_caller_thread,
3614 read_ty,
3615 read_options,
3616 read_address,
3617 count,
3618 rep,
3619 )?;
3620
3621 let instance = self.id().get_mut(store.0);
3622 let types = instance.component().types();
3623 let item_size = match ty.payload(types) {
3624 Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3625 None => 0,
3626 };
3627 let concurrent_state = store.0.concurrent_state_mut();
3628 if read_complete {
3629 let total = if let Some(Event::StreamRead {
3630 code: ReturnCode::Completed(old_total),
3631 ..
3632 }) = concurrent_state.take_event(read_handle_rep)?
3633 {
3634 count.add(old_total)?
3635 } else {
3636 count
3637 };
3638
3639 let code = ReturnCode::completed(ty.kind(), total);
3640
3641 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3642 }
3643
3644 if read_buffer_remaining || (count == 0 && read_count == 0) {
3651 let transmit = concurrent_state.get_mut(transmit_id)?;
3652 transmit.read = ReadState::GuestReady {
3653 ty: read_ty,
3654 flat_abi: read_flat_abi,
3655 options: read_options,
3656 address: read_address + (count.as_usize() * item_size),
3657 count: read_count.sub(count)?,
3658 handle: read_handle,
3659 instance: read_instance,
3660 caller_instance: read_caller_instance,
3661 caller_thread: read_caller_thread,
3662 };
3663 }
3664
3665 if write_complete {
3666 ReturnCode::completed(ty.kind(), count)
3667 } else {
3668 set_guest_ready(concurrent_state)?;
3669 ReturnCode::Blocked
3670 }
3671 }
3672
3673 ReadState::HostReady {
3674 consume,
3675 guest_offset,
3676 cancel,
3677 cancel_waker,
3678 } => {
3679 if cancel_waker.is_some() {
3680 bail_bug!("expected cancel_waker to be none");
3681 }
3682 if cancel {
3683 bail_bug!("expected cancel to be false");
3684 }
3685 if guest_offset != 0 {
3686 bail_bug!("expected guest_offset to be 0");
3687 }
3688
3689 if let TransmitIndex::Future(_) = ty {
3690 transmit.done = true;
3691 }
3692
3693 set_guest_ready(concurrent_state)?;
3694 self.consume(
3695 store.0,
3696 ty.kind(),
3697 transmit_id,
3698 consume,
3699 ItemCount::ZERO,
3700 false,
3701 )?
3702 }
3703
3704 ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3705
3706 ReadState::Open => {
3707 set_guest_ready(concurrent_state)?;
3708 ReturnCode::Blocked
3709 }
3710
3711 ReadState::Dropped => {
3712 if let TransmitIndex::Future(_) = ty {
3713 transmit.done = true;
3714 }
3715
3716 ReturnCode::Dropped(ItemCount::ZERO)
3717 }
3718 };
3719
3720 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3721 result = self.wait_for_write(store.0, transmit_handle)?;
3722 }
3723
3724 if result != ReturnCode::Blocked {
3725 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3726 TransmitLocalState::Write {
3727 done: matches!(
3728 (result, ty),
3729 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3730 ),
3731 };
3732 }
3733
3734 log::trace!(
3735 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3736 );
3737
3738 Ok(result)
3739 }
3740
3741 pub(super) fn guest_read<T: 'static>(
3743 self,
3744 mut store: StoreContextMut<T>,
3745 caller_instance: RuntimeComponentInstanceIndex,
3746 ty: TransmitIndex,
3747 options: OptionsIndex,
3748 flat_abi: Option<FlatAbi>,
3749 handle: u32,
3750 address: u32,
3751 count: u32,
3752 ) -> Result<ReturnCode> {
3753 let count = ItemCount::new(count)?;
3754
3755 if !self.options(store.0, options).async_ {
3756 store.0.check_blocking()?;
3760 }
3761
3762 let address = usize::try_from(address)?;
3763 self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3764 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3765 let TransmitLocalState::Read { done } = *state else {
3766 bail!(Trap::ConcurrentFutureStreamOp);
3767 };
3768
3769 if done {
3770 bail!("cannot read from stream after being notified that the writable end dropped");
3771 }
3772
3773 *state = TransmitLocalState::Busy;
3774 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3775 let concurrent_state = store.0.concurrent_state_mut();
3776 let caller_thread = concurrent_state.current_guest_thread()?;
3777 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3778 let transmit = concurrent_state.get_mut(transmit_id)?;
3779 log::trace!(
3780 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3781 transmit.write
3782 );
3783
3784 if transmit.done {
3785 bail!("cannot read from future after previous read succeeded");
3786 }
3787
3788 let new_state = if let WriteState::Dropped = &transmit.write {
3789 WriteState::Dropped
3790 } else {
3791 WriteState::Open
3792 };
3793
3794 let set_guest_ready = |me: &mut ConcurrentState| {
3795 let transmit = me.get_mut(transmit_id)?;
3796 if !matches!(&transmit.read, ReadState::Open) {
3797 bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3798 }
3799 transmit.read = ReadState::GuestReady {
3800 ty,
3801 flat_abi,
3802 options,
3803 address,
3804 count,
3805 handle,
3806 instance: self,
3807 caller_instance,
3808 caller_thread,
3809 };
3810 Ok::<_, crate::Error>(())
3811 };
3812
3813 let mut result = match mem::replace(&mut transmit.write, new_state) {
3814 WriteState::GuestReady {
3815 instance: _,
3816 ty: write_ty,
3817 flat_abi: write_flat_abi,
3818 options: write_options,
3819 address: write_address,
3820 count: write_count,
3821 handle: write_handle,
3822 caller: write_caller,
3823 } => {
3824 if flat_abi != write_flat_abi {
3825 bail_bug!("expected flat ABI calculations to be the same");
3826 }
3827
3828 if let TransmitIndex::Future(_) = ty {
3829 transmit.done = true;
3830 }
3831
3832 let write_handle_rep = transmit.write_handle.rep();
3833
3834 let write_complete = write_count == 0 || count > 0;
3839 let read_complete = write_count > 0;
3840 let write_buffer_remaining = count < write_count;
3841
3842 let count = count.min(write_count);
3843
3844 self.copy(
3845 store.as_context_mut(),
3846 flat_abi,
3847 write_caller,
3848 write_ty,
3849 write_options,
3850 write_address,
3851 caller_instance,
3852 caller_thread,
3853 ty,
3854 options,
3855 address,
3856 count,
3857 rep,
3858 )?;
3859
3860 let instance = self.id().get_mut(store.0);
3861 let types = instance.component().types();
3862 let item_size = match ty.payload(types) {
3863 Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3864 None => 0,
3865 };
3866 let concurrent_state = store.0.concurrent_state_mut();
3867
3868 if write_complete {
3869 let total = if let Some(Event::StreamWrite {
3870 code: ReturnCode::Completed(old_total),
3871 ..
3872 }) = concurrent_state.take_event(write_handle_rep)?
3873 {
3874 count.add(old_total)?
3875 } else {
3876 count
3877 };
3878
3879 let code = ReturnCode::completed(ty.kind(), total);
3880
3881 concurrent_state.send_write_result(
3882 write_ty,
3883 transmit_id,
3884 write_handle,
3885 code,
3886 )?;
3887 }
3888
3889 if write_buffer_remaining {
3890 let transmit = concurrent_state.get_mut(transmit_id)?;
3891 transmit.write = WriteState::GuestReady {
3892 instance: self,
3893 caller: write_caller,
3894 ty: write_ty,
3895 flat_abi: write_flat_abi,
3896 options: write_options,
3897 address: write_address + (count.as_usize() * item_size),
3898 count: write_count.sub(count)?,
3899 handle: write_handle,
3900 };
3901 }
3902
3903 if read_complete {
3904 ReturnCode::completed(ty.kind(), count)
3905 } else {
3906 set_guest_ready(concurrent_state)?;
3907 ReturnCode::Blocked
3908 }
3909 }
3910
3911 WriteState::HostReady {
3912 produce,
3913 try_into,
3914 guest_offset,
3915 cancel,
3916 cancel_waker,
3917 } => {
3918 if cancel_waker.is_some() {
3919 bail_bug!("expected cancel_waker to be none");
3920 }
3921 if cancel {
3922 bail_bug!("expected cancel to be false");
3923 }
3924 if guest_offset != 0 {
3925 bail_bug!("expected guest_offset to be 0");
3926 }
3927
3928 set_guest_ready(concurrent_state)?;
3929
3930 let code = self.produce(
3931 store.0,
3932 ty.kind(),
3933 transmit_id,
3934 produce,
3935 try_into,
3936 ItemCount::ZERO,
3937 false,
3938 )?;
3939
3940 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3941 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3942 }
3943
3944 code
3945 }
3946
3947 WriteState::Open => {
3948 set_guest_ready(concurrent_state)?;
3949 ReturnCode::Blocked
3950 }
3951
3952 WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO),
3953 };
3954
3955 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3956 result = self.wait_for_read(store.0, transmit_handle)?;
3957 }
3958
3959 if result != ReturnCode::Blocked {
3960 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3961 TransmitLocalState::Read {
3962 done: matches!(
3963 (result, ty),
3964 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3965 ),
3966 };
3967 }
3968
3969 log::trace!(
3970 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3971 );
3972
3973 Ok(result)
3974 }
3975
3976 fn wait_for_write(
3977 self,
3978 store: &mut StoreOpaque,
3979 handle: TableId<TransmitHandle>,
3980 ) -> Result<ReturnCode> {
3981 let waitable = Waitable::Transmit(handle);
3982 store.wait_for_event(waitable)?;
3983 let event = waitable.take_event(store.concurrent_state_mut())?;
3984 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3985 event
3986 {
3987 waitable.on_delivery(store, self, event)?;
3988 Ok(code)
3989 } else {
3990 bail_bug!("expected either a stream or future write event")
3991 }
3992 }
3993
3994 fn cancel_write(
3996 self,
3997 store: &mut StoreOpaque,
3998 transmit_id: TableId<TransmitState>,
3999 async_: bool,
4000 ) -> Result<ReturnCode> {
4001 let state = store.concurrent_state_mut();
4002 let transmit = state.get_mut(transmit_id)?;
4003 log::trace!(
4004 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
4005 transmit.read,
4006 transmit.write
4007 );
4008 let waitable = Waitable::Transmit(transmit.write_handle);
4009
4010 let code = if let Some(event) = waitable.take_event(state)? {
4011 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
4012 bail_bug!("expected either a stream or future write event")
4013 };
4014 waitable.on_delivery(store, self, event)?;
4015 match (code, event) {
4016 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
4017 ReturnCode::Cancelled(count)
4018 }
4019 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4020 _ => bail_bug!("unexpected code/event combo"),
4021 }
4022 } else if let ReadState::HostReady {
4023 cancel,
4024 cancel_waker,
4025 ..
4026 } = &mut state.get_mut(transmit_id)?.read
4027 {
4028 *cancel = true;
4029 if let Some(waker) = cancel_waker.take() {
4030 waker.wake();
4031 }
4032
4033 if async_ {
4034 ReturnCode::Blocked
4035 } else {
4036 let handle = store
4037 .concurrent_state_mut()
4038 .get_mut(transmit_id)?
4039 .write_handle;
4040 self.wait_for_write(store, handle)?
4041 }
4042 } else {
4043 ReturnCode::Cancelled(ItemCount::ZERO)
4044 };
4045
4046 if !matches!(code, ReturnCode::Blocked) {
4047 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4048
4049 match &transmit.write {
4050 WriteState::GuestReady { .. } => {
4051 transmit.write = WriteState::Open;
4052 }
4053 WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
4054 WriteState::Open | WriteState::Dropped => {}
4055 }
4056 }
4057
4058 log::trace!("cancelled write {transmit_id:?}: {code:?}");
4059
4060 Ok(code)
4061 }
4062
4063 fn wait_for_read(
4064 self,
4065 store: &mut StoreOpaque,
4066 handle: TableId<TransmitHandle>,
4067 ) -> Result<ReturnCode> {
4068 let waitable = Waitable::Transmit(handle);
4069 store.wait_for_event(waitable)?;
4070 let event = waitable.take_event(store.concurrent_state_mut())?;
4071 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
4072 event
4073 {
4074 waitable.on_delivery(store, self, event)?;
4075 Ok(code)
4076 } else {
4077 bail_bug!("expected either a stream or future read event")
4078 }
4079 }
4080
4081 fn cancel_read(
4083 self,
4084 store: &mut StoreOpaque,
4085 transmit_id: TableId<TransmitState>,
4086 async_: bool,
4087 ) -> Result<ReturnCode> {
4088 let state = store.concurrent_state_mut();
4089 let transmit = state.get_mut(transmit_id)?;
4090 log::trace!(
4091 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4092 transmit.read,
4093 transmit.write
4094 );
4095
4096 let waitable = Waitable::Transmit(transmit.read_handle);
4097 let code = if let Some(event) = waitable.take_event(state)? {
4098 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4099 bail_bug!("expected either a stream or future read event")
4100 };
4101 waitable.on_delivery(store, self, event)?;
4102 match (code, event) {
4103 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4104 ReturnCode::Cancelled(count)
4105 }
4106 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4107 _ => bail_bug!("unexpected code/event combo"),
4108 }
4109 } else if let WriteState::HostReady {
4110 cancel,
4111 cancel_waker,
4112 ..
4113 } = &mut state.get_mut(transmit_id)?.write
4114 {
4115 *cancel = true;
4116 if let Some(waker) = cancel_waker.take() {
4117 waker.wake();
4118 }
4119
4120 if async_ {
4121 ReturnCode::Blocked
4122 } else {
4123 let handle = store
4124 .concurrent_state_mut()
4125 .get_mut(transmit_id)?
4126 .read_handle;
4127 self.wait_for_read(store, handle)?
4128 }
4129 } else {
4130 ReturnCode::Cancelled(ItemCount::ZERO)
4131 };
4132
4133 if !matches!(code, ReturnCode::Blocked) {
4134 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4135
4136 match &transmit.read {
4137 ReadState::GuestReady { .. } => {
4138 transmit.read = ReadState::Open;
4139 }
4140 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4141 bail_bug!("support host read cancellation")
4142 }
4143 ReadState::Open | ReadState::Dropped => {}
4144 }
4145 }
4146
4147 log::trace!("cancelled read {transmit_id:?}: {code:?}");
4148
4149 Ok(code)
4150 }
4151
4152 fn guest_cancel_write(
4154 self,
4155 store: &mut StoreOpaque,
4156 ty: TransmitIndex,
4157 async_: bool,
4158 writer: u32,
4159 ) -> Result<ReturnCode> {
4160 if !async_ {
4161 store.check_blocking()?;
4165 }
4166
4167 let (rep, state) =
4168 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4169 let id = TableId::<TransmitHandle>::new(rep);
4170 log::trace!("guest cancel write {id:?} (handle {writer})");
4171 match state {
4172 TransmitLocalState::Write { .. } => {
4173 bail!("stream or future write cancelled when no write is pending")
4174 }
4175 TransmitLocalState::Read { .. } => {
4176 bail!("passed read end to `{{stream|future}}.cancel-write`")
4177 }
4178 TransmitLocalState::Busy => {}
4179 }
4180 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4181 let code = self.cancel_write(store, transmit_id, async_)?;
4182 if !matches!(code, ReturnCode::Blocked) {
4183 let state =
4184 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4185 .1;
4186 if let TransmitLocalState::Busy = state {
4187 *state = TransmitLocalState::Write { done: false };
4188 }
4189 }
4190 Ok(code)
4191 }
4192
4193 fn guest_cancel_read(
4195 self,
4196 store: &mut StoreOpaque,
4197 ty: TransmitIndex,
4198 async_: bool,
4199 reader: u32,
4200 ) -> Result<ReturnCode> {
4201 if !async_ {
4202 store.check_blocking()?;
4206 }
4207
4208 let (rep, state) =
4209 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4210 let id = TableId::<TransmitHandle>::new(rep);
4211 log::trace!("guest cancel read {id:?} (handle {reader})");
4212 match state {
4213 TransmitLocalState::Read { .. } => {
4214 bail!("stream or future read cancelled when no read is pending")
4215 }
4216 TransmitLocalState::Write { .. } => {
4217 bail!("passed write end to `{{stream|future}}.cancel-read`")
4218 }
4219 TransmitLocalState::Busy => {}
4220 }
4221 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4222 let code = self.cancel_read(store, transmit_id, async_)?;
4223 if !matches!(code, ReturnCode::Blocked) {
4224 let state =
4225 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4226 .1;
4227 if let TransmitLocalState::Busy = state {
4228 *state = TransmitLocalState::Read { done: false };
4229 }
4230 }
4231 Ok(code)
4232 }
4233
4234 fn guest_drop_readable(
4236 self,
4237 store: &mut StoreOpaque,
4238 ty: TransmitIndex,
4239 reader: u32,
4240 ) -> Result<()> {
4241 let table = self.id().get_mut(store).table_for_transmit(ty);
4242 let (rep, _is_done) = match ty {
4243 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4244 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4245 };
4246 let kind = match ty {
4247 TransmitIndex::Stream(_) => TransmitKind::Stream,
4248 TransmitIndex::Future(_) => TransmitKind::Future,
4249 };
4250 let id = TableId::<TransmitHandle>::new(rep);
4251 log::trace!("guest_drop_readable: drop reader {id:?}");
4252 store.host_drop_reader(id, kind)
4253 }
4254
4255 pub(crate) fn error_context_new(
4257 self,
4258 store: &mut StoreOpaque,
4259 ty: TypeComponentLocalErrorContextTableIndex,
4260 options: OptionsIndex,
4261 debug_msg_address: u32,
4262 debug_msg_len: u32,
4263 ) -> Result<u32> {
4264 let lift_ctx = &mut LiftContext::new(store, options, self);
4265 let debug_msg = String::linear_lift_from_flat(
4266 lift_ctx,
4267 InterfaceType::String,
4268 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4269 )?;
4270
4271 let err_ctx = ErrorContextState { debug_msg };
4273 let state = store.concurrent_state_mut();
4274 let table_id = state.push(err_ctx)?;
4275 let global_ref_count_idx =
4276 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4277
4278 let _ = state
4280 .global_error_context_ref_counts
4281 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4282
4283 let local_idx = self
4290 .id()
4291 .get_mut(store)
4292 .table_for_error_context(ty)
4293 .error_context_insert(table_id.rep())?;
4294
4295 Ok(local_idx)
4296 }
4297
4298 pub(super) fn error_context_debug_message<T>(
4300 self,
4301 store: StoreContextMut<T>,
4302 ty: TypeComponentLocalErrorContextTableIndex,
4303 options: OptionsIndex,
4304 err_ctx_handle: u32,
4305 debug_msg_address: u32,
4306 ) -> Result<()> {
4307 let handle_table_id_rep = self
4309 .id()
4310 .get_mut(store.0)
4311 .table_for_error_context(ty)
4312 .error_context_rep(err_ctx_handle)?;
4313
4314 let state = store.0.concurrent_state_mut();
4315 let ErrorContextState { debug_msg } =
4317 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4318 let debug_msg = debug_msg.clone();
4319
4320 let lower_cx = &mut LowerContext::new(store, options, self);
4321 let debug_msg_address = usize::try_from(debug_msg_address)?;
4322 let offset = lower_cx
4328 .as_slice_mut()
4329 .get(debug_msg_address..)
4330 .and_then(|b| b.get(..8))
4331 .map(|_| debug_msg_address)
4332 .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4333 debug_msg
4334 .as_str()
4335 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4336
4337 Ok(())
4338 }
4339
4340 pub(crate) fn future_cancel_read(
4342 self,
4343 store: &mut StoreOpaque,
4344 ty: TypeFutureTableIndex,
4345 async_: bool,
4346 reader: u32,
4347 ) -> Result<u32> {
4348 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4349 .map(|v| v.encode())
4350 }
4351
4352 pub(crate) fn future_cancel_write(
4354 self,
4355 store: &mut StoreOpaque,
4356 ty: TypeFutureTableIndex,
4357 async_: bool,
4358 writer: u32,
4359 ) -> Result<u32> {
4360 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4361 .map(|v| v.encode())
4362 }
4363
4364 pub(crate) fn stream_cancel_read(
4366 self,
4367 store: &mut StoreOpaque,
4368 ty: TypeStreamTableIndex,
4369 async_: bool,
4370 reader: u32,
4371 ) -> Result<u32> {
4372 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4373 .map(|v| v.encode())
4374 }
4375
4376 pub(crate) fn stream_cancel_write(
4378 self,
4379 store: &mut StoreOpaque,
4380 ty: TypeStreamTableIndex,
4381 async_: bool,
4382 writer: u32,
4383 ) -> Result<u32> {
4384 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4385 .map(|v| v.encode())
4386 }
4387
4388 pub(crate) fn future_drop_readable(
4390 self,
4391 store: &mut StoreOpaque,
4392 ty: TypeFutureTableIndex,
4393 reader: u32,
4394 ) -> Result<()> {
4395 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4396 }
4397
4398 pub(crate) fn stream_drop_readable(
4400 self,
4401 store: &mut StoreOpaque,
4402 ty: TypeStreamTableIndex,
4403 reader: u32,
4404 ) -> Result<()> {
4405 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4406 }
4407
4408 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4412 let (write, read) = store
4413 .concurrent_state_mut()
4414 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4415
4416 let table = self.id().get_mut(store).table_for_transmit(ty);
4417 let (read_handle, write_handle) = match ty {
4418 TransmitIndex::Future(ty) => (
4419 table.future_insert_read(ty, read.rep())?,
4420 table.future_insert_write(ty, write.rep())?,
4421 ),
4422 TransmitIndex::Stream(ty) => (
4423 table.stream_insert_read(ty, read.rep())?,
4424 table.stream_insert_write(ty, write.rep())?,
4425 ),
4426 };
4427
4428 let state = store.concurrent_state_mut();
4429 state.get_mut(read)?.common.handle = Some(read_handle);
4430 state.get_mut(write)?.common.handle = Some(write_handle);
4431
4432 Ok(ResourcePair {
4433 write: write_handle,
4434 read: read_handle,
4435 })
4436 }
4437
4438 pub(crate) fn error_context_drop(
4440 self,
4441 store: &mut StoreOpaque,
4442 ty: TypeComponentLocalErrorContextTableIndex,
4443 error_context: u32,
4444 ) -> Result<()> {
4445 let instance = self.id().get_mut(store);
4446
4447 let local_handle_table = instance.table_for_error_context(ty);
4448
4449 let rep = local_handle_table.error_context_drop(error_context)?;
4450
4451 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4452
4453 let state = store.concurrent_state_mut();
4454 let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4455 .global_error_context_ref_counts
4456 .get_mut(&global_ref_count_idx)
4457 else {
4458 bail_bug!("retrieve concurrent state for error context during drop")
4459 };
4460
4461 if *global_ref_count < 1 {
4463 bail_bug!("ref count unexpectedly zero");
4464 }
4465 *global_ref_count -= 1;
4466 if *global_ref_count == 0 {
4467 state
4468 .global_error_context_ref_counts
4469 .remove(&global_ref_count_idx);
4470
4471 state
4472 .delete(TableId::<ErrorContextState>::new(rep))
4473 .context("deleting component-global error context data")?;
4474 }
4475
4476 Ok(())
4477 }
4478
4479 fn guest_transfer(
4482 self,
4483 store: &mut StoreOpaque,
4484 src_idx: u32,
4485 src: TransmitIndex,
4486 dst: TransmitIndex,
4487 ) -> Result<u32> {
4488 let mut instance = self.id().get_mut(store);
4489 let src_table = instance.as_mut().table_for_transmit(src);
4490 let (rep, is_done) = match src {
4491 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4492 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4493 };
4494 if is_done {
4495 bail!("cannot lift after being notified that the writable end dropped");
4496 }
4497 let dst_table = instance.table_for_transmit(dst);
4498 let handle = match dst {
4499 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4500 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4501 }?;
4502 store
4503 .concurrent_state_mut()
4504 .get_mut(TableId::<TransmitHandle>::new(rep))?
4505 .common
4506 .handle = Some(handle);
4507 Ok(handle)
4508 }
4509
4510 pub(crate) fn future_new(
4512 self,
4513 store: &mut StoreOpaque,
4514 ty: TypeFutureTableIndex,
4515 ) -> Result<ResourcePair> {
4516 self.guest_new(store, TransmitIndex::Future(ty))
4517 }
4518
4519 pub(crate) fn stream_new(
4521 self,
4522 store: &mut StoreOpaque,
4523 ty: TypeStreamTableIndex,
4524 ) -> Result<ResourcePair> {
4525 self.guest_new(store, TransmitIndex::Stream(ty))
4526 }
4527
4528 pub(crate) fn future_transfer(
4531 self,
4532 store: &mut StoreOpaque,
4533 src_idx: u32,
4534 src: TypeFutureTableIndex,
4535 dst: TypeFutureTableIndex,
4536 ) -> Result<u32> {
4537 self.guest_transfer(
4538 store,
4539 src_idx,
4540 TransmitIndex::Future(src),
4541 TransmitIndex::Future(dst),
4542 )
4543 }
4544
4545 pub(crate) fn stream_transfer(
4548 self,
4549 store: &mut StoreOpaque,
4550 src_idx: u32,
4551 src: TypeStreamTableIndex,
4552 dst: TypeStreamTableIndex,
4553 ) -> Result<u32> {
4554 self.guest_transfer(
4555 store,
4556 src_idx,
4557 TransmitIndex::Stream(src),
4558 TransmitIndex::Stream(dst),
4559 )
4560 }
4561
4562 pub(crate) fn error_context_transfer(
4564 self,
4565 store: &mut StoreOpaque,
4566 src_idx: u32,
4567 src: TypeComponentLocalErrorContextTableIndex,
4568 dst: TypeComponentLocalErrorContextTableIndex,
4569 ) -> Result<u32> {
4570 let mut instance = self.id().get_mut(store);
4571 let rep = instance
4572 .as_mut()
4573 .table_for_error_context(src)
4574 .error_context_rep(src_idx)?;
4575 let dst_idx = instance
4576 .table_for_error_context(dst)
4577 .error_context_insert(rep)?;
4578
4579 let global_ref_count = store
4583 .concurrent_state_mut()
4584 .global_error_context_ref_counts
4585 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4586 .context("global ref count present for existing (sub)component error context")?;
4587
4588 global_ref_count.0 = global_ref_count
4589 .0
4590 .checked_add(1)
4591 .ok_or_else(|| format_err!(Trap::ReferenceCountOverflow))?;
4592
4593 Ok(dst_idx)
4594 }
4595}
4596
4597impl ComponentInstance {
4598 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4599 let (states, types) = self.instance_states();
4600 let runtime_instance = match ty {
4601 TransmitIndex::Stream(ty) => types[ty].instance,
4602 TransmitIndex::Future(ty) => types[ty].instance,
4603 };
4604 states[runtime_instance].handle_table()
4605 }
4606
4607 fn table_for_error_context(
4608 self: Pin<&mut Self>,
4609 ty: TypeComponentLocalErrorContextTableIndex,
4610 ) -> &mut HandleTable {
4611 let (states, types) = self.instance_states();
4612 let runtime_instance = types[ty].instance;
4613 states[runtime_instance].handle_table()
4614 }
4615
4616 fn get_mut_by_index(
4617 self: Pin<&mut Self>,
4618 ty: TransmitIndex,
4619 index: u32,
4620 ) -> Result<(u32, &mut TransmitLocalState)> {
4621 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4622 }
4623}
4624
4625impl ConcurrentState {
4626 fn send_write_result(
4627 &mut self,
4628 ty: TransmitIndex,
4629 id: TableId<TransmitState>,
4630 handle: u32,
4631 code: ReturnCode,
4632 ) -> Result<()> {
4633 let write_handle = self.get_mut(id)?.write_handle.rep();
4634 self.set_event(
4635 write_handle,
4636 match ty {
4637 TransmitIndex::Future(ty) => Event::FutureWrite {
4638 code,
4639 pending: Some((ty, handle)),
4640 },
4641 TransmitIndex::Stream(ty) => Event::StreamWrite {
4642 code,
4643 pending: Some((ty, handle)),
4644 },
4645 },
4646 )
4647 }
4648
4649 fn send_read_result(
4650 &mut self,
4651 ty: TransmitIndex,
4652 id: TableId<TransmitState>,
4653 handle: u32,
4654 code: ReturnCode,
4655 ) -> Result<()> {
4656 let read_handle = self.get_mut(id)?.read_handle.rep();
4657 self.set_event(
4658 read_handle,
4659 match ty {
4660 TransmitIndex::Future(ty) => Event::FutureRead {
4661 code,
4662 pending: Some((ty, handle)),
4663 },
4664 TransmitIndex::Stream(ty) => Event::StreamRead {
4665 code,
4666 pending: Some((ty, handle)),
4667 },
4668 },
4669 )
4670 }
4671
4672 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4673 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4674 }
4675
4676 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4677 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4678 }
4679
4680 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4691 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4692
4693 fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4694 let (ReturnCode::Completed(count)
4695 | ReturnCode::Dropped(count)
4696 | ReturnCode::Cancelled(count)) = old
4697 else {
4698 bail_bug!("unexpected old return code")
4699 };
4700
4701 Ok(match new {
4702 ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count),
4703 ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count),
4704 _ => bail_bug!("unexpected new return code"),
4705 })
4706 }
4707
4708 let event = match (waitable.take_event(self)?, event) {
4709 (None, _) => event,
4710 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4711 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4712 (
4713 Some(Event::StreamWrite {
4714 code: old_code,
4715 pending: old_pending,
4716 }),
4717 Event::StreamWrite { code, pending },
4718 ) => Event::StreamWrite {
4719 code: update_code(old_code, code)?,
4720 pending: old_pending.or(pending),
4721 },
4722 (
4723 Some(Event::StreamRead {
4724 code: old_code,
4725 pending: old_pending,
4726 }),
4727 Event::StreamRead { code, pending },
4728 ) => Event::StreamRead {
4729 code: update_code(old_code, code)?,
4730 pending: old_pending.or(pending),
4731 },
4732 _ => bail_bug!("unexpected event combination"),
4733 };
4734
4735 waitable.set_event(self, Some(event))
4736 }
4737
4738 fn new_transmit(
4741 &mut self,
4742 origin: TransmitOrigin,
4743 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4744 let state_id = self.push(TransmitState::new(origin))?;
4745
4746 let write = self.push(TransmitHandle::new(state_id))?;
4747 let read = self.push(TransmitHandle::new(state_id))?;
4748
4749 let state = self.get_mut(state_id)?;
4750 state.write_handle = write;
4751 state.read_handle = read;
4752
4753 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4754
4755 Ok((write, read))
4756 }
4757
4758 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4760 let state = self.delete(state_id)?;
4761 self.delete(state.write_handle)?;
4762 self.delete(state.read_handle)?;
4763
4764 log::trace!(
4765 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4766 state.write_handle,
4767 state.read_handle,
4768 );
4769
4770 Ok(())
4771 }
4772}
4773
4774pub(crate) struct ResourcePair {
4775 pub(crate) write: u32,
4776 pub(crate) read: u32,
4777}
4778
4779impl Waitable {
4780 pub(super) fn on_delivery(
4783 &self,
4784 store: &mut StoreOpaque,
4785 instance: Instance,
4786 event: Event,
4787 ) -> Result<()> {
4788 let instance = instance.id().get_mut(store);
4789 let (rep, state, code) = match event {
4790 Event::FutureRead {
4791 pending: Some((ty, handle)),
4792 code,
4793 }
4794 | Event::FutureWrite {
4795 pending: Some((ty, handle)),
4796 code,
4797 } => {
4798 let runtime_instance = instance.component().types()[ty].instance;
4799 let (rep, state) = instance.instance_states().0[runtime_instance]
4800 .handle_table()
4801 .future_rep(ty, handle)?;
4802 (rep, state, code)
4803 }
4804 Event::StreamRead {
4805 pending: Some((ty, handle)),
4806 code,
4807 }
4808 | Event::StreamWrite {
4809 pending: Some((ty, handle)),
4810 code,
4811 } => {
4812 let runtime_instance = instance.component().types()[ty].instance;
4813 let (rep, state) = instance.instance_states().0[runtime_instance]
4814 .handle_table()
4815 .stream_rep(ty, handle)?;
4816 (rep, state, code)
4817 }
4818 _ => return Ok(()),
4819 };
4820 if rep != self.rep() {
4821 bail_bug!("unexpected rep mismatch");
4822 }
4823 if *state != TransmitLocalState::Busy {
4824 bail_bug!("expected state to be busy");
4825 }
4826 let done = matches!(code, ReturnCode::Dropped(_));
4827 *state = match event {
4828 Event::FutureRead { .. } | Event::StreamRead { .. } => {
4829 TransmitLocalState::Read { done }
4830 }
4831 Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
4832 TransmitLocalState::Write { done }
4833 }
4834 _ => bail_bug!("unexpected event for stream"),
4835 };
4836
4837 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4838 let state = store.concurrent_state_mut();
4839 let transmit_id = state.get_mut(transmit_handle)?.state;
4840 let transmit = state.get_mut(transmit_id)?;
4841
4842 match event {
4843 Event::StreamRead { .. } => {
4844 transmit.read = ReadState::Open;
4845 }
4846 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4847 _ => {}
4848 }
4849 Ok(())
4850 }
4851}
4852
4853fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
4857 matches!(
4858 ty,
4859 None | Some(
4860 InterfaceType::S8
4861 | InterfaceType::U8
4862 | InterfaceType::S16
4863 | InterfaceType::U16
4864 | InterfaceType::S32
4865 | InterfaceType::U32
4866 | InterfaceType::S64
4867 | InterfaceType::U64
4868 | InterfaceType::Float32
4869 | InterfaceType::Float64
4870 )
4871 )
4872}
4873
4874struct LockedState<T> {
4878 inner: Mutex<Option<T>>,
4879}
4880
4881impl<T> LockedState<T> {
4882 fn new(value: T) -> Self {
4884 Self {
4885 inner: Mutex::new(Some(value)),
4886 }
4887 }
4888
4889 fn try_lock(&self) -> Result<MutexGuard<'_, Option<T>>> {
4898 match self.inner.try_lock() {
4899 Ok(lock) => Ok(lock),
4900 Err(_) => bail_bug!("should not have contention on state lock"),
4901 }
4902 }
4903
4904 fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4911 let result = self.try_lock()?.take();
4912 match result {
4913 Some(result) => Ok(LockedStateGuard {
4914 value: ManuallyDrop::new(result),
4915 state: self,
4916 }),
4917 None => bail_bug!("lock value unexpectedly missing"),
4918 }
4919 }
4920
4921 fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4930 let mut inner = self.try_lock()?;
4931 match &mut *inner {
4932 Some(state) => Ok(f(state)),
4933 None => bail_bug!("lock value unexpectedly missing"),
4934 }
4935 }
4936}
4937
4938struct LockedStateGuard<'a, T> {
4941 value: ManuallyDrop<T>,
4942 state: &'a LockedState<T>,
4943}
4944
4945impl<T> Deref for LockedStateGuard<'_, T> {
4946 type Target = T;
4947
4948 fn deref(&self) -> &T {
4949 &self.value
4950 }
4951}
4952
4953impl<T> DerefMut for LockedStateGuard<'_, T> {
4954 fn deref_mut(&mut self) -> &mut T {
4955 &mut self.value
4956 }
4957}
4958
4959impl<T> Drop for LockedStateGuard<'_, T> {
4960 fn drop(&mut self) {
4961 let value = unsafe { ManuallyDrop::take(&mut self.value) };
4966
4967 if let Ok(mut lock) = self.state.try_lock() {
4971 *lock = Some(value);
4972 }
4973 }
4974}
4975
4976#[cfg(test)]
4977mod tests {
4978 use super::*;
4979 use crate::{Engine, Store};
4980 use core::future::pending;
4981 use core::pin::pin;
4982 use std::sync::LazyLock;
4983
4984 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4985
4986 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4987 where
4988 T: FutureProducer<()>,
4989 {
4990 rx.poll_produce(
4991 &mut Context::from_waker(Waker::noop()),
4992 Store::new(&ENGINE, ()).as_context_mut(),
4993 finish,
4994 )
4995 }
4996
4997 #[test]
4998 fn future_producer() {
4999 let mut fut = pin!(async { crate::error::Ok(()) });
5000 assert!(matches!(
5001 poll_future_producer(fut.as_mut(), false),
5002 Poll::Ready(Ok(Some(()))),
5003 ));
5004
5005 let mut fut = pin!(async { crate::error::Ok(()) });
5006 assert!(matches!(
5007 poll_future_producer(fut.as_mut(), true),
5008 Poll::Ready(Ok(Some(()))),
5009 ));
5010
5011 let mut fut = pin!(pending::<Result<()>>());
5012 assert!(matches!(
5013 poll_future_producer(fut.as_mut(), false),
5014 Poll::Pending,
5015 ));
5016 assert!(matches!(
5017 poll_future_producer(fut.as_mut(), true),
5018 Poll::Ready(Ok(None)),
5019 ));
5020
5021 let (tx, rx) = oneshot::channel();
5022 let mut rx = pin!(rx);
5023 assert!(matches!(
5024 poll_future_producer(rx.as_mut(), false),
5025 Poll::Pending,
5026 ));
5027 assert!(matches!(
5028 poll_future_producer(rx.as_mut(), true),
5029 Poll::Ready(Ok(None)),
5030 ));
5031 tx.send(()).unwrap();
5032 assert!(matches!(
5033 poll_future_producer(rx.as_mut(), true),
5034 Poll::Ready(Ok(Some(()))),
5035 ));
5036
5037 let (tx, rx) = oneshot::channel();
5038 let mut rx = pin!(rx);
5039 tx.send(()).unwrap();
5040 assert!(matches!(
5041 poll_future_producer(rx.as_mut(), false),
5042 Poll::Ready(Ok(Some(()))),
5043 ));
5044
5045 let (tx, rx) = oneshot::channel::<()>();
5046 let mut rx = pin!(rx);
5047 drop(tx);
5048 assert!(matches!(
5049 poll_future_producer(rx.as_mut(), false),
5050 Poll::Ready(Err(..)),
5051 ));
5052
5053 let (tx, rx) = oneshot::channel::<()>();
5054 let mut rx = pin!(rx);
5055 drop(tx);
5056 assert!(matches!(
5057 poll_future_producer(rx.as_mut(), true),
5058 Poll::Ready(Err(..)),
5059 ));
5060 }
5061}