Skip to main content

wasmtime/runtime/component/concurrent/
future_stream_any.rs

1//! Implementation of [`FutureAny`] and [`StreamAny`].
2
3use crate::component::concurrent::futures_and_streams::{self, TransmitOrigin};
4use crate::component::concurrent::{TableId, TransmitHandle};
5use crate::component::func::{LiftContext, LowerContext, bad_type_info, desc};
6use crate::component::matching::InstanceType;
7use crate::component::types::{self, FutureType, StreamType};
8use crate::component::{
9    ComponentInstanceId, ComponentType, FutureReader, Lift, Lower, StreamReader,
10};
11use crate::store::StoreOpaque;
12use crate::{AsContextMut, Result, bail, error::Context};
13use std::any::TypeId;
14use std::mem::MaybeUninit;
15use wasmtime_environ::component::{
16    CanonicalAbiInfo, InterfaceType, TypeFutureTableIndex, TypeStreamTableIndex,
17};
18
19/// Represents a type-erased component model `future`.
20///
21/// This type is similar to [`ResourceAny`](crate::component::ResourceAny)
22/// where it's a static guarantee that it represents a component model
23/// `future`, but it does not contain any information about the underlying type
24/// that is associated with this future. This is intended to be used in
25/// "dynamically typed" situations where embedders may not know ahead of time
26/// the type of a `future` being used by component that is loaded.
27///
28/// # Closing futures
29///
30/// A [`FutureAny`] represents a resource that is owned by a [`Store`]. Proper
31/// disposal of a future requires invoking the [`FutureAny::close`] method to
32/// ensure that this handle does not leak. If [`FutureAny::close`] is not
33/// called then memory will not be leaked once the owning [`Store`] is dropped,
34/// but the resource handle will be leaked until the [`Store`] is dropped.
35///
36/// [`Store`]: crate::Store
37#[derive(Debug, Clone, PartialEq)]
38pub struct FutureAny {
39    id: TableId<TransmitHandle>,
40    ty: PayloadType<FutureType>,
41}
42
43impl FutureAny {
44    fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
45        // Note that unlike `FutureReader<T>` we need to perform an extra
46        // typecheck to ensure that the dynamic type of this future matches
47        // what the guest we're lowering into expects. This couldn't happen
48        // before this point (see the `ComponentType::typecheck` implementation
49        // for this type), so do it now.
50        let future_ty = match ty {
51            InterfaceType::Future(payload) => payload,
52            _ => bad_type_info(),
53        };
54        let payload = cx.types[cx.types[future_ty].ty].payload.as_ref();
55        self.ty.typecheck_guest(
56            &cx.instance_type(),
57            payload,
58            FutureType::equivalent_payload_guest,
59        )?;
60
61        // Like `FutureReader<T>`, however, lowering "just" gets a u32.
62        futures_and_streams::lower_future_to_index(self.id, cx, ty)
63    }
64
65    /// Attempts to convert this [`FutureAny`] to a [`FutureReader<T>`]
66    /// with a statically known type.
67    ///
68    /// # Errors
69    ///
70    /// This function will return an error if `T` does not match the type of
71    /// value on this future.
72    pub fn try_into_future_reader<T>(self) -> Result<FutureReader<T>>
73    where
74        T: ComponentType + 'static,
75    {
76        self.ty
77            .typecheck_host::<T>(FutureType::equivalent_payload_host::<T>)?;
78        Ok(FutureReader::new_(self.id))
79    }
80
81    /// Attempts to convert `reader` to a [`FutureAny`], erasing its statically
82    /// known type.
83    ///
84    /// # Errors
85    ///
86    /// This function will return an error if `reader` does not belong to
87    /// `store`.
88    pub fn try_from_future_reader<T>(
89        mut store: impl AsContextMut,
90        reader: FutureReader<T>,
91    ) -> Result<Self>
92    where
93        T: ComponentType + 'static,
94    {
95        let store = store.as_context_mut();
96        let ty = match store.0.transmit_origin(reader.id())? {
97            TransmitOrigin::Host => PayloadType::new_host::<T>(),
98            TransmitOrigin::GuestFuture(id, ty) => PayloadType::new_guest_future(store.0, id, ty),
99            TransmitOrigin::GuestStream(..) => bail!("not a future"),
100        };
101        Ok(FutureAny {
102            id: reader.id(),
103            ty,
104        })
105    }
106
107    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
108        let id = futures_and_streams::lift_index_to_future(cx, ty, index)?;
109        let InterfaceType::Future(ty) = ty else {
110            unreachable!()
111        };
112        let ty = cx.types[ty].ty;
113        Ok(FutureAny {
114            id,
115            // Note that this future might actually be a host-originating
116            // future which means that this ascription of "the type is the
117            // guest" may be slightly in accurate. The guest, however, has the
118            // most accurate view of what type this future has so that should
119            // be reasonable to ascribe as the type here regardless.
120            ty: PayloadType::Guest(FutureType::from(ty, &cx.instance_type())),
121        })
122    }
123
124    /// Close this `FutureAny`.
125    ///
126    /// This will close this future and cause any write that happens later to
127    /// returned `DROPPED`.
128    ///
129    /// # Panics
130    ///
131    /// Panics if the `store` does not own this future. Usage of this future
132    /// after calling `close` will also cause a panic.
133    pub fn close(&mut self, mut store: impl AsContextMut) {
134        futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
135    }
136}
137
138unsafe impl ComponentType for FutureAny {
139    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
140
141    type Lower = <u32 as ComponentType>::Lower;
142
143    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
144        match ty {
145            InterfaceType::Future(_) => Ok(()),
146            other => bail!("expected `future`, found `{}`", desc(other)),
147        }
148    }
149}
150
151unsafe impl Lower for FutureAny {
152    fn linear_lower_to_flat<T>(
153        &self,
154        cx: &mut LowerContext<'_, T>,
155        ty: InterfaceType,
156        dst: &mut MaybeUninit<Self::Lower>,
157    ) -> Result<()> {
158        self.lower_to_index(cx, ty)?
159            .linear_lower_to_flat(cx, InterfaceType::U32, dst)
160    }
161
162    fn linear_lower_to_memory<T>(
163        &self,
164        cx: &mut LowerContext<'_, T>,
165        ty: InterfaceType,
166        offset: usize,
167    ) -> Result<()> {
168        self.lower_to_index(cx, ty)?
169            .linear_lower_to_memory(cx, InterfaceType::U32, offset)
170    }
171}
172
173unsafe impl Lift for FutureAny {
174    fn linear_lift_from_flat(
175        cx: &mut LiftContext<'_>,
176        ty: InterfaceType,
177        src: &Self::Lower,
178    ) -> Result<Self> {
179        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
180        Self::lift_from_index(cx, ty, index)
181    }
182
183    fn linear_lift_from_memory(
184        cx: &mut LiftContext<'_>,
185        ty: InterfaceType,
186        bytes: &[u8],
187    ) -> Result<Self> {
188        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
189        Self::lift_from_index(cx, ty, index)
190    }
191}
192
193/// Represents a type-erased component model `stream`.
194///
195/// This type is similar to [`ResourceAny`](crate::component::ResourceAny)
196/// where it's a static guarantee that it represents a component model
197/// `stream`, but it does not contain any information about the underlying type
198/// that is associated with this stream. This is intended to be used in
199/// "dynamically typed" situations where embedders may not know ahead of time
200/// the type of a `stream` being used by component that is loaded.
201///
202/// # Closing streams
203///
204/// A [`StreamAny`] represents a resource that is owned by a [`Store`]. Proper
205/// disposal of a stream requires invoking the [`StreamAny::close`] method to
206/// ensure that this handle does not leak. If [`StreamAny::close`] is not
207/// called then memory will not be leaked once the owning [`Store`] is dropped,
208/// but the resource handle will be leaked until the [`Store`] is dropped.
209///
210/// [`Store`]: crate::Store
211#[derive(Debug, Clone, PartialEq)]
212pub struct StreamAny {
213    id: TableId<TransmitHandle>,
214    ty: PayloadType<StreamType>,
215}
216
217impl StreamAny {
218    fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
219        // See comments in `FutureAny::lower_to_index` for why this is
220        // different from `StreamReader`'s implementation.
221        let stream_ty = match ty {
222            InterfaceType::Stream(payload) => payload,
223            _ => bad_type_info(),
224        };
225        let payload = cx.types[cx.types[stream_ty].ty].payload.as_ref();
226        self.ty.typecheck_guest(
227            &cx.instance_type(),
228            payload,
229            StreamType::equivalent_payload_guest,
230        )?;
231        futures_and_streams::lower_stream_to_index(self.id, cx, ty)
232    }
233
234    /// Attempts to convert this [`StreamAny`] to a [`StreamReader<T>`]
235    /// with a statically known type.
236    ///
237    /// # Errors
238    ///
239    /// This function will return an error if `T` does not match the type of
240    /// value on this stream.
241    pub fn try_into_stream_reader<T>(self) -> Result<StreamReader<T>>
242    where
243        T: ComponentType + 'static,
244    {
245        self.ty
246            .typecheck_host::<T>(StreamType::equivalent_payload_host::<T>)?;
247        Ok(StreamReader::new_(self.id))
248    }
249
250    /// Attempts to convert `reader` to a [`StreamAny`], erasing its statically
251    /// known type.
252    ///
253    /// # Errors
254    ///
255    /// This function will return an error if `reader` does not belong to
256    /// `store`.
257    pub fn try_from_stream_reader<T>(
258        mut store: impl AsContextMut,
259        reader: StreamReader<T>,
260    ) -> Result<Self>
261    where
262        T: ComponentType + 'static,
263    {
264        let store = store.as_context_mut();
265        let ty = match store.0.transmit_origin(reader.id())? {
266            TransmitOrigin::Host => PayloadType::new_host::<T>(),
267            TransmitOrigin::GuestStream(id, ty) => PayloadType::new_guest_stream(store.0, id, ty),
268            TransmitOrigin::GuestFuture(..) => bail!("not a stream"),
269        };
270        Ok(StreamAny {
271            id: reader.id(),
272            ty,
273        })
274    }
275
276    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
277        let id = futures_and_streams::lift_index_to_stream(cx, ty, index)?;
278        let InterfaceType::Stream(ty) = ty else {
279            unreachable!()
280        };
281        let ty = cx.types[ty].ty;
282        Ok(StreamAny {
283            id,
284            // Note that this stream might actually be a host-originating, but
285            // see the documentation in `FutureAny::lift_from_index` for why
286            // this should be ok.
287            ty: PayloadType::Guest(StreamType::from(ty, &cx.instance_type())),
288        })
289    }
290
291    /// Close this `StreamAny`.
292    ///
293    /// This will close this stream and cause any write that happens later to
294    /// returned `DROPPED`.
295    ///
296    /// # Panics
297    ///
298    /// Panics if the `store` does not own this stream. Usage of this stream
299    /// after calling `close` will also cause a panic.
300    pub fn close(&mut self, mut store: impl AsContextMut) {
301        futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
302    }
303}
304
305unsafe impl ComponentType for StreamAny {
306    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
307
308    type Lower = <u32 as ComponentType>::Lower;
309
310    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
311        match ty {
312            InterfaceType::Stream(_) => Ok(()),
313            other => bail!("expected `stream`, found `{}`", desc(other)),
314        }
315    }
316}
317
318unsafe impl Lower for StreamAny {
319    fn linear_lower_to_flat<T>(
320        &self,
321        cx: &mut LowerContext<'_, T>,
322        ty: InterfaceType,
323        dst: &mut MaybeUninit<Self::Lower>,
324    ) -> Result<()> {
325        self.lower_to_index(cx, ty)?
326            .linear_lower_to_flat(cx, InterfaceType::U32, dst)
327    }
328
329    fn linear_lower_to_memory<T>(
330        &self,
331        cx: &mut LowerContext<'_, T>,
332        ty: InterfaceType,
333        offset: usize,
334    ) -> Result<()> {
335        self.lower_to_index(cx, ty)?
336            .linear_lower_to_memory(cx, InterfaceType::U32, offset)
337    }
338}
339
340unsafe impl Lift for StreamAny {
341    fn linear_lift_from_flat(
342        cx: &mut LiftContext<'_>,
343        ty: InterfaceType,
344        src: &Self::Lower,
345    ) -> Result<Self> {
346        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
347        Self::lift_from_index(cx, ty, index)
348    }
349
350    fn linear_lift_from_memory(
351        cx: &mut LiftContext<'_>,
352        ty: InterfaceType,
353        bytes: &[u8],
354    ) -> Result<Self> {
355        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
356        Self::lift_from_index(cx, ty, index)
357    }
358}
359
360#[derive(Debug, Clone)]
361enum PayloadType<T> {
362    Guest(T),
363    Host {
364        id: TypeId,
365        typecheck: fn(Option<&InterfaceType>, &InstanceType<'_>) -> Result<()>,
366    },
367}
368
369impl<T: PartialEq> PartialEq for PayloadType<T> {
370    fn eq(&self, other: &Self) -> bool {
371        match (self, other) {
372            (PayloadType::Guest(a), PayloadType::Guest(b)) => a == b,
373            (PayloadType::Guest(_), _) => false,
374            (PayloadType::Host { id: a_id, .. }, PayloadType::Host { id: b_id, .. }) => {
375                a_id == b_id
376            }
377            (PayloadType::Host { .. }, _) => false,
378        }
379    }
380}
381
382impl PayloadType<FutureType> {
383    fn new_guest_future(
384        store: &StoreOpaque,
385        id: ComponentInstanceId,
386        ty: TypeFutureTableIndex,
387    ) -> Self {
388        let types = InstanceType::new(&store.component_instance(id));
389        let ty = types.types[ty].ty;
390        PayloadType::Guest(FutureType::from(ty, &types))
391    }
392}
393
394impl PayloadType<StreamType> {
395    fn new_guest_stream(
396        store: &StoreOpaque,
397        id: ComponentInstanceId,
398        ty: TypeStreamTableIndex,
399    ) -> Self {
400        let types = InstanceType::new(&store.component_instance(id));
401        let ty = types.types[ty].ty;
402        PayloadType::Guest(StreamType::from(ty, &types))
403    }
404}
405
406impl<T> PayloadType<T> {
407    fn new_host<P>() -> Self
408    where
409        P: ComponentType + 'static,
410    {
411        PayloadType::Host {
412            typecheck: types::typecheck_payload::<P>,
413            id: TypeId::of::<P>(),
414        }
415    }
416
417    fn typecheck_guest(
418        &self,
419        types: &InstanceType<'_>,
420        payload: Option<&InterfaceType>,
421        equivalent: fn(&T, &InstanceType<'_>, Option<&InterfaceType>) -> bool,
422    ) -> Result<()> {
423        match self {
424            Self::Guest(ty) => {
425                if equivalent(ty, types, payload) {
426                    Ok(())
427                } else {
428                    bail!("future payload types differ")
429                }
430            }
431            Self::Host { typecheck, .. } => {
432                typecheck(payload, types).context("future payload types differ")
433            }
434        }
435    }
436
437    fn typecheck_host<P>(&self, equivalent: fn(&T) -> Result<()>) -> Result<()>
438    where
439        P: ComponentType + 'static,
440    {
441        match self {
442            Self::Guest(ty) => equivalent(ty),
443            Self::Host { id, .. } => {
444                if *id == TypeId::of::<P>() {
445                    Ok(())
446                } else {
447                    bail!("future payload types differ")
448                }
449            }
450        }
451    }
452}