Skip to main content

wasmtime_wasi_http/
handler.rs

1//! Provides utilities useful for dispatching incoming HTTP requests
2//! `wasi:http/handler` guest instances.
3
4#[cfg(feature = "p2")]
5use crate::p2;
6#[cfg(feature = "p2")]
7use crate::p2::bindings::http::types as p2_types;
8#[cfg(feature = "p3")]
9use crate::p3;
10use bytes::Bytes;
11use futures::{
12    channel::oneshot,
13    future::{Either, FutureExt},
14    stream::{FuturesUnordered, Stream},
15};
16use http_body_util::{BodyExt, combinators::UnsyncBoxBody};
17#[cfg(feature = "p3")]
18use p3::bindings::http::types as p3_types;
19use std::collections::VecDeque;
20use std::collections::btree_map::{BTreeMap, Entry};
21use std::error;
22use std::fmt;
23use std::future;
24use std::mem;
25use std::ops::DerefMut;
26use std::pin::{Pin, pin};
27use std::sync::{
28    Arc, Mutex,
29    atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed},
30};
31use std::task::{Context, Poll};
32use std::time::Instant;
33use tokio::sync::Notify;
34use wasmtime::component::{Accessor, GuestTaskId, Resource, TypedFuncCallConcurrent};
35#[cfg(feature = "p2")]
36use wasmtime::error::Context as _;
37use wasmtime::{AsContextMut, Result, Store, StoreContextMut, format_err};
38
39/// Represents either a `wasi:http/types@0.2.x` or `wasi:http/types@0.3.x` `error-code`.
40pub enum ErrorCode {
41    /// A `wasi:http/types@0.2.x` `error-code`.
42    #[cfg(feature = "p2")]
43    P2(p2_types::ErrorCode),
44    /// A `wasi:http/types@0.3.x` `error-code`.
45    #[cfg(feature = "p3")]
46    P3(p3_types::ErrorCode),
47}
48
49#[cfg(feature = "p2")]
50impl From<p2_types::ErrorCode> for ErrorCode {
51    fn from(code: p2_types::ErrorCode) -> Self {
52        Self::P2(code)
53    }
54}
55
56#[cfg(feature = "p3")]
57impl From<p3_types::ErrorCode> for ErrorCode {
58    fn from(code: p3_types::ErrorCode) -> Self {
59        Self::P3(code)
60    }
61}
62
63#[cfg(feature = "p2")]
64impl From<ErrorCode> for p2_types::ErrorCode {
65    fn from(code: ErrorCode) -> p2_types::ErrorCode {
66        match code {
67            ErrorCode::P2(code) => code,
68            #[cfg(feature = "p3")]
69            ErrorCode::P3(code) => code.into(),
70        }
71    }
72}
73
74#[cfg(feature = "p3")]
75impl From<ErrorCode> for p3_types::ErrorCode {
76    fn from(code: ErrorCode) -> p3_types::ErrorCode {
77        match code {
78            #[cfg(feature = "p2")]
79            ErrorCode::P2(code) => code.into(),
80            ErrorCode::P3(code) => code,
81        }
82    }
83}
84
85#[cfg(all(feature = "p2", feature = "p3"))]
86impl From<p2_types::ErrorCode> for p3_types::ErrorCode {
87    fn from(code: p2_types::ErrorCode) -> Self {
88        match code {
89            p2_types::ErrorCode::DnsTimeout => Self::DnsTimeout,
90            p2_types::ErrorCode::DnsError(payload) => Self::DnsError(p3_types::DnsErrorPayload {
91                rcode: payload.rcode,
92                info_code: payload.info_code,
93            }),
94            p2_types::ErrorCode::DestinationNotFound => Self::DestinationNotFound,
95            p2_types::ErrorCode::DestinationUnavailable => Self::DestinationUnavailable,
96            p2_types::ErrorCode::DestinationIpProhibited => Self::DestinationIpProhibited,
97            p2_types::ErrorCode::DestinationIpUnroutable => Self::DestinationIpUnroutable,
98            p2_types::ErrorCode::ConnectionRefused => Self::ConnectionRefused,
99            p2_types::ErrorCode::ConnectionTerminated => Self::ConnectionTerminated,
100            p2_types::ErrorCode::ConnectionTimeout => Self::ConnectionTimeout,
101            p2_types::ErrorCode::ConnectionReadTimeout => Self::ConnectionReadTimeout,
102            p2_types::ErrorCode::ConnectionWriteTimeout => Self::ConnectionWriteTimeout,
103            p2_types::ErrorCode::ConnectionLimitReached => Self::ConnectionLimitReached,
104            p2_types::ErrorCode::TlsProtocolError => Self::TlsProtocolError,
105            p2_types::ErrorCode::TlsCertificateError => Self::TlsCertificateError,
106            p2_types::ErrorCode::TlsAlertReceived(payload) => {
107                Self::TlsAlertReceived(p3_types::TlsAlertReceivedPayload {
108                    alert_id: payload.alert_id,
109                    alert_message: payload.alert_message,
110                })
111            }
112            p2_types::ErrorCode::HttpRequestDenied => Self::HttpRequestDenied,
113            p2_types::ErrorCode::HttpRequestLengthRequired => Self::HttpRequestLengthRequired,
114            p2_types::ErrorCode::HttpRequestBodySize(payload) => Self::HttpRequestBodySize(payload),
115            p2_types::ErrorCode::HttpRequestMethodInvalid => Self::HttpRequestMethodInvalid,
116            p2_types::ErrorCode::HttpRequestUriInvalid => Self::HttpRequestUriInvalid,
117            p2_types::ErrorCode::HttpRequestUriTooLong => Self::HttpRequestUriTooLong,
118            p2_types::ErrorCode::HttpRequestHeaderSectionSize(payload) => {
119                Self::HttpRequestHeaderSectionSize(payload)
120            }
121            p2_types::ErrorCode::HttpRequestHeaderSize(payload) => {
122                Self::HttpRequestHeaderSize(payload.map(|payload| p3_types::FieldSizePayload {
123                    field_name: payload.field_name,
124                    field_size: payload.field_size,
125                }))
126            }
127            p2_types::ErrorCode::HttpRequestTrailerSectionSize(payload) => {
128                Self::HttpRequestTrailerSectionSize(payload)
129            }
130            p2_types::ErrorCode::HttpRequestTrailerSize(payload) => {
131                Self::HttpRequestTrailerSize(p3_types::FieldSizePayload {
132                    field_name: payload.field_name,
133                    field_size: payload.field_size,
134                })
135            }
136            p2_types::ErrorCode::HttpResponseIncomplete => Self::HttpResponseIncomplete,
137            p2_types::ErrorCode::HttpResponseHeaderSectionSize(payload) => {
138                Self::HttpResponseHeaderSectionSize(payload)
139            }
140            p2_types::ErrorCode::HttpResponseHeaderSize(payload) => {
141                Self::HttpResponseHeaderSize(p3_types::FieldSizePayload {
142                    field_name: payload.field_name,
143                    field_size: payload.field_size,
144                })
145            }
146            p2_types::ErrorCode::HttpResponseBodySize(payload) => {
147                Self::HttpResponseBodySize(payload)
148            }
149            p2_types::ErrorCode::HttpResponseTrailerSectionSize(payload) => {
150                Self::HttpResponseTrailerSectionSize(payload)
151            }
152            p2_types::ErrorCode::HttpResponseTrailerSize(payload) => {
153                Self::HttpResponseTrailerSize(p3_types::FieldSizePayload {
154                    field_name: payload.field_name,
155                    field_size: payload.field_size,
156                })
157            }
158            p2_types::ErrorCode::HttpResponseTransferCoding(payload) => {
159                Self::HttpResponseTransferCoding(payload)
160            }
161            p2_types::ErrorCode::HttpResponseContentCoding(payload) => {
162                Self::HttpResponseContentCoding(payload)
163            }
164            p2_types::ErrorCode::HttpResponseTimeout => Self::HttpResponseTimeout,
165            p2_types::ErrorCode::HttpUpgradeFailed => Self::HttpUpgradeFailed,
166            p2_types::ErrorCode::HttpProtocolError => Self::HttpProtocolError,
167            p2_types::ErrorCode::LoopDetected => Self::LoopDetected,
168            p2_types::ErrorCode::ConfigurationError => Self::ConfigurationError,
169            p2_types::ErrorCode::InternalError(payload) => Self::InternalError(payload),
170        }
171    }
172}
173
174#[cfg(all(feature = "p2", feature = "p3"))]
175impl From<p3_types::ErrorCode> for p2_types::ErrorCode {
176    fn from(code: p3_types::ErrorCode) -> Self {
177        match code {
178            p3_types::ErrorCode::DnsTimeout => Self::DnsTimeout,
179            p3_types::ErrorCode::DnsError(payload) => Self::DnsError(p2_types::DnsErrorPayload {
180                rcode: payload.rcode,
181                info_code: payload.info_code,
182            }),
183            p3_types::ErrorCode::DestinationNotFound => Self::DestinationNotFound,
184            p3_types::ErrorCode::DestinationUnavailable => Self::DestinationUnavailable,
185            p3_types::ErrorCode::DestinationIpProhibited => Self::DestinationIpProhibited,
186            p3_types::ErrorCode::DestinationIpUnroutable => Self::DestinationIpUnroutable,
187            p3_types::ErrorCode::ConnectionRefused => Self::ConnectionRefused,
188            p3_types::ErrorCode::ConnectionTerminated => Self::ConnectionTerminated,
189            p3_types::ErrorCode::ConnectionTimeout => Self::ConnectionTimeout,
190            p3_types::ErrorCode::ConnectionReadTimeout => Self::ConnectionReadTimeout,
191            p3_types::ErrorCode::ConnectionWriteTimeout => Self::ConnectionWriteTimeout,
192            p3_types::ErrorCode::ConnectionLimitReached => Self::ConnectionLimitReached,
193            p3_types::ErrorCode::TlsProtocolError => Self::TlsProtocolError,
194            p3_types::ErrorCode::TlsCertificateError => Self::TlsCertificateError,
195            p3_types::ErrorCode::TlsAlertReceived(payload) => {
196                Self::TlsAlertReceived(p2_types::TlsAlertReceivedPayload {
197                    alert_id: payload.alert_id,
198                    alert_message: payload.alert_message,
199                })
200            }
201            p3_types::ErrorCode::HttpRequestDenied => Self::HttpRequestDenied,
202            p3_types::ErrorCode::HttpRequestLengthRequired => Self::HttpRequestLengthRequired,
203            p3_types::ErrorCode::HttpRequestBodySize(payload) => Self::HttpRequestBodySize(payload),
204            p3_types::ErrorCode::HttpRequestMethodInvalid => Self::HttpRequestMethodInvalid,
205            p3_types::ErrorCode::HttpRequestUriInvalid => Self::HttpRequestUriInvalid,
206            p3_types::ErrorCode::HttpRequestUriTooLong => Self::HttpRequestUriTooLong,
207            p3_types::ErrorCode::HttpRequestHeaderSectionSize(payload) => {
208                Self::HttpRequestHeaderSectionSize(payload)
209            }
210            p3_types::ErrorCode::HttpRequestHeaderSize(payload) => {
211                Self::HttpRequestHeaderSize(payload.map(|payload| p2_types::FieldSizePayload {
212                    field_name: payload.field_name,
213                    field_size: payload.field_size,
214                }))
215            }
216            p3_types::ErrorCode::HttpRequestTrailerSectionSize(payload) => {
217                Self::HttpRequestTrailerSectionSize(payload)
218            }
219            p3_types::ErrorCode::HttpRequestTrailerSize(payload) => {
220                Self::HttpRequestTrailerSize(p2_types::FieldSizePayload {
221                    field_name: payload.field_name,
222                    field_size: payload.field_size,
223                })
224            }
225            p3_types::ErrorCode::HttpResponseIncomplete => Self::HttpResponseIncomplete,
226            p3_types::ErrorCode::HttpResponseHeaderSectionSize(payload) => {
227                Self::HttpResponseHeaderSectionSize(payload)
228            }
229            p3_types::ErrorCode::HttpResponseHeaderSize(payload) => {
230                Self::HttpResponseHeaderSize(p2_types::FieldSizePayload {
231                    field_name: payload.field_name,
232                    field_size: payload.field_size,
233                })
234            }
235            p3_types::ErrorCode::HttpResponseBodySize(payload) => {
236                Self::HttpResponseBodySize(payload)
237            }
238            p3_types::ErrorCode::HttpResponseTrailerSectionSize(payload) => {
239                Self::HttpResponseTrailerSectionSize(payload)
240            }
241            p3_types::ErrorCode::HttpResponseTrailerSize(payload) => {
242                Self::HttpResponseTrailerSize(p2_types::FieldSizePayload {
243                    field_name: payload.field_name,
244                    field_size: payload.field_size,
245                })
246            }
247            p3_types::ErrorCode::HttpResponseTransferCoding(payload) => {
248                Self::HttpResponseTransferCoding(payload)
249            }
250            p3_types::ErrorCode::HttpResponseContentCoding(payload) => {
251                Self::HttpResponseContentCoding(payload)
252            }
253            p3_types::ErrorCode::HttpResponseTimeout => Self::HttpResponseTimeout,
254            p3_types::ErrorCode::HttpUpgradeFailed => Self::HttpUpgradeFailed,
255            p3_types::ErrorCode::HttpProtocolError => Self::HttpProtocolError,
256            p3_types::ErrorCode::LoopDetected => Self::LoopDetected,
257            p3_types::ErrorCode::ConfigurationError => Self::ConfigurationError,
258            p3_types::ErrorCode::InternalError(payload) => Self::InternalError(payload),
259        }
260    }
261}
262
263/// Represents either a p2 or p3 `WasiHttpCtxView` getter.
264pub enum ViewFn<T> {
265    /// A p2 getter.
266    #[cfg(feature = "p2")]
267    P2(fn(&mut T) -> crate::p2::WasiHttpCtxView),
268    /// A p3 getter.
269    #[cfg(feature = "p3")]
270    P3(fn(&mut T) -> p3::WasiHttpCtxView),
271}
272
273impl<T> Clone for ViewFn<T> {
274    fn clone(&self) -> Self {
275        match self {
276            #[cfg(feature = "p2")]
277            &Self::P2(view) => Self::P2(view),
278            #[cfg(feature = "p3")]
279            &Self::P3(view) => Self::P3(view),
280        }
281    }
282}
283
284impl<T> Copy for ViewFn<T> {}
285
286/// A Request to be handled using `ProxyHandler::handle`.
287pub type Request = http::Request<UnsyncBoxBody<Bytes, ErrorCode>>;
288
289/// A Response returned by `ProxyHandler::handle`.
290pub type Response = http::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
291
292/// Represents either a `wasi:http/incoming-handler@0.2.x` or
293/// `wasi:http/handler@0.3.x` pre-instance.
294pub enum ProxyPre<T: 'static> {
295    /// A `wasi:http/incoming-handler@0.2.x` pre-instance.
296    #[cfg(feature = "p2")]
297    P2(p2::bindings::ProxyPre<T>),
298    /// A `wasi:http/handler@0.3.x` pre-instance.
299    #[cfg(feature = "p3")]
300    P3(p3::bindings::ServicePre<T>),
301}
302
303impl<T: 'static> ProxyPre<T> {
304    /// Instantiates the pre-instance.
305    pub async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
306    where
307        T: Send,
308    {
309        Ok(match self {
310            #[cfg(feature = "p2")]
311            Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
312            #[cfg(feature = "p3")]
313            Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
314        })
315    }
316}
317
318/// Represents either a `wasi:http/incoming-handler@0.2.x` or
319/// `wasi:http/handler@0.3.x` instance.
320pub enum Proxy {
321    /// A `wasi:http/incoming-handler@0.2.x` instance.
322    #[cfg(feature = "p2")]
323    P2(p2::bindings::Proxy),
324    /// A `wasi:http/handler@0.3.x` instance.
325    #[cfg(feature = "p3")]
326    P3(p3::bindings::Service),
327}
328
329/// Async MPMC channel where each item is delivered to at most one consumer.
330struct Queue<T> {
331    queue: Mutex<VecDeque<T>>,
332    notify_push: Notify,
333}
334
335impl<T> Default for Queue<T> {
336    fn default() -> Self {
337        Self {
338            queue: Default::default(),
339            notify_push: Default::default(),
340        }
341    }
342}
343
344impl<T> Queue<T> {
345    fn is_empty(&self) -> bool {
346        self.queue.lock().unwrap().is_empty()
347    }
348
349    fn try_pop(&self) -> Option<T> {
350        self.queue.lock().unwrap().pop_front()
351    }
352
353    async fn pop(&self) -> T {
354        // This code comes from the Unbounded MPMC Channel example in [the
355        // `tokio::sync::Notify`
356        // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
357
358        let mut notified = pin!(self.notify_push.notified());
359
360        loop {
361            notified.as_mut().enable();
362            if let Some(item) = self.try_pop() {
363                return item;
364            }
365            notified.as_mut().await;
366            notified.set(self.notify_push.notified());
367        }
368    }
369}
370
371/// Represents the status of a `ProxyHandler` worker task.
372#[derive(Clone, Copy, Eq, PartialEq, Debug)]
373pub enum WorkerStatus {
374    /// The worker is not handling any requests, nor is it doing any post-return
375    /// work.  It _might_ be doing background work which the guest has indicated
376    /// can be interrupted and/or abandoned at any time, i.e. does not prevent
377    /// the instance from being disposed.
378    Idle,
379    /// The instance is handling one or more requests, waiting for each to
380    /// either produce a response or expire.
381    Requests,
382    /// All requests handled so far have either produced a response or expired,
383    /// but the guest has post-return work which needs to finish before the
384    /// instance can be considered idle.
385    PostReturn,
386}
387
388/// Represents the application-specific state of a `ProxyHandler` worker.
389///
390/// [`HandlerState::instantiate`] returns an implementation of this trait for
391/// each component instance (and thus each worker) created.  The worker uses it
392/// to determine when to exit.
393pub trait WorkerExpiration: 'static + Send + Sync {
394    /// Poll whether the worker has expired.
395    ///
396    /// This will return `Poll::Ready(())` if the worker has expired, meaning
397    /// the component instance should be dropped.  Otherwise, it will return
398    /// `Poll::Pending` and wake the `Waker` if and when it should be polled
399    /// again.
400    ///
401    /// `state` represents the current state of the worker, and `start`
402    /// represents when it transitioned into that state (or in the case of
403    /// `WorkerState::Requests`, when the most recent outstanding request
404    /// was accepted).
405    fn poll(
406        self: Pin<&mut Self>,
407        cx: &mut Context<'_>,
408        state: WorkerStatus,
409        start: Instant,
410    ) -> Poll<()>;
411}
412
413/// Represents the application-specific state of a `ProxyHandler` worker.
414///
415/// [`HandlerState::instantiate`] returns an implementation of this trait for
416/// each component instance (and thus each worker) created.  The worker uses it
417/// to determine how many requests to accept, how long to wait for the guest to
418/// produce responses, etc.
419pub trait WorkerState: 'static + Send + Sync {
420    /// The type of the associated data for [`Store`] belonging to this worker.
421    type StoreData: Send;
422
423    /// An opaque unique identifier that hosts can assigned to requests which is
424    /// threaded from [`ProxyHandler::handle`] into
425    /// [`WorkerState::on_request_start`]
426    type RequestId: Send + Sync;
427
428    /// Indicate whether the worker should accept another request given the
429    /// current number it is already handling concurrently and the total it has
430    /// handled so far.
431    fn should_accept_request(&self, concurrent_count: usize, total_count: usize) -> ShouldAccept;
432
433    /// Notification that a request has been accepted by the worker.
434    ///
435    /// This method can be used to record anything within `store`, if necessary.
436    /// The `task` corresponding to the component-model-level async task about
437    /// to be created is additionally passed here.
438    ///
439    /// If the future returned by this function resolves before the guest has
440    /// produced a response, the request will be considered "expired" and the
441    /// original `ProxyHandler::handle` future will resolve to an
442    /// `Err(ExpirationError.into())`.  In addition, the worker
443    /// will stop accepting new requests but will continue running until all
444    /// requests that have been accepted by the worker have either produced a
445    /// response or expired, at which point the state of the worker will
446    /// transition to either `WorkerState::PostReturn` or `WorkerState::Idle`.
447    ///
448    /// Note that the returned future is polled from within the
449    /// `Store::run_concurrent` event loop, and due to #11869 and #11870, it may
450    /// not be polled at all for arbitrary lengths of time.  Consequently, the
451    /// `Self::Expiration` implementation (which is polled from _outside_ the
452    /// `Store::run_concurrent` event loop) must also enforce request expiration
453    /// as a second level of defence if desired.
454    ///
455    /// For example, if a request timeout of N seconds is to be enforced, the
456    /// `Self::Expiration::poll` implementation, when called with
457    /// `WorkerState::Requests` should calculate the time elapsed since the most
458    /// recent outstanding request was accepted as indicated by the `start`
459    /// parameter.  If that time is greater than N seconds, we can expire the
460    /// instance immediately, confident that all outstanding requests have
461    /// expired.
462    ///
463    /// Once #11869 and #11870 have been addressed, this "second level of
464    /// defence" will no longer be necessary.
465    fn on_request_start(
466        &self,
467        store: StoreContextMut<'_, Self::StoreData>,
468        id: Self::RequestId,
469        task: GuestTaskId,
470    ) -> Pin<Box<dyn Future<Output = ()> + 'static + Send + Sync>>;
471
472    /// Dispose of the store belonging to the now-exited worker.
473    ///
474    /// This may be used to e.g. collect metrics from the store or its
475    /// associated data before the store is dropped, as well as e.g. retry
476    /// failed instantiations after the store is dropped.
477    ///
478    /// If the store is being dropped due to an error (e.g. a guest trap or a
479    /// host panic) `result` will be `Err(_)`; otherwise it will be `Ok(())`.
480    fn drop(&self, store: Store<Self::StoreData>, result: Result<(), wasmtime::Error>);
481}
482
483/// Represents the combination of a store and instance with which to handle
484/// requests.
485pub struct Instance<T: 'static, E: WorkerExpiration, S: WorkerState> {
486    /// The store to use to handle requests.
487    pub store: Store<T>,
488    /// The instance to use to handle requests.
489    pub proxy: Proxy,
490    /// `WasiHttpCtxView` getter function.
491    pub view: ViewFn<T>,
492    /// See [`WorkerExpiration`].
493    pub expiration: E,
494    /// See [`WorkerState`].
495    pub state: S,
496}
497
498/// Indicates whether a worker should accept new requests.
499pub enum ShouldAccept {
500    /// Yes, it should.
501    Yes,
502    /// No, it shouldn't (but ask again later).
503    No,
504    /// No, it shouldn't (and don't ask again).
505    Never,
506}
507
508/// Represents the application-specific state of a web server.
509pub trait HandlerState: 'static + Sync + Send + Sized {
510    /// The type of the associated data for [`Store`]s created using
511    /// [`Self::instantiate`].
512    type StoreData: Send;
513    /// The type of the `WorkerExpiration` implementation to be returned from
514    /// [`Self::instantiate`].
515    type WorkerExpiration: WorkerExpiration;
516    /// The type of the `WorkerState` implementation to be returned from
517    /// [`Self::instantiate`].
518    type WorkerState: WorkerState<StoreData = Self::StoreData>;
519
520    /// Create a new store and instance for handling one or more requests.
521    ///
522    /// Note that the implementer is responsible for applying a timeout to the
523    /// guest instantiation if appropriate (e.g. as part of an overall request
524    /// timeout).
525    fn instantiate(
526        &self,
527    ) -> impl Future<
528        Output = Result<Instance<Self::StoreData, Self::WorkerExpiration, Self::WorkerState>>,
529    > + Send;
530}
531
532struct ProxyHandlerInner<S: HandlerState> {
533    state: S,
534    request_queue: Queue<WorkerRequest<S>>,
535    worker_count: AtomicUsize,
536}
537
538/// Tracks request start times.
539///
540/// This is useful for keeping a [`WorkerState`] appraised of the most recently
541/// accepted outstanding request.
542#[derive(Default)]
543struct StartTimes(BTreeMap<Instant, usize>);
544
545impl StartTimes {
546    fn add(&mut self, time: Instant) {
547        *self.0.entry(time).or_insert(0) += 1;
548    }
549
550    fn remove(&mut self, time: Instant) {
551        let Entry::Occupied(mut entry) = self.0.entry(time) else {
552            unreachable!()
553        };
554        match *entry.get() {
555            0 => unreachable!(),
556            1 => {
557                entry.remove();
558            }
559            _ => {
560                *entry.get_mut() -= 1;
561            }
562        }
563    }
564
565    fn most_recent(&self) -> Option<Instant> {
566        self.0.last_key_value().map(|(&k, _)| k)
567    }
568}
569
570type WorkerRequest<S> = (
571    <<S as HandlerState>::WorkerState as WorkerState>::RequestId,
572    Request,
573    oneshot::Sender<Result<Response, wasmtime::Error>>,
574);
575
576struct Worker<S>
577where
578    S: HandlerState,
579{
580    handler: ProxyHandler<S>,
581    available: bool,
582}
583
584impl<S> Worker<S>
585where
586    S: HandlerState,
587{
588    fn set_available(&mut self, available: bool) {
589        if available != self.available {
590            self.available = available;
591            if available {
592                self.handler.0.worker_count.fetch_add(1, Relaxed);
593            } else {
594                // Decrement the count _before_ checking if the request queue is
595                // empty.  This helps ensure that `ProxyHandler::spawn` sees the
596                // new value before deciding whether to spawn a new worker.
597                let count = self.handler.0.worker_count.fetch_sub(1, Relaxed);
598                assert!(count >= 1);
599
600                // This addresses what would otherwise be a race condition in
601                // `ProxyHandler::spawn` where it only starts a worker if the
602                // available worker count is zero.  If we decrement the count to
603                // zero right after `ProxyHandler::spawn` checks it, then no
604                // worker will be started; thus it becomes our responsibility to
605                // start a worker here instead.
606                if count == 1 && !self.handler.0.request_queue.is_empty() {
607                    self.handler.start_worker(None);
608                }
609            }
610        }
611    }
612
613    async fn run(self, request: Option<WorkerRequest<S>>) {
614        match self.handler.0.state.instantiate().await {
615            Ok(Instance {
616                store,
617                proxy,
618                view,
619                expiration,
620                state,
621            }) => {
622                self.run_(store, proxy, view, expiration, state, request)
623                    .await
624            }
625
626            Err(error) => {
627                let error = Arc::new(error);
628                if let Some((request_id, request, tx)) = request {
629                    _ = tx.send(Err(InstantiationError {
630                        request_id,
631                        request: Mutex::new(request),
632                        error,
633                    }
634                    .into()));
635                } else {
636                    // In this case, the worker was spawned to handle any queued
637                    // requests.  Since we can't handle those requests, we send
638                    // them all an instantiation error.
639                    for (request_id, request, tx) in mem::take(
640                        self.handler
641                            .0
642                            .request_queue
643                            .queue
644                            .lock()
645                            .unwrap()
646                            .deref_mut(),
647                    ) {
648                        _ = tx.send(Err(InstantiationError {
649                            request_id,
650                            request: Mutex::new(request),
651                            error: error.clone(),
652                        }
653                        .into()));
654                    }
655                }
656            }
657        }
658    }
659
660    async fn run_(
661        mut self,
662        store: Store<S::StoreData>,
663        proxy: Proxy,
664        view: ViewFn<S::StoreData>,
665        expiration: S::WorkerExpiration,
666        state: S::WorkerState,
667        request: Option<WorkerRequest<S>>,
668    ) {
669        // NB: The code the follows is rather subtle in that it is structured
670        // carefully to give the `HandlerState` implementation full control over
671        // the component instance lifetime. Specifically, we must keep the
672        // `HandlerState` informed of the worker's state and how long it has
673        // been in that state, as well as allow it to expire the instance based
674        // on whatever combination of timeouts, dynamic resource usage, etc. it
675        // may take into consideration.
676        //
677        // Note that, when more than one request is handled concurrently in the
678        // same instance, we must stop accepting new requests as soon as any
679        // existing request reaches its expiration.  This serves to cap the
680        // amount of time we need to keep the instance alive before _all_
681        // requests have either completed or expired.
682        //
683        // As of this writing, there's an additional wrinkle that makes tracking
684        // expiration particularly tricky: per #11869 and #11870, busy guest
685        // loops, epoch interruption, and host functions registered using
686        // `Linker::func_{wrap,new}_async` all require blocking, exclusive
687        // access to the `Store`, which effectively prevents the
688        // `StoreContextMut::run_concurrent` event loop from making progress.
689        // That, in turn, prevents any concurrent tasks from executing, and also
690        // prevents the `AsyncFnOnce` passed to `run_concurrent` from being
691        // polled.  Consequently, we must poll `S::WorkerState` from _outside_
692        // the `run_concurrent` future to ensure expirations are enforced.  Once
693        // the aforementioned issues have been addressed, we'll be able to
694        // simplify the code and eliminate the need for communication between
695        // the "inside" future and the "outside" one.
696
697        // Wrap `store` in an object which, prior to leaving this scope, will
698        // pass the `store` to `HandlerState::drop`.
699        struct Dropper<S: HandlerState> {
700            state: S::WorkerState,
701            store: Option<Store<S::StoreData>>,
702        }
703
704        impl<S: HandlerState> Drop for Dropper<S> {
705            fn drop(&mut self) {
706                if let Some(store) = self.store.take() {
707                    self.state
708                        .drop(store, Err(wasmtime::format_err!("worker panicked")));
709                }
710            }
711        }
712
713        let mut dropper = Dropper::<S> {
714            state,
715            store: Some(store),
716        };
717
718        let proxy = &proxy;
719
720        let accept_concurrent = AtomicBool::new(true);
721        let status = Mutex::new((WorkerStatus::Idle, Instant::now()));
722        let mut expiration = pin!(expiration);
723
724        let function = async |accessor: &Accessor<_>| {
725            let mut reuse_count = 0;
726            let mut may_accept = true;
727            let mut futures = FuturesUnordered::new();
728            let mut start_times = StartTimes::default();
729
730            let accept_request = |(request_id, request, tx): WorkerRequest<S>,
731                                  futures: &mut FuturesUnordered<_>,
732                                  start_times: &mut StartTimes,
733                                  reuse_count: &mut usize| {
734                // Set `accept_concurrent` to false, conservatively assuming
735                // that the new task will be CPU-bound, at least to begin with.
736                // Only once the `StoreContextMut::run_concurrent` event loop
737                // returns `Pending` will we set `accept_concurrent` back to
738                // true and consider accepting more requests.
739                //
740                // This approach avoids taking on more than one CPU-bound task
741                // at a time, which would hurt throughput vs. leaving the
742                // additional requests for other workers to handle.
743                accept_concurrent.store(false, Relaxed);
744                *reuse_count += 1;
745
746                let prepared = accessor.with(|mut store| {
747                    let prepared = Prepared::new(store.as_context_mut(), proxy, request, view, tx);
748                    match prepared {
749                        Ok(prepared) => {
750                            // Notify the `HandlerState` that we're starting to
751                            // handle a request and retrieve the deadline by
752                            // which it must produce a response.
753                            //
754                            // If it fails to produce a response by the
755                            // deadline, we'll stop accepting new requests and
756                            // eventually exit the worker.
757                            let expiration = dropper.state.on_request_start(
758                                store.as_context_mut(),
759                                request_id,
760                                prepared.task(),
761                            );
762                            Ok((prepared, expiration))
763                        }
764                        Err(e) => Err(e),
765                    }
766                });
767
768                let start_time = Instant::now();
769                start_times.add(start_time);
770                *status.try_lock().unwrap() = (WorkerStatus::Requests, start_time);
771
772                futures.push(async move {
773                    let (prepared, expiration) = prepared?;
774                    let sent = prepared.run(accessor, expiration).await?;
775                    wasmtime::error::Ok((sent, start_time))
776                });
777            };
778
779            if let Some(req) = request {
780                accept_request(req, &mut futures, &mut start_times, &mut reuse_count);
781            }
782
783            // This is the main driver loop for this worker. This is modeled as
784            // a `poll_fn` which internally loops around the possible events.
785            // Events are sourced from the locals here, pinned outside of the
786            // `poll_fn` closure.
787            let mut futures = pin!(futures);
788            let handler = self.handler.clone();
789            let mut incoming_requests = pin!(futures::stream::unfold(
790                &handler.0.request_queue,
791                |queue| async move {
792                    let pair = queue.pop().await;
793                    Some((pair, queue))
794                }
795            ));
796            future::poll_fn(|cx| {
797                loop {
798                    // First, and crucially first, poll `futures`. This way
799                    // we'll discover any tasks that may have timed out, at
800                    // which point we'll stop accepting new tasks altogether
801                    // (see below for details). This is especially important in
802                    // the case where the task was blocked on a synchronous call
803                    // to a host function which has exclusive access to the
804                    // `Store`; once that call finishes, the first thing we need
805                    // to do is time out the task. If we were to poll for a new
806                    // task first, then we'd have to wait for _that_ task to
807                    // finish or time out before we could kill the instance.
808                    match futures.as_mut().poll_next(cx) {
809                        // A request either produced a response or expired.
810                        Poll::Ready(Some(Ok((responded, start_time)))) => {
811                            // Remove its start time from the map and update the
812                            // state.
813                            start_times.remove(start_time);
814                            *status.try_lock().unwrap() =
815                                if let Some(start_time) = start_times.most_recent() {
816                                    (WorkerStatus::Requests, start_time)
817                                } else {
818                                    (WorkerStatus::PostReturn, Instant::now())
819                                };
820
821                            if responded {
822                                // Response produced; carry on!
823                            } else {
824                                // Request expired; stop accepting new requests, but
825                                // continue polling until any other, in-progress
826                                // tasks until they have either finished or expired.
827                                // This effectively kicks off a "graceful shutdown"
828                                // of the worker, allowing any other concurrent
829                                // tasks time to finish before we drop the instance.
830                                may_accept = false;
831                            }
832                        }
833
834                        // Instance trapped.
835                        Poll::Ready(Some(Err(error))) => {
836                            break Poll::Ready(Err(error));
837                        }
838
839                        Poll::Ready(None) | Poll::Pending => {}
840                    }
841
842                    // At this point `futures` is either empty or it's `Pending`
843                    // meaning nothing is ready. Note that `Pending` here
844                    // doesn't necessarily mean all tasks are blocked on I/O.
845                    // They might simply be waiting for some deferred work to be
846                    // done by the next turn of the
847                    // `StoreContextMut::run_concurrent` event loop.  Therefore,
848                    // we check `accept_concurrent` here and only advertise we
849                    // have capacity for another task if either we have no tasks
850                    // at all or all our tasks really are blocked on I/O.
851                    self.set_available(
852                        may_accept
853                            && match dropper
854                                .state
855                                .should_accept_request(futures.len(), reuse_count)
856                            {
857                                ShouldAccept::Yes => {
858                                    futures.is_empty() || accept_concurrent.load(Relaxed)
859                                }
860                                ShouldAccept::No => false,
861                                ShouldAccept::Never => {
862                                    may_accept = false;
863                                    false
864                                }
865                            },
866                    );
867
868                    // If we're available for accepting more requests after the
869                    // deduction above, then try to accept a new task. If that's
870                    // successful then push it into `futures` and turn this loop
871                    // again to see where we're at next time around.
872                    if self.available
873                        && let Poll::Ready(Some(req)) = incoming_requests.as_mut().poll_next(cx)
874                    {
875                        accept_request(req, &mut futures, &mut start_times, &mut reuse_count);
876                        continue;
877                    }
878
879                    // If, at this point, we still have some requests that are
880                    // being processed then go ahead and bail out of this
881                    // singular call to `poll` by saying we're not ready yet.
882                    // This means we unconditionally wait for events within
883                    // `futures` and we're also registered, optionally, for
884                    // listening for incoming connections. That's all the events
885                    // we're interested in, so this iteration of `poll` is complete.
886                    if !futures.is_empty() {
887                        break Poll::Pending;
888                    }
889
890                    // At this point `futures` is empty, and we haven't gotten
891                    // any incoming tasks. Check the store we're using to see if
892                    // there are any "interesting" tasks around. These are tasks
893                    // which act as effectively strong references to this worker
894                    // to keep it running. If there are still interesting tasks,
895                    // then we're done with this iteration of `poll`. We'll get
896                    // woken up when anything changes, but otherwise it's time
897                    // to let something else happen.
898                    if accessor.poll_no_interesting_tasks(cx).is_pending() {
899                        break Poll::Pending;
900                    }
901
902                    // And now at this point we (a) have no `futures`, (b) no
903                    // new requests are available, and (c) the store is
904                    // completely devoid of interesting work. In this situation
905                    // if we're not actually capable of accepting any more work,
906                    // then we're completely done and it's time to exit this
907                    // worker.
908                    if !may_accept {
909                        break Poll::Ready(Ok(()));
910                    }
911
912                    // Finally, at this point we're idle but still eligible to
913                    // accept new work, so update the state if appropriate and
914                    // then return pending while we wait for new work.
915                    {
916                        let mut status = status.try_lock().unwrap();
917                        if status.0 != WorkerStatus::Idle {
918                            *status = (WorkerStatus::Idle, Instant::now());
919                        }
920                    }
921                    break Poll::Pending;
922                }
923            })
924            .await
925        };
926
927        let result = {
928            let mut future = pin!(
929                dropper
930                    .store
931                    .as_mut()
932                    .unwrap()
933                    .run_concurrent(function)
934                    .map(|v| v.flatten())
935            );
936
937            future::poll_fn(|cx| {
938                let poll = future.as_mut().poll(cx);
939                if poll.is_pending() {
940                    // If the future returns `Pending`, that's either because it's
941                    // idle (in which case it can definitely accept a new request) or
942                    // because all its tasks are awaiting I/O, in which case it may
943                    // have capacity for additional tasks to run concurrently.
944                    //
945                    // However, per #11869 and #11870, if one of the tasks is
946                    // blocked on a sync call to a host function which has exclusive
947                    // access to the `Store`, the `StoreContextMut::run_concurrent`
948                    // event loop will be unable to make progress until that call
949                    // finishes.  Similarly, if the task loops indefinitely, subject
950                    // only to epoch interruption, the event loop will also be
951                    // stuck.  Either way, any request expirations created inside
952                    // the `AsyncFnOnce` we passed to `run_concurrent` won't have a
953                    // chance to trigger.  Consequently, we poll for instance
954                    // expiration here, outside the event loop, based on the most
955                    // recently recorded state of the worker.
956
957                    let (status, start) = *status.try_lock().unwrap();
958
959                    if let Poll::Ready(()) = expiration.as_mut().poll(cx, status, start) {
960                        return Poll::Ready(match status {
961                            WorkerStatus::Requests | WorkerStatus::PostReturn => {
962                                Err(format_err!("guest timed out"))
963                            }
964                            WorkerStatus::Idle => Ok(()),
965                        });
966                    }
967
968                    // Otherwise, if the instance has not yet expired, we set
969                    // `accept_concurrent` to true and, if it wasn't already true
970                    // before, poll the future one more time so it can ask for
971                    // another request if appropriate.
972                    if !accept_concurrent.swap(true, Relaxed) {
973                        return future.as_mut().poll(cx);
974                    }
975                }
976
977                poll
978            })
979            .await
980        };
981
982        dropper.state.drop(dropper.store.take().unwrap(), result);
983    }
984}
985
986impl<S> Drop for Worker<S>
987where
988    S: HandlerState,
989{
990    fn drop(&mut self) {
991        self.set_available(false);
992    }
993}
994
995/// Represents the state of a web server.
996///
997/// Note that this supports optional instance reuse, enabled when
998/// `S::WorkerState::should_accept_request` returns [`ShouldAccept::Yes`] more
999/// than once for a given instance.  See [`WorkerState`] for details.
1000pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
1001
1002impl<S: HandlerState> Clone for ProxyHandler<S> {
1003    fn clone(&self) -> Self {
1004        Self(self.0.clone())
1005    }
1006}
1007
1008/// This error is returned if, when handling the request, a new worker and
1009/// associated instance needed to be created, but instantiation failed, e.g. due
1010/// to reaching a pooling allocator limit or running out of memory.  In this
1011/// case, the caller may be able to recover and retry (e.g. after waiting for
1012/// existing instances to be dropped and/or freeing memory used by caches,
1013/// etc.).  Otherwise, it will probably need to return an HTTP 500 error.
1014pub struct InstantiationError<T> {
1015    /// The ID of the request which was originally configured,
1016    pub request_id: T,
1017    /// The original request passed to `ProxyHandler::handle`.
1018    ///
1019    /// This is wrapped in a `Mutex` to satisfy the `Send + Sync` bounds
1020    /// required by `wasmtime::Error`.
1021    pub request: Mutex<Request>,
1022    /// The original instantiation error.
1023    ///
1024    /// This is wrapped in an `Arc` because a single instantiation error may
1025    /// affect multiple requests, and each caller will be given a clone.
1026    pub error: Arc<wasmtime::Error>,
1027}
1028
1029impl<T> fmt::Display for InstantiationError<T> {
1030    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1031        write!(f, "instantiation error: {}", self.error)
1032    }
1033}
1034
1035impl<T> fmt::Debug for InstantiationError<T> {
1036    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1037        write!(f, "instantiation error: {:?}", self.error)
1038    }
1039}
1040
1041impl<T> error::Error for InstantiationError<T> {}
1042
1043/// Returned when the guest failed to produce a response before the expiration
1044/// returned by `HandlerState::on_request_start` elapsed.
1045pub struct ExpirationError;
1046
1047impl fmt::Display for ExpirationError {
1048    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1049        fmt::Debug::fmt(self, f)
1050    }
1051}
1052
1053impl fmt::Debug for ExpirationError {
1054    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1055        write!(f, "guest timed out")
1056    }
1057}
1058
1059impl error::Error for ExpirationError {}
1060
1061/// A worker trapped or panicked and failed to produce a result.
1062pub struct TrapOrPanicError;
1063
1064impl fmt::Display for TrapOrPanicError {
1065    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1066        fmt::Debug::fmt(self, f)
1067    }
1068}
1069
1070impl fmt::Debug for TrapOrPanicError {
1071    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1072        write!(f, "worker trapped or panicked")
1073    }
1074}
1075
1076impl error::Error for TrapOrPanicError {}
1077
1078impl<S> ProxyHandler<S>
1079where
1080    S: HandlerState,
1081{
1082    /// Create a new `ProxyHandler` with the specified application state and
1083    /// pre-instance.
1084    pub fn new(state: S) -> Self {
1085        Self(Arc::new(ProxyHandlerInner {
1086            state,
1087            request_queue: Default::default(),
1088            worker_count: AtomicUsize::from(0),
1089        }))
1090    }
1091
1092    /// Handle the specified request, returning a response on success or the
1093    /// tuple of the request and error on failure.
1094    ///
1095    /// This function will return a `wasmtime::Error` on failure, which may be
1096    /// downcast to a more specific type in certain scenarios:
1097    ///
1098    /// - [`InstantiationError`] if a new worker was created to handle the
1099    /// request but could not instantiate the guest component.
1100    ///
1101    /// - [`ExpirationError`] if the request expired before it produced a
1102    /// response.  See [`WorkerState::on_request_start`] for details.
1103    ///
1104    /// - [`TrapOrPanicError`] if the worker responsible for handling the
1105    /// request trapped or panicked before it produced a response.  This may be
1106    /// used when a trap occurs but cannot be traced to a specific request,
1107    /// e.g. during concurrent request handling.
1108    ///
1109    /// In other failure cases (e.g. `wasi:http/types#error-code` return values
1110    /// and/or traps when executing synchronous WASIp2 handler functions), the
1111    /// original error returned by the handler will be returned.
1112    ///
1113    /// # Backpressure
1114    ///
1115    /// Note that this API does not implement any form of backpressure to limit
1116    /// the number of in-flight `Request`s being processed. This function
1117    /// may spawn new tokio tasks, instantiate new modules under new stores, and
1118    /// queue up pending `Request`s while waiting for previous instances. In all
1119    /// of these situations invoking this function will consume some host-side
1120    /// resources until the request is done.
1121    ///
1122    /// Embedders using this API must ensure to take this into account. If an
1123    /// infinite number of requests can be fed into this function then it's
1124    /// recommended to take a semaphore, for example, around this function call
1125    /// to limit the number of concurrent requests that are being processed.
1126    pub async fn handle(
1127        &self,
1128        id: <S::WorkerState as WorkerState>::RequestId,
1129        request: Request,
1130    ) -> Result<Response, wasmtime::Error> {
1131        let (tx, rx) = oneshot::channel();
1132        let req = (id, request, tx);
1133        if self.0.worker_count.load(Relaxed) == 0 {
1134            // There are no available workers; skip the queue and pass
1135            // the request directly to the worker, which improves
1136            // performance as measured by `wasmtime-server-rps.sh` by
1137            // about 15%.
1138            self.start_worker(Some(req));
1139        } else {
1140            let mut queue = self.0.request_queue.queue.lock().unwrap();
1141            queue.push_back(req);
1142
1143            // Start a new worker to handle the request if the last worker just
1144            // went unavailable.  See also `Worker::set_available` for what
1145            // happens if the available worker count goes to zero right after we
1146            // check it here, and note that we only check the count _after_
1147            // we've pushed the request to the queue.
1148            //
1149            // The upshot is that at least one (or more) of the
1150            // following will happen:
1151            //
1152            // - An existing worker will accept the request
1153            // - We'll start a new worker here to accept the request
1154            // - `Worker::set_available` will start a new worker to accept the request
1155            //
1156            // I.e. it should not be possible for the request to be orphaned
1157            // indefinitely in the queue without being accepted except in the
1158            // case of a panic or an instantiation error.  In the case of an
1159            // instantiation error, we'll give the request back to the caller in
1160            // an `Err(_)`, allowing the application to decide what to do next.
1161            if self.0.worker_count.load(Relaxed) == 0 {
1162                let req = queue.pop_back().unwrap();
1163                drop(queue);
1164                self.start_worker(Some(req));
1165            } else {
1166                drop(queue);
1167                self.0.request_queue.notify_push.notify_one();
1168            }
1169        }
1170
1171        rx.await.map_err(|_| TrapOrPanicError)?
1172    }
1173
1174    /// Return a reference to the application state.
1175    pub fn state(&self) -> &S {
1176        &self.0.state
1177    }
1178
1179    fn start_worker(&self, request: Option<WorkerRequest<S>>) {
1180        tokio::spawn(
1181            Worker {
1182                handler: self.clone(),
1183                available: false,
1184            }
1185            .run(request),
1186        );
1187    }
1188}
1189
1190/// Representation of a "prepared" call for a guest, used to extract the
1191/// `GuestTaskId` before actually executing any handlers.
1192///
1193/// Right now this is a bit gross since it has to type out a bunch of types by
1194/// hand.
1195pub enum Prepared<'a, T: 'static> {
1196    #[doc(hidden)]
1197    #[cfg(feature = "p2")]
1198    P2 {
1199        guest: &'a p2::bindings::Proxy,
1200        call: TypedFuncCallConcurrent<
1201            T,
1202            (
1203                Resource<p2_types::IncomingRequest>,
1204                Resource<p2_types::ResponseOutparam>,
1205            ),
1206            (),
1207        >,
1208        tx: Arc<Mutex<Option<oneshot::Sender<Result<Response, wasmtime::Error>>>>>,
1209    },
1210    #[doc(hidden)]
1211    #[cfg(feature = "p3")]
1212    P3 {
1213        guest: &'a p3::bindings::Service,
1214        call: TypedFuncCallConcurrent<
1215            T,
1216            (Resource<p3_types::Request>,),
1217            (Result<Resource<p3_types::Response>, p3_types::ErrorCode>,),
1218        >,
1219        tx: oneshot::Sender<Result<Response, wasmtime::Error>>,
1220        request_io_result: Pin<Box<dyn Future<Output = Result<(), p3_types::ErrorCode>> + Send>>,
1221        view: fn(&mut T) -> p3::WasiHttpCtxView,
1222    },
1223}
1224
1225impl<'a, T: Send> Prepared<'a, T> {
1226    /// Creates a new prepared request.
1227    pub fn new(
1228        mut store: StoreContextMut<'_, T>,
1229        proxy: &'a Proxy,
1230        request: Request,
1231        view: ViewFn<T>,
1232        tx: oneshot::Sender<Result<Response, wasmtime::Error>>,
1233    ) -> Result<Prepared<'a, T>> {
1234        match (proxy, view) {
1235            #[cfg(feature = "p3")]
1236            (Proxy::P3(guest), ViewFn::P3(view)) => {
1237                let (request, body) = request.into_parts();
1238                let body = body.map_err(p3_types::ErrorCode::from);
1239                let request = http::Request::from_parts(request, body);
1240                let (request, request_io_result) = p3::Request::from_http(request);
1241                let request = view(store.data_mut()).table.push(request)?;
1242
1243                Ok(Prepared::P3 {
1244                    tx,
1245                    request_io_result: Box::pin(request_io_result),
1246                    guest,
1247                    view,
1248                    call: guest
1249                        .wasi_http_handler()
1250                        .func_handle()
1251                        .start_call_concurrent(store, (request,))?,
1252                })
1253            }
1254            #[cfg(feature = "p2")]
1255            (Proxy::P2(guest), ViewFn::P2(view)) => {
1256                // Here we wrap the sender in an `Arc<Mutex<Option<_>>>`, with one
1257                // clone used in the `response-outparam` and the other used to send
1258                // an error if the request expires or the handler returns without
1259                // producing a response.
1260                let tx = Arc::new(Mutex::new(Some(tx)));
1261
1262                let request =
1263                    view(store.data_mut()).new_incoming_request(p2_types::Scheme::Http, request)?;
1264
1265                let out = view(store.data_mut()).new_response_outparam_from_callback({
1266                    let tx = tx.clone();
1267                    move |value| {
1268                        if let Some(tx) = tx.lock().unwrap().take() {
1269                            _ = tx.send(
1270                                value
1271                                    .map(|v| {
1272                                        v.map(move |body| {
1273                                            body.map_err(wasmtime::Error::from).boxed_unsync()
1274                                        })
1275                                    })
1276                                    .map_err(wasmtime::Error::from),
1277                            );
1278                        }
1279                    }
1280                })?;
1281
1282                Ok(Prepared::P2 {
1283                    guest,
1284                    tx,
1285                    call: guest
1286                        .wasi_http_incoming_handler()
1287                        .func_handle()
1288                        .start_call_concurrent(store, (request, out))?,
1289                })
1290            }
1291            #[cfg(all(feature = "p2", feature = "p3"))]
1292            _ => unreachable!(),
1293        }
1294    }
1295
1296    fn task(&self) -> GuestTaskId {
1297        match self {
1298            #[cfg(feature = "p3")]
1299            Prepared::P3 { call, .. } => call.task(),
1300            #[cfg(feature = "p2")]
1301            Prepared::P2 { call, .. } => call.task(),
1302        }
1303    }
1304
1305    /// Executes this request to completion.
1306    pub async fn run(
1307        self,
1308        accessor: &Accessor<T>,
1309        expiration: impl Future<Output = ()>,
1310    ) -> Result<bool> {
1311        let expiration = pin!(expiration);
1312
1313        match self {
1314            #[cfg(feature = "p3")]
1315            Prepared::P3 {
1316                guest,
1317                call,
1318                tx,
1319                request_io_result,
1320                view,
1321            } => {
1322                let handle =
1323                    pin!(async move {
1324                        let response = guest
1325                            .wasi_http_handler()
1326                            .func_handle()
1327                            .finish_call_concurrent(accessor, call)
1328                            .await?
1329                            .0?;
1330
1331                        let response = accessor.with(|mut store| {
1332                            let response = view(store.get()).table.delete(response)?;
1333                            Ok::<_, wasmtime::Error>(response.into_http_with_getter(
1334                                &mut store,
1335                                request_io_result,
1336                                view,
1337                            )?)
1338                        })?;
1339
1340                        Ok(response
1341                            .map(move |body| body.map_err(wasmtime::Error::from).boxed_unsync()))
1342                    });
1343
1344                // TODO: We should also use `oneshot::Sender::poll_close` to be
1345                // notified when the receiver is dropped, in which case we should
1346                // expire the request since the response is no longer of interest to
1347                // the original `ProxyHandler::handle` caller.
1348                let (result, sent) = match futures::future::select(handle, expiration).await {
1349                    Either::Left((result, _)) => (result, true),
1350                    // TODO: We should also send a cancel request to the expired
1351                    // task to give it a chance to shut down gracefully, but as of
1352                    // this writing Wasmtime does not yet provide an API for doing
1353                    // that.  See issue #11833.  Instead, we let it continue running
1354                    // as a background task until it either returns a response
1355                    // (which we'll ignore) or the instance itself has expired.
1356                    Either::Right(((), _)) => (Err(ExpirationError.into()), false),
1357                };
1358
1359                _ = tx.send(result);
1360
1361                Ok(sent)
1362            }
1363            #[cfg(feature = "p2")]
1364            Prepared::P2 { guest, call, tx } => {
1365                let handle = pin!(
1366                    guest
1367                        .wasi_http_incoming_handler()
1368                        .func_handle()
1369                        .finish_call_concurrent(accessor, call)
1370                );
1371
1372                const MESSAGE: &str = "guest never invoked `response-outparam::set` method";
1373
1374                struct Dropper(
1375                    Arc<Mutex<Option<oneshot::Sender<Result<Response, wasmtime::Error>>>>>,
1376                );
1377
1378                impl Drop for Dropper {
1379                    fn drop(&mut self) {
1380                        if let Some(tx) = self.0.lock().unwrap().take() {
1381                            _ = tx.send(Err(format_err!("{MESSAGE}")));
1382                        }
1383                    }
1384                }
1385
1386                let tx = Dropper(tx);
1387
1388                // See corresponding TODO comment for the p3 case above.
1389                let (result, sent) = match futures::future::select(handle, expiration).await {
1390                    Either::Left((result, _)) => (result.context(MESSAGE), true),
1391                    // See corresponding TODO comment for the p3 case above.
1392                    Either::Right(((), _)) => (Err(ExpirationError.into()), false),
1393                };
1394
1395                if let Some(tx) = tx.0.lock().unwrap().take() {
1396                    _ = tx.send(result.and_then(|()| Err(format_err!("{MESSAGE}"))));
1397                }
1398
1399                Ok(sent)
1400            }
1401        }
1402    }
1403}