wasmtime/runtime/component/concurrent/
future_stream_any.rs

1//! Implementation of [`FutureAny`] and [`StreamAny`].
2
3use crate::AsContextMut;
4use crate::component::concurrent::futures_and_streams::{self, TransmitOrigin};
5use crate::component::concurrent::{TableId, TransmitHandle};
6use crate::component::func::{LiftContext, LowerContext, bad_type_info, desc};
7use crate::component::matching::InstanceType;
8use crate::component::types::{self, FutureType, StreamType};
9use crate::component::{
10    ComponentInstanceId, ComponentType, FutureReader, Lift, Lower, StreamReader,
11};
12use crate::store::StoreOpaque;
13use anyhow::{Context, Result, bail};
14use std::any::TypeId;
15use std::mem::MaybeUninit;
16use wasmtime_environ::component::{
17    CanonicalAbiInfo, InterfaceType, TypeFutureTableIndex, TypeStreamTableIndex,
18};
19
20/// Represents a type-erased component model `future`.
21///
22/// This type is similar to [`ResourceAny`](crate::component::ResourceAny)
23/// where it's a static guarantee that it represents a component model
24/// `future`, but it does not contain any information about the underlying type
25/// that is associated with this future. This is intended to be used in
26/// "dynamically typed" situations where embedders may not know ahead of time
27/// the type of a `future` being used by component that is loaded.
28///
29/// # Closing futures
30///
31/// A [`FutureAny`] represents a resource that is owned by a [`Store`]. Proper
32/// disposal of a future requires invoking the [`FutureAny::close`] method to
33/// ensure that this handle does not leak. If [`FutureAny::close`] is not
34/// called then memory will not be leaked once the owning [`Store`] is dropped,
35/// but the resource handle will be leaked until the [`Store`] is dropped.
36///
37/// [`Store`]: crate::Store
38#[derive(Debug, Clone, PartialEq)]
39pub struct FutureAny {
40    id: TableId<TransmitHandle>,
41    ty: PayloadType<FutureType>,
42}
43
44impl FutureAny {
45    fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
46        // Note that unlike `FutureReader<T>` we need to perform an extra
47        // typecheck to ensure that the dynamic type of this future matches
48        // what the guest we're lowering into expects. This couldn't happen
49        // before this point (see the `ComponentType::typecheck` implementation
50        // for this type), so do it now.
51        let future_ty = match ty {
52            InterfaceType::Future(payload) => payload,
53            _ => bad_type_info(),
54        };
55        let payload = cx.types[cx.types[future_ty].ty].payload.as_ref();
56        self.ty.typecheck_guest(
57            &cx.instance_type(),
58            payload,
59            FutureType::equivalent_payload_guest,
60        )?;
61
62        // Like `FutureReader<T>`, however, lowering "just" gets a u32.
63        futures_and_streams::lower_future_to_index(self.id, cx, ty)
64    }
65
66    /// Attempts to convert this [`FutureAny`] to a [`FutureReader<T>`]
67    /// with a statically known type.
68    ///
69    /// # Errors
70    ///
71    /// This function will return an error if `T` does not match the type of
72    /// value on this future.
73    pub fn try_into_future_reader<T>(self) -> Result<FutureReader<T>>
74    where
75        T: ComponentType + 'static,
76    {
77        self.ty
78            .typecheck_host::<T>(FutureType::equivalent_payload_host::<T>)?;
79        Ok(FutureReader::new_(self.id))
80    }
81
82    /// Attempts to convert `reader` to a [`FutureAny`], erasing its statically
83    /// known type.
84    ///
85    /// # Errors
86    ///
87    /// This function will return an error if `reader` does not belong to
88    /// `store`.
89    pub fn try_from_future_reader<T>(
90        mut store: impl AsContextMut,
91        reader: FutureReader<T>,
92    ) -> Result<Self>
93    where
94        T: ComponentType + 'static,
95    {
96        let store = store.as_context_mut();
97        let ty = match store.0.transmit_origin(reader.id())? {
98            TransmitOrigin::Host => PayloadType::new_host::<T>(),
99            TransmitOrigin::GuestFuture(id, ty) => PayloadType::new_guest_future(store.0, id, ty),
100            TransmitOrigin::GuestStream(..) => bail!("not a future"),
101        };
102        Ok(FutureAny {
103            id: reader.id(),
104            ty,
105        })
106    }
107
108    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
109        let id = futures_and_streams::lift_index_to_future(cx, ty, index)?;
110        let InterfaceType::Future(ty) = ty else {
111            unreachable!()
112        };
113        let ty = cx.types[ty].ty;
114        Ok(FutureAny {
115            id,
116            // Note that this future might actually be a host-originating
117            // future which means that this ascription of "the type is the
118            // guest" may be slightly in accurate. The guest, however, has the
119            // most accurate view of what type this future has so that should
120            // be reasonable to ascribe as the type here regardless.
121            ty: PayloadType::Guest(FutureType::from(ty, &cx.instance_type())),
122        })
123    }
124
125    /// Close this `FutureAny`.
126    ///
127    /// This will close this future and cause any write that happens later to
128    /// returned `DROPPED`.
129    ///
130    /// # Panics
131    ///
132    /// Panics if the `store` does not own this future. Usage of this future
133    /// after calling `close` will also cause a panic.
134    pub fn close(&mut self, mut store: impl AsContextMut) {
135        futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
136    }
137}
138
139unsafe impl ComponentType for FutureAny {
140    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
141
142    type Lower = <u32 as ComponentType>::Lower;
143
144    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
145        match ty {
146            InterfaceType::Future(_) => Ok(()),
147            other => bail!("expected `future`, found `{}`", desc(other)),
148        }
149    }
150}
151
152unsafe impl Lower for FutureAny {
153    fn linear_lower_to_flat<T>(
154        &self,
155        cx: &mut LowerContext<'_, T>,
156        ty: InterfaceType,
157        dst: &mut MaybeUninit<Self::Lower>,
158    ) -> Result<()> {
159        self.lower_to_index(cx, ty)?
160            .linear_lower_to_flat(cx, InterfaceType::U32, dst)
161    }
162
163    fn linear_lower_to_memory<T>(
164        &self,
165        cx: &mut LowerContext<'_, T>,
166        ty: InterfaceType,
167        offset: usize,
168    ) -> Result<()> {
169        self.lower_to_index(cx, ty)?
170            .linear_lower_to_memory(cx, InterfaceType::U32, offset)
171    }
172}
173
174unsafe impl Lift for FutureAny {
175    fn linear_lift_from_flat(
176        cx: &mut LiftContext<'_>,
177        ty: InterfaceType,
178        src: &Self::Lower,
179    ) -> Result<Self> {
180        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
181        Self::lift_from_index(cx, ty, index)
182    }
183
184    fn linear_lift_from_memory(
185        cx: &mut LiftContext<'_>,
186        ty: InterfaceType,
187        bytes: &[u8],
188    ) -> Result<Self> {
189        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
190        Self::lift_from_index(cx, ty, index)
191    }
192}
193
194/// Represents a type-erased component model `stream`.
195///
196/// This type is similar to [`ResourceAny`](crate::component::ResourceAny)
197/// where it's a static guarantee that it represents a component model
198/// `stream`, but it does not contain any information about the underlying type
199/// that is associated with this stream. This is intended to be used in
200/// "dynamically typed" situations where embedders may not know ahead of time
201/// the type of a `stream` being used by component that is loaded.
202///
203/// # Closing streams
204///
205/// A [`StreamAny`] represents a resource that is owned by a [`Store`]. Proper
206/// disposal of a stream requires invoking the [`StreamAny::close`] method to
207/// ensure that this handle does not leak. If [`StreamAny::close`] is not
208/// called then memory will not be leaked once the owning [`Store`] is dropped,
209/// but the resource handle will be leaked until the [`Store`] is dropped.
210///
211/// [`Store`]: crate::Store
212#[derive(Debug, Clone, PartialEq)]
213pub struct StreamAny {
214    id: TableId<TransmitHandle>,
215    ty: PayloadType<StreamType>,
216}
217
218impl StreamAny {
219    fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
220        // See comments in `FutureAny::lower_to_index` for why this is
221        // different from `StreamReader`'s implementation.
222        let stream_ty = match ty {
223            InterfaceType::Stream(payload) => payload,
224            _ => bad_type_info(),
225        };
226        let payload = cx.types[cx.types[stream_ty].ty].payload.as_ref();
227        self.ty.typecheck_guest(
228            &cx.instance_type(),
229            payload,
230            StreamType::equivalent_payload_guest,
231        )?;
232        futures_and_streams::lower_stream_to_index(self.id, cx, ty)
233    }
234
235    /// Attempts to convert this [`StreamAny`] to a [`StreamReader<T>`]
236    /// with a statically known type.
237    ///
238    /// # Errors
239    ///
240    /// This function will return an error if `T` does not match the type of
241    /// value on this stream.
242    pub fn try_into_stream_reader<T>(self) -> Result<StreamReader<T>>
243    where
244        T: ComponentType + 'static,
245    {
246        self.ty
247            .typecheck_host::<T>(StreamType::equivalent_payload_host::<T>)?;
248        Ok(StreamReader::new_(self.id))
249    }
250
251    /// Attempts to convert `reader` to a [`StreamAny`], erasing its statically
252    /// known type.
253    ///
254    /// # Errors
255    ///
256    /// This function will return an error if `reader` does not belong to
257    /// `store`.
258    pub fn try_from_stream_reader<T>(
259        mut store: impl AsContextMut,
260        reader: StreamReader<T>,
261    ) -> Result<Self>
262    where
263        T: ComponentType + 'static,
264    {
265        let store = store.as_context_mut();
266        let ty = match store.0.transmit_origin(reader.id())? {
267            TransmitOrigin::Host => PayloadType::new_host::<T>(),
268            TransmitOrigin::GuestStream(id, ty) => PayloadType::new_guest_stream(store.0, id, ty),
269            TransmitOrigin::GuestFuture(..) => bail!("not a stream"),
270        };
271        Ok(StreamAny {
272            id: reader.id(),
273            ty,
274        })
275    }
276
277    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
278        let id = futures_and_streams::lift_index_to_stream(cx, ty, index)?;
279        let InterfaceType::Stream(ty) = ty else {
280            unreachable!()
281        };
282        let ty = cx.types[ty].ty;
283        Ok(StreamAny {
284            id,
285            // Note that this stream might actually be a host-originating, but
286            // see the documentation in `FutureAny::lift_from_index` for why
287            // this should be ok.
288            ty: PayloadType::Guest(StreamType::from(ty, &cx.instance_type())),
289        })
290    }
291
292    /// Close this `StreamAny`.
293    ///
294    /// This will close this stream and cause any write that happens later to
295    /// returned `DROPPED`.
296    ///
297    /// # Panics
298    ///
299    /// Panics if the `store` does not own this stream. Usage of this stream
300    /// after calling `close` will also cause a panic.
301    pub fn close(&mut self, mut store: impl AsContextMut) {
302        futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
303    }
304}
305
306unsafe impl ComponentType for StreamAny {
307    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
308
309    type Lower = <u32 as ComponentType>::Lower;
310
311    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
312        match ty {
313            InterfaceType::Stream(_) => Ok(()),
314            other => bail!("expected `stream`, found `{}`", desc(other)),
315        }
316    }
317}
318
319unsafe impl Lower for StreamAny {
320    fn linear_lower_to_flat<T>(
321        &self,
322        cx: &mut LowerContext<'_, T>,
323        ty: InterfaceType,
324        dst: &mut MaybeUninit<Self::Lower>,
325    ) -> Result<()> {
326        self.lower_to_index(cx, ty)?
327            .linear_lower_to_flat(cx, InterfaceType::U32, dst)
328    }
329
330    fn linear_lower_to_memory<T>(
331        &self,
332        cx: &mut LowerContext<'_, T>,
333        ty: InterfaceType,
334        offset: usize,
335    ) -> Result<()> {
336        self.lower_to_index(cx, ty)?
337            .linear_lower_to_memory(cx, InterfaceType::U32, offset)
338    }
339}
340
341unsafe impl Lift for StreamAny {
342    fn linear_lift_from_flat(
343        cx: &mut LiftContext<'_>,
344        ty: InterfaceType,
345        src: &Self::Lower,
346    ) -> Result<Self> {
347        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
348        Self::lift_from_index(cx, ty, index)
349    }
350
351    fn linear_lift_from_memory(
352        cx: &mut LiftContext<'_>,
353        ty: InterfaceType,
354        bytes: &[u8],
355    ) -> Result<Self> {
356        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
357        Self::lift_from_index(cx, ty, index)
358    }
359}
360
361#[derive(Debug, Clone)]
362enum PayloadType<T> {
363    Guest(T),
364    Host {
365        id: TypeId,
366        typecheck: fn(Option<&InterfaceType>, &InstanceType<'_>) -> Result<()>,
367    },
368}
369
370impl<T: PartialEq> PartialEq for PayloadType<T> {
371    fn eq(&self, other: &Self) -> bool {
372        match (self, other) {
373            (PayloadType::Guest(a), PayloadType::Guest(b)) => a == b,
374            (PayloadType::Guest(_), _) => false,
375            (PayloadType::Host { id: a_id, .. }, PayloadType::Host { id: b_id, .. }) => {
376                a_id == b_id
377            }
378            (PayloadType::Host { .. }, _) => false,
379        }
380    }
381}
382
383impl PayloadType<FutureType> {
384    fn new_guest_future(
385        store: &StoreOpaque,
386        id: ComponentInstanceId,
387        ty: TypeFutureTableIndex,
388    ) -> Self {
389        let types = InstanceType::new(&store.component_instance(id));
390        let ty = types.types[ty].ty;
391        PayloadType::Guest(FutureType::from(ty, &types))
392    }
393}
394
395impl PayloadType<StreamType> {
396    fn new_guest_stream(
397        store: &StoreOpaque,
398        id: ComponentInstanceId,
399        ty: TypeStreamTableIndex,
400    ) -> Self {
401        let types = InstanceType::new(&store.component_instance(id));
402        let ty = types.types[ty].ty;
403        PayloadType::Guest(StreamType::from(ty, &types))
404    }
405}
406
407impl<T> PayloadType<T> {
408    fn new_host<P>() -> Self
409    where
410        P: ComponentType + 'static,
411    {
412        PayloadType::Host {
413            typecheck: types::typecheck_payload::<P>,
414            id: TypeId::of::<P>(),
415        }
416    }
417
418    fn typecheck_guest(
419        &self,
420        types: &InstanceType<'_>,
421        payload: Option<&InterfaceType>,
422        equivalent: fn(&T, &InstanceType<'_>, Option<&InterfaceType>) -> bool,
423    ) -> Result<()> {
424        match self {
425            Self::Guest(ty) => {
426                if equivalent(ty, types, payload) {
427                    Ok(())
428                } else {
429                    bail!("future payload types differ")
430                }
431            }
432            Self::Host { typecheck, .. } => {
433                typecheck(payload, types).context("future payload types differ")
434            }
435        }
436    }
437
438    fn typecheck_host<P>(&self, equivalent: fn(&T) -> Result<()>) -> Result<()>
439    where
440        P: ComponentType + 'static,
441    {
442        match self {
443            Self::Guest(ty) => equivalent(ty),
444            Self::Host { id, .. } => {
445                if *id == TypeId::of::<P>() {
446                    Ok(())
447                } else {
448                    bail!("future payload types differ")
449                }
450            }
451        }
452    }
453}