1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9 AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10 Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17 Error, Result, Trap, bail, bail_bug, ensure,
18 error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, ManuallyDrop, MaybeUninit};
26use core::ops::{Deref, DerefMut};
27use core::pin::Pin;
28use core::task::{Context, Poll, Waker, ready};
29use futures::channel::oneshot;
30use futures::{FutureExt as _, stream};
31use std::any::{Any, TypeId};
32use std::boxed::Box;
33use std::io::Cursor;
34use std::string::String;
35use std::sync::{Arc, Mutex, MutexGuard};
36use std::vec::Vec;
37use wasmtime_environ::component::{
38 CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
39 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
40 TypeFutureTableIndex, TypeStreamTableIndex,
41};
42
43pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
44
45mod buffers;
46
47#[derive(Copy, Clone, Debug)]
50pub enum TransmitKind {
51 Stream,
52 Future,
53}
54
55#[derive(Copy, Clone, Debug, PartialEq)]
57pub enum ReturnCode {
58 Blocked,
59 Completed(ItemCount),
60 Dropped(ItemCount),
61 Cancelled(ItemCount),
62}
63
64impl ReturnCode {
65 pub fn encode(&self) -> u32 {
70 const BLOCKED: u32 = 0xffff_ffff;
71 const COMPLETED: u32 = 0x0;
72 const DROPPED: u32 = 0x1;
73 const CANCELLED: u32 = 0x2;
74 match self {
75 ReturnCode::Blocked => BLOCKED,
76 ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED,
77 ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED,
78 ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED,
79 }
80 }
81
82 fn completed(kind: TransmitKind, count: ItemCount) -> Self {
85 Self::Completed(if let TransmitKind::Future = kind {
86 ItemCount::ZERO
87 } else {
88 count
89 })
90 }
91}
92
93#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
100#[repr(transparent)]
101pub struct ItemCount {
102 raw: u32,
103}
104
105impl ItemCount {
106 const MAX: u32 = 1 << 28;
107 const ZERO: ItemCount = ItemCount { raw: 0 };
108
109 fn new(count: u32) -> Result<Self, Trap> {
112 if count < Self::MAX {
113 Ok(Self { raw: count })
114 } else {
115 Err(Trap::StreamOpTooBig)
116 }
117 }
118
119 fn new_usize(count: usize) -> Result<Self, Trap> {
121 let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?;
122 Self::new(count)
123 }
124
125 fn as_u32(&self) -> u32 {
126 self.raw
127 }
128
129 fn as_usize(&self) -> usize {
130 usize::try_from(self.raw).unwrap()
131 }
132
133 fn inc(&mut self, amt: usize) -> Result<(), Trap> {
136 let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?;
137 let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?;
138 if new_raw < Self::MAX {
139 self.raw = new_raw;
140 Ok(())
141 } else {
142 Err(Trap::StreamOpTooBig)
143 }
144 }
145
146 fn add(&self, other: ItemCount) -> Result<ItemCount> {
152 match self.raw.checked_add(other.raw) {
153 Some(raw) => Ok(ItemCount::new(raw)?),
154 None => bail_bug!("overflow in `ItemCount::add`"),
155 }
156 }
157
158 fn sub(&self, other: ItemCount) -> Result<ItemCount> {
163 match self.raw.checked_sub(other.raw) {
164 Some(raw) => Ok(ItemCount { raw }),
165 None => bail_bug!("underflow in `ItemCount::sub`"),
166 }
167 }
168}
169
170impl fmt::Display for ItemCount {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 self.raw.fmt(f)
173 }
174}
175
176impl fmt::Debug for ItemCount {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 self.raw.fmt(f)
179 }
180}
181
182impl PartialEq<u32> for ItemCount {
183 fn eq(&self, other: &u32) -> bool {
184 self.raw == *other
185 }
186}
187
188impl PartialOrd<u32> for ItemCount {
189 fn partial_cmp(&self, other: &u32) -> Option<std::cmp::Ordering> {
190 self.raw.partial_cmp(other)
191 }
192}
193
194#[derive(Copy, Clone, Debug)]
199pub enum TransmitIndex {
200 Stream(TypeStreamTableIndex),
201 Future(TypeFutureTableIndex),
202}
203
204impl TransmitIndex {
205 pub fn kind(&self) -> TransmitKind {
206 match self {
207 TransmitIndex::Stream(_) => TransmitKind::Stream,
208 TransmitIndex::Future(_) => TransmitKind::Future,
209 }
210 }
211
212 fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
215 match self {
216 TransmitIndex::Stream(i) => {
217 let ty = types[*i].ty;
218 types[ty].payload.as_ref()
219 }
220 TransmitIndex::Future(i) => {
221 let ty = types[*i].ty;
222 types[ty].payload.as_ref()
223 }
224 }
225 }
226}
227
228fn get_mut_by_index_from(
231 handle_table: &mut HandleTable,
232 ty: TransmitIndex,
233 index: u32,
234) -> Result<(u32, &mut TransmitLocalState)> {
235 match ty {
236 TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
237 TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
238 }
239}
240
241fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
242 mut store: StoreContextMut<U>,
243 instance: Instance,
244 caller_thread: QualifiedThreadId,
245 options: OptionsIndex,
246 ty: TransmitIndex,
247 address: usize,
248 count: usize,
249 buffer: &mut B,
250) -> Result<()> {
251 let count = buffer.remaining().len().min(count);
252
253 let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
257 let old_thread = store.0.set_thread(caller_thread)?;
258 (
259 &mut LowerContext::new(store.as_context_mut(), options, instance),
260 Some(old_thread),
261 )
262 } else {
263 (
264 &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
265 None,
266 )
267 };
268
269 if address % usize::try_from(T::ALIGN32)? != 0 {
270 bail!("read pointer not aligned");
271 }
272 lower
273 .as_slice_mut()
274 .get_mut(address..)
275 .and_then(|b| b.get_mut(..T::SIZE32 * count))
276 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
277
278 if let Some(ty) = ty.payload(lower.types) {
279 T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
280 }
281
282 if let Some(old_thread) = old_thread {
283 store.0.set_thread(old_thread)?;
284 }
285
286 buffer.skip(count);
287
288 Ok(())
289}
290
291fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
292 lift: &mut LiftContext<'_>,
293 ty: Option<InterfaceType>,
294 buffer: &mut B,
295 address: usize,
296 count: usize,
297) -> Result<()> {
298 let count = count.min(buffer.remaining_capacity());
299 if T::IS_RUST_UNIT_TYPE {
300 buffer.extend(
304 iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
305 )
306 } else {
307 let ty = match ty {
308 Some(ty) => ty,
309 None => bail_bug!("type required for non-unit lift"),
310 };
311 if address % usize::try_from(T::ALIGN32)? != 0 {
312 bail!("write pointer not aligned");
313 }
314 lift.memory()
315 .get(address..)
316 .and_then(|b| b.get(..T::SIZE32 * count))
317 .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
318
319 let list = &WasmList::new(address, count, lift, ty)?;
320 T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
321 }
322 Ok(())
323}
324
325#[derive(Debug, PartialEq, Eq, PartialOrd)]
327pub(super) struct ErrorContextState {
328 pub(crate) debug_msg: String,
330}
331
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
335pub(super) struct FlatAbi {
336 pub(super) size: u32,
337 pub(super) align: u32,
338}
339
340pub struct Destination<'a, T, B> {
342 id: TableId<TransmitState>,
343 buffer: &'a mut B,
344 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
345 _phantom: PhantomData<fn() -> T>,
346}
347
348impl<'a, T, B> Destination<'a, T, B> {
349 pub fn reborrow(&mut self) -> Destination<'_, T, B> {
351 Destination {
352 id: self.id,
353 buffer: &mut *self.buffer,
354 host_buffer: self.host_buffer.as_deref_mut(),
355 _phantom: PhantomData,
356 }
357 }
358
359 pub fn take_buffer(&mut self) -> B
365 where
366 B: Default,
367 {
368 mem::take(self.buffer)
369 }
370
371 pub fn set_buffer(&mut self, buffer: B) {
381 *self.buffer = buffer;
382 }
383
384 pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
401 self.remaining_(store.as_context_mut().0).unwrap()
405 }
406
407 fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
408 let transmit = store.concurrent_state_mut().get_mut(self.id)?;
409
410 if let &ReadState::GuestReady { count, .. } = &transmit.read {
411 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
412 bail_bug!("expected WriteState::HostReady")
413 };
414
415 Ok(Some(count.as_usize() - guest_offset.as_usize()))
416 } else {
417 Ok(None)
418 }
419 }
420}
421
422impl<'a, B> Destination<'a, u8, B> {
423 pub fn as_direct<D>(
434 mut self,
435 store: StoreContextMut<'a, D>,
436 capacity: usize,
437 ) -> DirectDestination<'a, D> {
438 if let Some(buffer) = self.host_buffer.as_deref_mut() {
439 buffer.set_position(0);
440 if buffer.get_mut().is_empty() {
441 buffer.get_mut().resize(capacity, 0);
442 }
443 }
444
445 DirectDestination {
446 id: self.id,
447 host_buffer: self.host_buffer,
448 store,
449 }
450 }
451}
452
453pub struct DirectDestination<'a, D: 'static> {
456 id: TableId<TransmitState>,
457 host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
458 store: StoreContextMut<'a, D>,
459}
460
461impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
462 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
463 let rem = self.remaining();
464 let n = rem.len().min(buf.len());
465 rem[..n].copy_from_slice(&buf[..n]);
466 self.mark_written(n);
467 Ok(n)
468 }
469
470 fn flush(&mut self) -> std::io::Result<()> {
471 Ok(())
472 }
473}
474
475impl<D: 'static> DirectDestination<'_, D> {
476 pub fn remaining(&mut self) -> &mut [u8] {
478 self.remaining_().unwrap()
482 }
483
484 fn remaining_(&mut self) -> Result<&mut [u8]> {
485 if let Some(buffer) = self.host_buffer.as_deref_mut() {
486 return Ok(buffer.get_mut());
487 }
488 let transmit = self
489 .store
490 .as_context_mut()
491 .0
492 .concurrent_state_mut()
493 .get_mut(self.id)?;
494
495 let &ReadState::GuestReady {
496 address,
497 count,
498 options,
499 instance,
500 ..
501 } = &transmit.read
502 else {
503 bail_bug!("expected ReadState::GuestReady")
504 };
505
506 let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
507 bail_bug!("expected WriteState::HostReady")
508 };
509
510 let memory = instance
511 .options_memory_mut(self.store.0, options)
512 .get_mut((address + guest_offset.as_usize())..)
513 .and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize())));
514 match memory {
515 Some(memory) => Ok(memory),
516 None => bail_bug!("guest buffer unexpectedly out of bounds"),
517 }
518 }
519
520 pub fn mark_written(&mut self, count: usize) {
527 self.mark_written_(count).unwrap()
531 }
532
533 fn mark_written_(&mut self, count: usize) -> Result<()> {
534 if let Some(buffer) = self.host_buffer.as_deref_mut() {
535 buffer.set_position(
536 buffer
539 .position()
540 .checked_add(u64::try_from(count).unwrap())
541 .unwrap(),
542 );
543 } else {
544 let transmit = self
545 .store
546 .as_context_mut()
547 .0
548 .concurrent_state_mut()
549 .get_mut(self.id)?;
550
551 let ReadState::GuestReady {
552 count: read_count, ..
553 } = &transmit.read
554 else {
555 bail_bug!("expected ReadState::GuestReady")
556 };
557
558 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
559 bail_bug!("expected WriteState::HostReady");
560 };
561
562 if guest_offset.as_usize() + count > read_count.as_usize() {
563 panic!(
566 "write count ({count}) must be less than or equal to read count ({read_count})"
567 )
568 } else {
569 guest_offset.inc(count)?;
570 }
571 }
572 Ok(())
573 }
574}
575
576#[derive(Copy, Clone, Debug)]
578pub enum StreamResult {
579 Completed,
582 Cancelled,
587 Dropped,
590}
591
592pub trait StreamProducer<D>: Send + 'static {
594 type Item;
596
597 type Buffer: WriteBuffer<Self::Item> + Default;
599
600 fn poll_produce<'a>(
736 self: Pin<&mut Self>,
737 cx: &mut Context<'_>,
738 store: StoreContextMut<'a, D>,
739 destination: Destination<'a, Self::Item, Self::Buffer>,
740 finish: bool,
741 ) -> Poll<Result<StreamResult>>;
742
743 fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
749 Err(me)
750 }
751}
752
753impl<T, D> StreamProducer<D> for iter::Empty<T>
754where
755 T: Send + Sync + 'static,
756{
757 type Item = T;
758 type Buffer = Option<Self::Item>;
759
760 fn poll_produce<'a>(
761 self: Pin<&mut Self>,
762 _: &mut Context<'_>,
763 _: StoreContextMut<'a, D>,
764 _: Destination<'a, Self::Item, Self::Buffer>,
765 _: bool,
766 ) -> Poll<Result<StreamResult>> {
767 Poll::Ready(Ok(StreamResult::Dropped))
768 }
769}
770
771impl<T, D> StreamProducer<D> for stream::Empty<T>
772where
773 T: Send + Sync + 'static,
774{
775 type Item = T;
776 type Buffer = Option<Self::Item>;
777
778 fn poll_produce<'a>(
779 self: Pin<&mut Self>,
780 _: &mut Context<'_>,
781 _: StoreContextMut<'a, D>,
782 _: Destination<'a, Self::Item, Self::Buffer>,
783 _: bool,
784 ) -> Poll<Result<StreamResult>> {
785 Poll::Ready(Ok(StreamResult::Dropped))
786 }
787}
788
789impl<T, D> StreamProducer<D> for Vec<T>
790where
791 T: Unpin + Send + Sync + 'static,
792{
793 type Item = T;
794 type Buffer = VecBuffer<T>;
795
796 fn poll_produce<'a>(
797 self: Pin<&mut Self>,
798 _: &mut Context<'_>,
799 _: StoreContextMut<'a, D>,
800 mut dst: Destination<'a, Self::Item, Self::Buffer>,
801 _: bool,
802 ) -> Poll<Result<StreamResult>> {
803 dst.set_buffer(mem::take(self.get_mut()).into());
804 Poll::Ready(Ok(StreamResult::Dropped))
805 }
806}
807
808impl<T, D> StreamProducer<D> for Box<[T]>
809where
810 T: Unpin + Send + Sync + 'static,
811{
812 type Item = T;
813 type Buffer = VecBuffer<T>;
814
815 fn poll_produce<'a>(
816 self: Pin<&mut Self>,
817 _: &mut Context<'_>,
818 _: StoreContextMut<'a, D>,
819 mut dst: Destination<'a, Self::Item, Self::Buffer>,
820 _: bool,
821 ) -> Poll<Result<StreamResult>> {
822 dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
823 Poll::Ready(Ok(StreamResult::Dropped))
824 }
825}
826
827#[cfg(feature = "component-model-async-bytes")]
828impl<D> StreamProducer<D> for bytes::Bytes {
829 type Item = u8;
830 type Buffer = Cursor<Self>;
831
832 fn poll_produce<'a>(
833 self: Pin<&mut Self>,
834 _: &mut Context<'_>,
835 _store: StoreContextMut<'a, D>,
836 mut dst: Destination<'a, Self::Item, Self::Buffer>,
837 _: bool,
838 ) -> Poll<Result<StreamResult>> {
839 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
840 Poll::Ready(Ok(StreamResult::Dropped))
841 }
842}
843
844#[cfg(feature = "component-model-async-bytes")]
845impl<D> StreamProducer<D> for bytes::BytesMut {
846 type Item = u8;
847 type Buffer = Cursor<Self>;
848
849 fn poll_produce<'a>(
850 self: Pin<&mut Self>,
851 _: &mut Context<'_>,
852 _store: StoreContextMut<'a, D>,
853 mut dst: Destination<'a, Self::Item, Self::Buffer>,
854 _: bool,
855 ) -> Poll<Result<StreamResult>> {
856 dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
857 Poll::Ready(Ok(StreamResult::Dropped))
858 }
859}
860
861pub struct Source<'a, T> {
863 id: TableId<TransmitState>,
864 host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
865}
866
867impl<'a, T> Source<'a, T> {
868 pub fn reborrow(&mut self) -> Source<'_, T> {
870 Source {
871 id: self.id,
872 host_buffer: self.host_buffer.as_deref_mut(),
873 }
874 }
875
876 pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
878 where
879 T: func::Lift + 'static,
880 B: ReadBuffer<T>,
881 {
882 if let Some(input) = &mut self.host_buffer {
883 let count = input.remaining().len().min(buffer.remaining_capacity());
884 buffer.move_from(*input, count);
885 } else {
886 let store = store.as_context_mut();
887 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
888
889 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
890 bail_bug!("expected ReadState::HostReady");
891 };
892
893 let &WriteState::GuestReady {
894 ty,
895 address,
896 count,
897 options,
898 instance,
899 ..
900 } = &transmit.write
901 else {
902 bail_bug!("expected WriteState::GuestReady");
903 };
904
905 let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
906 let ty = ty.payload(cx.types);
907 let old_remaining = buffer.remaining_capacity();
908 lift::<T, B>(
909 cx,
910 ty.copied(),
911 buffer,
912 address + (T::SIZE32 * guest_offset.as_usize()),
913 count.as_usize() - guest_offset.as_usize(),
914 )?;
915
916 let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
917
918 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
919 bail_bug!("expected ReadState::HostReady");
920 };
921
922 guest_offset.inc(old_remaining - buffer.remaining_capacity())?;
923 }
924
925 Ok(())
926 }
927
928 pub fn remaining(&self, mut store: impl AsContextMut) -> usize
931 where
932 T: 'static,
933 {
934 self.remaining_(store.as_context_mut().0).unwrap()
938 }
939
940 fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
941 where
942 T: 'static,
943 {
944 let transmit = store.concurrent_state_mut().get_mut(self.id)?;
945
946 if let &WriteState::GuestReady { count, .. } = &transmit.write {
947 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
948 bail_bug!("expected ReadState::HostReady")
949 };
950
951 Ok(count.as_usize() - guest_offset.as_usize())
952 } else if let Some(host_buffer) = &self.host_buffer {
953 Ok(host_buffer.remaining().len())
954 } else {
955 bail_bug!("expected either WriteState::GuestReady or host buffer")
956 }
957 }
958}
959
960impl<'a> Source<'a, u8> {
961 pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
963 DirectSource {
964 id: self.id,
965 host_buffer: self.host_buffer,
966 store,
967 }
968 }
969}
970
971pub struct DirectSource<'a, D: 'static> {
974 id: TableId<TransmitState>,
975 host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
976 store: StoreContextMut<'a, D>,
977}
978
979impl<D: 'static> std::io::Read for DirectSource<'_, D> {
980 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
981 let rem = self.remaining();
982 let n = rem.len().min(buf.len());
983 buf[..n].copy_from_slice(&rem[..n]);
984 self.mark_read(n);
985 Ok(n)
986 }
987}
988
989impl<D: 'static> DirectSource<'_, D> {
990 pub fn remaining(&mut self) -> &[u8] {
992 self.remaining_().unwrap()
996 }
997
998 fn remaining_(&mut self) -> Result<&[u8]> {
999 if let Some(buffer) = self.host_buffer.as_deref_mut() {
1000 return Ok(buffer.remaining());
1001 }
1002 let transmit = self
1003 .store
1004 .as_context_mut()
1005 .0
1006 .concurrent_state_mut()
1007 .get_mut(self.id)?;
1008
1009 let &WriteState::GuestReady {
1010 address,
1011 count,
1012 options,
1013 instance,
1014 ..
1015 } = &transmit.write
1016 else {
1017 bail_bug!("expected WriteState::GuestReady")
1018 };
1019
1020 let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
1021 bail_bug!("expected ReadState::HostReady")
1022 };
1023
1024 let memory = instance
1025 .options_memory(self.store.0, options)
1026 .get((address + guest_offset.as_usize())..)
1027 .and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize())));
1028 match memory {
1029 Some(memory) => Ok(memory),
1030 None => bail_bug!("guest buffer unexpectedly out of bounds"),
1031 }
1032 }
1033
1034 pub fn mark_read(&mut self, count: usize) {
1041 self.mark_read_(count).unwrap()
1045 }
1046
1047 fn mark_read_(&mut self, count: usize) -> Result<()> {
1048 if let Some(buffer) = self.host_buffer.as_deref_mut() {
1049 buffer.skip(count);
1050 return Ok(());
1051 }
1052
1053 let transmit = self
1054 .store
1055 .as_context_mut()
1056 .0
1057 .concurrent_state_mut()
1058 .get_mut(self.id)?;
1059
1060 let WriteState::GuestReady {
1061 count: write_count, ..
1062 } = &transmit.write
1063 else {
1064 bail_bug!("expected WriteState::GuestReady");
1065 };
1066
1067 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
1068 bail_bug!("expected ReadState::HostReady");
1069 };
1070
1071 if guest_offset.as_usize() + count > write_count.as_usize() {
1072 panic!("read count ({count}) must be less than or equal to write count ({write_count})")
1074 } else {
1075 guest_offset.inc(count)?;
1076 }
1077 Ok(())
1078 }
1079}
1080
1081pub trait StreamConsumer<D>: Send + 'static {
1083 type Item;
1085
1086 fn poll_consume(
1169 self: Pin<&mut Self>,
1170 cx: &mut Context<'_>,
1171 store: StoreContextMut<D>,
1172 source: Source<'_, Self::Item>,
1173 finish: bool,
1174 ) -> Poll<Result<StreamResult>>;
1175}
1176
1177pub trait FutureProducer<D>: Send + 'static {
1179 type Item;
1181
1182 fn poll_produce(
1192 self: Pin<&mut Self>,
1193 cx: &mut Context<'_>,
1194 store: StoreContextMut<D>,
1195 finish: bool,
1196 ) -> Poll<Result<Option<Self::Item>>>;
1197}
1198
1199impl<T, E, D, Fut> FutureProducer<D> for Fut
1200where
1201 E: Into<Error>,
1202 Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1203{
1204 type Item = T;
1205
1206 fn poll_produce<'a>(
1207 self: Pin<&mut Self>,
1208 cx: &mut Context<'_>,
1209 _: StoreContextMut<'a, D>,
1210 finish: bool,
1211 ) -> Poll<Result<Option<T>>> {
1212 match self.poll(cx) {
1213 Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1214 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1215 Poll::Pending if finish => Poll::Ready(Ok(None)),
1216 Poll::Pending => Poll::Pending,
1217 }
1218 }
1219}
1220
1221pub trait FutureConsumer<D>: Send + 'static {
1223 type Item;
1225
1226 fn poll_consume(
1238 self: Pin<&mut Self>,
1239 cx: &mut Context<'_>,
1240 store: StoreContextMut<D>,
1241 source: Source<'_, Self::Item>,
1242 finish: bool,
1243 ) -> Poll<Result<()>>;
1244}
1245
1246pub struct FutureReader<T> {
1253 id: TableId<TransmitHandle>,
1254 _phantom: PhantomData<T>,
1255}
1256
1257impl<T> FutureReader<T> {
1258 pub fn new<S: AsContextMut>(
1267 mut store: S,
1268 producer: impl FutureProducer<S::Data, Item = T>,
1269 ) -> Result<Self>
1270 where
1271 T: func::Lower + func::Lift + Send + Sync + 'static,
1272 {
1273 ensure!(
1274 store.as_context().0.concurrency_support(),
1275 "concurrency support is not enabled"
1276 );
1277
1278 struct Producer<P>(P);
1279
1280 impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1281 for Producer<P>
1282 {
1283 type Item = P::Item;
1284 type Buffer = Option<P::Item>;
1285
1286 fn poll_produce<'a>(
1287 self: Pin<&mut Self>,
1288 cx: &mut Context<'_>,
1289 store: StoreContextMut<D>,
1290 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1291 finish: bool,
1292 ) -> Poll<Result<StreamResult>> {
1293 let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1296
1297 Poll::Ready(Ok(
1298 if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1299 destination.set_buffer(Some(value));
1300
1301 StreamResult::Completed
1308 } else {
1309 StreamResult::Cancelled
1310 },
1311 ))
1312 }
1313 }
1314
1315 Ok(Self::new_(
1316 store
1317 .as_context_mut()
1318 .new_transmit(TransmitKind::Future, Producer(producer))?,
1319 ))
1320 }
1321
1322 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1323 Self {
1324 id,
1325 _phantom: PhantomData,
1326 }
1327 }
1328
1329 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1330 self.id
1331 }
1332
1333 pub fn pipe<S: AsContextMut>(
1343 self,
1344 mut store: S,
1345 consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1346 ) -> Result<()>
1347 where
1348 T: func::Lift + 'static,
1349 {
1350 struct Consumer<C>(C);
1351
1352 impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1353 for Consumer<C>
1354 {
1355 type Item = T;
1356
1357 fn poll_consume(
1358 self: Pin<&mut Self>,
1359 cx: &mut Context<'_>,
1360 mut store: StoreContextMut<D>,
1361 mut source: Source<Self::Item>,
1362 finish: bool,
1363 ) -> Poll<Result<StreamResult>> {
1364 let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1367
1368 ready!(consumer.poll_consume(
1369 cx,
1370 store.as_context_mut(),
1371 source.reborrow(),
1372 finish
1373 ))?;
1374
1375 Poll::Ready(Ok(if source.remaining(store) == 0 {
1376 StreamResult::Completed
1382 } else {
1383 StreamResult::Cancelled
1384 }))
1385 }
1386 }
1387
1388 store
1389 .as_context_mut()
1390 .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1391 }
1392
1393 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1395 let id = lift_index_to_future(cx, ty, index)?;
1396 Ok(Self::new_(id))
1397 }
1398
1399 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1417 future_close(store.as_context_mut().0, &mut self.id)
1418 }
1419
1420 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1422 accessor.as_accessor().with(|access| self.close(access))
1423 }
1424
1425 pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1431 where
1432 A: AsAccessor,
1433 {
1434 GuardedFutureReader::new(accessor, self)
1435 }
1436
1437 pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1444 where
1445 T: ComponentType + 'static,
1446 {
1447 FutureAny::try_from_future_reader(store, self)
1448 }
1449
1450 pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1457 where
1458 T: ComponentType + 'static,
1459 {
1460 future.try_into_future_reader()
1461 }
1462}
1463
1464impl<T> fmt::Debug for FutureReader<T> {
1465 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1466 f.debug_struct("FutureReader")
1467 .field("id", &self.id)
1468 .finish()
1469 }
1470}
1471
1472pub(super) fn future_close(
1473 store: &mut StoreOpaque,
1474 id: &mut TableId<TransmitHandle>,
1475) -> Result<()> {
1476 let id = mem::replace(id, TableId::new(u32::MAX));
1477 store.host_drop_reader(id, TransmitKind::Future)
1478}
1479
1480pub(super) fn lift_index_to_future(
1482 cx: &mut LiftContext<'_>,
1483 ty: InterfaceType,
1484 index: u32,
1485) -> Result<TableId<TransmitHandle>> {
1486 match ty {
1487 InterfaceType::Future(src) => {
1488 let handle_table = cx
1489 .instance_mut()
1490 .table_for_transmit(TransmitIndex::Future(src));
1491 let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1492 if is_done {
1493 bail!("cannot lift future after being notified that the writable end dropped");
1494 }
1495 let id = TableId::<TransmitHandle>::new(rep);
1496 let concurrent_state = cx.concurrent_state_mut();
1497 let future = concurrent_state.get_mut(id)?;
1498 future.common.handle = None;
1499 let state = future.state;
1500
1501 if concurrent_state.get_mut(state)?.done {
1502 bail!("cannot lift future after previous read succeeded");
1503 }
1504
1505 Ok(id)
1506 }
1507 _ => func::bad_type_info(),
1508 }
1509}
1510
1511pub(super) fn lower_future_to_index<U>(
1513 id: TableId<TransmitHandle>,
1514 cx: &mut LowerContext<'_, U>,
1515 ty: InterfaceType,
1516) -> Result<u32> {
1517 match ty {
1518 InterfaceType::Future(dst) => {
1519 let concurrent_state = cx.store.0.concurrent_state_mut();
1520 let state = concurrent_state.get_mut(id)?.state;
1521 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1522
1523 let handle = cx
1524 .instance_mut()
1525 .table_for_transmit(TransmitIndex::Future(dst))
1526 .future_insert_read(dst, rep)?;
1527
1528 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1529
1530 Ok(handle)
1531 }
1532 _ => func::bad_type_info(),
1533 }
1534}
1535
1536unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1539 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1540
1541 type Lower = <u32 as func::ComponentType>::Lower;
1542
1543 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1544 match ty {
1545 InterfaceType::Future(ty) => {
1546 let ty = types.types[*ty].ty;
1547 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1548 }
1549 other => bail!("expected `future`, found `{}`", func::desc(other)),
1550 }
1551 }
1552}
1553
1554unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1556 fn linear_lower_to_flat<U>(
1557 &self,
1558 cx: &mut LowerContext<'_, U>,
1559 ty: InterfaceType,
1560 dst: &mut MaybeUninit<Self::Lower>,
1561 ) -> Result<()> {
1562 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1563 }
1564
1565 fn linear_lower_to_memory<U>(
1566 &self,
1567 cx: &mut LowerContext<'_, U>,
1568 ty: InterfaceType,
1569 offset: usize,
1570 ) -> Result<()> {
1571 lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1572 cx,
1573 InterfaceType::U32,
1574 offset,
1575 )
1576 }
1577}
1578
1579unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1581 fn linear_lift_from_flat(
1582 cx: &mut LiftContext<'_>,
1583 ty: InterfaceType,
1584 src: &Self::Lower,
1585 ) -> Result<Self> {
1586 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1587 Self::lift_from_index(cx, ty, index)
1588 }
1589
1590 fn linear_lift_from_memory(
1591 cx: &mut LiftContext<'_>,
1592 ty: InterfaceType,
1593 bytes: &[u8],
1594 ) -> Result<Self> {
1595 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1596 Self::lift_from_index(cx, ty, index)
1597 }
1598}
1599
1600pub struct GuardedFutureReader<T, A>
1608where
1609 A: AsAccessor,
1610{
1611 reader: Option<FutureReader<T>>,
1615 accessor: A,
1616}
1617
1618impl<T, A> GuardedFutureReader<T, A>
1619where
1620 A: AsAccessor,
1621{
1622 pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1630 assert!(
1631 accessor
1632 .as_accessor()
1633 .with(|a| a.as_context().0.concurrency_support())
1634 );
1635 Self {
1636 reader: Some(reader),
1637 accessor,
1638 }
1639 }
1640
1641 pub fn into_future(self) -> FutureReader<T> {
1644 self.into()
1645 }
1646}
1647
1648impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1649where
1650 A: AsAccessor,
1651{
1652 fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1653 guard.reader.take().unwrap()
1654 }
1655}
1656
1657impl<T, A> Drop for GuardedFutureReader<T, A>
1658where
1659 A: AsAccessor,
1660{
1661 fn drop(&mut self) {
1662 if let Some(reader) = &mut self.reader {
1663 let result = reader.close_with(&self.accessor);
1666 debug_assert!(result.is_ok());
1667 }
1668 }
1669}
1670
1671pub struct StreamReader<T> {
1678 id: TableId<TransmitHandle>,
1679 _phantom: PhantomData<T>,
1680}
1681
1682impl<T> StreamReader<T> {
1683 pub fn new<S: AsContextMut>(
1692 mut store: S,
1693 producer: impl StreamProducer<S::Data, Item = T>,
1694 ) -> Result<Self>
1695 where
1696 T: func::Lower + func::Lift + Send + Sync + 'static,
1697 {
1698 ensure!(
1699 store.as_context().0.concurrency_support(),
1700 "concurrency support is not enabled",
1701 );
1702 Ok(Self::new_(
1703 store
1704 .as_context_mut()
1705 .new_transmit(TransmitKind::Stream, producer)?,
1706 ))
1707 }
1708
1709 pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1710 Self {
1711 id,
1712 _phantom: PhantomData,
1713 }
1714 }
1715
1716 pub(super) fn id(&self) -> TableId<TransmitHandle> {
1717 self.id
1718 }
1719
1720 pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1742 let store = store.as_context_mut();
1743 let state = store.0.concurrent_state_mut();
1744 let id = state.get_mut(self.id).unwrap().state;
1745 if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1746 match try_into(TypeId::of::<V>()) {
1747 Some(result) => {
1748 self.close(store).unwrap();
1749 Ok(*result.downcast::<V>().unwrap())
1750 }
1751 None => Err(self),
1752 }
1753 } else {
1754 Err(self)
1755 }
1756 }
1757
1758 pub fn pipe<S: AsContextMut>(
1768 self,
1769 mut store: S,
1770 consumer: impl StreamConsumer<S::Data, Item = T>,
1771 ) -> Result<()>
1772 where
1773 T: 'static,
1774 {
1775 store
1776 .as_context_mut()
1777 .set_consumer(self.id, TransmitKind::Stream, consumer)
1778 }
1779
1780 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1782 let id = lift_index_to_stream(cx, ty, index)?;
1783 Ok(Self::new_(id))
1784 }
1785
1786 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1802 stream_close(store.as_context_mut().0, &mut self.id)
1803 }
1804
1805 pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1807 accessor.as_accessor().with(|access| self.close(access))
1808 }
1809
1810 pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1816 where
1817 A: AsAccessor,
1818 {
1819 GuardedStreamReader::new(accessor, self)
1820 }
1821
1822 pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1829 where
1830 T: ComponentType + 'static,
1831 {
1832 StreamAny::try_from_stream_reader(store, self)
1833 }
1834
1835 pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1842 where
1843 T: ComponentType + 'static,
1844 {
1845 stream.try_into_stream_reader()
1846 }
1847}
1848
1849impl<T> fmt::Debug for StreamReader<T> {
1850 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1851 f.debug_struct("StreamReader")
1852 .field("id", &self.id)
1853 .finish()
1854 }
1855}
1856
1857pub(super) fn stream_close(
1858 store: &mut StoreOpaque,
1859 id: &mut TableId<TransmitHandle>,
1860) -> Result<()> {
1861 let id = mem::replace(id, TableId::new(u32::MAX));
1862 store.host_drop_reader(id, TransmitKind::Stream)
1863}
1864
1865pub(super) fn lift_index_to_stream(
1867 cx: &mut LiftContext<'_>,
1868 ty: InterfaceType,
1869 index: u32,
1870) -> Result<TableId<TransmitHandle>> {
1871 match ty {
1872 InterfaceType::Stream(src) => {
1873 let handle_table = cx
1874 .instance_mut()
1875 .table_for_transmit(TransmitIndex::Stream(src));
1876 let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1877 if is_done {
1878 bail!("cannot lift stream after being notified that the writable end dropped");
1879 }
1880 let id = TableId::<TransmitHandle>::new(rep);
1881 cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1882 Ok(id)
1883 }
1884 _ => func::bad_type_info(),
1885 }
1886}
1887
1888pub(super) fn lower_stream_to_index<U>(
1890 id: TableId<TransmitHandle>,
1891 cx: &mut LowerContext<'_, U>,
1892 ty: InterfaceType,
1893) -> Result<u32> {
1894 match ty {
1895 InterfaceType::Stream(dst) => {
1896 let concurrent_state = cx.store.0.concurrent_state_mut();
1897 let state = concurrent_state.get_mut(id)?.state;
1898 let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1899
1900 let handle = cx
1901 .instance_mut()
1902 .table_for_transmit(TransmitIndex::Stream(dst))
1903 .stream_insert_read(dst, rep)?;
1904
1905 cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1906
1907 Ok(handle)
1908 }
1909 _ => func::bad_type_info(),
1910 }
1911}
1912
1913unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1916 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1917
1918 type Lower = <u32 as func::ComponentType>::Lower;
1919
1920 fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1921 match ty {
1922 InterfaceType::Stream(ty) => {
1923 let ty = types.types[*ty].ty;
1924 types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1925 }
1926 other => bail!("expected `stream`, found `{}`", func::desc(other)),
1927 }
1928 }
1929}
1930
1931unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1933 fn linear_lower_to_flat<U>(
1934 &self,
1935 cx: &mut LowerContext<'_, U>,
1936 ty: InterfaceType,
1937 dst: &mut MaybeUninit<Self::Lower>,
1938 ) -> Result<()> {
1939 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1940 }
1941
1942 fn linear_lower_to_memory<U>(
1943 &self,
1944 cx: &mut LowerContext<'_, U>,
1945 ty: InterfaceType,
1946 offset: usize,
1947 ) -> Result<()> {
1948 lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1949 cx,
1950 InterfaceType::U32,
1951 offset,
1952 )
1953 }
1954}
1955
1956unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1958 fn linear_lift_from_flat(
1959 cx: &mut LiftContext<'_>,
1960 ty: InterfaceType,
1961 src: &Self::Lower,
1962 ) -> Result<Self> {
1963 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1964 Self::lift_from_index(cx, ty, index)
1965 }
1966
1967 fn linear_lift_from_memory(
1968 cx: &mut LiftContext<'_>,
1969 ty: InterfaceType,
1970 bytes: &[u8],
1971 ) -> Result<Self> {
1972 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1973 Self::lift_from_index(cx, ty, index)
1974 }
1975}
1976
1977pub struct GuardedStreamReader<T, A>
1985where
1986 A: AsAccessor,
1987{
1988 reader: Option<StreamReader<T>>,
1992 accessor: A,
1993}
1994
1995impl<T, A> GuardedStreamReader<T, A>
1996where
1997 A: AsAccessor,
1998{
1999 pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
2008 assert!(
2009 accessor
2010 .as_accessor()
2011 .with(|a| a.as_context().0.concurrency_support())
2012 );
2013 Self {
2014 reader: Some(reader),
2015 accessor,
2016 }
2017 }
2018
2019 pub fn into_stream(self) -> StreamReader<T> {
2022 self.into()
2023 }
2024}
2025
2026impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
2027where
2028 A: AsAccessor,
2029{
2030 fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
2031 guard.reader.take().unwrap()
2032 }
2033}
2034
2035impl<T, A> Drop for GuardedStreamReader<T, A>
2036where
2037 A: AsAccessor,
2038{
2039 fn drop(&mut self) {
2040 if let Some(reader) = &mut self.reader {
2041 let result = reader.close_with(&self.accessor);
2044 debug_assert!(result.is_ok());
2045 }
2046 }
2047}
2048
2049pub struct ErrorContext {
2051 rep: u32,
2052}
2053
2054impl ErrorContext {
2055 pub(crate) fn new(rep: u32) -> Self {
2056 Self { rep }
2057 }
2058
2059 pub fn into_val(self) -> Val {
2061 Val::ErrorContext(ErrorContextAny(self.rep))
2062 }
2063
2064 pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
2066 let Val::ErrorContext(ErrorContextAny(rep)) = value else {
2067 bail!("expected `error-context`; got `{}`", value.desc());
2068 };
2069 Ok(Self::new(*rep))
2070 }
2071
2072 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
2073 match ty {
2074 InterfaceType::ErrorContext(src) => {
2075 let rep = cx
2076 .instance_mut()
2077 .table_for_error_context(src)
2078 .error_context_rep(index)?;
2079
2080 Ok(Self { rep })
2081 }
2082 _ => func::bad_type_info(),
2083 }
2084 }
2085}
2086
2087pub(crate) fn lower_error_context_to_index<U>(
2088 rep: u32,
2089 cx: &mut LowerContext<'_, U>,
2090 ty: InterfaceType,
2091) -> Result<u32> {
2092 match ty {
2093 InterfaceType::ErrorContext(dst) => {
2094 let tbl = cx.instance_mut().table_for_error_context(dst);
2095 tbl.error_context_insert(rep)
2096 }
2097 _ => func::bad_type_info(),
2098 }
2099}
2100unsafe impl func::ComponentType for ErrorContext {
2103 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2104
2105 type Lower = <u32 as func::ComponentType>::Lower;
2106
2107 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2108 match ty {
2109 InterfaceType::ErrorContext(_) => Ok(()),
2110 other => bail!("expected `error`, found `{}`", func::desc(other)),
2111 }
2112 }
2113}
2114
2115unsafe impl func::Lower for ErrorContext {
2117 fn linear_lower_to_flat<T>(
2118 &self,
2119 cx: &mut LowerContext<'_, T>,
2120 ty: InterfaceType,
2121 dst: &mut MaybeUninit<Self::Lower>,
2122 ) -> Result<()> {
2123 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2124 cx,
2125 InterfaceType::U32,
2126 dst,
2127 )
2128 }
2129
2130 fn linear_lower_to_memory<T>(
2131 &self,
2132 cx: &mut LowerContext<'_, T>,
2133 ty: InterfaceType,
2134 offset: usize,
2135 ) -> Result<()> {
2136 lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2137 cx,
2138 InterfaceType::U32,
2139 offset,
2140 )
2141 }
2142}
2143
2144unsafe impl func::Lift for ErrorContext {
2146 fn linear_lift_from_flat(
2147 cx: &mut LiftContext<'_>,
2148 ty: InterfaceType,
2149 src: &Self::Lower,
2150 ) -> Result<Self> {
2151 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2152 Self::lift_from_index(cx, ty, index)
2153 }
2154
2155 fn linear_lift_from_memory(
2156 cx: &mut LiftContext<'_>,
2157 ty: InterfaceType,
2158 bytes: &[u8],
2159 ) -> Result<Self> {
2160 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2161 Self::lift_from_index(cx, ty, index)
2162 }
2163}
2164
2165pub(super) struct TransmitHandle {
2167 pub(super) common: WaitableCommon,
2168 state: TableId<TransmitState>,
2170}
2171
2172impl TransmitHandle {
2173 fn new(state: TableId<TransmitState>) -> Self {
2174 Self {
2175 common: WaitableCommon::default(),
2176 state,
2177 }
2178 }
2179}
2180
2181impl TableDebug for TransmitHandle {
2182 fn type_name() -> &'static str {
2183 "TransmitHandle"
2184 }
2185}
2186
2187struct TransmitState {
2189 write_handle: TableId<TransmitHandle>,
2191 read_handle: TableId<TransmitHandle>,
2193 write: WriteState,
2195 read: ReadState,
2197 done: bool,
2199 pub(super) origin: TransmitOrigin,
2202}
2203
2204#[derive(Copy, Clone)]
2205pub(super) enum TransmitOrigin {
2206 Host,
2207 GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2208 GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2209}
2210
2211impl TransmitState {
2212 fn new(origin: TransmitOrigin) -> Self {
2213 Self {
2214 write_handle: TableId::new(u32::MAX),
2215 read_handle: TableId::new(u32::MAX),
2216 read: ReadState::Open,
2217 write: WriteState::Open,
2218 done: false,
2219 origin,
2220 }
2221 }
2222}
2223
2224impl TableDebug for TransmitState {
2225 fn type_name() -> &'static str {
2226 "TransmitState"
2227 }
2228}
2229
2230impl TransmitOrigin {
2231 fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2232 match index {
2233 TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2234 TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2235 }
2236 }
2237}
2238
2239type PollStream = Box<
2240 dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2241>;
2242
2243type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2244
2245enum WriteState {
2247 Open,
2249 GuestReady {
2251 instance: Instance,
2252 caller: RuntimeComponentInstanceIndex,
2253 ty: TransmitIndex,
2254 flat_abi: Option<FlatAbi>,
2255 options: OptionsIndex,
2256 address: usize,
2257 count: ItemCount,
2258 handle: u32,
2259 },
2260 HostReady {
2262 produce: PollStream,
2263 try_into: TryInto,
2264 guest_offset: ItemCount,
2265 cancel: bool,
2266 cancel_waker: Option<Waker>,
2267 },
2268 Dropped,
2270}
2271
2272impl fmt::Debug for WriteState {
2273 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2274 match self {
2275 Self::Open => f.debug_tuple("Open").finish(),
2276 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2277 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2278 Self::Dropped => f.debug_tuple("Dropped").finish(),
2279 }
2280 }
2281}
2282
2283enum ReadState {
2285 Open,
2287 GuestReady {
2289 ty: TransmitIndex,
2290 caller_instance: RuntimeComponentInstanceIndex,
2291 caller_thread: QualifiedThreadId,
2292 flat_abi: Option<FlatAbi>,
2293 instance: Instance,
2294 options: OptionsIndex,
2295 address: usize,
2296 count: ItemCount,
2297 handle: u32,
2298 },
2299 HostReady {
2301 consume: PollStream,
2302 guest_offset: ItemCount,
2303 cancel: bool,
2304 cancel_waker: Option<Waker>,
2305 },
2306 HostToHost {
2308 accept: Box<
2309 dyn for<'a> Fn(
2310 &'a mut UntypedWriteBuffer<'a>,
2311 )
2312 -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2313 + Send
2314 + Sync,
2315 >,
2316 buffer: Vec<u8>,
2317 limit: usize,
2318 },
2319 Dropped,
2321}
2322
2323impl fmt::Debug for ReadState {
2324 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2325 match self {
2326 Self::Open => f.debug_tuple("Open").finish(),
2327 Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2328 Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2329 Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2330 Self::Dropped => f.debug_tuple("Dropped").finish(),
2331 }
2332 }
2333}
2334
2335fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result<ReturnCode> {
2336 Ok(match state {
2337 StreamResult::Dropped => ReturnCode::Dropped(count),
2338 StreamResult::Completed => ReturnCode::completed(kind, count),
2339 StreamResult::Cancelled => ReturnCode::Cancelled(count),
2340 })
2341}
2342
2343impl StoreOpaque {
2344 fn pipe_from_guest(
2345 &mut self,
2346 kind: TransmitKind,
2347 id: TableId<TransmitState>,
2348 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2349 ) {
2350 let future = async move {
2351 let stream_state = future.await?;
2352 tls::get(|store| {
2353 let state = store.concurrent_state_mut();
2354 let transmit = state.get_mut(id)?;
2355 let ReadState::HostReady {
2356 consume,
2357 guest_offset,
2358 ..
2359 } = mem::replace(&mut transmit.read, ReadState::Open)
2360 else {
2361 bail_bug!("expected ReadState::HostReady")
2362 };
2363 let code = return_code(kind, stream_state, guest_offset)?;
2364 transmit.read = match stream_state {
2365 StreamResult::Dropped => ReadState::Dropped,
2366 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2367 consume,
2368 guest_offset: ItemCount::ZERO,
2369 cancel: false,
2370 cancel_waker: None,
2371 },
2372 };
2373 let WriteState::GuestReady { ty, handle, .. } =
2374 mem::replace(&mut transmit.write, WriteState::Open)
2375 else {
2376 bail_bug!("expected WriteState::GuestReady")
2377 };
2378 state.send_write_result(ty, id, handle, code)?;
2379 Ok(())
2380 })
2381 };
2382
2383 self.concurrent_state_mut().push_future(future.boxed());
2384 }
2385
2386 fn pipe_to_guest(
2387 &mut self,
2388 kind: TransmitKind,
2389 id: TableId<TransmitState>,
2390 future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2391 ) {
2392 let future = async move {
2393 let stream_state = future.await?;
2394 tls::get(|store| {
2395 let state = store.concurrent_state_mut();
2396 let transmit = state.get_mut(id)?;
2397 let WriteState::HostReady {
2398 produce,
2399 try_into,
2400 guest_offset,
2401 ..
2402 } = mem::replace(&mut transmit.write, WriteState::Open)
2403 else {
2404 bail_bug!("expected WriteState::HostReady")
2405 };
2406 let code = return_code(kind, stream_state, guest_offset)?;
2407 transmit.write = match stream_state {
2408 StreamResult::Dropped => WriteState::Dropped,
2409 StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2410 produce,
2411 try_into,
2412 guest_offset: ItemCount::ZERO,
2413 cancel: false,
2414 cancel_waker: None,
2415 },
2416 };
2417 let ReadState::GuestReady { ty, handle, .. } =
2418 mem::replace(&mut transmit.read, ReadState::Open)
2419 else {
2420 bail_bug!("expected ReadState::GuestReady")
2421 };
2422 state.send_read_result(ty, id, handle, code)?;
2423 Ok(())
2424 })
2425 };
2426
2427 self.concurrent_state_mut().push_future(future.boxed());
2428 }
2429
2430 fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2432 let state = self.concurrent_state_mut();
2433 let transmit_id = state.get_mut(id)?.state;
2434 let transmit = state
2435 .get_mut(transmit_id)
2436 .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2437 log::trace!(
2438 "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2439 transmit.read,
2440 transmit.write
2441 );
2442
2443 transmit.read = ReadState::Dropped;
2444
2445 let new_state = if let WriteState::Dropped = &transmit.write {
2448 WriteState::Dropped
2449 } else {
2450 WriteState::Open
2451 };
2452
2453 let write_handle = transmit.write_handle;
2454
2455 match mem::replace(&mut transmit.write, new_state) {
2456 WriteState::GuestReady { ty, handle, .. } => {
2459 state.update_event(
2460 write_handle.rep(),
2461 match ty {
2462 TransmitIndex::Future(ty) => Event::FutureWrite {
2463 code: ReturnCode::Dropped(ItemCount::ZERO),
2464 pending: Some((ty, handle)),
2465 },
2466 TransmitIndex::Stream(ty) => Event::StreamWrite {
2467 code: ReturnCode::Dropped(ItemCount::ZERO),
2468 pending: Some((ty, handle)),
2469 },
2470 },
2471 )?;
2472 }
2473
2474 WriteState::HostReady { .. } => {}
2475
2476 WriteState::Open => {
2477 state.update_event(
2478 write_handle.rep(),
2479 match kind {
2480 TransmitKind::Future => Event::FutureWrite {
2481 code: ReturnCode::Dropped(ItemCount::ZERO),
2482 pending: None,
2483 },
2484 TransmitKind::Stream => Event::StreamWrite {
2485 code: ReturnCode::Dropped(ItemCount::ZERO),
2486 pending: None,
2487 },
2488 },
2489 )?;
2490 }
2491
2492 WriteState::Dropped => {
2493 log::trace!("host_drop_reader delete {transmit_id:?}");
2494 state.delete_transmit(transmit_id)?;
2495 }
2496 }
2497 Ok(())
2498 }
2499
2500 fn host_drop_writer(
2502 &mut self,
2503 id: TableId<TransmitHandle>,
2504 on_drop_open: Option<fn() -> Result<()>>,
2505 ) -> Result<()> {
2506 let state = self.concurrent_state_mut();
2507 let transmit_id = state.get_mut(id)?.state;
2508 let transmit = state
2509 .get_mut(transmit_id)
2510 .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2511 log::trace!(
2512 "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2513 transmit.read,
2514 transmit.write
2515 );
2516
2517 match &mut transmit.write {
2519 WriteState::GuestReady { .. } => {
2520 bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2521 }
2522 WriteState::HostReady { .. } => {}
2523 v @ WriteState::Open => {
2524 if let (Some(on_drop_open), false) = (
2525 on_drop_open,
2526 transmit.done || matches!(transmit.read, ReadState::Dropped),
2527 ) {
2528 on_drop_open()?;
2529 } else {
2530 *v = WriteState::Dropped;
2531 }
2532 }
2533 WriteState::Dropped => bail_bug!("write state is already dropped"),
2534 }
2535
2536 let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2537
2538 let new_state = if let ReadState::Dropped = &transmit.read {
2544 ReadState::Dropped
2545 } else {
2546 ReadState::Open
2547 };
2548
2549 let read_handle = transmit.read_handle;
2550
2551 match mem::replace(&mut transmit.read, new_state) {
2553 ReadState::GuestReady { ty, handle, .. } => {
2557 self.concurrent_state_mut().update_event(
2559 read_handle.rep(),
2560 match ty {
2561 TransmitIndex::Future(ty) => Event::FutureRead {
2562 code: ReturnCode::Dropped(ItemCount::ZERO),
2563 pending: Some((ty, handle)),
2564 },
2565 TransmitIndex::Stream(ty) => Event::StreamRead {
2566 code: ReturnCode::Dropped(ItemCount::ZERO),
2567 pending: Some((ty, handle)),
2568 },
2569 },
2570 )?;
2571 }
2572
2573 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2574
2575 ReadState::Open => {
2577 self.concurrent_state_mut().update_event(
2578 read_handle.rep(),
2579 match on_drop_open {
2580 Some(_) => Event::FutureRead {
2581 code: ReturnCode::Dropped(ItemCount::ZERO),
2582 pending: None,
2583 },
2584 None => Event::StreamRead {
2585 code: ReturnCode::Dropped(ItemCount::ZERO),
2586 pending: None,
2587 },
2588 },
2589 )?;
2590 }
2591
2592 ReadState::Dropped => {
2595 log::trace!("host_drop_writer delete {transmit_id:?}");
2596 self.concurrent_state_mut().delete_transmit(transmit_id)?;
2597 }
2598 }
2599 Ok(())
2600 }
2601
2602 pub(super) fn transmit_origin(
2603 &mut self,
2604 id: TableId<TransmitHandle>,
2605 ) -> Result<TransmitOrigin> {
2606 let state = self.concurrent_state_mut();
2607 let state_id = state.get_mut(id)?.state;
2608 Ok(state.get_mut(state_id)?.origin)
2609 }
2610}
2611
2612impl<T> StoreContextMut<'_, T> {
2613 fn new_transmit<P: StreamProducer<T>>(
2614 mut self,
2615 kind: TransmitKind,
2616 producer: P,
2617 ) -> Result<TableId<TransmitHandle>>
2618 where
2619 P::Item: func::Lower,
2620 {
2621 let token = StoreToken::new(self.as_context_mut());
2622 let state = self.0.concurrent_state_mut();
2623 let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2624 let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2625 let id = state.get_mut(read)?.state;
2626 let mut dropped = false;
2627 let produce = Box::new({
2628 let producer = producer.clone();
2629 move || {
2630 let producer = producer.clone();
2631 async move {
2632 let mut state = producer.take()?;
2633 let (mine, buffer) = &mut *state;
2634
2635 let (result, cancelled) = if buffer.remaining().is_empty() {
2636 future::poll_fn(|cx| {
2637 tls::get(|store| {
2638 let transmit = store.concurrent_state_mut().get_mut(id)?;
2639
2640 let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2641 bail_bug!("expected WriteState::HostReady")
2642 };
2643
2644 let mut host_buffer =
2645 if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2646 Some(Cursor::new(mem::take(buffer)))
2647 } else {
2648 None
2649 };
2650
2651 let poll = mine.as_mut().poll_produce(
2652 cx,
2653 token.as_context_mut(store),
2654 Destination {
2655 id,
2656 buffer,
2657 host_buffer: host_buffer.as_mut(),
2658 _phantom: PhantomData,
2659 },
2660 cancel,
2661 );
2662
2663 let transmit = store.concurrent_state_mut().get_mut(id)?;
2664
2665 let host_offset = if let (
2666 Some(host_buffer),
2667 ReadState::HostToHost { buffer, limit, .. },
2668 ) = (host_buffer, &mut transmit.read)
2669 {
2670 *limit = usize::try_from(host_buffer.position())?;
2671 *buffer = host_buffer.into_inner();
2672 *limit
2673 } else {
2674 0
2675 };
2676
2677 {
2678 let WriteState::HostReady {
2679 guest_offset,
2680 cancel,
2681 cancel_waker,
2682 ..
2683 } = &mut transmit.write
2684 else {
2685 bail_bug!("expected WriteState::HostReady")
2686 };
2687
2688 if poll.is_pending() {
2689 if !buffer.remaining().is_empty()
2690 || *guest_offset > 0
2691 || host_offset > 0
2692 {
2693 bail!(
2694 "StreamProducer::poll_produce returned Poll::Pending \
2695 after producing at least one item"
2696 )
2697 }
2698 *cancel_waker = Some(cx.waker().clone());
2699 } else {
2700 *cancel_waker = None;
2701 *cancel = false;
2702 }
2703 }
2704
2705 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2706 })?
2707 })
2708 .await?
2709 } else {
2710 (StreamResult::Completed, false)
2711 };
2712
2713 let (guest_offset, host_offset, count) = tls::get(|store| {
2714 let transmit = store.concurrent_state_mut().get_mut(id)?;
2715 let (count, host_offset) = match &transmit.read {
2716 &ReadState::GuestReady { count, .. } => (count.as_u32(), 0),
2717 &ReadState::HostToHost { limit, .. } => (1, limit),
2718 _ => bail_bug!("invalid read state"),
2719 };
2720 let guest_offset = match &transmit.write {
2721 &WriteState::HostReady { guest_offset, .. } => guest_offset,
2722 _ => bail_bug!("invalid write state"),
2723 };
2724 Ok((guest_offset, host_offset, count))
2725 })?;
2726
2727 match result {
2728 StreamResult::Completed => {
2729 if count > 1
2730 && buffer.remaining().is_empty()
2731 && guest_offset == 0
2732 && host_offset == 0
2733 {
2734 bail!(
2735 "StreamProducer::poll_produce returned StreamResult::Completed \
2736 without producing any items"
2737 );
2738 }
2739 }
2740 StreamResult::Cancelled => {
2741 if !cancelled {
2742 bail!(
2743 "StreamProducer::poll_produce returned StreamResult::Cancelled \
2744 without being given a `finish` parameter value of true"
2745 );
2746 }
2747 }
2748 StreamResult::Dropped => {
2749 dropped = true;
2750 }
2751 }
2752
2753 let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2754
2755 drop(state);
2756
2757 if write_buffer {
2758 write(token, id, producer.clone(), kind).await?;
2759 }
2760
2761 Ok(if dropped {
2762 if producer.with(|p| p.1.remaining().is_empty())? {
2763 StreamResult::Dropped
2764 } else {
2765 StreamResult::Completed
2766 }
2767 } else {
2768 result
2769 })
2770 }
2771 .boxed()
2772 }
2773 });
2774 let try_into = Box::new(move |ty| {
2775 let (mine, buffer) = producer.try_lock().ok()?.take()?;
2776 match P::try_into(mine, ty) {
2777 Ok(value) => Some(value),
2778 Err(mine) => {
2779 *producer.try_lock().ok()? = Some((mine, buffer));
2780 None
2781 }
2782 }
2783 });
2784 state.get_mut(id)?.write = WriteState::HostReady {
2785 produce,
2786 try_into,
2787 guest_offset: ItemCount::ZERO,
2788 cancel: false,
2789 cancel_waker: None,
2790 };
2791 Ok(read)
2792 }
2793
2794 fn set_consumer<C: StreamConsumer<T>>(
2795 mut self,
2796 id: TableId<TransmitHandle>,
2797 kind: TransmitKind,
2798 consumer: C,
2799 ) -> Result<()> {
2800 let token = StoreToken::new(self.as_context_mut());
2801 let state = self.0.concurrent_state_mut();
2802 let id = state.get_mut(id)?.state;
2803 let transmit = state.get_mut(id)?;
2804 let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2805 let consume_with_buffer = {
2806 let consumer = consumer.clone();
2807 async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2808 let mut mine = consumer.take()?;
2809
2810 let host_buffer_remaining_before =
2811 host_buffer.as_deref_mut().map(|v| v.remaining().len());
2812
2813 let (result, cancelled) = future::poll_fn(|cx| {
2814 tls::get(|store| {
2815 let cancel = match &store.concurrent_state_mut().get_mut(id)?.read {
2816 &ReadState::HostReady { cancel, .. } => cancel,
2817 ReadState::Open => false,
2818 _ => bail_bug!("unexpected read state"),
2819 };
2820
2821 let poll = mine.as_mut().poll_consume(
2822 cx,
2823 token.as_context_mut(store),
2824 Source {
2825 id,
2826 host_buffer: host_buffer.as_deref_mut(),
2827 },
2828 cancel,
2829 );
2830
2831 if let ReadState::HostReady {
2832 cancel_waker,
2833 cancel,
2834 ..
2835 } = &mut store.concurrent_state_mut().get_mut(id)?.read
2836 {
2837 if poll.is_pending() {
2838 *cancel_waker = Some(cx.waker().clone());
2839 } else {
2840 *cancel_waker = None;
2841 *cancel = false;
2842 }
2843 }
2844
2845 Ok(poll.map(|v| v.map(|result| (result, cancel))))
2846 })?
2847 })
2848 .await?;
2849
2850 let (guest_offset, count) = tls::get(|store| {
2851 let transmit = store.concurrent_state_mut().get_mut(id)?;
2852 Ok((
2853 match &transmit.read {
2854 &ReadState::HostReady { guest_offset, .. } => guest_offset,
2855 ReadState::Open => ItemCount::ZERO,
2856 _ => bail_bug!("invalid read state"),
2857 },
2858 match &transmit.write {
2859 WriteState::GuestReady { count, .. } => count.as_usize(),
2860 WriteState::HostReady { .. } => match host_buffer_remaining_before {
2861 Some(n) => n,
2862 None => bail_bug!("host_buffer_remaining_before should be set"),
2863 },
2864 _ => bail_bug!("invalid write state"),
2865 },
2866 ))
2867 })?;
2868
2869 match result {
2870 StreamResult::Completed => {
2871 if count > 0
2872 && guest_offset == 0
2873 && host_buffer_remaining_before
2874 .zip(host_buffer.map(|v| v.remaining().len()))
2875 .map(|(before, after)| before == after)
2876 .unwrap_or(false)
2877 {
2878 bail!(
2879 "StreamConsumer::poll_consume returned StreamResult::Completed \
2880 without consuming any items"
2881 );
2882 }
2883
2884 if let TransmitKind::Future = kind {
2885 tls::get(|store| {
2886 store.concurrent_state_mut().get_mut(id)?.done = true;
2887 crate::error::Ok(())
2888 })?;
2889 }
2890 }
2891 StreamResult::Cancelled => {
2892 if !cancelled {
2893 bail!(
2894 "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2895 without being given a `finish` parameter value of true"
2896 );
2897 }
2898 }
2899 StreamResult::Dropped => {}
2900 }
2901
2902 Ok(result)
2903 }
2904 };
2905 let consume = {
2906 let consume = consume_with_buffer.clone();
2907 Box::new(move || {
2908 let consume = consume.clone();
2909 async move { consume(None).await }.boxed()
2910 })
2911 };
2912
2913 match &transmit.write {
2914 WriteState::Open => {
2915 transmit.read = ReadState::HostReady {
2916 consume,
2917 guest_offset: ItemCount::ZERO,
2918 cancel: false,
2919 cancel_waker: None,
2920 };
2921 }
2922 &WriteState::GuestReady { .. } => {
2923 let future = consume();
2924 transmit.read = ReadState::HostReady {
2925 consume,
2926 guest_offset: ItemCount::ZERO,
2927 cancel: false,
2928 cancel_waker: None,
2929 };
2930 self.0.pipe_from_guest(kind, id, future);
2931 }
2932 WriteState::HostReady { .. } => {
2933 let WriteState::HostReady { produce, .. } = mem::replace(
2934 &mut transmit.write,
2935 WriteState::HostReady {
2936 produce: Box::new(|| {
2937 Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2938 }),
2939 try_into: Box::new(|_| None),
2940 guest_offset: ItemCount::ZERO,
2941 cancel: false,
2942 cancel_waker: None,
2943 },
2944 ) else {
2945 bail_bug!("expected WriteState::HostReady")
2946 };
2947
2948 transmit.read = ReadState::HostToHost {
2949 accept: Box::new(move |input| {
2950 let consume = consume_with_buffer.clone();
2951 async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2952 }),
2953 buffer: Vec::new(),
2954 limit: 0,
2955 };
2956
2957 let future = async move {
2958 loop {
2959 if tls::get(|store| {
2960 crate::error::Ok(matches!(
2961 store.concurrent_state_mut().get_mut(id)?.read,
2962 ReadState::Dropped
2963 ))
2964 })? {
2965 break Ok(());
2966 }
2967
2968 match produce().await? {
2969 StreamResult::Completed | StreamResult::Cancelled => {}
2970 StreamResult::Dropped => break Ok(()),
2971 }
2972
2973 if let TransmitKind::Future = kind {
2974 break Ok(());
2975 }
2976 }
2977 }
2978 .map(move |result| {
2979 tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2980 result
2981 });
2982
2983 state.push_future(Box::pin(future));
2984 }
2985 WriteState::Dropped => {
2986 let reader = transmit.read_handle;
2987 self.0.host_drop_reader(reader, kind)?;
2988 }
2989 }
2990 Ok(())
2991 }
2992}
2993
2994async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2995 token: StoreToken<D>,
2996 id: TableId<TransmitState>,
2997 pair: Arc<LockedState<(P, B)>>,
2998 kind: TransmitKind,
2999) -> Result<()> {
3000 let (read, guest_offset) = tls::get(|store| {
3001 let transmit = store.concurrent_state_mut().get_mut(id)?;
3002
3003 let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
3004 Some(guest_offset)
3005 } else {
3006 None
3007 };
3008
3009 crate::error::Ok((
3010 mem::replace(&mut transmit.read, ReadState::Open),
3011 guest_offset,
3012 ))
3013 })?;
3014
3015 match read {
3016 ReadState::GuestReady {
3017 ty,
3018 flat_abi,
3019 options,
3020 address,
3021 count,
3022 handle,
3023 instance,
3024 caller_instance,
3025 caller_thread,
3026 } => {
3027 let guest_offset = match guest_offset {
3028 Some(i) => i,
3029 None => bail_bug!("guest_offset should be present if ready"),
3030 };
3031
3032 if let TransmitKind::Future = kind {
3033 tls::get(|store| {
3034 store.concurrent_state_mut().get_mut(id)?.done = true;
3035 crate::error::Ok(())
3036 })?;
3037 }
3038
3039 let old_remaining = pair.with(|p| p.1.remaining().len())?;
3040 let accept = {
3041 let pair = pair.clone();
3042 move |mut store: StoreContextMut<D>| {
3043 let mut state = pair.take()?;
3044 lower::<T, B, D>(
3045 store.as_context_mut(),
3046 instance,
3047 caller_thread,
3048 options,
3049 ty,
3050 address + (T::SIZE32 * guest_offset.as_usize()),
3051 count.as_usize() - guest_offset.as_usize(),
3052 &mut state.1,
3053 )?;
3054 crate::error::Ok(())
3055 }
3056 };
3057
3058 if guest_offset < count {
3059 if T::MAY_REQUIRE_REALLOC {
3060 let (tx, rx) = oneshot::channel();
3065 tls::get(move |store| {
3066 store
3067 .concurrent_state_mut()
3068 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3069 move |store| {
3070 _ = tx.send(accept(token.as_context_mut(store))?);
3071 Ok(())
3072 },
3073 ))))
3074 });
3075 rx.await?
3076 } else {
3077 tls::get(|store| accept(token.as_context_mut(store)))?
3082 };
3083 }
3084
3085 tls::get(|store| {
3086 let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3087
3088 let transmit = store.concurrent_state_mut().get_mut(id)?;
3089
3090 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3091 bail_bug!("expected WriteState::HostReady")
3092 };
3093
3094 guest_offset.inc(count)?;
3095
3096 transmit.read = ReadState::GuestReady {
3097 ty,
3098 flat_abi,
3099 options,
3100 address,
3101 count: ItemCount::new_usize(count)?,
3102 handle,
3103 instance,
3104 caller_instance,
3105 caller_thread,
3106 };
3107
3108 crate::error::Ok(())
3109 })?;
3110
3111 Ok(())
3112 }
3113
3114 ReadState::HostToHost {
3115 accept,
3116 mut buffer,
3117 limit,
3118 } => {
3119 let mut state = StreamResult::Completed;
3120 let mut position = 0;
3121
3122 while !matches!(state, StreamResult::Dropped) && position < limit {
3123 let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3124 state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3125 (buffer, position, _) = slice_buffer.into_parts();
3126 }
3127
3128 {
3129 let mut pair = pair.take()?;
3130 let (_, buffer) = &mut *pair;
3131
3132 while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3133 state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3134 }
3135 }
3136
3137 tls::get(|store| {
3138 store.concurrent_state_mut().get_mut(id)?.read = match state {
3139 StreamResult::Dropped => ReadState::Dropped,
3140 StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3141 accept,
3142 buffer,
3143 limit: 0,
3144 },
3145 };
3146
3147 crate::error::Ok(())
3148 })?;
3149 Ok(())
3150 }
3151
3152 _ => bail_bug!("unexpected read state"),
3153 }
3154}
3155
3156impl Instance {
3157 fn consume(
3160 self,
3161 store: &mut dyn VMStore,
3162 kind: TransmitKind,
3163 transmit_id: TableId<TransmitState>,
3164 consume: PollStream,
3165 guest_offset: ItemCount,
3166 cancel: bool,
3167 ) -> Result<ReturnCode> {
3168 let mut future = consume();
3169 store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
3170 consume,
3171 guest_offset,
3172 cancel,
3173 cancel_waker: None,
3174 };
3175 let poll = tls::set(store, || {
3176 future
3177 .as_mut()
3178 .poll(&mut Context::from_waker(&Waker::noop()))
3179 });
3180
3181 Ok(match poll {
3182 Poll::Ready(state) => {
3183 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3184 let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3185 bail_bug!("expected ReadState::HostReady")
3186 };
3187 let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3188 transmit.write = WriteState::Open;
3189 code
3190 }
3191 Poll::Pending => {
3192 store.pipe_from_guest(kind, transmit_id, future);
3193 ReturnCode::Blocked
3194 }
3195 })
3196 }
3197
3198 fn produce(
3201 self,
3202 store: &mut dyn VMStore,
3203 kind: TransmitKind,
3204 transmit_id: TableId<TransmitState>,
3205 produce: PollStream,
3206 try_into: TryInto,
3207 guest_offset: ItemCount,
3208 cancel: bool,
3209 ) -> Result<ReturnCode> {
3210 let mut future = produce();
3211 store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3212 produce,
3213 try_into,
3214 guest_offset,
3215 cancel,
3216 cancel_waker: None,
3217 };
3218 let poll = tls::set(store, || {
3219 future
3220 .as_mut()
3221 .poll(&mut Context::from_waker(&Waker::noop()))
3222 });
3223
3224 Ok(match poll {
3225 Poll::Ready(state) => {
3226 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3227 let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3228 bail_bug!("expected WriteState::HostReady")
3229 };
3230 let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3231 transmit.read = ReadState::Open;
3232 code
3233 }
3234 Poll::Pending => {
3235 store.pipe_to_guest(kind, transmit_id, future);
3236 ReturnCode::Blocked
3237 }
3238 })
3239 }
3240
3241 pub(super) fn guest_drop_writable(
3243 self,
3244 store: &mut StoreOpaque,
3245 ty: TransmitIndex,
3246 writer: u32,
3247 ) -> Result<()> {
3248 let table = self.id().get_mut(store).table_for_transmit(ty);
3249 let transmit_rep = match ty {
3250 TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3251 TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3252 };
3253
3254 let id = TableId::<TransmitHandle>::new(transmit_rep);
3255 log::trace!("guest_drop_writable: drop writer {id:?}");
3256 match ty {
3257 TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3258 TransmitIndex::Future(_) => store.host_drop_writer(
3259 id,
3260 Some(|| {
3261 Err(format_err!(
3262 "cannot drop future write end without first writing a value"
3263 ))
3264 }),
3265 ),
3266 }
3267 }
3268
3269 fn copy<T: 'static>(
3272 store: StoreContextMut<T>,
3273 flat_abi: Option<FlatAbi>,
3274 write_instance: Instance,
3275 write_caller_instance: RuntimeComponentInstanceIndex,
3276 write_ty: TransmitIndex,
3277 write_options: OptionsIndex,
3278 write_address: usize,
3279 read_instance: Instance,
3280 read_caller_instance: RuntimeComponentInstanceIndex,
3281 read_caller_thread: QualifiedThreadId,
3282 read_ty: TransmitIndex,
3283 read_options: OptionsIndex,
3284 read_address: usize,
3285 count: ItemCount,
3286 rep: u32,
3287 ) -> Result<()> {
3288 let (write_component, store) = write_instance.component_and_store_mut(store.0);
3289 let (read_component, mut store) = read_instance.component_and_store_mut(store);
3290 let write_types = write_component.types();
3291 let read_types = read_component.types();
3292 let count = count.as_usize();
3293
3294 let write_payload_ty = write_ty.payload(write_types);
3297 let write_abi = match write_payload_ty {
3298 Some(ty) => write_types.canonical_abi(ty),
3299 None => &CanonicalAbiInfo::ZERO,
3300 };
3301 let write_length_in_bytes = match flat_abi {
3302 Some(abi) => usize::try_from(abi.size)? * count,
3303 None => usize::try_from(write_abi.size32)? * count,
3304 };
3305 if write_length_in_bytes > 0 {
3306 if write_address % usize::try_from(write_abi.align32)? != 0 {
3307 bail!("write pointer not aligned");
3308 }
3309 write_instance
3310 .options_memory(store, write_options)
3311 .get(write_address..)
3312 .and_then(|b| b.get(..write_length_in_bytes))
3313 .ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
3314 }
3315
3316 let read_payload_ty = read_ty.payload(read_types);
3317 let read_abi = match read_payload_ty {
3318 Some(ty) => read_types.canonical_abi(ty),
3319 None => &CanonicalAbiInfo::ZERO,
3320 };
3321 let read_length_in_bytes = match flat_abi {
3322 Some(abi) => usize::try_from(abi.size)? * count,
3323 None => usize::try_from(read_abi.size32)? * count,
3324 };
3325 if read_length_in_bytes > 0 {
3326 if read_address % usize::try_from(read_abi.align32)? != 0 {
3327 bail!("read pointer not aligned");
3328 }
3329 read_instance
3330 .options_memory(store, read_options)
3331 .get(read_address..)
3332 .and_then(|b| b.get(..read_length_in_bytes))
3333 .ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
3334 }
3335
3336 if write_caller_instance == read_caller_instance
3337 && !allow_intra_component_read_write(write_payload_ty)
3338 {
3339 bail!(
3340 "cannot read from and write to intra-component future/stream with non-numeric payload"
3341 )
3342 }
3343
3344 match (write_ty, read_ty) {
3345 (TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
3346 if count != 1 {
3347 bail_bug!("futures can only send 1 item");
3348 }
3349
3350 let val = write_payload_ty
3351 .map(|ty| {
3352 let lift = &mut LiftContext::new(store, write_options, write_instance);
3353 let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3354 Val::load(lift, *ty, bytes)
3355 })
3356 .transpose()?;
3357
3358 if let Some(val) = val {
3359 let old_thread = store.set_thread(read_caller_thread)?;
3363 let lower =
3364 &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3365 let ptr = func::validate_inbounds_dynamic(
3366 read_abi,
3367 lower.as_slice_mut(),
3368 &ValRaw::u32(read_address.try_into()?),
3369 )?;
3370 let ty = match read_payload_ty {
3371 Some(ty) => ty,
3372 None => bail_bug!("expected read payload type to be present"),
3373 };
3374 val.store(lower, *ty, ptr)?;
3375 store.set_thread(old_thread)?;
3376 }
3377 }
3378 (TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
3379 if write_length_in_bytes == 0 {
3380 return Ok(());
3381 }
3382 let write_payload_ty = match write_payload_ty {
3383 Some(ty) => ty,
3384 None => bail_bug!("expected write payload type to be present"),
3385 };
3386 let read_payload_ty = match read_payload_ty {
3387 Some(ty) => ty,
3388 None => bail_bug!("expected read payload type to be present"),
3389 };
3390 if flat_abi.is_some() {
3391 let store_opaque = store.store_opaque_mut();
3393
3394 assert_eq!(read_length_in_bytes, write_length_in_bytes);
3395
3396 if read_instance
3397 .options_memory(store_opaque, read_options)
3398 .as_ptr()
3399 == write_instance
3400 .options_memory(store_opaque, write_options)
3401 .as_ptr()
3402 {
3403 let memory = read_instance.options_memory_mut(store_opaque, read_options);
3404 memory.copy_within(
3405 write_address..write_address + write_length_in_bytes,
3406 read_address,
3407 );
3408 } else {
3409 let src = write_instance.options_memory(store_opaque, write_options)
3410 [write_address..][..write_length_in_bytes]
3411 .as_ptr();
3412 let dst = read_instance.options_memory_mut(store_opaque, read_options)
3413 [read_address..][..read_length_in_bytes]
3414 .as_mut_ptr();
3415
3416 unsafe {
3426 src.copy_to_nonoverlapping(dst, write_length_in_bytes);
3427 }
3428 }
3429 } else {
3430 let store_opaque = store.store_opaque_mut();
3431 let lift = &mut LiftContext::new(store_opaque, write_options, write_instance);
3432 let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3433 lift.consume_fuel_array(count, size_of::<Val>())?;
3434
3435 let values = (0..count)
3436 .map(|index| {
3437 let size = usize::try_from(write_abi.size32)?;
3438 Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
3439 })
3440 .collect::<Result<Vec<_>>>()?;
3441
3442 let id = TableId::<TransmitHandle>::new(rep);
3443 log::trace!("copy values {values:?} for {id:?}");
3444
3445 let old_thread = store.set_thread(read_caller_thread)?;
3449 let lower =
3450 &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3451 let mut ptr = read_address;
3452 for value in values {
3453 value.store(lower, *read_payload_ty, ptr)?;
3454 ptr += usize::try_from(read_abi.size32)?;
3455 }
3456 store.set_thread(old_thread)?;
3457 }
3458 }
3459 _ => bail_bug!("mismatched transmit types in copy"),
3460 }
3461
3462 Ok(())
3463 }
3464
3465 fn check_bounds(
3466 self,
3467 store: &StoreOpaque,
3468 options: OptionsIndex,
3469 ty: TransmitIndex,
3470 address: usize,
3471 count: usize,
3472 ) -> Result<()> {
3473 let types = self.id().get(store).component().types();
3474 let size = usize::try_from(
3475 match ty {
3476 TransmitIndex::Future(ty) => types[types[ty].ty]
3477 .payload
3478 .map(|ty| types.canonical_abi(&ty).size32),
3479 TransmitIndex::Stream(ty) => types[types[ty].ty]
3480 .payload
3481 .map(|ty| types.canonical_abi(&ty).size32),
3482 }
3483 .unwrap_or(0),
3484 )?;
3485
3486 if count > 0 && size > 0 {
3487 self.options_memory(store, options)
3488 .get(address..)
3489 .and_then(|b| b.get(..(size * count)))
3490 .map(drop)
3491 .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3492 } else {
3493 Ok(())
3494 }
3495 }
3496
3497 pub(super) fn guest_write<T: 'static>(
3499 self,
3500 mut store: StoreContextMut<T>,
3501 caller: RuntimeComponentInstanceIndex,
3502 ty: TransmitIndex,
3503 options: OptionsIndex,
3504 flat_abi: Option<FlatAbi>,
3505 handle: u32,
3506 address: u32,
3507 count: u32,
3508 ) -> Result<ReturnCode> {
3509 let count = ItemCount::new(count)?;
3510
3511 if !self.options(store.0, options).async_ {
3512 store.0.check_blocking()?;
3516 }
3517
3518 let address = usize::try_from(address)?;
3519 self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3520 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3521 let TransmitLocalState::Write { done } = *state else {
3522 bail!(Trap::ConcurrentFutureStreamOp);
3523 };
3524
3525 if done {
3526 bail!("cannot write after being notified that the readable end dropped");
3527 }
3528
3529 *state = TransmitLocalState::Busy;
3530 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3531 let concurrent_state = store.0.concurrent_state_mut();
3532 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3533 let transmit = concurrent_state.get_mut(transmit_id)?;
3534 log::trace!(
3535 "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3536 transmit.read
3537 );
3538
3539 if transmit.done {
3540 bail!("cannot write to future after previous write succeeded or readable end dropped");
3541 }
3542
3543 let new_state = if let ReadState::Dropped = &transmit.read {
3544 ReadState::Dropped
3545 } else {
3546 ReadState::Open
3547 };
3548
3549 let set_guest_ready = |me: &mut ConcurrentState| {
3550 let transmit = me.get_mut(transmit_id)?;
3551 if !matches!(&transmit.write, WriteState::Open) {
3552 bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3553 }
3554 transmit.write = WriteState::GuestReady {
3555 instance: self,
3556 caller,
3557 ty,
3558 flat_abi,
3559 options,
3560 address,
3561 count,
3562 handle,
3563 };
3564 Ok::<_, crate::Error>(())
3565 };
3566
3567 let mut result = match mem::replace(&mut transmit.read, new_state) {
3568 ReadState::GuestReady {
3569 ty: read_ty,
3570 flat_abi: read_flat_abi,
3571 options: read_options,
3572 address: read_address,
3573 count: read_count,
3574 handle: read_handle,
3575 instance: read_instance,
3576 caller_instance: read_caller_instance,
3577 caller_thread: read_caller_thread,
3578 } => {
3579 if flat_abi != read_flat_abi {
3580 bail_bug!("expected flat ABI calculations to be the same");
3581 }
3582
3583 if let TransmitIndex::Future(_) = ty {
3584 transmit.done = true;
3585 }
3586
3587 let write_complete = count == 0 || read_count > 0;
3609 let read_complete = count > 0;
3610 let read_buffer_remaining = count < read_count;
3611
3612 let read_handle_rep = transmit.read_handle.rep();
3613
3614 let count = count.min(read_count);
3615
3616 Instance::copy(
3617 store.as_context_mut(),
3618 flat_abi,
3619 self,
3620 caller,
3621 ty,
3622 options,
3623 address,
3624 read_instance,
3625 read_caller_instance,
3626 read_caller_thread,
3627 read_ty,
3628 read_options,
3629 read_address,
3630 count,
3631 rep,
3632 )?;
3633
3634 let instance = read_instance.id().get(store.0);
3635 let types = instance.component().types();
3636 let item_size = match read_ty.payload(types) {
3637 Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3638 None => 0,
3639 };
3640 let concurrent_state = store.0.concurrent_state_mut();
3641 if read_complete {
3642 let total = if let Some(Event::StreamRead {
3643 code: ReturnCode::Completed(old_total),
3644 ..
3645 }) = concurrent_state.take_event(read_handle_rep)?
3646 {
3647 count.add(old_total)?
3648 } else {
3649 count
3650 };
3651
3652 let code = ReturnCode::completed(ty.kind(), total);
3653
3654 concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3655 }
3656
3657 if read_buffer_remaining || (count == 0 && read_count == 0) {
3664 let transmit = concurrent_state.get_mut(transmit_id)?;
3665 transmit.read = ReadState::GuestReady {
3666 ty: read_ty,
3667 flat_abi: read_flat_abi,
3668 options: read_options,
3669 address: read_address + (count.as_usize() * item_size),
3670 count: read_count.sub(count)?,
3671 handle: read_handle,
3672 instance: read_instance,
3673 caller_instance: read_caller_instance,
3674 caller_thread: read_caller_thread,
3675 };
3676 }
3677
3678 if write_complete {
3679 ReturnCode::completed(ty.kind(), count)
3680 } else {
3681 set_guest_ready(concurrent_state)?;
3682 ReturnCode::Blocked
3683 }
3684 }
3685
3686 ReadState::HostReady {
3687 consume,
3688 guest_offset,
3689 cancel,
3690 cancel_waker,
3691 } => {
3692 if cancel_waker.is_some() {
3693 bail_bug!("expected cancel_waker to be none");
3694 }
3695 if cancel {
3696 bail_bug!("expected cancel to be false");
3697 }
3698 if guest_offset != 0 {
3699 bail_bug!("expected guest_offset to be 0");
3700 }
3701
3702 if let TransmitIndex::Future(_) = ty {
3703 transmit.done = true;
3704 }
3705
3706 set_guest_ready(concurrent_state)?;
3707 self.consume(
3708 store.0,
3709 ty.kind(),
3710 transmit_id,
3711 consume,
3712 ItemCount::ZERO,
3713 false,
3714 )?
3715 }
3716
3717 ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3718
3719 ReadState::Open => {
3720 set_guest_ready(concurrent_state)?;
3721 ReturnCode::Blocked
3722 }
3723
3724 ReadState::Dropped => {
3725 if let TransmitIndex::Future(_) = ty {
3726 transmit.done = true;
3727 }
3728
3729 ReturnCode::Dropped(ItemCount::ZERO)
3730 }
3731 };
3732
3733 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3734 result = self.wait_for_write(store.0, transmit_handle)?;
3735 }
3736
3737 if result != ReturnCode::Blocked {
3738 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3739 TransmitLocalState::Write {
3740 done: matches!(result, ReturnCode::Dropped(_)),
3741 };
3742 }
3743
3744 log::trace!(
3745 "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3746 );
3747
3748 Ok(result)
3749 }
3750
3751 pub(super) fn guest_read<T: 'static>(
3753 self,
3754 mut store: StoreContextMut<T>,
3755 caller_instance: RuntimeComponentInstanceIndex,
3756 ty: TransmitIndex,
3757 options: OptionsIndex,
3758 flat_abi: Option<FlatAbi>,
3759 handle: u32,
3760 address: u32,
3761 count: u32,
3762 ) -> Result<ReturnCode> {
3763 let count = ItemCount::new(count)?;
3764
3765 if !self.options(store.0, options).async_ {
3766 store.0.check_blocking()?;
3770 }
3771
3772 let address = usize::try_from(address)?;
3773 self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3774 let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3775 let TransmitLocalState::Read { done } = *state else {
3776 bail!(Trap::ConcurrentFutureStreamOp);
3777 };
3778
3779 if done {
3780 bail!("cannot read after being notified that the writable end dropped");
3781 }
3782
3783 *state = TransmitLocalState::Busy;
3784 let transmit_handle = TableId::<TransmitHandle>::new(rep);
3785 let concurrent_state = store.0.concurrent_state_mut();
3786 let caller_thread = concurrent_state.current_guest_thread()?;
3787 let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3788 let transmit = concurrent_state.get_mut(transmit_id)?;
3789 log::trace!(
3790 "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3791 transmit.write
3792 );
3793
3794 if transmit.done {
3795 bail!("cannot read from future after previous read succeeded");
3796 }
3797
3798 let new_state = if let WriteState::Dropped = &transmit.write {
3799 WriteState::Dropped
3800 } else {
3801 WriteState::Open
3802 };
3803
3804 let set_guest_ready = |me: &mut ConcurrentState| {
3805 let transmit = me.get_mut(transmit_id)?;
3806 if !matches!(&transmit.read, ReadState::Open) {
3807 bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3808 }
3809 transmit.read = ReadState::GuestReady {
3810 ty,
3811 flat_abi,
3812 options,
3813 address,
3814 count,
3815 handle,
3816 instance: self,
3817 caller_instance,
3818 caller_thread,
3819 };
3820 Ok::<_, crate::Error>(())
3821 };
3822
3823 let mut result = match mem::replace(&mut transmit.write, new_state) {
3824 WriteState::GuestReady {
3825 instance: write_instance,
3826 ty: write_ty,
3827 flat_abi: write_flat_abi,
3828 options: write_options,
3829 address: write_address,
3830 count: write_count,
3831 handle: write_handle,
3832 caller: write_caller,
3833 } => {
3834 if flat_abi != write_flat_abi {
3835 bail_bug!("expected flat ABI calculations to be the same");
3836 }
3837
3838 if let TransmitIndex::Future(_) = ty {
3839 transmit.done = true;
3840 }
3841
3842 let write_handle_rep = transmit.write_handle.rep();
3843
3844 let write_complete = write_count == 0 || count > 0;
3849 let read_complete = write_count > 0;
3850 let write_buffer_remaining = count < write_count;
3851
3852 let count = count.min(write_count);
3853
3854 Instance::copy(
3855 store.as_context_mut(),
3856 flat_abi,
3857 write_instance,
3858 write_caller,
3859 write_ty,
3860 write_options,
3861 write_address,
3862 self,
3863 caller_instance,
3864 caller_thread,
3865 ty,
3866 options,
3867 address,
3868 count,
3869 rep,
3870 )?;
3871
3872 let instance = write_instance.id().get(store.0);
3873 let types = instance.component().types();
3874 let item_size = match write_ty.payload(types) {
3875 Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3876 None => 0,
3877 };
3878 let concurrent_state = store.0.concurrent_state_mut();
3879
3880 if write_complete {
3881 let total = if let Some(Event::StreamWrite {
3882 code: ReturnCode::Completed(old_total),
3883 ..
3884 }) = concurrent_state.take_event(write_handle_rep)?
3885 {
3886 count.add(old_total)?
3887 } else {
3888 count
3889 };
3890
3891 let code = ReturnCode::completed(ty.kind(), total);
3892
3893 concurrent_state.send_write_result(
3894 write_ty,
3895 transmit_id,
3896 write_handle,
3897 code,
3898 )?;
3899 }
3900
3901 if write_buffer_remaining {
3902 let transmit = concurrent_state.get_mut(transmit_id)?;
3903 transmit.write = WriteState::GuestReady {
3904 instance: write_instance,
3905 caller: write_caller,
3906 ty: write_ty,
3907 flat_abi: write_flat_abi,
3908 options: write_options,
3909 address: write_address + (count.as_usize() * item_size),
3910 count: write_count.sub(count)?,
3911 handle: write_handle,
3912 };
3913 }
3914
3915 if read_complete {
3916 ReturnCode::completed(ty.kind(), count)
3917 } else {
3918 set_guest_ready(concurrent_state)?;
3919 ReturnCode::Blocked
3920 }
3921 }
3922
3923 WriteState::HostReady {
3924 produce,
3925 try_into,
3926 guest_offset,
3927 cancel,
3928 cancel_waker,
3929 } => {
3930 if cancel_waker.is_some() {
3931 bail_bug!("expected cancel_waker to be none");
3932 }
3933 if cancel {
3934 bail_bug!("expected cancel to be false");
3935 }
3936 if guest_offset != 0 {
3937 bail_bug!("expected guest_offset to be 0");
3938 }
3939
3940 set_guest_ready(concurrent_state)?;
3941
3942 let code = self.produce(
3943 store.0,
3944 ty.kind(),
3945 transmit_id,
3946 produce,
3947 try_into,
3948 ItemCount::ZERO,
3949 false,
3950 )?;
3951
3952 if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3953 store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3954 }
3955
3956 code
3957 }
3958
3959 WriteState::Open => {
3960 set_guest_ready(concurrent_state)?;
3961 ReturnCode::Blocked
3962 }
3963
3964 WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO),
3965 };
3966
3967 if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3968 result = self.wait_for_read(store.0, transmit_handle)?;
3969 }
3970
3971 if result != ReturnCode::Blocked {
3972 *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3973 TransmitLocalState::Read {
3974 done: matches!(
3975 (result, ty),
3976 (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3977 ),
3978 };
3979 }
3980
3981 log::trace!(
3982 "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3983 );
3984
3985 Ok(result)
3986 }
3987
3988 fn wait_for_write(
3989 self,
3990 store: &mut StoreOpaque,
3991 handle: TableId<TransmitHandle>,
3992 ) -> Result<ReturnCode> {
3993 let waitable = Waitable::Transmit(handle);
3994 store.wait_for_event(waitable)?;
3995 let event = waitable.take_event(store.concurrent_state_mut())?;
3996 if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3997 event
3998 {
3999 waitable.on_delivery(store, self, event)?;
4000 Ok(code)
4001 } else {
4002 bail_bug!("expected either a stream or future write event")
4003 }
4004 }
4005
4006 fn cancel_write(
4008 self,
4009 store: &mut StoreOpaque,
4010 transmit_id: TableId<TransmitState>,
4011 async_: bool,
4012 ) -> Result<ReturnCode> {
4013 let state = store.concurrent_state_mut();
4014 let transmit = state.get_mut(transmit_id)?;
4015 log::trace!(
4016 "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
4017 transmit.read,
4018 transmit.write
4019 );
4020 let waitable = Waitable::Transmit(transmit.write_handle);
4021
4022 let code = if let Some(event) = waitable.take_event(state)? {
4023 let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
4024 bail_bug!("expected either a stream or future write event")
4025 };
4026 waitable.on_delivery(store, self, event)?;
4027 match (code, event) {
4028 (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
4029 ReturnCode::Cancelled(count)
4030 }
4031 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4032 _ => bail_bug!("unexpected code/event combo"),
4033 }
4034 } else if let ReadState::HostReady {
4035 cancel,
4036 cancel_waker,
4037 ..
4038 } = &mut state.get_mut(transmit_id)?.read
4039 {
4040 *cancel = true;
4041 if let Some(waker) = cancel_waker.take() {
4042 waker.wake();
4043 }
4044
4045 if async_ {
4046 ReturnCode::Blocked
4047 } else {
4048 let handle = store
4049 .concurrent_state_mut()
4050 .get_mut(transmit_id)?
4051 .write_handle;
4052 self.wait_for_write(store, handle)?
4053 }
4054 } else {
4055 ReturnCode::Cancelled(ItemCount::ZERO)
4056 };
4057
4058 if !matches!(code, ReturnCode::Blocked) {
4059 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4060
4061 match &transmit.write {
4062 WriteState::GuestReady { .. } => {
4063 transmit.write = WriteState::Open;
4064 }
4065 WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
4066 WriteState::Open | WriteState::Dropped => {}
4067 }
4068 }
4069
4070 log::trace!("cancelled write {transmit_id:?}: {code:?}");
4071
4072 Ok(code)
4073 }
4074
4075 fn wait_for_read(
4076 self,
4077 store: &mut StoreOpaque,
4078 handle: TableId<TransmitHandle>,
4079 ) -> Result<ReturnCode> {
4080 let waitable = Waitable::Transmit(handle);
4081 store.wait_for_event(waitable)?;
4082 let event = waitable.take_event(store.concurrent_state_mut())?;
4083 if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
4084 event
4085 {
4086 waitable.on_delivery(store, self, event)?;
4087 Ok(code)
4088 } else {
4089 bail_bug!("expected either a stream or future read event")
4090 }
4091 }
4092
4093 fn cancel_read(
4095 self,
4096 store: &mut StoreOpaque,
4097 transmit_id: TableId<TransmitState>,
4098 async_: bool,
4099 ) -> Result<ReturnCode> {
4100 let state = store.concurrent_state_mut();
4101 let transmit = state.get_mut(transmit_id)?;
4102 log::trace!(
4103 "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4104 transmit.read,
4105 transmit.write
4106 );
4107
4108 let waitable = Waitable::Transmit(transmit.read_handle);
4109 let code = if let Some(event) = waitable.take_event(state)? {
4110 let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4111 bail_bug!("expected either a stream or future read event")
4112 };
4113 waitable.on_delivery(store, self, event)?;
4114 match (code, event) {
4115 (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4116 ReturnCode::Cancelled(count)
4117 }
4118 (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4119 _ => bail_bug!("unexpected code/event combo"),
4120 }
4121 } else if let WriteState::HostReady {
4122 cancel,
4123 cancel_waker,
4124 ..
4125 } = &mut state.get_mut(transmit_id)?.write
4126 {
4127 *cancel = true;
4128 if let Some(waker) = cancel_waker.take() {
4129 waker.wake();
4130 }
4131
4132 if async_ {
4133 ReturnCode::Blocked
4134 } else {
4135 let handle = store
4136 .concurrent_state_mut()
4137 .get_mut(transmit_id)?
4138 .read_handle;
4139 self.wait_for_read(store, handle)?
4140 }
4141 } else {
4142 ReturnCode::Cancelled(ItemCount::ZERO)
4143 };
4144
4145 if !matches!(code, ReturnCode::Blocked) {
4146 let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4147
4148 match &transmit.read {
4149 ReadState::GuestReady { .. } => {
4150 transmit.read = ReadState::Open;
4151 }
4152 ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4153 bail_bug!("support host read cancellation")
4154 }
4155 ReadState::Open | ReadState::Dropped => {}
4156 }
4157 }
4158
4159 log::trace!("cancelled read {transmit_id:?}: {code:?}");
4160
4161 Ok(code)
4162 }
4163
4164 fn guest_cancel_write(
4166 self,
4167 store: &mut StoreOpaque,
4168 ty: TransmitIndex,
4169 async_: bool,
4170 writer: u32,
4171 ) -> Result<ReturnCode> {
4172 if !async_ {
4173 store.check_blocking()?;
4177 }
4178
4179 let (rep, state) =
4180 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4181 let id = TableId::<TransmitHandle>::new(rep);
4182 log::trace!("guest cancel write {id:?} (handle {writer})");
4183 match state {
4184 TransmitLocalState::Write { .. } => {
4185 bail!("stream or future write cancelled when no write is pending")
4186 }
4187 TransmitLocalState::Read { .. } => {
4188 bail!("passed read end to `{{stream|future}}.cancel-write`")
4189 }
4190 TransmitLocalState::Busy => {}
4191 }
4192 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4193 let code = self.cancel_write(store, transmit_id, async_)?;
4194 if !matches!(code, ReturnCode::Blocked) {
4195 let state =
4196 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4197 .1;
4198 if let TransmitLocalState::Busy = state {
4199 *state = TransmitLocalState::Write { done: false };
4200 }
4201 }
4202 Ok(code)
4203 }
4204
4205 fn guest_cancel_read(
4207 self,
4208 store: &mut StoreOpaque,
4209 ty: TransmitIndex,
4210 async_: bool,
4211 reader: u32,
4212 ) -> Result<ReturnCode> {
4213 if !async_ {
4214 store.check_blocking()?;
4218 }
4219
4220 let (rep, state) =
4221 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4222 let id = TableId::<TransmitHandle>::new(rep);
4223 log::trace!("guest cancel read {id:?} (handle {reader})");
4224 match state {
4225 TransmitLocalState::Read { .. } => {
4226 bail!("stream or future read cancelled when no read is pending")
4227 }
4228 TransmitLocalState::Write { .. } => {
4229 bail!("passed write end to `{{stream|future}}.cancel-read`")
4230 }
4231 TransmitLocalState::Busy => {}
4232 }
4233 let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4234 let code = self.cancel_read(store, transmit_id, async_)?;
4235 if !matches!(code, ReturnCode::Blocked) {
4236 let state =
4237 get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4238 .1;
4239 if let TransmitLocalState::Busy = state {
4240 *state = TransmitLocalState::Read { done: false };
4241 }
4242 }
4243 Ok(code)
4244 }
4245
4246 fn guest_drop_readable(
4248 self,
4249 store: &mut StoreOpaque,
4250 ty: TransmitIndex,
4251 reader: u32,
4252 ) -> Result<()> {
4253 let table = self.id().get_mut(store).table_for_transmit(ty);
4254 let (rep, _is_done) = match ty {
4255 TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4256 TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4257 };
4258 let kind = match ty {
4259 TransmitIndex::Stream(_) => TransmitKind::Stream,
4260 TransmitIndex::Future(_) => TransmitKind::Future,
4261 };
4262 let id = TableId::<TransmitHandle>::new(rep);
4263 log::trace!("guest_drop_readable: drop reader {id:?}");
4264 store.host_drop_reader(id, kind)
4265 }
4266
4267 pub(crate) fn error_context_new(
4269 self,
4270 store: &mut StoreOpaque,
4271 ty: TypeComponentLocalErrorContextTableIndex,
4272 options: OptionsIndex,
4273 debug_msg_address: u32,
4274 debug_msg_len: u32,
4275 ) -> Result<u32> {
4276 let lift_ctx = &mut LiftContext::new(store, options, self);
4277 let debug_msg = String::linear_lift_from_flat(
4278 lift_ctx,
4279 InterfaceType::String,
4280 &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4281 )?;
4282
4283 let err_ctx = ErrorContextState { debug_msg };
4285 let state = store.concurrent_state_mut();
4286 let table_id = state.push(err_ctx)?;
4287 let global_ref_count_idx =
4288 TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4289
4290 let _ = state
4292 .global_error_context_ref_counts
4293 .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4294
4295 let local_idx = self
4302 .id()
4303 .get_mut(store)
4304 .table_for_error_context(ty)
4305 .error_context_insert(table_id.rep())?;
4306
4307 Ok(local_idx)
4308 }
4309
4310 pub(super) fn error_context_debug_message<T>(
4312 self,
4313 store: StoreContextMut<T>,
4314 ty: TypeComponentLocalErrorContextTableIndex,
4315 options: OptionsIndex,
4316 err_ctx_handle: u32,
4317 debug_msg_address: u32,
4318 ) -> Result<()> {
4319 let handle_table_id_rep = self
4321 .id()
4322 .get_mut(store.0)
4323 .table_for_error_context(ty)
4324 .error_context_rep(err_ctx_handle)?;
4325
4326 let state = store.0.concurrent_state_mut();
4327 let ErrorContextState { debug_msg } =
4329 state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4330 let debug_msg = debug_msg.clone();
4331
4332 let lower_cx = &mut LowerContext::new(store, options, self);
4333 let debug_msg_address = usize::try_from(debug_msg_address)?;
4334 let offset = lower_cx
4340 .as_slice_mut()
4341 .get(debug_msg_address..)
4342 .and_then(|b| b.get(..8))
4343 .map(|_| debug_msg_address)
4344 .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4345 debug_msg
4346 .as_str()
4347 .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4348
4349 Ok(())
4350 }
4351
4352 pub(crate) fn future_cancel_read(
4354 self,
4355 store: &mut StoreOpaque,
4356 ty: TypeFutureTableIndex,
4357 async_: bool,
4358 reader: u32,
4359 ) -> Result<u32> {
4360 self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4361 .map(|v| v.encode())
4362 }
4363
4364 pub(crate) fn future_cancel_write(
4366 self,
4367 store: &mut StoreOpaque,
4368 ty: TypeFutureTableIndex,
4369 async_: bool,
4370 writer: u32,
4371 ) -> Result<u32> {
4372 self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4373 .map(|v| v.encode())
4374 }
4375
4376 pub(crate) fn stream_cancel_read(
4378 self,
4379 store: &mut StoreOpaque,
4380 ty: TypeStreamTableIndex,
4381 async_: bool,
4382 reader: u32,
4383 ) -> Result<u32> {
4384 self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4385 .map(|v| v.encode())
4386 }
4387
4388 pub(crate) fn stream_cancel_write(
4390 self,
4391 store: &mut StoreOpaque,
4392 ty: TypeStreamTableIndex,
4393 async_: bool,
4394 writer: u32,
4395 ) -> Result<u32> {
4396 self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4397 .map(|v| v.encode())
4398 }
4399
4400 pub(crate) fn future_drop_readable(
4402 self,
4403 store: &mut StoreOpaque,
4404 ty: TypeFutureTableIndex,
4405 reader: u32,
4406 ) -> Result<()> {
4407 self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4408 }
4409
4410 pub(crate) fn stream_drop_readable(
4412 self,
4413 store: &mut StoreOpaque,
4414 ty: TypeStreamTableIndex,
4415 reader: u32,
4416 ) -> Result<()> {
4417 self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4418 }
4419
4420 fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4424 let (write, read) = store
4425 .concurrent_state_mut()
4426 .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4427
4428 let table = self.id().get_mut(store).table_for_transmit(ty);
4429 let (read_handle, write_handle) = match ty {
4430 TransmitIndex::Future(ty) => (
4431 table.future_insert_read(ty, read.rep())?,
4432 table.future_insert_write(ty, write.rep())?,
4433 ),
4434 TransmitIndex::Stream(ty) => (
4435 table.stream_insert_read(ty, read.rep())?,
4436 table.stream_insert_write(ty, write.rep())?,
4437 ),
4438 };
4439
4440 let state = store.concurrent_state_mut();
4441 state.get_mut(read)?.common.handle = Some(read_handle);
4442 state.get_mut(write)?.common.handle = Some(write_handle);
4443
4444 Ok(ResourcePair {
4445 write: write_handle,
4446 read: read_handle,
4447 })
4448 }
4449
4450 pub(crate) fn error_context_drop(
4452 self,
4453 store: &mut StoreOpaque,
4454 ty: TypeComponentLocalErrorContextTableIndex,
4455 error_context: u32,
4456 ) -> Result<()> {
4457 let instance = self.id().get_mut(store);
4458
4459 let local_handle_table = instance.table_for_error_context(ty);
4460
4461 let rep = local_handle_table.error_context_drop(error_context)?;
4462
4463 let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4464
4465 let state = store.concurrent_state_mut();
4466 let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4467 .global_error_context_ref_counts
4468 .get_mut(&global_ref_count_idx)
4469 else {
4470 bail_bug!("retrieve concurrent state for error context during drop")
4471 };
4472
4473 if *global_ref_count < 1 {
4475 bail_bug!("ref count unexpectedly zero");
4476 }
4477 *global_ref_count -= 1;
4478 if *global_ref_count == 0 {
4479 state
4480 .global_error_context_ref_counts
4481 .remove(&global_ref_count_idx);
4482
4483 state
4484 .delete(TableId::<ErrorContextState>::new(rep))
4485 .context("deleting component-global error context data")?;
4486 }
4487
4488 Ok(())
4489 }
4490
4491 fn guest_transfer(
4494 self,
4495 store: &mut StoreOpaque,
4496 src_idx: u32,
4497 src: TransmitIndex,
4498 dst: TransmitIndex,
4499 ) -> Result<u32> {
4500 let mut instance = self.id().get_mut(store);
4501 let src_table = instance.as_mut().table_for_transmit(src);
4502 let (rep, is_done) = match src {
4503 TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4504 TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4505 };
4506 if is_done {
4507 bail!("cannot lift after being notified that the writable end dropped");
4508 }
4509 let dst_table = instance.table_for_transmit(dst);
4510 let handle = match dst {
4511 TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4512 TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4513 }?;
4514 store
4515 .concurrent_state_mut()
4516 .get_mut(TableId::<TransmitHandle>::new(rep))?
4517 .common
4518 .handle = Some(handle);
4519 Ok(handle)
4520 }
4521
4522 pub(crate) fn future_new(
4524 self,
4525 store: &mut StoreOpaque,
4526 ty: TypeFutureTableIndex,
4527 ) -> Result<ResourcePair> {
4528 self.guest_new(store, TransmitIndex::Future(ty))
4529 }
4530
4531 pub(crate) fn stream_new(
4533 self,
4534 store: &mut StoreOpaque,
4535 ty: TypeStreamTableIndex,
4536 ) -> Result<ResourcePair> {
4537 self.guest_new(store, TransmitIndex::Stream(ty))
4538 }
4539
4540 pub(crate) fn future_transfer(
4543 self,
4544 store: &mut StoreOpaque,
4545 src_idx: u32,
4546 src: TypeFutureTableIndex,
4547 dst: TypeFutureTableIndex,
4548 ) -> Result<u32> {
4549 self.guest_transfer(
4550 store,
4551 src_idx,
4552 TransmitIndex::Future(src),
4553 TransmitIndex::Future(dst),
4554 )
4555 }
4556
4557 pub(crate) fn stream_transfer(
4560 self,
4561 store: &mut StoreOpaque,
4562 src_idx: u32,
4563 src: TypeStreamTableIndex,
4564 dst: TypeStreamTableIndex,
4565 ) -> Result<u32> {
4566 self.guest_transfer(
4567 store,
4568 src_idx,
4569 TransmitIndex::Stream(src),
4570 TransmitIndex::Stream(dst),
4571 )
4572 }
4573
4574 pub(crate) fn error_context_transfer(
4576 self,
4577 store: &mut StoreOpaque,
4578 src_idx: u32,
4579 src: TypeComponentLocalErrorContextTableIndex,
4580 dst: TypeComponentLocalErrorContextTableIndex,
4581 ) -> Result<u32> {
4582 let mut instance = self.id().get_mut(store);
4583 let rep = instance
4584 .as_mut()
4585 .table_for_error_context(src)
4586 .error_context_rep(src_idx)?;
4587 let dst_idx = instance
4588 .table_for_error_context(dst)
4589 .error_context_insert(rep)?;
4590
4591 let global_ref_count = store
4595 .concurrent_state_mut()
4596 .global_error_context_ref_counts
4597 .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4598 .context("global ref count present for existing (sub)component error context")?;
4599
4600 global_ref_count.0 = global_ref_count
4601 .0
4602 .checked_add(1)
4603 .ok_or_else(|| format_err!(Trap::ReferenceCountOverflow))?;
4604
4605 Ok(dst_idx)
4606 }
4607}
4608
4609impl ComponentInstance {
4610 fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4611 let (states, types) = self.instance_states();
4612 let runtime_instance = match ty {
4613 TransmitIndex::Stream(ty) => types[ty].instance,
4614 TransmitIndex::Future(ty) => types[ty].instance,
4615 };
4616 states[runtime_instance].handle_table()
4617 }
4618
4619 fn table_for_error_context(
4620 self: Pin<&mut Self>,
4621 ty: TypeComponentLocalErrorContextTableIndex,
4622 ) -> &mut HandleTable {
4623 let (states, types) = self.instance_states();
4624 let runtime_instance = types[ty].instance;
4625 states[runtime_instance].handle_table()
4626 }
4627
4628 fn get_mut_by_index(
4629 self: Pin<&mut Self>,
4630 ty: TransmitIndex,
4631 index: u32,
4632 ) -> Result<(u32, &mut TransmitLocalState)> {
4633 get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4634 }
4635}
4636
4637impl ConcurrentState {
4638 fn send_write_result(
4639 &mut self,
4640 ty: TransmitIndex,
4641 id: TableId<TransmitState>,
4642 handle: u32,
4643 code: ReturnCode,
4644 ) -> Result<()> {
4645 let write_handle = self.get_mut(id)?.write_handle.rep();
4646 self.set_event(
4647 write_handle,
4648 match ty {
4649 TransmitIndex::Future(ty) => Event::FutureWrite {
4650 code,
4651 pending: Some((ty, handle)),
4652 },
4653 TransmitIndex::Stream(ty) => Event::StreamWrite {
4654 code,
4655 pending: Some((ty, handle)),
4656 },
4657 },
4658 )
4659 }
4660
4661 fn send_read_result(
4662 &mut self,
4663 ty: TransmitIndex,
4664 id: TableId<TransmitState>,
4665 handle: u32,
4666 code: ReturnCode,
4667 ) -> Result<()> {
4668 let read_handle = self.get_mut(id)?.read_handle.rep();
4669 self.set_event(
4670 read_handle,
4671 match ty {
4672 TransmitIndex::Future(ty) => Event::FutureRead {
4673 code,
4674 pending: Some((ty, handle)),
4675 },
4676 TransmitIndex::Stream(ty) => Event::StreamRead {
4677 code,
4678 pending: Some((ty, handle)),
4679 },
4680 },
4681 )
4682 }
4683
4684 fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4685 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4686 }
4687
4688 fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4689 Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4690 }
4691
4692 fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4703 let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4704
4705 fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4706 let (ReturnCode::Completed(count)
4707 | ReturnCode::Dropped(count)
4708 | ReturnCode::Cancelled(count)) = old
4709 else {
4710 bail_bug!("unexpected old return code")
4711 };
4712
4713 Ok(match new {
4714 ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count),
4715 ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count),
4716 _ => bail_bug!("unexpected new return code"),
4717 })
4718 }
4719
4720 let event = match (waitable.take_event(self)?, event) {
4721 (None, _) => event,
4722 (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4723 (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4724 (
4725 Some(Event::StreamWrite {
4726 code: old_code,
4727 pending: old_pending,
4728 }),
4729 Event::StreamWrite { code, pending },
4730 ) => Event::StreamWrite {
4731 code: update_code(old_code, code)?,
4732 pending: old_pending.or(pending),
4733 },
4734 (
4735 Some(Event::StreamRead {
4736 code: old_code,
4737 pending: old_pending,
4738 }),
4739 Event::StreamRead { code, pending },
4740 ) => Event::StreamRead {
4741 code: update_code(old_code, code)?,
4742 pending: old_pending.or(pending),
4743 },
4744 _ => bail_bug!("unexpected event combination"),
4745 };
4746
4747 waitable.set_event(self, Some(event))
4748 }
4749
4750 fn new_transmit(
4753 &mut self,
4754 origin: TransmitOrigin,
4755 ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4756 let state_id = self.push(TransmitState::new(origin))?;
4757
4758 let write = self.push(TransmitHandle::new(state_id))?;
4759 let read = self.push(TransmitHandle::new(state_id))?;
4760
4761 let state = self.get_mut(state_id)?;
4762 state.write_handle = write;
4763 state.read_handle = read;
4764
4765 log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4766
4767 Ok((write, read))
4768 }
4769
4770 fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4772 let state = self.delete(state_id)?;
4773 self.delete(state.write_handle)?;
4774 self.delete(state.read_handle)?;
4775
4776 log::trace!(
4777 "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4778 state.write_handle,
4779 state.read_handle,
4780 );
4781
4782 Ok(())
4783 }
4784}
4785
4786pub(crate) struct ResourcePair {
4787 pub(crate) write: u32,
4788 pub(crate) read: u32,
4789}
4790
4791impl Waitable {
4792 pub(super) fn on_delivery(
4795 &self,
4796 store: &mut StoreOpaque,
4797 instance: Instance,
4798 event: Event,
4799 ) -> Result<()> {
4800 let instance = instance.id().get_mut(store);
4801 let (rep, state, code) = match event {
4802 Event::FutureRead {
4803 pending: Some((ty, handle)),
4804 code,
4805 }
4806 | Event::FutureWrite {
4807 pending: Some((ty, handle)),
4808 code,
4809 } => {
4810 let runtime_instance = instance.component().types()[ty].instance;
4811 let (rep, state) = instance.instance_states().0[runtime_instance]
4812 .handle_table()
4813 .future_rep(ty, handle)?;
4814 (rep, state, code)
4815 }
4816 Event::StreamRead {
4817 pending: Some((ty, handle)),
4818 code,
4819 }
4820 | Event::StreamWrite {
4821 pending: Some((ty, handle)),
4822 code,
4823 } => {
4824 let runtime_instance = instance.component().types()[ty].instance;
4825 let (rep, state) = instance.instance_states().0[runtime_instance]
4826 .handle_table()
4827 .stream_rep(ty, handle)?;
4828 (rep, state, code)
4829 }
4830 _ => return Ok(()),
4831 };
4832 if rep != self.rep() {
4833 bail_bug!("unexpected rep mismatch");
4834 }
4835 if *state != TransmitLocalState::Busy {
4836 bail_bug!("expected state to be busy");
4837 }
4838 let done = matches!(code, ReturnCode::Dropped(_));
4839 *state = match event {
4840 Event::FutureRead { .. } | Event::StreamRead { .. } => {
4841 TransmitLocalState::Read { done }
4842 }
4843 Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
4844 TransmitLocalState::Write { done }
4845 }
4846 _ => bail_bug!("unexpected event for stream"),
4847 };
4848
4849 let transmit_handle = TableId::<TransmitHandle>::new(rep);
4850 let state = store.concurrent_state_mut();
4851 let transmit_id = state.get_mut(transmit_handle)?.state;
4852 let transmit = state.get_mut(transmit_id)?;
4853
4854 match event {
4855 Event::StreamRead { .. } => {
4856 transmit.read = ReadState::Open;
4857 }
4858 Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4859 _ => {}
4860 }
4861 Ok(())
4862 }
4863}
4864
4865fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
4869 matches!(
4870 ty,
4871 None | Some(
4872 InterfaceType::S8
4873 | InterfaceType::U8
4874 | InterfaceType::S16
4875 | InterfaceType::U16
4876 | InterfaceType::S32
4877 | InterfaceType::U32
4878 | InterfaceType::S64
4879 | InterfaceType::U64
4880 | InterfaceType::Float32
4881 | InterfaceType::Float64
4882 )
4883 )
4884}
4885
4886struct LockedState<T> {
4890 inner: Mutex<Option<T>>,
4891}
4892
4893impl<T> LockedState<T> {
4894 fn new(value: T) -> Self {
4896 Self {
4897 inner: Mutex::new(Some(value)),
4898 }
4899 }
4900
4901 fn try_lock(&self) -> Result<MutexGuard<'_, Option<T>>> {
4910 match self.inner.try_lock() {
4911 Ok(lock) => Ok(lock),
4912 Err(_) => bail_bug!("should not have contention on state lock"),
4913 }
4914 }
4915
4916 fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4923 let result = self.try_lock()?.take();
4924 match result {
4925 Some(result) => Ok(LockedStateGuard {
4926 value: ManuallyDrop::new(result),
4927 state: self,
4928 }),
4929 None => bail_bug!("lock value unexpectedly missing"),
4930 }
4931 }
4932
4933 fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4942 let mut inner = self.try_lock()?;
4943 match &mut *inner {
4944 Some(state) => Ok(f(state)),
4945 None => bail_bug!("lock value unexpectedly missing"),
4946 }
4947 }
4948}
4949
4950struct LockedStateGuard<'a, T> {
4953 value: ManuallyDrop<T>,
4954 state: &'a LockedState<T>,
4955}
4956
4957impl<T> Deref for LockedStateGuard<'_, T> {
4958 type Target = T;
4959
4960 fn deref(&self) -> &T {
4961 &self.value
4962 }
4963}
4964
4965impl<T> DerefMut for LockedStateGuard<'_, T> {
4966 fn deref_mut(&mut self) -> &mut T {
4967 &mut self.value
4968 }
4969}
4970
4971impl<T> Drop for LockedStateGuard<'_, T> {
4972 fn drop(&mut self) {
4973 let value = unsafe { ManuallyDrop::take(&mut self.value) };
4978
4979 if let Ok(mut lock) = self.state.try_lock() {
4983 *lock = Some(value);
4984 }
4985 }
4986}
4987
4988#[cfg(test)]
4989mod tests {
4990 use super::*;
4991 use crate::{Engine, Store};
4992 use core::future::pending;
4993 use core::pin::pin;
4994 use std::sync::LazyLock;
4995
4996 static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4997
4998 fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4999 where
5000 T: FutureProducer<()>,
5001 {
5002 rx.poll_produce(
5003 &mut Context::from_waker(Waker::noop()),
5004 Store::new(&ENGINE, ()).as_context_mut(),
5005 finish,
5006 )
5007 }
5008
5009 #[test]
5010 fn future_producer() {
5011 let mut fut = pin!(async { crate::error::Ok(()) });
5012 assert!(matches!(
5013 poll_future_producer(fut.as_mut(), false),
5014 Poll::Ready(Ok(Some(()))),
5015 ));
5016
5017 let mut fut = pin!(async { crate::error::Ok(()) });
5018 assert!(matches!(
5019 poll_future_producer(fut.as_mut(), true),
5020 Poll::Ready(Ok(Some(()))),
5021 ));
5022
5023 let mut fut = pin!(pending::<Result<()>>());
5024 assert!(matches!(
5025 poll_future_producer(fut.as_mut(), false),
5026 Poll::Pending,
5027 ));
5028 assert!(matches!(
5029 poll_future_producer(fut.as_mut(), true),
5030 Poll::Ready(Ok(None)),
5031 ));
5032
5033 let (tx, rx) = oneshot::channel();
5034 let mut rx = pin!(rx);
5035 assert!(matches!(
5036 poll_future_producer(rx.as_mut(), false),
5037 Poll::Pending,
5038 ));
5039 assert!(matches!(
5040 poll_future_producer(rx.as_mut(), true),
5041 Poll::Ready(Ok(None)),
5042 ));
5043 tx.send(()).unwrap();
5044 assert!(matches!(
5045 poll_future_producer(rx.as_mut(), true),
5046 Poll::Ready(Ok(Some(()))),
5047 ));
5048
5049 let (tx, rx) = oneshot::channel();
5050 let mut rx = pin!(rx);
5051 tx.send(()).unwrap();
5052 assert!(matches!(
5053 poll_future_producer(rx.as_mut(), false),
5054 Poll::Ready(Ok(Some(()))),
5055 ));
5056
5057 let (tx, rx) = oneshot::channel::<()>();
5058 let mut rx = pin!(rx);
5059 drop(tx);
5060 assert!(matches!(
5061 poll_future_producer(rx.as_mut(), false),
5062 Poll::Ready(Err(..)),
5063 ));
5064
5065 let (tx, rx) = oneshot::channel::<()>();
5066 let mut rx = pin!(rx);
5067 drop(tx);
5068 assert!(matches!(
5069 poll_future_producer(rx.as_mut(), true),
5070 Poll::Ready(Err(..)),
5071 ));
5072 }
5073}