Skip to main content

wasmtime/runtime/component/concurrent/
future_stream_any.rs

1//! Implementation of [`FutureAny`] and [`StreamAny`].
2
3use crate::component::concurrent::futures_and_streams::{self, TransmitOrigin};
4use crate::component::concurrent::{TableId, TransmitHandle};
5use crate::component::func::{LiftContext, LowerContext, bad_type_info, desc};
6use crate::component::matching::InstanceType;
7use crate::component::types::{self, FutureType, StreamType};
8use crate::component::{
9    ComponentInstanceId, ComponentType, FutureReader, Lift, Lower, StreamReader,
10};
11use crate::store::StoreOpaque;
12use crate::{AsContextMut, Result, bail, error::Context};
13use std::any::TypeId;
14use std::mem::MaybeUninit;
15use wasmtime_environ::component::{
16    CanonicalAbiInfo, InterfaceType, TypeFutureTableIndex, TypeStreamTableIndex,
17};
18
19/// Represents a type-erased component model `future`.
20///
21/// This type is similar to [`ResourceAny`](crate::component::ResourceAny)
22/// where it's a static guarantee that it represents a component model
23/// `future`, but it does not contain any information about the underlying type
24/// that is associated with this future. This is intended to be used in
25/// "dynamically typed" situations where embedders may not know ahead of time
26/// the type of a `future` being used by component that is loaded.
27///
28/// # Closing futures
29///
30/// A [`FutureAny`] represents a resource that is owned by a [`Store`]. Proper
31/// disposal of a future requires invoking the [`FutureAny::close`] method to
32/// ensure that this handle does not leak. If [`FutureAny::close`] is not
33/// called then memory will not be leaked once the owning [`Store`] is dropped,
34/// but the resource handle will be leaked until the [`Store`] is dropped.
35///
36/// [`Store`]: crate::Store
37#[derive(Debug, Clone, PartialEq)]
38pub struct FutureAny {
39    id: TableId<TransmitHandle>,
40    ty: PayloadType<FutureType>,
41}
42
43impl FutureAny {
44    fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
45        // Note that unlike `FutureReader<T>` we need to perform an extra
46        // typecheck to ensure that the dynamic type of this future matches
47        // what the guest we're lowering into expects. This couldn't happen
48        // before this point (see the `ComponentType::typecheck` implementation
49        // for this type), so do it now.
50        let future_ty = match ty {
51            InterfaceType::Future(payload) => payload,
52            _ => bad_type_info(),
53        };
54        let payload = cx.types[cx.types[future_ty].ty].payload.as_ref();
55        self.ty.typecheck_guest(
56            &cx.instance_type(),
57            payload,
58            FutureType::equivalent_payload_guest,
59        )?;
60
61        // Like `FutureReader<T>`, however, lowering "just" gets a u32.
62        futures_and_streams::lower_future_to_index(self.id, cx, ty)
63    }
64
65    /// Attempts to convert this [`FutureAny`] to a [`FutureReader<T>`]
66    /// with a statically known type.
67    ///
68    /// # Errors
69    ///
70    /// This function will return an error if `T` does not match the type of
71    /// value on this future.
72    pub fn try_into_future_reader<T>(self) -> Result<FutureReader<T>>
73    where
74        T: ComponentType + 'static,
75    {
76        self.ty
77            .typecheck_host::<T>(FutureType::equivalent_payload_host::<T>)?;
78        Ok(FutureReader::new_(self.id))
79    }
80
81    /// Attempts to convert `reader` to a [`FutureAny`], erasing its statically
82    /// known type.
83    ///
84    /// # Errors
85    ///
86    /// This function will return an error if `reader` does not belong to
87    /// `store`.
88    pub fn try_from_future_reader<T>(
89        mut store: impl AsContextMut,
90        reader: FutureReader<T>,
91    ) -> Result<Self>
92    where
93        T: ComponentType + 'static,
94    {
95        let store = store.as_context_mut();
96        let ty = match store.0.transmit_origin(reader.id())? {
97            TransmitOrigin::Host => PayloadType::new_host::<T>(),
98            TransmitOrigin::GuestFuture(id, ty) => PayloadType::new_guest_future(store.0, id, ty),
99            TransmitOrigin::GuestStream(..) => bail!("not a future"),
100        };
101        Ok(FutureAny {
102            id: reader.id(),
103            ty,
104        })
105    }
106
107    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
108        let id = futures_and_streams::lift_index_to_future(cx, ty, index)?;
109        let InterfaceType::Future(ty) = ty else {
110            unreachable!()
111        };
112        let ty = cx.types[ty].ty;
113        Ok(FutureAny {
114            id,
115            // Note that this future might actually be a host-originating
116            // future which means that this ascription of "the type is the
117            // guest" may be slightly in accurate. The guest, however, has the
118            // most accurate view of what type this future has so that should
119            // be reasonable to ascribe as the type here regardless.
120            ty: PayloadType::Guest(FutureType::from(ty, &cx.instance_type())),
121        })
122    }
123
124    /// Close this `FutureAny`.
125    ///
126    /// This will close this future and cause any write that happens later to
127    /// returned `DROPPED`.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if this future has already been closed.
132    ///
133    /// # Panics
134    ///
135    /// Panics if the `store` does not own this future.
136    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
137        futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
138    }
139}
140
141unsafe impl ComponentType for FutureAny {
142    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
143
144    type Lower = <u32 as ComponentType>::Lower;
145
146    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
147        match ty {
148            InterfaceType::Future(_) => Ok(()),
149            other => bail!("expected `future`, found `{}`", desc(other)),
150        }
151    }
152}
153
154unsafe impl Lower for FutureAny {
155    fn linear_lower_to_flat<T>(
156        &self,
157        cx: &mut LowerContext<'_, T>,
158        ty: InterfaceType,
159        dst: &mut MaybeUninit<Self::Lower>,
160    ) -> Result<()> {
161        self.lower_to_index(cx, ty)?
162            .linear_lower_to_flat(cx, InterfaceType::U32, dst)
163    }
164
165    fn linear_lower_to_memory<T>(
166        &self,
167        cx: &mut LowerContext<'_, T>,
168        ty: InterfaceType,
169        offset: usize,
170    ) -> Result<()> {
171        self.lower_to_index(cx, ty)?
172            .linear_lower_to_memory(cx, InterfaceType::U32, offset)
173    }
174}
175
176unsafe impl Lift for FutureAny {
177    fn linear_lift_from_flat(
178        cx: &mut LiftContext<'_>,
179        ty: InterfaceType,
180        src: &Self::Lower,
181    ) -> Result<Self> {
182        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
183        Self::lift_from_index(cx, ty, index)
184    }
185
186    fn linear_lift_from_memory(
187        cx: &mut LiftContext<'_>,
188        ty: InterfaceType,
189        bytes: &[u8],
190    ) -> Result<Self> {
191        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
192        Self::lift_from_index(cx, ty, index)
193    }
194}
195
196/// Represents a type-erased component model `stream`.
197///
198/// This type is similar to [`ResourceAny`](crate::component::ResourceAny)
199/// where it's a static guarantee that it represents a component model
200/// `stream`, but it does not contain any information about the underlying type
201/// that is associated with this stream. This is intended to be used in
202/// "dynamically typed" situations where embedders may not know ahead of time
203/// the type of a `stream` being used by component that is loaded.
204///
205/// # Closing streams
206///
207/// A [`StreamAny`] represents a resource that is owned by a [`Store`]. Proper
208/// disposal of a stream requires invoking the [`StreamAny::close`] method to
209/// ensure that this handle does not leak. If [`StreamAny::close`] is not
210/// called then memory will not be leaked once the owning [`Store`] is dropped,
211/// but the resource handle will be leaked until the [`Store`] is dropped.
212///
213/// [`Store`]: crate::Store
214#[derive(Debug, Clone, PartialEq)]
215pub struct StreamAny {
216    id: TableId<TransmitHandle>,
217    ty: PayloadType<StreamType>,
218}
219
220impl StreamAny {
221    fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
222        // See comments in `FutureAny::lower_to_index` for why this is
223        // different from `StreamReader`'s implementation.
224        let stream_ty = match ty {
225            InterfaceType::Stream(payload) => payload,
226            _ => bad_type_info(),
227        };
228        let payload = cx.types[cx.types[stream_ty].ty].payload.as_ref();
229        self.ty.typecheck_guest(
230            &cx.instance_type(),
231            payload,
232            StreamType::equivalent_payload_guest,
233        )?;
234        futures_and_streams::lower_stream_to_index(self.id, cx, ty)
235    }
236
237    /// Attempts to convert this [`StreamAny`] to a [`StreamReader<T>`]
238    /// with a statically known type.
239    ///
240    /// # Errors
241    ///
242    /// This function will return an error if `T` does not match the type of
243    /// value on this stream.
244    pub fn try_into_stream_reader<T>(self) -> Result<StreamReader<T>>
245    where
246        T: ComponentType + 'static,
247    {
248        self.ty
249            .typecheck_host::<T>(StreamType::equivalent_payload_host::<T>)?;
250        Ok(StreamReader::new_(self.id))
251    }
252
253    /// Attempts to convert `reader` to a [`StreamAny`], erasing its statically
254    /// known type.
255    ///
256    /// # Errors
257    ///
258    /// This function will return an error if `reader` does not belong to
259    /// `store`.
260    pub fn try_from_stream_reader<T>(
261        mut store: impl AsContextMut,
262        reader: StreamReader<T>,
263    ) -> Result<Self>
264    where
265        T: ComponentType + 'static,
266    {
267        let store = store.as_context_mut();
268        let ty = match store.0.transmit_origin(reader.id())? {
269            TransmitOrigin::Host => PayloadType::new_host::<T>(),
270            TransmitOrigin::GuestStream(id, ty) => PayloadType::new_guest_stream(store.0, id, ty),
271            TransmitOrigin::GuestFuture(..) => bail!("not a stream"),
272        };
273        Ok(StreamAny {
274            id: reader.id(),
275            ty,
276        })
277    }
278
279    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
280        let id = futures_and_streams::lift_index_to_stream(cx, ty, index)?;
281        let InterfaceType::Stream(ty) = ty else {
282            unreachable!()
283        };
284        let ty = cx.types[ty].ty;
285        Ok(StreamAny {
286            id,
287            // Note that this stream might actually be a host-originating, but
288            // see the documentation in `FutureAny::lift_from_index` for why
289            // this should be ok.
290            ty: PayloadType::Guest(StreamType::from(ty, &cx.instance_type())),
291        })
292    }
293
294    /// Close this `StreamAny`.
295    ///
296    /// This will close this stream and cause any write that happens later to
297    /// returned `DROPPED`.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if this stream has already been closed.
302    ///
303    /// # Panics
304    ///
305    /// Panics if the `store` does not own this stream.
306    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
307        futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
308    }
309}
310
311unsafe impl ComponentType for StreamAny {
312    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
313
314    type Lower = <u32 as ComponentType>::Lower;
315
316    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
317        match ty {
318            InterfaceType::Stream(_) => Ok(()),
319            other => bail!("expected `stream`, found `{}`", desc(other)),
320        }
321    }
322}
323
324unsafe impl Lower for StreamAny {
325    fn linear_lower_to_flat<T>(
326        &self,
327        cx: &mut LowerContext<'_, T>,
328        ty: InterfaceType,
329        dst: &mut MaybeUninit<Self::Lower>,
330    ) -> Result<()> {
331        self.lower_to_index(cx, ty)?
332            .linear_lower_to_flat(cx, InterfaceType::U32, dst)
333    }
334
335    fn linear_lower_to_memory<T>(
336        &self,
337        cx: &mut LowerContext<'_, T>,
338        ty: InterfaceType,
339        offset: usize,
340    ) -> Result<()> {
341        self.lower_to_index(cx, ty)?
342            .linear_lower_to_memory(cx, InterfaceType::U32, offset)
343    }
344}
345
346unsafe impl Lift for StreamAny {
347    fn linear_lift_from_flat(
348        cx: &mut LiftContext<'_>,
349        ty: InterfaceType,
350        src: &Self::Lower,
351    ) -> Result<Self> {
352        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
353        Self::lift_from_index(cx, ty, index)
354    }
355
356    fn linear_lift_from_memory(
357        cx: &mut LiftContext<'_>,
358        ty: InterfaceType,
359        bytes: &[u8],
360    ) -> Result<Self> {
361        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
362        Self::lift_from_index(cx, ty, index)
363    }
364}
365
366#[derive(Debug, Clone)]
367enum PayloadType<T> {
368    Guest(T),
369    Host {
370        id: TypeId,
371        typecheck: fn(Option<&InterfaceType>, &InstanceType<'_>) -> Result<()>,
372    },
373}
374
375impl<T: PartialEq> PartialEq for PayloadType<T> {
376    fn eq(&self, other: &Self) -> bool {
377        match (self, other) {
378            (PayloadType::Guest(a), PayloadType::Guest(b)) => a == b,
379            (PayloadType::Guest(_), _) => false,
380            (PayloadType::Host { id: a_id, .. }, PayloadType::Host { id: b_id, .. }) => {
381                a_id == b_id
382            }
383            (PayloadType::Host { .. }, _) => false,
384        }
385    }
386}
387
388impl PayloadType<FutureType> {
389    fn new_guest_future(
390        store: &StoreOpaque,
391        id: ComponentInstanceId,
392        ty: TypeFutureTableIndex,
393    ) -> Self {
394        let types = InstanceType::new(&store.component_instance(id));
395        let ty = types.types[ty].ty;
396        PayloadType::Guest(FutureType::from(ty, &types))
397    }
398}
399
400impl PayloadType<StreamType> {
401    fn new_guest_stream(
402        store: &StoreOpaque,
403        id: ComponentInstanceId,
404        ty: TypeStreamTableIndex,
405    ) -> Self {
406        let types = InstanceType::new(&store.component_instance(id));
407        let ty = types.types[ty].ty;
408        PayloadType::Guest(StreamType::from(ty, &types))
409    }
410}
411
412impl<T> PayloadType<T> {
413    fn new_host<P>() -> Self
414    where
415        P: ComponentType + 'static,
416    {
417        PayloadType::Host {
418            typecheck: types::typecheck_payload::<P>,
419            id: TypeId::of::<P>(),
420        }
421    }
422
423    fn typecheck_guest(
424        &self,
425        types: &InstanceType<'_>,
426        payload: Option<&InterfaceType>,
427        equivalent: fn(&T, &InstanceType<'_>, Option<&InterfaceType>) -> bool,
428    ) -> Result<()> {
429        match self {
430            Self::Guest(ty) => {
431                if equivalent(ty, types, payload) {
432                    Ok(())
433                } else {
434                    bail!("future payload types differ")
435                }
436            }
437            Self::Host { typecheck, .. } => {
438                typecheck(payload, types).context("future payload types differ")
439            }
440        }
441    }
442
443    fn typecheck_host<P>(&self, equivalent: fn(&T) -> Result<()>) -> Result<()>
444    where
445        P: ComponentType + 'static,
446    {
447        match self {
448            Self::Guest(ty) => equivalent(ty),
449            Self::Host { id, .. } => {
450                if *id == TypeId::of::<P>() {
451                    Ok(())
452                } else {
453                    bail!("future payload types differ")
454                }
455            }
456        }
457    }
458}