wasmtime_wasi_http/handler.rs
1//! Provides utilities useful for dispatching incoming HTTP requests
2//! `wasi:http/handler` guest instances.
3
4#[cfg(feature = "p2")]
5use crate::p2;
6#[cfg(feature = "p2")]
7use crate::p2::bindings::http::types as p2_types;
8#[cfg(feature = "p3")]
9use crate::p3;
10use bytes::Bytes;
11use futures::{
12 channel::oneshot,
13 future::{Either, FutureExt},
14 stream::{FuturesUnordered, Stream},
15};
16use http_body_util::{BodyExt, combinators::UnsyncBoxBody};
17#[cfg(feature = "p3")]
18use p3::bindings::http::types as p3_types;
19use std::collections::VecDeque;
20use std::collections::btree_map::{BTreeMap, Entry};
21use std::error;
22use std::fmt;
23use std::future;
24use std::mem;
25use std::ops::DerefMut;
26use std::pin::{Pin, pin};
27use std::sync::{
28 Arc, Mutex,
29 atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed},
30};
31use std::task::{Context, Poll};
32use std::time::Instant;
33use tokio::sync::Notify;
34use wasmtime::component::{Accessor, GuestTaskId, Resource, TypedFuncCallConcurrent};
35#[cfg(feature = "p2")]
36use wasmtime::error::Context as _;
37use wasmtime::{AsContextMut, Result, Store, StoreContextMut, format_err};
38
39/// Represents either a `wasi:http/types@0.2.x` or `wasi:http/types@0.3.x` `error-code`.
40pub enum ErrorCode {
41 /// A `wasi:http/types@0.2.x` `error-code`.
42 #[cfg(feature = "p2")]
43 P2(p2_types::ErrorCode),
44 /// A `wasi:http/types@0.3.x` `error-code`.
45 #[cfg(feature = "p3")]
46 P3(p3_types::ErrorCode),
47}
48
49#[cfg(feature = "p2")]
50impl From<p2_types::ErrorCode> for ErrorCode {
51 fn from(code: p2_types::ErrorCode) -> Self {
52 Self::P2(code)
53 }
54}
55
56#[cfg(feature = "p3")]
57impl From<p3_types::ErrorCode> for ErrorCode {
58 fn from(code: p3_types::ErrorCode) -> Self {
59 Self::P3(code)
60 }
61}
62
63#[cfg(feature = "p2")]
64impl From<ErrorCode> for p2_types::ErrorCode {
65 fn from(code: ErrorCode) -> p2_types::ErrorCode {
66 match code {
67 ErrorCode::P2(code) => code,
68 #[cfg(feature = "p3")]
69 ErrorCode::P3(code) => code.into(),
70 }
71 }
72}
73
74#[cfg(feature = "p3")]
75impl From<ErrorCode> for p3_types::ErrorCode {
76 fn from(code: ErrorCode) -> p3_types::ErrorCode {
77 match code {
78 #[cfg(feature = "p2")]
79 ErrorCode::P2(code) => code.into(),
80 ErrorCode::P3(code) => code,
81 }
82 }
83}
84
85#[cfg(all(feature = "p2", feature = "p3"))]
86impl From<p2_types::ErrorCode> for p3_types::ErrorCode {
87 fn from(code: p2_types::ErrorCode) -> Self {
88 match code {
89 p2_types::ErrorCode::DnsTimeout => Self::DnsTimeout,
90 p2_types::ErrorCode::DnsError(payload) => Self::DnsError(p3_types::DnsErrorPayload {
91 rcode: payload.rcode,
92 info_code: payload.info_code,
93 }),
94 p2_types::ErrorCode::DestinationNotFound => Self::DestinationNotFound,
95 p2_types::ErrorCode::DestinationUnavailable => Self::DestinationUnavailable,
96 p2_types::ErrorCode::DestinationIpProhibited => Self::DestinationIpProhibited,
97 p2_types::ErrorCode::DestinationIpUnroutable => Self::DestinationIpUnroutable,
98 p2_types::ErrorCode::ConnectionRefused => Self::ConnectionRefused,
99 p2_types::ErrorCode::ConnectionTerminated => Self::ConnectionTerminated,
100 p2_types::ErrorCode::ConnectionTimeout => Self::ConnectionTimeout,
101 p2_types::ErrorCode::ConnectionReadTimeout => Self::ConnectionReadTimeout,
102 p2_types::ErrorCode::ConnectionWriteTimeout => Self::ConnectionWriteTimeout,
103 p2_types::ErrorCode::ConnectionLimitReached => Self::ConnectionLimitReached,
104 p2_types::ErrorCode::TlsProtocolError => Self::TlsProtocolError,
105 p2_types::ErrorCode::TlsCertificateError => Self::TlsCertificateError,
106 p2_types::ErrorCode::TlsAlertReceived(payload) => {
107 Self::TlsAlertReceived(p3_types::TlsAlertReceivedPayload {
108 alert_id: payload.alert_id,
109 alert_message: payload.alert_message,
110 })
111 }
112 p2_types::ErrorCode::HttpRequestDenied => Self::HttpRequestDenied,
113 p2_types::ErrorCode::HttpRequestLengthRequired => Self::HttpRequestLengthRequired,
114 p2_types::ErrorCode::HttpRequestBodySize(payload) => Self::HttpRequestBodySize(payload),
115 p2_types::ErrorCode::HttpRequestMethodInvalid => Self::HttpRequestMethodInvalid,
116 p2_types::ErrorCode::HttpRequestUriInvalid => Self::HttpRequestUriInvalid,
117 p2_types::ErrorCode::HttpRequestUriTooLong => Self::HttpRequestUriTooLong,
118 p2_types::ErrorCode::HttpRequestHeaderSectionSize(payload) => {
119 Self::HttpRequestHeaderSectionSize(payload)
120 }
121 p2_types::ErrorCode::HttpRequestHeaderSize(payload) => {
122 Self::HttpRequestHeaderSize(payload.map(|payload| p3_types::FieldSizePayload {
123 field_name: payload.field_name,
124 field_size: payload.field_size,
125 }))
126 }
127 p2_types::ErrorCode::HttpRequestTrailerSectionSize(payload) => {
128 Self::HttpRequestTrailerSectionSize(payload)
129 }
130 p2_types::ErrorCode::HttpRequestTrailerSize(payload) => {
131 Self::HttpRequestTrailerSize(p3_types::FieldSizePayload {
132 field_name: payload.field_name,
133 field_size: payload.field_size,
134 })
135 }
136 p2_types::ErrorCode::HttpResponseIncomplete => Self::HttpResponseIncomplete,
137 p2_types::ErrorCode::HttpResponseHeaderSectionSize(payload) => {
138 Self::HttpResponseHeaderSectionSize(payload)
139 }
140 p2_types::ErrorCode::HttpResponseHeaderSize(payload) => {
141 Self::HttpResponseHeaderSize(p3_types::FieldSizePayload {
142 field_name: payload.field_name,
143 field_size: payload.field_size,
144 })
145 }
146 p2_types::ErrorCode::HttpResponseBodySize(payload) => {
147 Self::HttpResponseBodySize(payload)
148 }
149 p2_types::ErrorCode::HttpResponseTrailerSectionSize(payload) => {
150 Self::HttpResponseTrailerSectionSize(payload)
151 }
152 p2_types::ErrorCode::HttpResponseTrailerSize(payload) => {
153 Self::HttpResponseTrailerSize(p3_types::FieldSizePayload {
154 field_name: payload.field_name,
155 field_size: payload.field_size,
156 })
157 }
158 p2_types::ErrorCode::HttpResponseTransferCoding(payload) => {
159 Self::HttpResponseTransferCoding(payload)
160 }
161 p2_types::ErrorCode::HttpResponseContentCoding(payload) => {
162 Self::HttpResponseContentCoding(payload)
163 }
164 p2_types::ErrorCode::HttpResponseTimeout => Self::HttpResponseTimeout,
165 p2_types::ErrorCode::HttpUpgradeFailed => Self::HttpUpgradeFailed,
166 p2_types::ErrorCode::HttpProtocolError => Self::HttpProtocolError,
167 p2_types::ErrorCode::LoopDetected => Self::LoopDetected,
168 p2_types::ErrorCode::ConfigurationError => Self::ConfigurationError,
169 p2_types::ErrorCode::InternalError(payload) => Self::InternalError(payload),
170 }
171 }
172}
173
174#[cfg(all(feature = "p2", feature = "p3"))]
175impl From<p3_types::ErrorCode> for p2_types::ErrorCode {
176 fn from(code: p3_types::ErrorCode) -> Self {
177 match code {
178 p3_types::ErrorCode::DnsTimeout => Self::DnsTimeout,
179 p3_types::ErrorCode::DnsError(payload) => Self::DnsError(p2_types::DnsErrorPayload {
180 rcode: payload.rcode,
181 info_code: payload.info_code,
182 }),
183 p3_types::ErrorCode::DestinationNotFound => Self::DestinationNotFound,
184 p3_types::ErrorCode::DestinationUnavailable => Self::DestinationUnavailable,
185 p3_types::ErrorCode::DestinationIpProhibited => Self::DestinationIpProhibited,
186 p3_types::ErrorCode::DestinationIpUnroutable => Self::DestinationIpUnroutable,
187 p3_types::ErrorCode::ConnectionRefused => Self::ConnectionRefused,
188 p3_types::ErrorCode::ConnectionTerminated => Self::ConnectionTerminated,
189 p3_types::ErrorCode::ConnectionTimeout => Self::ConnectionTimeout,
190 p3_types::ErrorCode::ConnectionReadTimeout => Self::ConnectionReadTimeout,
191 p3_types::ErrorCode::ConnectionWriteTimeout => Self::ConnectionWriteTimeout,
192 p3_types::ErrorCode::ConnectionLimitReached => Self::ConnectionLimitReached,
193 p3_types::ErrorCode::TlsProtocolError => Self::TlsProtocolError,
194 p3_types::ErrorCode::TlsCertificateError => Self::TlsCertificateError,
195 p3_types::ErrorCode::TlsAlertReceived(payload) => {
196 Self::TlsAlertReceived(p2_types::TlsAlertReceivedPayload {
197 alert_id: payload.alert_id,
198 alert_message: payload.alert_message,
199 })
200 }
201 p3_types::ErrorCode::HttpRequestDenied => Self::HttpRequestDenied,
202 p3_types::ErrorCode::HttpRequestLengthRequired => Self::HttpRequestLengthRequired,
203 p3_types::ErrorCode::HttpRequestBodySize(payload) => Self::HttpRequestBodySize(payload),
204 p3_types::ErrorCode::HttpRequestMethodInvalid => Self::HttpRequestMethodInvalid,
205 p3_types::ErrorCode::HttpRequestUriInvalid => Self::HttpRequestUriInvalid,
206 p3_types::ErrorCode::HttpRequestUriTooLong => Self::HttpRequestUriTooLong,
207 p3_types::ErrorCode::HttpRequestHeaderSectionSize(payload) => {
208 Self::HttpRequestHeaderSectionSize(payload)
209 }
210 p3_types::ErrorCode::HttpRequestHeaderSize(payload) => {
211 Self::HttpRequestHeaderSize(payload.map(|payload| p2_types::FieldSizePayload {
212 field_name: payload.field_name,
213 field_size: payload.field_size,
214 }))
215 }
216 p3_types::ErrorCode::HttpRequestTrailerSectionSize(payload) => {
217 Self::HttpRequestTrailerSectionSize(payload)
218 }
219 p3_types::ErrorCode::HttpRequestTrailerSize(payload) => {
220 Self::HttpRequestTrailerSize(p2_types::FieldSizePayload {
221 field_name: payload.field_name,
222 field_size: payload.field_size,
223 })
224 }
225 p3_types::ErrorCode::HttpResponseIncomplete => Self::HttpResponseIncomplete,
226 p3_types::ErrorCode::HttpResponseHeaderSectionSize(payload) => {
227 Self::HttpResponseHeaderSectionSize(payload)
228 }
229 p3_types::ErrorCode::HttpResponseHeaderSize(payload) => {
230 Self::HttpResponseHeaderSize(p2_types::FieldSizePayload {
231 field_name: payload.field_name,
232 field_size: payload.field_size,
233 })
234 }
235 p3_types::ErrorCode::HttpResponseBodySize(payload) => {
236 Self::HttpResponseBodySize(payload)
237 }
238 p3_types::ErrorCode::HttpResponseTrailerSectionSize(payload) => {
239 Self::HttpResponseTrailerSectionSize(payload)
240 }
241 p3_types::ErrorCode::HttpResponseTrailerSize(payload) => {
242 Self::HttpResponseTrailerSize(p2_types::FieldSizePayload {
243 field_name: payload.field_name,
244 field_size: payload.field_size,
245 })
246 }
247 p3_types::ErrorCode::HttpResponseTransferCoding(payload) => {
248 Self::HttpResponseTransferCoding(payload)
249 }
250 p3_types::ErrorCode::HttpResponseContentCoding(payload) => {
251 Self::HttpResponseContentCoding(payload)
252 }
253 p3_types::ErrorCode::HttpResponseTimeout => Self::HttpResponseTimeout,
254 p3_types::ErrorCode::HttpUpgradeFailed => Self::HttpUpgradeFailed,
255 p3_types::ErrorCode::HttpProtocolError => Self::HttpProtocolError,
256 p3_types::ErrorCode::LoopDetected => Self::LoopDetected,
257 p3_types::ErrorCode::ConfigurationError => Self::ConfigurationError,
258 p3_types::ErrorCode::InternalError(payload) => Self::InternalError(payload),
259 }
260 }
261}
262
263/// Represents either a p2 or p3 `WasiHttpCtxView` getter.
264pub enum ViewFn<T> {
265 /// A p2 getter.
266 #[cfg(feature = "p2")]
267 P2(fn(&mut T) -> crate::p2::WasiHttpCtxView),
268 /// A p3 getter.
269 #[cfg(feature = "p3")]
270 P3(fn(&mut T) -> p3::WasiHttpCtxView),
271}
272
273impl<T> Clone for ViewFn<T> {
274 fn clone(&self) -> Self {
275 match self {
276 #[cfg(feature = "p2")]
277 &Self::P2(view) => Self::P2(view),
278 #[cfg(feature = "p3")]
279 &Self::P3(view) => Self::P3(view),
280 }
281 }
282}
283
284impl<T> Copy for ViewFn<T> {}
285
286/// A Request to be handled using `ProxyHandler::handle`.
287pub type Request = http::Request<UnsyncBoxBody<Bytes, ErrorCode>>;
288
289/// A Response returned by `ProxyHandler::handle`.
290pub type Response = http::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
291
292/// Represents either a `wasi:http/incoming-handler@0.2.x` or
293/// `wasi:http/handler@0.3.x` pre-instance.
294pub enum ProxyPre<T: 'static> {
295 /// A `wasi:http/incoming-handler@0.2.x` pre-instance.
296 #[cfg(feature = "p2")]
297 P2(p2::bindings::ProxyPre<T>),
298 /// A `wasi:http/handler@0.3.x` pre-instance.
299 #[cfg(feature = "p3")]
300 P3(p3::bindings::ServicePre<T>),
301}
302
303impl<T: 'static> ProxyPre<T> {
304 /// Instantiates the pre-instance.
305 pub async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
306 where
307 T: Send,
308 {
309 Ok(match self {
310 #[cfg(feature = "p2")]
311 Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
312 #[cfg(feature = "p3")]
313 Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
314 })
315 }
316}
317
318/// Represents either a `wasi:http/incoming-handler@0.2.x` or
319/// `wasi:http/handler@0.3.x` instance.
320pub enum Proxy {
321 /// A `wasi:http/incoming-handler@0.2.x` instance.
322 #[cfg(feature = "p2")]
323 P2(p2::bindings::Proxy),
324 /// A `wasi:http/handler@0.3.x` instance.
325 #[cfg(feature = "p3")]
326 P3(p3::bindings::Service),
327}
328
329/// Async MPMC channel where each item is delivered to at most one consumer.
330struct Queue<T> {
331 queue: Mutex<VecDeque<T>>,
332 notify_push: Notify,
333}
334
335impl<T> Default for Queue<T> {
336 fn default() -> Self {
337 Self {
338 queue: Default::default(),
339 notify_push: Default::default(),
340 }
341 }
342}
343
344impl<T> Queue<T> {
345 fn is_empty(&self) -> bool {
346 self.queue.lock().unwrap().is_empty()
347 }
348
349 fn try_pop(&self) -> Option<T> {
350 self.queue.lock().unwrap().pop_front()
351 }
352
353 async fn pop(&self) -> T {
354 // This code comes from the Unbounded MPMC Channel example in [the
355 // `tokio::sync::Notify`
356 // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
357
358 let mut notified = pin!(self.notify_push.notified());
359
360 loop {
361 notified.as_mut().enable();
362 if let Some(item) = self.try_pop() {
363 return item;
364 }
365 notified.as_mut().await;
366 notified.set(self.notify_push.notified());
367 }
368 }
369}
370
371/// Represents the status of a `ProxyHandler` worker task.
372#[derive(Clone, Copy, Eq, PartialEq, Debug)]
373pub enum WorkerStatus {
374 /// The worker is not handling any requests, nor is it doing any post-return
375 /// work. It _might_ be doing background work which the guest has indicated
376 /// can be interrupted and/or abandoned at any time, i.e. does not prevent
377 /// the instance from being disposed.
378 Idle,
379 /// The instance is handling one or more requests, waiting for each to
380 /// either produce a response or expire.
381 Requests,
382 /// All requests handled so far have either produced a response or expired,
383 /// but the guest has post-return work which needs to finish before the
384 /// instance can be considered idle.
385 PostReturn,
386}
387
388/// Represents the application-specific state of a `ProxyHandler` worker.
389///
390/// [`HandlerState::instantiate`] returns an implementation of this trait for
391/// each component instance (and thus each worker) created. The worker uses it
392/// to determine when to exit.
393pub trait WorkerExpiration: 'static + Send + Sync {
394 /// Poll whether the worker has expired.
395 ///
396 /// This will return `Poll::Ready(())` if the worker has expired, meaning
397 /// the component instance should be dropped. Otherwise, it will return
398 /// `Poll::Pending` and wake the `Waker` if and when it should be polled
399 /// again.
400 ///
401 /// `state` represents the current state of the worker, and `start`
402 /// represents when it transitioned into that state (or in the case of
403 /// `WorkerState::Requests`, when the most recent outstanding request
404 /// was accepted).
405 fn poll(
406 self: Pin<&mut Self>,
407 cx: &mut Context<'_>,
408 state: WorkerStatus,
409 start: Instant,
410 ) -> Poll<()>;
411}
412
413/// Represents the application-specific state of a `ProxyHandler` worker.
414///
415/// [`HandlerState::instantiate`] returns an implementation of this trait for
416/// each component instance (and thus each worker) created. The worker uses it
417/// to determine how many requests to accept, how long to wait for the guest to
418/// produce responses, etc.
419pub trait WorkerState: 'static + Send + Sync {
420 /// The type of the associated data for [`Store`] belonging to this worker.
421 type StoreData: Send;
422
423 /// An opaque unique identifier that hosts can assigned to requests which is
424 /// threaded from [`ProxyHandler::handle`] into
425 /// [`WorkerState::on_request_start`]
426 type RequestId: Send + Sync;
427
428 /// Indicate whether the worker should accept another request given the
429 /// current number it is already handling concurrently and the total it has
430 /// handled so far.
431 fn should_accept_request(&self, concurrent_count: usize, total_count: usize) -> ShouldAccept;
432
433 /// Notification that a request has been accepted by the worker.
434 ///
435 /// This method can be used to record anything within `store`, if necessary.
436 /// The `task` corresponding to the component-model-level async task about
437 /// to be created is additionally passed here.
438 ///
439 /// If the future returned by this function resolves before the guest has
440 /// produced a response, the request will be considered "expired" and the
441 /// original `ProxyHandler::handle` future will resolve to an
442 /// `Err(ExpirationError.into())`. In addition, the worker
443 /// will stop accepting new requests but will continue running until all
444 /// requests that have been accepted by the worker have either produced a
445 /// response or expired, at which point the state of the worker will
446 /// transition to either `WorkerState::PostReturn` or `WorkerState::Idle`.
447 ///
448 /// Note that the returned future is polled from within the
449 /// `Store::run_concurrent` event loop, and due to #11869 and #11870, it may
450 /// not be polled at all for arbitrary lengths of time. Consequently, the
451 /// `Self::Expiration` implementation (which is polled from _outside_ the
452 /// `Store::run_concurrent` event loop) must also enforce request expiration
453 /// as a second level of defence if desired.
454 ///
455 /// For example, if a request timeout of N seconds is to be enforced, the
456 /// `Self::Expiration::poll` implementation, when called with
457 /// `WorkerState::Requests` should calculate the time elapsed since the most
458 /// recent outstanding request was accepted as indicated by the `start`
459 /// parameter. If that time is greater than N seconds, we can expire the
460 /// instance immediately, confident that all outstanding requests have
461 /// expired.
462 ///
463 /// Once #11869 and #11870 have been addressed, this "second level of
464 /// defence" will no longer be necessary.
465 fn on_request_start(
466 &self,
467 store: StoreContextMut<'_, Self::StoreData>,
468 id: Self::RequestId,
469 task: GuestTaskId,
470 ) -> Pin<Box<dyn Future<Output = ()> + 'static + Send + Sync>>;
471
472 /// Dispose of the store belonging to the now-exited worker.
473 ///
474 /// This may be used to e.g. collect metrics from the store or its
475 /// associated data before the store is dropped, as well as e.g. retry
476 /// failed instantiations after the store is dropped.
477 ///
478 /// If the store is being dropped due to an error (e.g. a guest trap or a
479 /// host panic) `result` will be `Err(_)`; otherwise it will be `Ok(())`.
480 fn drop(&self, store: Store<Self::StoreData>, result: Result<(), wasmtime::Error>);
481}
482
483/// Represents the combination of a store and instance with which to handle
484/// requests.
485pub struct Instance<T: 'static, E: WorkerExpiration, S: WorkerState> {
486 /// The store to use to handle requests.
487 pub store: Store<T>,
488 /// The instance to use to handle requests.
489 pub proxy: Proxy,
490 /// `WasiHttpCtxView` getter function.
491 pub view: ViewFn<T>,
492 /// See [`WorkerExpiration`].
493 pub expiration: E,
494 /// See [`WorkerState`].
495 pub state: S,
496}
497
498/// Indicates whether a worker should accept new requests.
499pub enum ShouldAccept {
500 /// Yes, it should.
501 Yes,
502 /// No, it shouldn't (but ask again later).
503 No,
504 /// No, it shouldn't (and don't ask again).
505 Never,
506}
507
508/// Represents the application-specific state of a web server.
509pub trait HandlerState: 'static + Sync + Send + Sized {
510 /// The type of the associated data for [`Store`]s created using
511 /// [`Self::instantiate`].
512 type StoreData: Send;
513 /// The type of the `WorkerExpiration` implementation to be returned from
514 /// [`Self::instantiate`].
515 type WorkerExpiration: WorkerExpiration;
516 /// The type of the `WorkerState` implementation to be returned from
517 /// [`Self::instantiate`].
518 type WorkerState: WorkerState<StoreData = Self::StoreData>;
519
520 /// Create a new store and instance for handling one or more requests.
521 ///
522 /// Note that the implementer is responsible for applying a timeout to the
523 /// guest instantiation if appropriate (e.g. as part of an overall request
524 /// timeout).
525 fn instantiate(
526 &self,
527 ) -> impl Future<
528 Output = Result<Instance<Self::StoreData, Self::WorkerExpiration, Self::WorkerState>>,
529 > + Send;
530}
531
532struct ProxyHandlerInner<S: HandlerState> {
533 state: S,
534 request_queue: Queue<WorkerRequest<S>>,
535 worker_count: AtomicUsize,
536}
537
538/// Tracks request start times.
539///
540/// This is useful for keeping a [`WorkerState`] appraised of the most recently
541/// accepted outstanding request.
542#[derive(Default)]
543struct StartTimes(BTreeMap<Instant, usize>);
544
545impl StartTimes {
546 fn add(&mut self, time: Instant) {
547 *self.0.entry(time).or_insert(0) += 1;
548 }
549
550 fn remove(&mut self, time: Instant) {
551 let Entry::Occupied(mut entry) = self.0.entry(time) else {
552 unreachable!()
553 };
554 match *entry.get() {
555 0 => unreachable!(),
556 1 => {
557 entry.remove();
558 }
559 _ => {
560 *entry.get_mut() -= 1;
561 }
562 }
563 }
564
565 fn most_recent(&self) -> Option<Instant> {
566 self.0.last_key_value().map(|(&k, _)| k)
567 }
568}
569
570type WorkerRequest<S> = (
571 <<S as HandlerState>::WorkerState as WorkerState>::RequestId,
572 Request,
573 oneshot::Sender<Result<Response, wasmtime::Error>>,
574);
575
576struct Worker<S>
577where
578 S: HandlerState,
579{
580 handler: ProxyHandler<S>,
581 available: bool,
582}
583
584impl<S> Worker<S>
585where
586 S: HandlerState,
587{
588 fn set_available(&mut self, available: bool) {
589 if available != self.available {
590 self.available = available;
591 if available {
592 self.handler.0.worker_count.fetch_add(1, Relaxed);
593 } else {
594 // Decrement the count _before_ checking if the request queue is
595 // empty. This helps ensure that `ProxyHandler::spawn` sees the
596 // new value before deciding whether to spawn a new worker.
597 let count = self.handler.0.worker_count.fetch_sub(1, Relaxed);
598 assert!(count >= 1);
599
600 // This addresses what would otherwise be a race condition in
601 // `ProxyHandler::spawn` where it only starts a worker if the
602 // available worker count is zero. If we decrement the count to
603 // zero right after `ProxyHandler::spawn` checks it, then no
604 // worker will be started; thus it becomes our responsibility to
605 // start a worker here instead.
606 if count == 1 && !self.handler.0.request_queue.is_empty() {
607 self.handler.start_worker(None);
608 }
609 }
610 }
611 }
612
613 async fn run(self, request: Option<WorkerRequest<S>>) {
614 match self.handler.0.state.instantiate().await {
615 Ok(Instance {
616 store,
617 proxy,
618 view,
619 expiration,
620 state,
621 }) => {
622 self.run_(store, proxy, view, expiration, state, request)
623 .await
624 }
625
626 Err(error) => {
627 let error = Arc::new(error);
628 if let Some((request_id, request, tx)) = request {
629 _ = tx.send(Err(InstantiationError {
630 request_id,
631 request: Mutex::new(request),
632 error,
633 }
634 .into()));
635 } else {
636 // In this case, the worker was spawned to handle any queued
637 // requests. Since we can't handle those requests, we send
638 // them all an instantiation error.
639 for (request_id, request, tx) in mem::take(
640 self.handler
641 .0
642 .request_queue
643 .queue
644 .lock()
645 .unwrap()
646 .deref_mut(),
647 ) {
648 _ = tx.send(Err(InstantiationError {
649 request_id,
650 request: Mutex::new(request),
651 error: error.clone(),
652 }
653 .into()));
654 }
655 }
656 }
657 }
658 }
659
660 async fn run_(
661 mut self,
662 store: Store<S::StoreData>,
663 proxy: Proxy,
664 view: ViewFn<S::StoreData>,
665 expiration: S::WorkerExpiration,
666 state: S::WorkerState,
667 request: Option<WorkerRequest<S>>,
668 ) {
669 // NB: The code the follows is rather subtle in that it is structured
670 // carefully to give the `HandlerState` implementation full control over
671 // the component instance lifetime. Specifically, we must keep the
672 // `HandlerState` informed of the worker's state and how long it has
673 // been in that state, as well as allow it to expire the instance based
674 // on whatever combination of timeouts, dynamic resource usage, etc. it
675 // may take into consideration.
676 //
677 // Note that, when more than one request is handled concurrently in the
678 // same instance, we must stop accepting new requests as soon as any
679 // existing request reaches its expiration. This serves to cap the
680 // amount of time we need to keep the instance alive before _all_
681 // requests have either completed or expired.
682 //
683 // As of this writing, there's an additional wrinkle that makes tracking
684 // expiration particularly tricky: per #11869 and #11870, busy guest
685 // loops, epoch interruption, and host functions registered using
686 // `Linker::func_{wrap,new}_async` all require blocking, exclusive
687 // access to the `Store`, which effectively prevents the
688 // `StoreContextMut::run_concurrent` event loop from making progress.
689 // That, in turn, prevents any concurrent tasks from executing, and also
690 // prevents the `AsyncFnOnce` passed to `run_concurrent` from being
691 // polled. Consequently, we must poll `S::WorkerState` from _outside_
692 // the `run_concurrent` future to ensure expirations are enforced. Once
693 // the aforementioned issues have been addressed, we'll be able to
694 // simplify the code and eliminate the need for communication between
695 // the "inside" future and the "outside" one.
696
697 // Wrap `store` in an object which, prior to leaving this scope, will
698 // pass the `store` to `HandlerState::drop`.
699 struct Dropper<S: HandlerState> {
700 state: S::WorkerState,
701 store: Option<Store<S::StoreData>>,
702 }
703
704 impl<S: HandlerState> Drop for Dropper<S> {
705 fn drop(&mut self) {
706 if let Some(store) = self.store.take() {
707 self.state
708 .drop(store, Err(wasmtime::format_err!("worker panicked")));
709 }
710 }
711 }
712
713 let mut dropper = Dropper::<S> {
714 state,
715 store: Some(store),
716 };
717
718 let proxy = &proxy;
719
720 let accept_concurrent = AtomicBool::new(true);
721 let status = Mutex::new((WorkerStatus::Idle, Instant::now()));
722 let mut expiration = pin!(expiration);
723
724 let function = async |accessor: &Accessor<_>| {
725 let mut reuse_count = 0;
726 let mut may_accept = true;
727 let mut futures = FuturesUnordered::new();
728 let mut start_times = StartTimes::default();
729
730 let accept_request = |(request_id, request, tx): WorkerRequest<S>,
731 futures: &mut FuturesUnordered<_>,
732 start_times: &mut StartTimes,
733 reuse_count: &mut usize| {
734 // Set `accept_concurrent` to false, conservatively assuming
735 // that the new task will be CPU-bound, at least to begin with.
736 // Only once the `StoreContextMut::run_concurrent` event loop
737 // returns `Pending` will we set `accept_concurrent` back to
738 // true and consider accepting more requests.
739 //
740 // This approach avoids taking on more than one CPU-bound task
741 // at a time, which would hurt throughput vs. leaving the
742 // additional requests for other workers to handle.
743 accept_concurrent.store(false, Relaxed);
744 *reuse_count += 1;
745
746 let prepared = accessor.with(|mut store| {
747 let prepared = Prepared::new(store.as_context_mut(), proxy, request, view, tx);
748 match prepared {
749 Ok(prepared) => {
750 // Notify the `HandlerState` that we're starting to
751 // handle a request and retrieve the deadline by
752 // which it must produce a response.
753 //
754 // If it fails to produce a response by the
755 // deadline, we'll stop accepting new requests and
756 // eventually exit the worker.
757 let expiration = dropper.state.on_request_start(
758 store.as_context_mut(),
759 request_id,
760 prepared.task(),
761 );
762 Ok((prepared, expiration))
763 }
764 Err(e) => Err(e),
765 }
766 });
767
768 let start_time = Instant::now();
769 start_times.add(start_time);
770 *status.try_lock().unwrap() = (WorkerStatus::Requests, start_time);
771
772 futures.push(async move {
773 let (prepared, expiration) = prepared?;
774 let sent = prepared.run(accessor, expiration).await?;
775 wasmtime::error::Ok((sent, start_time))
776 });
777 };
778
779 if let Some(req) = request {
780 accept_request(req, &mut futures, &mut start_times, &mut reuse_count);
781 }
782
783 // This is the main driver loop for this worker. This is modeled as
784 // a `poll_fn` which internally loops around the possible events.
785 // Events are sourced from the locals here, pinned outside of the
786 // `poll_fn` closure.
787 let mut futures = pin!(futures);
788 let handler = self.handler.clone();
789 let mut incoming_requests = pin!(futures::stream::unfold(
790 &handler.0.request_queue,
791 |queue| async move {
792 let pair = queue.pop().await;
793 Some((pair, queue))
794 }
795 ));
796 future::poll_fn(|cx| {
797 loop {
798 // First, and crucially first, poll `futures`. This way
799 // we'll discover any tasks that may have timed out, at
800 // which point we'll stop accepting new tasks altogether
801 // (see below for details). This is especially important in
802 // the case where the task was blocked on a synchronous call
803 // to a host function which has exclusive access to the
804 // `Store`; once that call finishes, the first thing we need
805 // to do is time out the task. If we were to poll for a new
806 // task first, then we'd have to wait for _that_ task to
807 // finish or time out before we could kill the instance.
808 match futures.as_mut().poll_next(cx) {
809 // A request either produced a response or expired.
810 Poll::Ready(Some(Ok((responded, start_time)))) => {
811 // Remove its start time from the map and update the
812 // state.
813 start_times.remove(start_time);
814 *status.try_lock().unwrap() =
815 if let Some(start_time) = start_times.most_recent() {
816 (WorkerStatus::Requests, start_time)
817 } else {
818 (WorkerStatus::PostReturn, Instant::now())
819 };
820
821 if responded {
822 // Response produced; carry on!
823 } else {
824 // Request expired; stop accepting new requests, but
825 // continue polling until any other, in-progress
826 // tasks until they have either finished or expired.
827 // This effectively kicks off a "graceful shutdown"
828 // of the worker, allowing any other concurrent
829 // tasks time to finish before we drop the instance.
830 may_accept = false;
831 }
832 }
833
834 // Instance trapped.
835 Poll::Ready(Some(Err(error))) => {
836 break Poll::Ready(Err(error));
837 }
838
839 Poll::Ready(None) | Poll::Pending => {}
840 }
841
842 // At this point `futures` is either empty or it's `Pending`
843 // meaning nothing is ready. Note that `Pending` here
844 // doesn't necessarily mean all tasks are blocked on I/O.
845 // They might simply be waiting for some deferred work to be
846 // done by the next turn of the
847 // `StoreContextMut::run_concurrent` event loop. Therefore,
848 // we check `accept_concurrent` here and only advertise we
849 // have capacity for another task if either we have no tasks
850 // at all or all our tasks really are blocked on I/O.
851 self.set_available(
852 may_accept
853 && match dropper
854 .state
855 .should_accept_request(futures.len(), reuse_count)
856 {
857 ShouldAccept::Yes => {
858 futures.is_empty() || accept_concurrent.load(Relaxed)
859 }
860 ShouldAccept::No => false,
861 ShouldAccept::Never => {
862 may_accept = false;
863 false
864 }
865 },
866 );
867
868 // If we're available for accepting more requests after the
869 // deduction above, then try to accept a new task. If that's
870 // successful then push it into `futures` and turn this loop
871 // again to see where we're at next time around.
872 if self.available
873 && let Poll::Ready(Some(req)) = incoming_requests.as_mut().poll_next(cx)
874 {
875 accept_request(req, &mut futures, &mut start_times, &mut reuse_count);
876 continue;
877 }
878
879 // If, at this point, we still have some requests that are
880 // being processed then go ahead and bail out of this
881 // singular call to `poll` by saying we're not ready yet.
882 // This means we unconditionally wait for events within
883 // `futures` and we're also registered, optionally, for
884 // listening for incoming connections. That's all the events
885 // we're interested in, so this iteration of `poll` is complete.
886 if !futures.is_empty() {
887 break Poll::Pending;
888 }
889
890 // At this point `futures` is empty, and we haven't gotten
891 // any incoming tasks. Check the store we're using to see if
892 // there are any "interesting" tasks around. These are tasks
893 // which act as effectively strong references to this worker
894 // to keep it running. If there are still interesting tasks,
895 // then we're done with this iteration of `poll`. We'll get
896 // woken up when anything changes, but otherwise it's time
897 // to let something else happen.
898 if accessor.poll_no_interesting_tasks(cx).is_pending() {
899 break Poll::Pending;
900 }
901
902 // And now at this point we (a) have no `futures`, (b) no
903 // new requests are available, and (c) the store is
904 // completely devoid of interesting work. In this situation
905 // if we're not actually capable of accepting any more work,
906 // then we're completely done and it's time to exit this
907 // worker.
908 if !may_accept {
909 break Poll::Ready(Ok(()));
910 }
911
912 // Finally, at this point we're idle but still eligible to
913 // accept new work, so update the state if appropriate and
914 // then return pending while we wait for new work.
915 {
916 let mut status = status.try_lock().unwrap();
917 if status.0 != WorkerStatus::Idle {
918 *status = (WorkerStatus::Idle, Instant::now());
919 }
920 }
921 break Poll::Pending;
922 }
923 })
924 .await
925 };
926
927 let result = {
928 let mut future = pin!(
929 dropper
930 .store
931 .as_mut()
932 .unwrap()
933 .run_concurrent(function)
934 .map(|v| v.flatten())
935 );
936
937 future::poll_fn(|cx| {
938 let poll = future.as_mut().poll(cx);
939 if poll.is_pending() {
940 // If the future returns `Pending`, that's either because it's
941 // idle (in which case it can definitely accept a new request) or
942 // because all its tasks are awaiting I/O, in which case it may
943 // have capacity for additional tasks to run concurrently.
944 //
945 // However, per #11869 and #11870, if one of the tasks is
946 // blocked on a sync call to a host function which has exclusive
947 // access to the `Store`, the `StoreContextMut::run_concurrent`
948 // event loop will be unable to make progress until that call
949 // finishes. Similarly, if the task loops indefinitely, subject
950 // only to epoch interruption, the event loop will also be
951 // stuck. Either way, any request expirations created inside
952 // the `AsyncFnOnce` we passed to `run_concurrent` won't have a
953 // chance to trigger. Consequently, we poll for instance
954 // expiration here, outside the event loop, based on the most
955 // recently recorded state of the worker.
956
957 let (status, start) = *status.try_lock().unwrap();
958
959 if let Poll::Ready(()) = expiration.as_mut().poll(cx, status, start) {
960 return Poll::Ready(match status {
961 WorkerStatus::Requests | WorkerStatus::PostReturn => {
962 Err(format_err!("guest timed out"))
963 }
964 WorkerStatus::Idle => Ok(()),
965 });
966 }
967
968 // Otherwise, if the instance has not yet expired, we set
969 // `accept_concurrent` to true and, if it wasn't already true
970 // before, poll the future one more time so it can ask for
971 // another request if appropriate.
972 if !accept_concurrent.swap(true, Relaxed) {
973 return future.as_mut().poll(cx);
974 }
975 }
976
977 poll
978 })
979 .await
980 };
981
982 dropper.state.drop(dropper.store.take().unwrap(), result);
983 }
984}
985
986impl<S> Drop for Worker<S>
987where
988 S: HandlerState,
989{
990 fn drop(&mut self) {
991 self.set_available(false);
992 }
993}
994
995/// Represents the state of a web server.
996///
997/// Note that this supports optional instance reuse, enabled when
998/// `S::WorkerState::should_accept_request` returns [`ShouldAccept::Yes`] more
999/// than once for a given instance. See [`WorkerState`] for details.
1000pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
1001
1002impl<S: HandlerState> Clone for ProxyHandler<S> {
1003 fn clone(&self) -> Self {
1004 Self(self.0.clone())
1005 }
1006}
1007
1008/// This error is returned if, when handling the request, a new worker and
1009/// associated instance needed to be created, but instantiation failed, e.g. due
1010/// to reaching a pooling allocator limit or running out of memory. In this
1011/// case, the caller may be able to recover and retry (e.g. after waiting for
1012/// existing instances to be dropped and/or freeing memory used by caches,
1013/// etc.). Otherwise, it will probably need to return an HTTP 500 error.
1014pub struct InstantiationError<T> {
1015 /// The ID of the request which was originally configured,
1016 pub request_id: T,
1017 /// The original request passed to `ProxyHandler::handle`.
1018 ///
1019 /// This is wrapped in a `Mutex` to satisfy the `Send + Sync` bounds
1020 /// required by `wasmtime::Error`.
1021 pub request: Mutex<Request>,
1022 /// The original instantiation error.
1023 ///
1024 /// This is wrapped in an `Arc` because a single instantiation error may
1025 /// affect multiple requests, and each caller will be given a clone.
1026 pub error: Arc<wasmtime::Error>,
1027}
1028
1029impl<T> fmt::Display for InstantiationError<T> {
1030 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1031 write!(f, "instantiation error: {}", self.error)
1032 }
1033}
1034
1035impl<T> fmt::Debug for InstantiationError<T> {
1036 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1037 write!(f, "instantiation error: {:?}", self.error)
1038 }
1039}
1040
1041impl<T> error::Error for InstantiationError<T> {}
1042
1043/// Returned when the guest failed to produce a response before the expiration
1044/// returned by `HandlerState::on_request_start` elapsed.
1045pub struct ExpirationError;
1046
1047impl fmt::Display for ExpirationError {
1048 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1049 fmt::Debug::fmt(self, f)
1050 }
1051}
1052
1053impl fmt::Debug for ExpirationError {
1054 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1055 write!(f, "guest timed out")
1056 }
1057}
1058
1059impl error::Error for ExpirationError {}
1060
1061/// A worker trapped or panicked and failed to produce a result.
1062pub struct TrapOrPanicError;
1063
1064impl fmt::Display for TrapOrPanicError {
1065 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1066 fmt::Debug::fmt(self, f)
1067 }
1068}
1069
1070impl fmt::Debug for TrapOrPanicError {
1071 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1072 write!(f, "worker trapped or panicked")
1073 }
1074}
1075
1076impl error::Error for TrapOrPanicError {}
1077
1078impl<S> ProxyHandler<S>
1079where
1080 S: HandlerState,
1081{
1082 /// Create a new `ProxyHandler` with the specified application state and
1083 /// pre-instance.
1084 pub fn new(state: S) -> Self {
1085 Self(Arc::new(ProxyHandlerInner {
1086 state,
1087 request_queue: Default::default(),
1088 worker_count: AtomicUsize::from(0),
1089 }))
1090 }
1091
1092 /// Handle the specified request, returning a response on success or the
1093 /// tuple of the request and error on failure.
1094 ///
1095 /// This function will return a `wasmtime::Error` on failure, which may be
1096 /// downcast to a more specific type in certain scenarios:
1097 ///
1098 /// - [`InstantiationError`] if a new worker was created to handle the
1099 /// request but could not instantiate the guest component.
1100 ///
1101 /// - [`ExpirationError`] if the request expired before it produced a
1102 /// response. See [`WorkerState::on_request_start`] for details.
1103 ///
1104 /// - [`TrapOrPanicError`] if the worker responsible for handling the
1105 /// request trapped or panicked before it produced a response. This may be
1106 /// used when a trap occurs but cannot be traced to a specific request,
1107 /// e.g. during concurrent request handling.
1108 ///
1109 /// In other failure cases (e.g. `wasi:http/types#error-code` return values
1110 /// and/or traps when executing synchronous WASIp2 handler functions), the
1111 /// original error returned by the handler will be returned.
1112 ///
1113 /// # Backpressure
1114 ///
1115 /// Note that this API does not implement any form of backpressure to limit
1116 /// the number of in-flight `Request`s being processed. This function
1117 /// may spawn new tokio tasks, instantiate new modules under new stores, and
1118 /// queue up pending `Request`s while waiting for previous instances. In all
1119 /// of these situations invoking this function will consume some host-side
1120 /// resources until the request is done.
1121 ///
1122 /// Embedders using this API must ensure to take this into account. If an
1123 /// infinite number of requests can be fed into this function then it's
1124 /// recommended to take a semaphore, for example, around this function call
1125 /// to limit the number of concurrent requests that are being processed.
1126 pub async fn handle(
1127 &self,
1128 id: <S::WorkerState as WorkerState>::RequestId,
1129 request: Request,
1130 ) -> Result<Response, wasmtime::Error> {
1131 let (tx, rx) = oneshot::channel();
1132 let req = (id, request, tx);
1133 if self.0.worker_count.load(Relaxed) == 0 {
1134 // There are no available workers; skip the queue and pass
1135 // the request directly to the worker, which improves
1136 // performance as measured by `wasmtime-server-rps.sh` by
1137 // about 15%.
1138 self.start_worker(Some(req));
1139 } else {
1140 let mut queue = self.0.request_queue.queue.lock().unwrap();
1141 queue.push_back(req);
1142
1143 // Start a new worker to handle the request if the last worker just
1144 // went unavailable. See also `Worker::set_available` for what
1145 // happens if the available worker count goes to zero right after we
1146 // check it here, and note that we only check the count _after_
1147 // we've pushed the request to the queue.
1148 //
1149 // The upshot is that at least one (or more) of the
1150 // following will happen:
1151 //
1152 // - An existing worker will accept the request
1153 // - We'll start a new worker here to accept the request
1154 // - `Worker::set_available` will start a new worker to accept the request
1155 //
1156 // I.e. it should not be possible for the request to be orphaned
1157 // indefinitely in the queue without being accepted except in the
1158 // case of a panic or an instantiation error. In the case of an
1159 // instantiation error, we'll give the request back to the caller in
1160 // an `Err(_)`, allowing the application to decide what to do next.
1161 if self.0.worker_count.load(Relaxed) == 0 {
1162 let req = queue.pop_back().unwrap();
1163 drop(queue);
1164 self.start_worker(Some(req));
1165 } else {
1166 drop(queue);
1167 self.0.request_queue.notify_push.notify_one();
1168 }
1169 }
1170
1171 rx.await.map_err(|_| TrapOrPanicError)?
1172 }
1173
1174 /// Return a reference to the application state.
1175 pub fn state(&self) -> &S {
1176 &self.0.state
1177 }
1178
1179 fn start_worker(&self, request: Option<WorkerRequest<S>>) {
1180 tokio::spawn(
1181 Worker {
1182 handler: self.clone(),
1183 available: false,
1184 }
1185 .run(request),
1186 );
1187 }
1188}
1189
1190/// Representation of a "prepared" call for a guest, used to extract the
1191/// `GuestTaskId` before actually executing any handlers.
1192///
1193/// Right now this is a bit gross since it has to type out a bunch of types by
1194/// hand.
1195pub enum Prepared<'a, T: 'static> {
1196 #[doc(hidden)]
1197 #[cfg(feature = "p2")]
1198 P2 {
1199 guest: &'a p2::bindings::Proxy,
1200 call: TypedFuncCallConcurrent<
1201 T,
1202 (
1203 Resource<p2_types::IncomingRequest>,
1204 Resource<p2_types::ResponseOutparam>,
1205 ),
1206 (),
1207 >,
1208 tx: Arc<Mutex<Option<oneshot::Sender<Result<Response, wasmtime::Error>>>>>,
1209 },
1210 #[doc(hidden)]
1211 #[cfg(feature = "p3")]
1212 P3 {
1213 guest: &'a p3::bindings::Service,
1214 call: TypedFuncCallConcurrent<
1215 T,
1216 (Resource<p3_types::Request>,),
1217 (Result<Resource<p3_types::Response>, p3_types::ErrorCode>,),
1218 >,
1219 tx: oneshot::Sender<Result<Response, wasmtime::Error>>,
1220 request_io_result: Pin<Box<dyn Future<Output = Result<(), p3_types::ErrorCode>> + Send>>,
1221 view: fn(&mut T) -> p3::WasiHttpCtxView,
1222 },
1223}
1224
1225impl<'a, T: Send> Prepared<'a, T> {
1226 /// Creates a new prepared request.
1227 pub fn new(
1228 mut store: StoreContextMut<'_, T>,
1229 proxy: &'a Proxy,
1230 request: Request,
1231 view: ViewFn<T>,
1232 tx: oneshot::Sender<Result<Response, wasmtime::Error>>,
1233 ) -> Result<Prepared<'a, T>> {
1234 match (proxy, view) {
1235 #[cfg(feature = "p3")]
1236 (Proxy::P3(guest), ViewFn::P3(view)) => {
1237 let (request, body) = request.into_parts();
1238 let body = body.map_err(p3_types::ErrorCode::from);
1239 let request = http::Request::from_parts(request, body);
1240 let (request, request_io_result) = p3::Request::from_http(request);
1241 let request = view(store.data_mut()).table.push(request)?;
1242
1243 Ok(Prepared::P3 {
1244 tx,
1245 request_io_result: Box::pin(request_io_result),
1246 guest,
1247 view,
1248 call: guest
1249 .wasi_http_handler()
1250 .func_handle()
1251 .start_call_concurrent(store, (request,))?,
1252 })
1253 }
1254 #[cfg(feature = "p2")]
1255 (Proxy::P2(guest), ViewFn::P2(view)) => {
1256 // Here we wrap the sender in an `Arc<Mutex<Option<_>>>`, with one
1257 // clone used in the `response-outparam` and the other used to send
1258 // an error if the request expires or the handler returns without
1259 // producing a response.
1260 let tx = Arc::new(Mutex::new(Some(tx)));
1261
1262 let request =
1263 view(store.data_mut()).new_incoming_request(p2_types::Scheme::Http, request)?;
1264
1265 let out = view(store.data_mut()).new_response_outparam_from_callback({
1266 let tx = tx.clone();
1267 move |value| {
1268 if let Some(tx) = tx.lock().unwrap().take() {
1269 _ = tx.send(
1270 value
1271 .map(|v| {
1272 v.map(move |body| {
1273 body.map_err(wasmtime::Error::from).boxed_unsync()
1274 })
1275 })
1276 .map_err(wasmtime::Error::from),
1277 );
1278 }
1279 }
1280 })?;
1281
1282 Ok(Prepared::P2 {
1283 guest,
1284 tx,
1285 call: guest
1286 .wasi_http_incoming_handler()
1287 .func_handle()
1288 .start_call_concurrent(store, (request, out))?,
1289 })
1290 }
1291 #[cfg(all(feature = "p2", feature = "p3"))]
1292 _ => unreachable!(),
1293 }
1294 }
1295
1296 fn task(&self) -> GuestTaskId {
1297 match self {
1298 #[cfg(feature = "p3")]
1299 Prepared::P3 { call, .. } => call.task(),
1300 #[cfg(feature = "p2")]
1301 Prepared::P2 { call, .. } => call.task(),
1302 }
1303 }
1304
1305 /// Executes this request to completion.
1306 pub async fn run(
1307 self,
1308 accessor: &Accessor<T>,
1309 expiration: impl Future<Output = ()>,
1310 ) -> Result<bool> {
1311 let expiration = pin!(expiration);
1312
1313 match self {
1314 #[cfg(feature = "p3")]
1315 Prepared::P3 {
1316 guest,
1317 call,
1318 tx,
1319 request_io_result,
1320 view,
1321 } => {
1322 let handle =
1323 pin!(async move {
1324 let response = guest
1325 .wasi_http_handler()
1326 .func_handle()
1327 .finish_call_concurrent(accessor, call)
1328 .await?
1329 .0?;
1330
1331 let response = accessor.with(|mut store| {
1332 let response = view(store.get()).table.delete(response)?;
1333 Ok::<_, wasmtime::Error>(response.into_http_with_getter(
1334 &mut store,
1335 request_io_result,
1336 view,
1337 )?)
1338 })?;
1339
1340 Ok(response
1341 .map(move |body| body.map_err(wasmtime::Error::from).boxed_unsync()))
1342 });
1343
1344 // TODO: We should also use `oneshot::Sender::poll_close` to be
1345 // notified when the receiver is dropped, in which case we should
1346 // expire the request since the response is no longer of interest to
1347 // the original `ProxyHandler::handle` caller.
1348 let (result, sent) = match futures::future::select(handle, expiration).await {
1349 Either::Left((result, _)) => (result, true),
1350 // TODO: We should also send a cancel request to the expired
1351 // task to give it a chance to shut down gracefully, but as of
1352 // this writing Wasmtime does not yet provide an API for doing
1353 // that. See issue #11833. Instead, we let it continue running
1354 // as a background task until it either returns a response
1355 // (which we'll ignore) or the instance itself has expired.
1356 Either::Right(((), _)) => (Err(ExpirationError.into()), false),
1357 };
1358
1359 _ = tx.send(result);
1360
1361 Ok(sent)
1362 }
1363 #[cfg(feature = "p2")]
1364 Prepared::P2 { guest, call, tx } => {
1365 let handle = pin!(
1366 guest
1367 .wasi_http_incoming_handler()
1368 .func_handle()
1369 .finish_call_concurrent(accessor, call)
1370 );
1371
1372 const MESSAGE: &str = "guest never invoked `response-outparam::set` method";
1373
1374 struct Dropper(
1375 Arc<Mutex<Option<oneshot::Sender<Result<Response, wasmtime::Error>>>>>,
1376 );
1377
1378 impl Drop for Dropper {
1379 fn drop(&mut self) {
1380 if let Some(tx) = self.0.lock().unwrap().take() {
1381 _ = tx.send(Err(format_err!("{MESSAGE}")));
1382 }
1383 }
1384 }
1385
1386 let tx = Dropper(tx);
1387
1388 // See corresponding TODO comment for the p3 case above.
1389 let (result, sent) = match futures::future::select(handle, expiration).await {
1390 Either::Left((result, _)) => (result.context(MESSAGE), true),
1391 // See corresponding TODO comment for the p3 case above.
1392 Either::Right(((), _)) => (Err(ExpirationError.into()), false),
1393 };
1394
1395 if let Some(tx) = tx.0.lock().unwrap().take() {
1396 _ = tx.send(result.and_then(|()| Err(format_err!("{MESSAGE}"))));
1397 }
1398
1399 Ok(sent)
1400 }
1401 }
1402 }
1403}