wasmtime_wasi_http/
handler.rs

1//! Provides utilities useful for dispatching incoming HTTP requests
2//! `wasi:http/handler` guest instances.
3
4#[cfg(feature = "p3")]
5use crate::p3;
6use futures::stream::{FuturesUnordered, StreamExt};
7use std::collections::VecDeque;
8use std::collections::btree_map::{BTreeMap, Entry};
9use std::future;
10use std::pin::{Pin, pin};
11use std::sync::{
12    Arc, Mutex,
13    atomic::{
14        AtomicBool, AtomicU64, AtomicUsize,
15        Ordering::{Relaxed, SeqCst},
16    },
17};
18use std::task::Poll;
19use std::time::{Duration, Instant};
20use tokio::sync::Notify;
21use wasmtime::AsContextMut;
22use wasmtime::component::Accessor;
23use wasmtime::{Result, Store, StoreContextMut, format_err};
24
25/// Alternative p2 bindings generated with `exports: { default: async | store }`
26/// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances.
27pub mod p2 {
28    #[expect(missing_docs, reason = "bindgen-generated code")]
29    pub mod bindings {
30        wasmtime::component::bindgen!({
31            path: "wit",
32            world: "wasi:http/proxy",
33            imports: { default: tracing },
34            exports: { default: async | store },
35            require_store_data_send: true,
36            with: {
37                // http is in this crate
38                "wasi:http": crate::bindings::http,
39                // Upstream package dependencies
40                "wasi:io": wasmtime_wasi::p2::bindings::io,
41            }
42        });
43
44        pub use wasi::*;
45    }
46}
47
48/// Represents either a `wasi:http/incoming-handler@0.2.x` or
49/// `wasi:http/handler@0.3.x` pre-instance.
50pub enum ProxyPre<T: 'static> {
51    /// A `wasi:http/incoming-handler@0.2.x` pre-instance.
52    P2(p2::bindings::ProxyPre<T>),
53    /// A `wasi:http/handler@0.3.x` pre-instance.
54    #[cfg(feature = "p3")]
55    P3(p3::bindings::ServicePre<T>),
56}
57
58impl<T: 'static> ProxyPre<T> {
59    async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
60    where
61        T: Send,
62    {
63        Ok(match self {
64            Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
65            #[cfg(feature = "p3")]
66            Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
67        })
68    }
69}
70
71/// Represents either a `wasi:http/incoming-handler@0.2.x` or
72/// `wasi:http/handler@0.3.x` instance.
73pub enum Proxy {
74    /// A `wasi:http/incoming-handler@0.2.x` instance.
75    P2(p2::bindings::Proxy),
76    /// A `wasi:http/handler@0.3.x` instance.
77    #[cfg(feature = "p3")]
78    P3(p3::bindings::Service),
79}
80
81/// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or
82/// `wasi:http/handler@0.3.x` instance.
83pub type TaskFn<T> = Box<
84    dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
85        + Send,
86>;
87
88/// Async MPMC channel where each item is delivered to at most one consumer.
89struct Queue<T> {
90    queue: Mutex<VecDeque<T>>,
91    notify: Notify,
92}
93
94impl<T> Default for Queue<T> {
95    fn default() -> Self {
96        Self {
97            queue: Default::default(),
98            notify: Default::default(),
99        }
100    }
101}
102
103impl<T> Queue<T> {
104    fn is_empty(&self) -> bool {
105        self.queue.lock().unwrap().is_empty()
106    }
107
108    fn push(&self, item: T) {
109        self.queue.lock().unwrap().push_back(item);
110        self.notify.notify_one();
111    }
112
113    fn try_pop(&self) -> Option<T> {
114        self.queue.lock().unwrap().pop_front()
115    }
116
117    async fn pop(&self) -> T {
118        // This code comes from the Unbound MPMC Channel example in [the
119        // `tokio::sync::Notify`
120        // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
121
122        let mut notified = pin!(self.notify.notified());
123
124        loop {
125            notified.as_mut().enable();
126            if let Some(item) = self.try_pop() {
127                return item;
128            }
129            notified.as_mut().await;
130            notified.set(self.notify.notified());
131        }
132    }
133}
134
135/// Bundles a [`Store`] with a callback to write a profile (if configured).
136pub struct StoreBundle<T: 'static> {
137    /// The [`Store`] to use to handle requests.
138    pub store: Store<T>,
139    /// Callback to write a profile (if enabled) once all requests have been
140    /// handled.
141    pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>,
142}
143
144/// Represents the application-specific state of a web server.
145pub trait HandlerState: 'static + Sync + Send {
146    /// The type of the associated data for [`Store`]s created using
147    /// [`new_store`].
148    type StoreData: Send;
149
150    /// Create a new [`Store`] for handling one or more requests.
151    ///
152    /// The `req_id` parameter is the value passed in the call to
153    /// [`ProxyHandler::spawn`] that created the worker to which the new `Store`
154    /// will belong.  See that function's documentation for details.
155    fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>;
156
157    /// Maximum time allowed to handle a request.
158    ///
159    /// In practice, a guest may be allowed to run up to 2x this time in the
160    /// case of instance reuse to avoid penalizing concurrent requests being
161    /// handled by the same instance.
162    fn request_timeout(&self) -> Duration;
163
164    /// Maximum time to keep an idle instance around before dropping it.
165    fn idle_instance_timeout(&self) -> Duration;
166
167    /// Maximum number of requests to handle using a single instance before
168    /// dropping it.
169    fn max_instance_reuse_count(&self) -> usize;
170
171    /// Maximum number of requests to handle concurrently using a single
172    /// instance.
173    fn max_instance_concurrent_reuse_count(&self) -> usize;
174
175    /// Called when a worker exits with an error.
176    fn handle_worker_error(&self, error: wasmtime::Error);
177}
178
179struct ProxyHandlerInner<S: HandlerState> {
180    state: S,
181    instance_pre: ProxyPre<S::StoreData>,
182    next_id: AtomicU64,
183    task_queue: Queue<TaskFn<S::StoreData>>,
184    worker_count: AtomicUsize,
185}
186
187/// Helper utility to track the start times of tasks accepted by a worker.
188///
189/// This is used to ensure that timeouts are enforced even when the
190/// `StoreContextMut::run_concurrent` event loop is unable to make progress due
191/// to the guest either busy looping or being blocked on a synchronous call to a
192/// host function which has exclusive access to the `Store`.
193#[derive(Default)]
194struct StartTimes(BTreeMap<Instant, usize>);
195
196impl StartTimes {
197    fn add(&mut self, time: Instant) {
198        *self.0.entry(time).or_insert(0) += 1;
199    }
200
201    fn remove(&mut self, time: Instant) {
202        let Entry::Occupied(mut entry) = self.0.entry(time) else {
203            unreachable!()
204        };
205        match *entry.get() {
206            0 => unreachable!(),
207            1 => {
208                entry.remove();
209            }
210            _ => {
211                *entry.get_mut() -= 1;
212            }
213        }
214    }
215
216    fn earliest(&self) -> Option<Instant> {
217        self.0.first_key_value().map(|(&k, _)| k)
218    }
219}
220
221struct Worker<S>
222where
223    S: HandlerState,
224{
225    handler: ProxyHandler<S>,
226    available: bool,
227}
228
229impl<S> Worker<S>
230where
231    S: HandlerState,
232{
233    fn set_available(&mut self, available: bool) {
234        if available != self.available {
235            self.available = available;
236            if available {
237                self.handler.0.worker_count.fetch_add(1, Relaxed);
238            } else {
239                // Here we use `SeqCst` to ensure the load/store is ordered
240                // correctly with respect to the `Queue::is_empty` check we do
241                // below.
242                let count = self.handler.0.worker_count.fetch_sub(1, SeqCst);
243                // This addresses what would otherwise be a race condition in
244                // `ProxyHandler::spawn` where it only starts a worker if the
245                // available worker count is zero.  If we decrement the count to
246                // zero right after `ProxyHandler::spawn` checks it, then no
247                // worker will be started; thus it becomes our responsibility to
248                // start a worker here instead.
249                if count == 1 && !self.handler.0.task_queue.is_empty() {
250                    self.handler.start_worker(None, None);
251                }
252            }
253        }
254    }
255
256    async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
257        if let Err(error) = self.run_(task, req_id).await {
258            self.handler.0.state.handle_worker_error(error);
259        }
260    }
261
262    async fn run_(
263        &mut self,
264        task: Option<TaskFn<S::StoreData>>,
265        req_id: Option<u64>,
266    ) -> Result<()> {
267        // NB: The code the follows is rather subtle in that it is structured
268        // carefully to provide a few key invariants related to how instance
269        // reuse and request timeouts interact:
270        //
271        // - A task must never be allowed to run for more than 2x the request
272        // timeout, if any.
273        //
274        // - Every task we accept here must be allowed to run for at least 1x
275        // the request timeout, if any.
276        //
277        // - When more than one task is run concurrently in the same instance,
278        // we must stop accepting new tasks as soon as any existing task reaches
279        // the request timeout.  This serves to cap the amount of time we need
280        // to keep the instance alive before _all_ tasks have either completed
281        // or timed out.
282        //
283        // As of this writing, there's an additional wrinkle that makes
284        // guaranteeing those invariants particularly tricky: per #11869 and
285        // #11870, busy guest loops, epoch interruption, and host functions
286        // registered using `Linker::func_{wrap,new}_async` all require
287        // blocking, exclusive access to the `Store`, which effectively prevents
288        // the `StoreContextMut::run_concurrent` event loop from making
289        // progress.  That, in turn, prevents any concurrent tasks from
290        // executing, and also prevents the `AsyncFnOnce` passed to
291        // `run_concurrent` from being polled.  Consequently, we must rely on a
292        // "second line of defense" to ensure tasks are timed out promptly,
293        // which is to check for timeouts _outside_ the `run_concurrent` future.
294        // Once the aforementioned issues have been addressed, we'll be able to
295        // remove that check and its associated baggage.
296
297        let handler = &self.handler.0;
298
299        let StoreBundle {
300            mut store,
301            write_profile,
302        } = handler.state.new_store(req_id)?;
303
304        let request_timeout = handler.state.request_timeout();
305        let idle_instance_timeout = handler.state.idle_instance_timeout();
306        let max_instance_reuse_count = handler.state.max_instance_reuse_count();
307        let max_instance_concurrent_reuse_count =
308            handler.state.max_instance_concurrent_reuse_count();
309
310        let proxy = &handler.instance_pre.instantiate_async(&mut store).await?;
311        let accept_concurrent = AtomicBool::new(true);
312        let task_start_times = Mutex::new(StartTimes::default());
313
314        let mut future = pin!(store.run_concurrent(async |accessor| {
315            let mut reuse_count = 0;
316            let mut timed_out = false;
317            let mut futures = FuturesUnordered::new();
318
319            let accept_task = |task: TaskFn<S::StoreData>,
320                               futures: &mut FuturesUnordered<_>,
321                               reuse_count: &mut usize| {
322                // Set `accept_concurrent` to false, conservatively assuming
323                // that the new task will be CPU-bound, at least to begin with.
324                // Only once the `StoreContextMut::run_concurrent` event loop
325                // returns `Pending` will we set `accept_concurrent` back to
326                // true and consider accepting more tasks.
327                //
328                // This approach avoids taking on more than one CPU-bound task
329                // at a time, which would hurt throughput vs. leaving the
330                // additional tasks for other workers to handle.
331                accept_concurrent.store(false, Relaxed);
332                *reuse_count += 1;
333
334                let start_time = Instant::now().checked_add(request_timeout);
335                if let Some(start_time) = start_time {
336                    task_start_times.lock().unwrap().add(start_time);
337                }
338
339                futures.push(tokio::time::timeout(request_timeout, async move {
340                    (task)(accessor, proxy).await;
341                    start_time
342                }));
343            };
344
345            if let Some(task) = task {
346                accept_task(task, &mut futures, &mut reuse_count);
347            }
348
349            let handler = self.handler.clone();
350            while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) {
351                let new_task = {
352                    let future_count = futures.len();
353                    let mut next_future = pin!(async {
354                        if futures.is_empty() {
355                            future::pending().await
356                        } else {
357                            futures.next().await.unwrap()
358                        }
359                    });
360                    let mut next_task = pin!(tokio::time::timeout(
361                        if future_count == 0 {
362                            idle_instance_timeout
363                        } else {
364                            Duration::MAX
365                        },
366                        handler.0.task_queue.pop()
367                    ));
368                    // Poll any existing tasks, and if they're all `Pending`
369                    // _and_ we haven't reached any reuse limits yet, poll for a
370                    // new task from the queue.
371                    //
372                    // Note the the order of operations here is important.  By
373                    // polling `next_future` first, we'll disover any tasks that
374                    // may have timed out, at which point we'll stop accepting
375                    // new tasks altogether (see below for details).  This is
376                    // especially imporant in the case where the task was
377                    // blocked on a synchronous call to a host function which
378                    // has exclusive access to the `Store`; once that call
379                    // finishes, the first think we need to do is time out the
380                    // task.  If we were to poll for a new task first, then we'd
381                    // have to wait for _that_ task to finish or time out before
382                    // we could kill the instance.
383                    future::poll_fn(|cx| match next_future.as_mut().poll(cx) {
384                        Poll::Pending => {
385                            // Note that `Pending` here doesn't necessarily mean
386                            // all tasks are blocked on I/O.  They might simply
387                            // be waiting for some deferred work to be done by
388                            // the next turn of the
389                            // `StoreContextMut::run_concurrent` event loop.
390                            // Therefore, we check `accept_concurrent` here and
391                            // only advertise we have capacity for another task
392                            // if either we have no tasks at all or all our
393                            // tasks really are blocked on I/O.
394                            self.set_available(
395                                reuse_count < max_instance_reuse_count
396                                    && future_count < max_instance_concurrent_reuse_count
397                                    && (future_count == 0 || accept_concurrent.load(Relaxed)),
398                            );
399
400                            if self.available {
401                                next_task.as_mut().poll(cx).map(Some)
402                            } else {
403                                Poll::Pending
404                            }
405                        }
406                        Poll::Ready(Ok(start_time)) => {
407                            // Task completed; carry on!
408                            if let Some(start_time) = start_time {
409                                task_start_times.lock().unwrap().remove(start_time);
410                            }
411                            Poll::Ready(None)
412                        }
413                        Poll::Ready(Err(_)) => {
414                            // Task timed out; stop accepting new tasks, but
415                            // continue polling until any other, in-progress
416                            // tasks until they have either finished or timed
417                            // out.  This effectively kicks off a "graceful
418                            // shutdown" of the worker, allowing any other
419                            // concurrent tasks time to finish before we drop
420                            // the instance.
421                            //
422                            // TODO: We should also send a cancel request to the
423                            // timed-out task to give it a chance to shut down
424                            // gracefully (and delay dropping the instance for a
425                            // reasonable amount of time), but as of this
426                            // writing Wasmtime does not yet provide an API for
427                            // doing that.  See issue #11833.
428                            timed_out = true;
429                            reuse_count = max_instance_reuse_count;
430                            Poll::Ready(None)
431                        }
432                    })
433                    .await
434                };
435
436                match new_task {
437                    Some(Ok(task)) => {
438                        accept_task(task, &mut futures, &mut reuse_count);
439                    }
440                    Some(Err(_)) => break,
441                    None => {}
442                }
443            }
444
445            accessor.with(|mut access| write_profile(access.as_context_mut()));
446
447            if timed_out {
448                Err(format_err!("guest timed out"))
449            } else {
450                wasmtime::error::Ok(())
451            }
452        }));
453
454        let mut sleep = pin!(tokio::time::sleep(Duration::MAX));
455
456        future::poll_fn(|cx| {
457            let poll = future.as_mut().poll(cx);
458            if poll.is_pending() {
459                // If the future returns `Pending`, that's either because it's
460                // idle (in which case it can definitely accept a new task) or
461                // because all its tasks are awaiting I/O, in which case it may
462                // have capacity for additional tasks to run concurrently.
463                //
464                // However, if one of the tasks is blocked on a sync call to a
465                // host function which has exclusive access to the `Store`, the
466                // `StoreContextMut::run_concurrent` event loop will be unable
467                // to make progress until that call finishes.  Similarly, if the
468                // task loops indefinitely, subject only to epoch interruption,
469                // the event loop will also be stuck.  Either way, any task
470                // timeouts created inside the `AsyncFnOnce` we passed to
471                // `run_concurrent` won't have a chance to trigger.
472                // Consequently, we need to _also_ enforce timeouts here,
473                // outside the event loop.
474                //
475                // Therefore, we check if the oldest outstanding task has been
476                // running for at least `request_timeout*2`, which is the
477                // maximum time needed for any other concurrent tasks to
478                // complete or time out, at which point we can safely discard
479                // the instance.  If that deadline has not yet arrived, we
480                // schedule a wakeup to occur when it does.
481                //
482                // We uphold the "never kill an instance with a task which has
483                // been running for less than the request timeout" invariant
484                // here by noting that this timeout will only trigger if the
485                // `AsyncFnOnce` we passed to `run_concurrent` has been unable
486                // to run for at least the past `request_timeout` amount of
487                // time, meaning it can't possibly have accepted a task newer
488                // than that.
489                if let Some(deadline) = task_start_times
490                    .lock()
491                    .unwrap()
492                    .earliest()
493                    .and_then(|v| v.checked_add(request_timeout.saturating_mul(2)))
494                {
495                    sleep.as_mut().reset(deadline.into());
496                    // Note that this will schedule a wakeup for later if the
497                    // deadline has not yet arrived:
498                    if sleep.as_mut().poll(cx).is_ready() {
499                        // Deadline has been reached; kill the instance with an
500                        // error.
501                        return Poll::Ready(Err(format_err!("guest timed out")));
502                    }
503                }
504
505                // Otherwise, if no timeouts have elapsed, we set
506                // `accept_concurrent` to true and, if it wasn't already true
507                // before, poll the future one more time so it can ask for
508                // another task if appropriate.
509                if !accept_concurrent.swap(true, Relaxed) {
510                    return future.as_mut().poll(cx);
511                }
512            }
513
514            poll
515        })
516        .await?
517    }
518}
519
520impl<S> Drop for Worker<S>
521where
522    S: HandlerState,
523{
524    fn drop(&mut self) {
525        self.set_available(false);
526    }
527}
528
529/// Represents the state of a web server.
530///
531/// Note that this supports optional instance reuse, enabled when
532/// `S::max_instance_reuse_count()` returns a number greater than one.  See
533/// [`Self::push`] for details.
534pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
535
536impl<S: HandlerState> Clone for ProxyHandler<S> {
537    fn clone(&self) -> Self {
538        Self(self.0.clone())
539    }
540}
541
542impl<S> ProxyHandler<S>
543where
544    S: HandlerState,
545{
546    /// Create a new `ProxyHandler` with the specified application state and
547    /// pre-instance.
548    pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self {
549        Self(Arc::new(ProxyHandlerInner {
550            state,
551            instance_pre,
552            next_id: AtomicU64::from(0),
553            task_queue: Default::default(),
554            worker_count: AtomicUsize::from(0),
555        }))
556    }
557
558    /// Push a task to the task queue for this handler.
559    ///
560    /// This will either spawn a new background worker to run the task or
561    /// deliver it to an already-running worker.
562    ///
563    /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a
564    /// new worker is started for this task.  It is intended to be used as a
565    /// "request identifier" corresponding to that task and can be used e.g. to
566    /// prefix all logging from the `Store` with that identifier.  Note that a
567    /// non-`None` value only makes sense when `<S as
568    /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier
569    /// will not match subsequent tasks handled by the worker.
570    pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) {
571        match self.0.state.max_instance_reuse_count() {
572            0 => panic!("`max_instance_reuse_count` must be at least 1"),
573            _ => {
574                if self.0.worker_count.load(Relaxed) == 0 {
575                    // There are no available workers; skip the queue and pass
576                    // the task directly to the worker, which improves
577                    // performance as measured by `wasmtime-server-rps.sh` by
578                    // about 15%.
579                    self.start_worker(Some(task), req_id);
580                } else {
581                    self.0.task_queue.push(task);
582                    // Start a new worker to handle the task if the last worker
583                    // just went unavailable.  See also `Worker::set_available`
584                    // for what happens if the available worker count goes to
585                    // zero right after we check it here, and note that we only
586                    // check the count _after_ we've pushed the task to the
587                    // queue.  We use `SeqCst` here to ensure that we get an
588                    // updated view of `worker_count` as it exists after the
589                    // `Queue::push` above.
590                    //
591                    // The upshot is that at least one (or more) of the
592                    // following will happen:
593                    //
594                    // - An existing worker will accept the task
595                    // - We'll start a new worker here to accept the task
596                    // - `Worker::set_available` will start a new worker to accept the task
597                    //
598                    // I.e. it should not be possible for the task to be
599                    // orphaned indefinitely in the queue without being
600                    // accepted.
601                    if self.0.worker_count.load(SeqCst) == 0 {
602                        self.start_worker(None, None);
603                    }
604                }
605            }
606        }
607    }
608
609    /// Generate a unique request ID.
610    pub fn next_req_id(&self) -> u64 {
611        self.0.next_id.fetch_add(1, Relaxed)
612    }
613
614    /// Return a reference to the application state.
615    pub fn state(&self) -> &S {
616        &self.0.state
617    }
618
619    /// Return a reference to the pre-instance.
620    pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> {
621        &self.0.instance_pre
622    }
623
624    fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
625        tokio::spawn(
626            Worker {
627                handler: self.clone(),
628                available: false,
629            }
630            .run(task, req_id),
631        );
632    }
633}