1use crate::component::concurrent::futures_and_streams::{self, TransmitOrigin};
4use crate::component::concurrent::{TableId, TransmitHandle};
5use crate::component::func::{LiftContext, LowerContext, bad_type_info, desc};
6use crate::component::matching::InstanceType;
7use crate::component::types::{self, FutureType, StreamType};
8use crate::component::{
9 ComponentInstanceId, ComponentType, FutureReader, Lift, Lower, StreamReader,
10};
11use crate::store::StoreOpaque;
12use crate::{AsContextMut, Result, bail, error::Context};
13use std::any::TypeId;
14use std::mem::MaybeUninit;
15use wasmtime_environ::component::{
16 CanonicalAbiInfo, InterfaceType, TypeFutureTableIndex, TypeStreamTableIndex,
17};
18
19#[derive(Debug, Clone, PartialEq)]
38pub struct FutureAny {
39 id: TableId<TransmitHandle>,
40 ty: PayloadType<FutureType>,
41}
42
43impl FutureAny {
44 fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
45 let future_ty = match ty {
51 InterfaceType::Future(payload) => payload,
52 _ => bad_type_info(),
53 };
54 let payload = cx.types[cx.types[future_ty].ty].payload.as_ref();
55 self.ty.typecheck_guest(
56 &cx.instance_type(),
57 payload,
58 FutureType::equivalent_payload_guest,
59 )?;
60
61 futures_and_streams::lower_future_to_index(self.id, cx, ty)
63 }
64
65 pub fn try_into_future_reader<T>(self) -> Result<FutureReader<T>>
73 where
74 T: ComponentType + 'static,
75 {
76 self.ty
77 .typecheck_host::<T>(FutureType::equivalent_payload_host::<T>)?;
78 Ok(FutureReader::new_(self.id))
79 }
80
81 pub fn try_from_future_reader<T>(
89 mut store: impl AsContextMut,
90 reader: FutureReader<T>,
91 ) -> Result<Self>
92 where
93 T: ComponentType + 'static,
94 {
95 let store = store.as_context_mut();
96 let ty = match store.0.transmit_origin(reader.id())? {
97 TransmitOrigin::Host => PayloadType::new_host::<T>(),
98 TransmitOrigin::GuestFuture(id, ty) => PayloadType::new_guest_future(store.0, id, ty),
99 TransmitOrigin::GuestStream(..) => bail!("not a future"),
100 };
101 Ok(FutureAny {
102 id: reader.id(),
103 ty,
104 })
105 }
106
107 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
108 let id = futures_and_streams::lift_index_to_future(cx, ty, index)?;
109 let InterfaceType::Future(ty) = ty else {
110 unreachable!()
111 };
112 let ty = cx.types[ty].ty;
113 Ok(FutureAny {
114 id,
115 ty: PayloadType::Guest(FutureType::from(ty, &cx.instance_type())),
121 })
122 }
123
124 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
137 futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
138 }
139}
140
141unsafe impl ComponentType for FutureAny {
142 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
143
144 type Lower = <u32 as ComponentType>::Lower;
145
146 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
147 match ty {
148 InterfaceType::Future(_) => Ok(()),
149 other => bail!("expected `future`, found `{}`", desc(other)),
150 }
151 }
152}
153
154unsafe impl Lower for FutureAny {
155 fn linear_lower_to_flat<T>(
156 &self,
157 cx: &mut LowerContext<'_, T>,
158 ty: InterfaceType,
159 dst: &mut MaybeUninit<Self::Lower>,
160 ) -> Result<()> {
161 self.lower_to_index(cx, ty)?
162 .linear_lower_to_flat(cx, InterfaceType::U32, dst)
163 }
164
165 fn linear_lower_to_memory<T>(
166 &self,
167 cx: &mut LowerContext<'_, T>,
168 ty: InterfaceType,
169 offset: usize,
170 ) -> Result<()> {
171 self.lower_to_index(cx, ty)?
172 .linear_lower_to_memory(cx, InterfaceType::U32, offset)
173 }
174}
175
176unsafe impl Lift for FutureAny {
177 fn linear_lift_from_flat(
178 cx: &mut LiftContext<'_>,
179 ty: InterfaceType,
180 src: &Self::Lower,
181 ) -> Result<Self> {
182 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
183 Self::lift_from_index(cx, ty, index)
184 }
185
186 fn linear_lift_from_memory(
187 cx: &mut LiftContext<'_>,
188 ty: InterfaceType,
189 bytes: &[u8],
190 ) -> Result<Self> {
191 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
192 Self::lift_from_index(cx, ty, index)
193 }
194}
195
196#[derive(Debug, Clone, PartialEq)]
215pub struct StreamAny {
216 id: TableId<TransmitHandle>,
217 ty: PayloadType<StreamType>,
218}
219
220impl StreamAny {
221 fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
222 let stream_ty = match ty {
225 InterfaceType::Stream(payload) => payload,
226 _ => bad_type_info(),
227 };
228 let payload = cx.types[cx.types[stream_ty].ty].payload.as_ref();
229 self.ty.typecheck_guest(
230 &cx.instance_type(),
231 payload,
232 StreamType::equivalent_payload_guest,
233 )?;
234 futures_and_streams::lower_stream_to_index(self.id, cx, ty)
235 }
236
237 pub fn try_into_stream_reader<T>(self) -> Result<StreamReader<T>>
245 where
246 T: ComponentType + 'static,
247 {
248 self.ty
249 .typecheck_host::<T>(StreamType::equivalent_payload_host::<T>)?;
250 Ok(StreamReader::new_(self.id))
251 }
252
253 pub fn try_from_stream_reader<T>(
261 mut store: impl AsContextMut,
262 reader: StreamReader<T>,
263 ) -> Result<Self>
264 where
265 T: ComponentType + 'static,
266 {
267 let store = store.as_context_mut();
268 let ty = match store.0.transmit_origin(reader.id())? {
269 TransmitOrigin::Host => PayloadType::new_host::<T>(),
270 TransmitOrigin::GuestStream(id, ty) => PayloadType::new_guest_stream(store.0, id, ty),
271 TransmitOrigin::GuestFuture(..) => bail!("not a stream"),
272 };
273 Ok(StreamAny {
274 id: reader.id(),
275 ty,
276 })
277 }
278
279 fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
280 let id = futures_and_streams::lift_index_to_stream(cx, ty, index)?;
281 let InterfaceType::Stream(ty) = ty else {
282 unreachable!()
283 };
284 let ty = cx.types[ty].ty;
285 Ok(StreamAny {
286 id,
287 ty: PayloadType::Guest(StreamType::from(ty, &cx.instance_type())),
291 })
292 }
293
294 pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
307 futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
308 }
309}
310
311unsafe impl ComponentType for StreamAny {
312 const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
313
314 type Lower = <u32 as ComponentType>::Lower;
315
316 fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
317 match ty {
318 InterfaceType::Stream(_) => Ok(()),
319 other => bail!("expected `stream`, found `{}`", desc(other)),
320 }
321 }
322}
323
324unsafe impl Lower for StreamAny {
325 fn linear_lower_to_flat<T>(
326 &self,
327 cx: &mut LowerContext<'_, T>,
328 ty: InterfaceType,
329 dst: &mut MaybeUninit<Self::Lower>,
330 ) -> Result<()> {
331 self.lower_to_index(cx, ty)?
332 .linear_lower_to_flat(cx, InterfaceType::U32, dst)
333 }
334
335 fn linear_lower_to_memory<T>(
336 &self,
337 cx: &mut LowerContext<'_, T>,
338 ty: InterfaceType,
339 offset: usize,
340 ) -> Result<()> {
341 self.lower_to_index(cx, ty)?
342 .linear_lower_to_memory(cx, InterfaceType::U32, offset)
343 }
344}
345
346unsafe impl Lift for StreamAny {
347 fn linear_lift_from_flat(
348 cx: &mut LiftContext<'_>,
349 ty: InterfaceType,
350 src: &Self::Lower,
351 ) -> Result<Self> {
352 let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
353 Self::lift_from_index(cx, ty, index)
354 }
355
356 fn linear_lift_from_memory(
357 cx: &mut LiftContext<'_>,
358 ty: InterfaceType,
359 bytes: &[u8],
360 ) -> Result<Self> {
361 let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
362 Self::lift_from_index(cx, ty, index)
363 }
364}
365
366#[derive(Debug, Clone)]
367enum PayloadType<T> {
368 Guest(T),
369 Host {
370 id: TypeId,
371 typecheck: fn(Option<&InterfaceType>, &InstanceType<'_>) -> Result<()>,
372 },
373}
374
375impl<T: PartialEq> PartialEq for PayloadType<T> {
376 fn eq(&self, other: &Self) -> bool {
377 match (self, other) {
378 (PayloadType::Guest(a), PayloadType::Guest(b)) => a == b,
379 (PayloadType::Guest(_), _) => false,
380 (PayloadType::Host { id: a_id, .. }, PayloadType::Host { id: b_id, .. }) => {
381 a_id == b_id
382 }
383 (PayloadType::Host { .. }, _) => false,
384 }
385 }
386}
387
388impl PayloadType<FutureType> {
389 fn new_guest_future(
390 store: &StoreOpaque,
391 id: ComponentInstanceId,
392 ty: TypeFutureTableIndex,
393 ) -> Self {
394 let types = InstanceType::new(&store.component_instance(id));
395 let ty = types.types[ty].ty;
396 PayloadType::Guest(FutureType::from(ty, &types))
397 }
398}
399
400impl PayloadType<StreamType> {
401 fn new_guest_stream(
402 store: &StoreOpaque,
403 id: ComponentInstanceId,
404 ty: TypeStreamTableIndex,
405 ) -> Self {
406 let types = InstanceType::new(&store.component_instance(id));
407 let ty = types.types[ty].ty;
408 PayloadType::Guest(StreamType::from(ty, &types))
409 }
410}
411
412impl<T> PayloadType<T> {
413 fn new_host<P>() -> Self
414 where
415 P: ComponentType + 'static,
416 {
417 PayloadType::Host {
418 typecheck: types::typecheck_payload::<P>,
419 id: TypeId::of::<P>(),
420 }
421 }
422
423 fn typecheck_guest(
424 &self,
425 types: &InstanceType<'_>,
426 payload: Option<&InterfaceType>,
427 equivalent: fn(&T, &InstanceType<'_>, Option<&InterfaceType>) -> bool,
428 ) -> Result<()> {
429 match self {
430 Self::Guest(ty) => {
431 if equivalent(ty, types, payload) {
432 Ok(())
433 } else {
434 bail!("future payload types differ")
435 }
436 }
437 Self::Host { typecheck, .. } => {
438 typecheck(payload, types).context("future payload types differ")
439 }
440 }
441 }
442
443 fn typecheck_host<P>(&self, equivalent: fn(&T) -> Result<()>) -> Result<()>
444 where
445 P: ComponentType + 'static,
446 {
447 match self {
448 Self::Guest(ty) => equivalent(ty),
449 Self::Host { id, .. } => {
450 if *id == TypeId::of::<P>() {
451 Ok(())
452 } else {
453 bail!("future payload types differ")
454 }
455 }
456 }
457 }
458}