wasmtime_wasi_http/
handler.rs

1//! Provides utilities useful for dispatching incoming HTTP requests
2//! `wasi:http/handler` guest instances.
3
4#[cfg(feature = "p3")]
5use crate::p3;
6use anyhow::{Result, anyhow};
7use futures::stream::{FuturesUnordered, StreamExt};
8use std::collections::VecDeque;
9use std::collections::btree_map::{BTreeMap, Entry};
10use std::future;
11use std::pin::{Pin, pin};
12use std::sync::{
13    Arc, Mutex,
14    atomic::{
15        AtomicBool, AtomicU64, AtomicUsize,
16        Ordering::{Relaxed, SeqCst},
17    },
18};
19use std::task::Poll;
20use std::time::{Duration, Instant};
21use tokio::sync::Notify;
22use wasmtime::AsContextMut;
23use wasmtime::component::Accessor;
24use wasmtime::{Store, StoreContextMut};
25
26/// Alternative p2 bindings generated with `exports: { default: async | store }`
27/// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances.
28pub mod p2 {
29    #[expect(missing_docs, reason = "bindgen-generated code")]
30    pub mod bindings {
31        wasmtime::component::bindgen!({
32            path: "wit",
33            world: "wasi:http/proxy",
34            imports: { default: tracing },
35            exports: { default: async | store },
36            require_store_data_send: true,
37            with: {
38                // http is in this crate
39                "wasi:http": crate::bindings::http,
40                // Upstream package dependencies
41                "wasi:io": wasmtime_wasi::p2::bindings::io,
42            }
43        });
44
45        pub use wasi::*;
46    }
47}
48
49/// Represents either a `wasi:http/incoming-handler@0.2.x` or
50/// `wasi:http/handler@0.3.x` pre-instance.
51pub enum ProxyPre<T: 'static> {
52    /// A `wasi:http/incoming-handler@0.2.x` pre-instance.
53    P2(p2::bindings::ProxyPre<T>),
54    /// A `wasi:http/handler@0.3.x` pre-instance.
55    #[cfg(feature = "p3")]
56    P3(p3::bindings::ProxyPre<T>),
57}
58
59impl<T: 'static> ProxyPre<T> {
60    async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
61    where
62        T: Send,
63    {
64        Ok(match self {
65            Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
66            #[cfg(feature = "p3")]
67            Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
68        })
69    }
70}
71
72/// Represents either a `wasi:http/incoming-handler@0.2.x` or
73/// `wasi:http/handler@0.3.x` instance.
74pub enum Proxy {
75    /// A `wasi:http/incoming-handler@0.2.x` instance.
76    P2(p2::bindings::Proxy),
77    /// A `wasi:http/handler@0.3.x` instance.
78    #[cfg(feature = "p3")]
79    P3(p3::bindings::Proxy),
80}
81
82/// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or
83/// `wasi:http/handler@0.3.x` instance.
84pub type TaskFn<T> = Box<
85    dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
86        + Send,
87>;
88
89/// Async MPMC channel where each item is delivered to at most one consumer.
90struct Queue<T> {
91    queue: Mutex<VecDeque<T>>,
92    notify: Notify,
93}
94
95impl<T> Default for Queue<T> {
96    fn default() -> Self {
97        Self {
98            queue: Default::default(),
99            notify: Default::default(),
100        }
101    }
102}
103
104impl<T> Queue<T> {
105    fn is_empty(&self) -> bool {
106        self.queue.lock().unwrap().is_empty()
107    }
108
109    fn push(&self, item: T) {
110        self.queue.lock().unwrap().push_back(item);
111        self.notify.notify_one();
112    }
113
114    fn try_pop(&self) -> Option<T> {
115        self.queue.lock().unwrap().pop_front()
116    }
117
118    async fn pop(&self) -> T {
119        // This code comes from the Unbound MPMC Channel example in [the
120        // `tokio::sync::Notify`
121        // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
122
123        let mut notified = pin!(self.notify.notified());
124
125        loop {
126            notified.as_mut().enable();
127            if let Some(item) = self.try_pop() {
128                return item;
129            }
130            notified.as_mut().await;
131            notified.set(self.notify.notified());
132        }
133    }
134}
135
136/// Bundles a [`Store`] with a callback to write a profile (if configured).
137pub struct StoreBundle<T: 'static> {
138    /// The [`Store`] to use to handle requests.
139    pub store: Store<T>,
140    /// Callback to write a profile (if enabled) once all requests have been
141    /// handled.
142    pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>,
143}
144
145/// Represents the application-specific state of a web server.
146pub trait HandlerState: 'static + Sync + Send {
147    /// The type of the associated data for [`Store`]s created using
148    /// [`new_store`].
149    type StoreData: Send;
150
151    /// Create a new [`Store`] for handling one or more requests.
152    ///
153    /// The `req_id` parameter is the value passed in the call to
154    /// [`ProxyHandler::spawn`] that created the worker to which the new `Store`
155    /// will belong.  See that function's documentation for details.
156    fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>;
157
158    /// Maximum time allowed to handle a request.
159    ///
160    /// In practice, a guest may be allowed to run up to 2x this time in the
161    /// case of instance reuse to avoid penalizing concurrent requests being
162    /// handled by the same instance.
163    fn request_timeout(&self) -> Duration;
164
165    /// Maximum time to keep an idle instance around before dropping it.
166    fn idle_instance_timeout(&self) -> Duration;
167
168    /// Maximum number of requests to handle using a single instance before
169    /// dropping it.
170    fn max_instance_reuse_count(&self) -> usize;
171
172    /// Maximum number of requests to handle concurrently using a single
173    /// instance.
174    fn max_instance_concurrent_reuse_count(&self) -> usize;
175
176    /// Called when a worker exits with an error.
177    fn handle_worker_error(&self, error: anyhow::Error);
178}
179
180struct ProxyHandlerInner<S: HandlerState> {
181    state: S,
182    instance_pre: ProxyPre<S::StoreData>,
183    next_id: AtomicU64,
184    task_queue: Queue<TaskFn<S::StoreData>>,
185    worker_count: AtomicUsize,
186}
187
188/// Helper utility to track the start times of tasks accepted by a worker.
189///
190/// This is used to ensure that timeouts are enforced even when the
191/// `StoreContextMut::run_concurrent` event loop is unable to make progress due
192/// to the guest either busy looping or being blocked on a synchronous call to a
193/// host function which has exclusive access to the `Store`.
194#[derive(Default)]
195struct StartTimes(BTreeMap<Instant, usize>);
196
197impl StartTimes {
198    fn add(&mut self, time: Instant) {
199        *self.0.entry(time).or_insert(0) += 1;
200    }
201
202    fn remove(&mut self, time: Instant) {
203        let Entry::Occupied(mut entry) = self.0.entry(time) else {
204            unreachable!()
205        };
206        match *entry.get() {
207            0 => unreachable!(),
208            1 => {
209                entry.remove();
210            }
211            _ => {
212                *entry.get_mut() -= 1;
213            }
214        }
215    }
216
217    fn earliest(&self) -> Option<Instant> {
218        self.0.first_key_value().map(|(&k, _)| k)
219    }
220}
221
222struct Worker<S>
223where
224    S: HandlerState,
225{
226    handler: ProxyHandler<S>,
227    available: bool,
228}
229
230impl<S> Worker<S>
231where
232    S: HandlerState,
233{
234    fn set_available(&mut self, available: bool) {
235        if available != self.available {
236            self.available = available;
237            if available {
238                self.handler.0.worker_count.fetch_add(1, Relaxed);
239            } else {
240                // Here we use `SeqCst` to ensure the load/store is ordered
241                // correctly with respect to the `Queue::is_empty` check we do
242                // below.
243                let count = self.handler.0.worker_count.fetch_sub(1, SeqCst);
244                // This addresses what would otherwise be a race condition in
245                // `ProxyHandler::spawn` where it only starts a worker if the
246                // available worker count is zero.  If we decrement the count to
247                // zero right after `ProxyHandler::spawn` checks it, then no
248                // worker will be started; thus it becomes our responsibility to
249                // start a worker here instead.
250                if count == 1 && !self.handler.0.task_queue.is_empty() {
251                    self.handler.start_worker(None, None);
252                }
253            }
254        }
255    }
256
257    async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
258        if let Err(error) = self.run_(task, req_id).await {
259            self.handler.0.state.handle_worker_error(error);
260        }
261    }
262
263    async fn run_(
264        &mut self,
265        task: Option<TaskFn<S::StoreData>>,
266        req_id: Option<u64>,
267    ) -> Result<()> {
268        // NB: The code the follows is rather subtle in that it is structured
269        // carefully to provide a few key invariants related to how instance
270        // reuse and request timeouts interact:
271        //
272        // - A task must never be allowed to run for more than 2x the request
273        // timeout, if any.
274        //
275        // - Every task we accept here must be allowed to run for at least 1x
276        // the request timeout, if any.
277        //
278        // - When more than one task is run concurrently in the same instance,
279        // we must stop accepting new tasks as soon as any existing task reaches
280        // the request timeout.  This serves to cap the amount of time we need
281        // to keep the instance alive before _all_ tasks have either completed
282        // or timed out.
283        //
284        // As of this writing, there's an additional wrinkle that makes
285        // guaranteeing those invariants particularly tricky: per #11869 and
286        // #11870, busy guest loops, epoch interruption, and host functions
287        // registered using `Linker::func_{wrap,new}_async` all require
288        // blocking, exclusive access to the `Store`, which effectively prevents
289        // the `StoreContextMut::run_concurrent` event loop from making
290        // progress.  That, in turn, prevents any concurrent tasks from
291        // executing, and also prevents the `AsyncFnOnce` passed to
292        // `run_concurrent` from being polled.  Consequently, we must rely on a
293        // "second line of defense" to ensure tasks are timed out promptly,
294        // which is to check for timeouts _outside_ the `run_concurrent` future.
295        // Once the aforementioned issues have been addressed, we'll be able to
296        // remove that check and its associated baggage.
297
298        let handler = &self.handler.0;
299
300        let StoreBundle {
301            mut store,
302            write_profile,
303        } = handler.state.new_store(req_id)?;
304
305        let request_timeout = handler.state.request_timeout();
306        let idle_instance_timeout = handler.state.idle_instance_timeout();
307        let max_instance_reuse_count = handler.state.max_instance_reuse_count();
308        let max_instance_concurrent_reuse_count =
309            handler.state.max_instance_concurrent_reuse_count();
310
311        let proxy = &handler.instance_pre.instantiate_async(&mut store).await?;
312        let accept_concurrent = AtomicBool::new(true);
313        let task_start_times = Mutex::new(StartTimes::default());
314
315        let mut future = pin!(store.run_concurrent(async |accessor| {
316            let mut reuse_count = 0;
317            let mut timed_out = false;
318            let mut futures = FuturesUnordered::new();
319
320            let accept_task = |task: TaskFn<S::StoreData>,
321                               futures: &mut FuturesUnordered<_>,
322                               reuse_count: &mut usize| {
323                // Set `accept_concurrent` to false, conservatively assuming
324                // that the new task will be CPU-bound, at least to begin with.
325                // Only once the `StoreContextMut::run_concurrent` event loop
326                // returns `Pending` will we set `accept_concurrent` back to
327                // true and consider accepting more tasks.
328                //
329                // This approach avoids taking on more than one CPU-bound task
330                // at a time, which would hurt throughput vs. leaving the
331                // additional tasks for other workers to handle.
332                accept_concurrent.store(false, Relaxed);
333                *reuse_count += 1;
334
335                let start_time = Instant::now().checked_add(request_timeout);
336                if let Some(start_time) = start_time {
337                    task_start_times.lock().unwrap().add(start_time);
338                }
339
340                futures.push(tokio::time::timeout(request_timeout, async move {
341                    (task)(accessor, proxy).await;
342                    start_time
343                }));
344            };
345
346            if let Some(task) = task {
347                accept_task(task, &mut futures, &mut reuse_count);
348            }
349
350            let handler = self.handler.clone();
351            while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) {
352                let new_task = {
353                    let future_count = futures.len();
354                    let mut next_future = pin!(async {
355                        if futures.is_empty() {
356                            future::pending().await
357                        } else {
358                            futures.next().await.unwrap()
359                        }
360                    });
361                    let mut next_task = pin!(tokio::time::timeout(
362                        if future_count == 0 {
363                            idle_instance_timeout
364                        } else {
365                            Duration::MAX
366                        },
367                        handler.0.task_queue.pop()
368                    ));
369                    // Poll any existing tasks, and if they're all `Pending`
370                    // _and_ we haven't reached any reuse limits yet, poll for a
371                    // new task from the queue.
372                    //
373                    // Note the the order of operations here is important.  By
374                    // polling `next_future` first, we'll disover any tasks that
375                    // may have timed out, at which point we'll stop accepting
376                    // new tasks altogether (see below for details).  This is
377                    // especially imporant in the case where the task was
378                    // blocked on a synchronous call to a host function which
379                    // has exclusive access to the `Store`; once that call
380                    // finishes, the first think we need to do is time out the
381                    // task.  If we were to poll for a new task first, then we'd
382                    // have to wait for _that_ task to finish or time out before
383                    // we could kill the instance.
384                    future::poll_fn(|cx| match next_future.as_mut().poll(cx) {
385                        Poll::Pending => {
386                            // Note that `Pending` here doesn't necessarily mean
387                            // all tasks are blocked on I/O.  They might simply
388                            // be waiting for some deferred work to be done by
389                            // the next turn of the
390                            // `StoreContextMut::run_concurrent` event loop.
391                            // Therefore, we check `accept_concurrent` here and
392                            // only advertise we have capacity for another task
393                            // if either we have no tasks at all or all our
394                            // tasks really are blocked on I/O.
395                            self.set_available(
396                                reuse_count < max_instance_reuse_count
397                                    && future_count < max_instance_concurrent_reuse_count
398                                    && (future_count == 0 || accept_concurrent.load(Relaxed)),
399                            );
400
401                            if self.available {
402                                next_task.as_mut().poll(cx).map(Some)
403                            } else {
404                                Poll::Pending
405                            }
406                        }
407                        Poll::Ready(Ok(start_time)) => {
408                            // Task completed; carry on!
409                            if let Some(start_time) = start_time {
410                                task_start_times.lock().unwrap().remove(start_time);
411                            }
412                            Poll::Ready(None)
413                        }
414                        Poll::Ready(Err(_)) => {
415                            // Task timed out; stop accepting new tasks, but
416                            // continue polling until any other, in-progress
417                            // tasks until they have either finished or timed
418                            // out.  This effectively kicks off a "graceful
419                            // shutdown" of the worker, allowing any other
420                            // concurrent tasks time to finish before we drop
421                            // the instance.
422                            //
423                            // TODO: We should also send a cancel request to the
424                            // timed-out task to give it a chance to shut down
425                            // gracefully (and delay dropping the instance for a
426                            // reasonable amount of time), but as of this
427                            // writing Wasmtime does not yet provide an API for
428                            // doing that.  See issue #11833.
429                            timed_out = true;
430                            reuse_count = max_instance_reuse_count;
431                            Poll::Ready(None)
432                        }
433                    })
434                    .await
435                };
436
437                match new_task {
438                    Some(Ok(task)) => {
439                        accept_task(task, &mut futures, &mut reuse_count);
440                    }
441                    Some(Err(_)) => break,
442                    None => {}
443                }
444            }
445
446            accessor.with(|mut access| write_profile(access.as_context_mut()));
447
448            if timed_out {
449                Err(anyhow!("guest timed out"))
450            } else {
451                anyhow::Ok(())
452            }
453        }));
454
455        let mut sleep = pin!(tokio::time::sleep(Duration::MAX));
456
457        future::poll_fn(|cx| {
458            let poll = future.as_mut().poll(cx);
459            if poll.is_pending() {
460                // If the future returns `Pending`, that's either because it's
461                // idle (in which case it can definitely accept a new task) or
462                // because all its tasks are awaiting I/O, in which case it may
463                // have capacity for additional tasks to run concurrently.
464                //
465                // However, if one of the tasks is blocked on a sync call to a
466                // host function which has exclusive access to the `Store`, the
467                // `StoreContextMut::run_concurrent` event loop will be unable
468                // to make progress until that call finishes.  Similarly, if the
469                // task loops indefinitely, subject only to epoch interruption,
470                // the event loop will also be stuck.  Either way, any task
471                // timeouts created inside the `AsyncFnOnce` we passed to
472                // `run_concurrent` won't have a chance to trigger.
473                // Consequently, we need to _also_ enforce timeouts here,
474                // outside the event loop.
475                //
476                // Therefore, we check if the oldest outstanding task has been
477                // running for at least `request_timeout*2`, which is the
478                // maximum time needed for any other concurrent tasks to
479                // complete or time out, at which point we can safely discard
480                // the instance.  If that deadline has not yet arrived, we
481                // schedule a wakeup to occur when it does.
482                //
483                // We uphold the "never kill an instance with a task which has
484                // been running for less than the request timeout" invariant
485                // here by noting that this timeout will only trigger if the
486                // `AsyncFnOnce` we passed to `run_concurrent` has been unable
487                // to run for at least the past `request_timeout` amount of
488                // time, meaning it can't possibly have accepted a task newer
489                // than that.
490                if let Some(deadline) = task_start_times
491                    .lock()
492                    .unwrap()
493                    .earliest()
494                    .and_then(|v| v.checked_add(request_timeout.saturating_mul(2)))
495                {
496                    sleep.as_mut().reset(deadline.into());
497                    // Note that this will schedule a wakeup for later if the
498                    // deadline has not yet arrived:
499                    if sleep.as_mut().poll(cx).is_ready() {
500                        // Deadline has been reached; kill the instance with an
501                        // error.
502                        return Poll::Ready(Err(anyhow!("guest timed out")));
503                    }
504                }
505
506                // Otherwise, if no timeouts have elapsed, we set
507                // `accept_concurrent` to true and, if it wasn't already true
508                // before, poll the future one more time so it can ask for
509                // another task if appropriate.
510                if !accept_concurrent.swap(true, Relaxed) {
511                    return future.as_mut().poll(cx);
512                }
513            }
514
515            poll
516        })
517        .await?
518    }
519}
520
521impl<S> Drop for Worker<S>
522where
523    S: HandlerState,
524{
525    fn drop(&mut self) {
526        self.set_available(false);
527    }
528}
529
530/// Represents the state of a web server.
531///
532/// Note that this supports optional instance reuse, enabled when
533/// `S::max_instance_reuse_count()` returns a number greater than one.  See
534/// [`Self::push`] for details.
535pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
536
537impl<S: HandlerState> Clone for ProxyHandler<S> {
538    fn clone(&self) -> Self {
539        Self(self.0.clone())
540    }
541}
542
543impl<S> ProxyHandler<S>
544where
545    S: HandlerState,
546{
547    /// Create a new `ProxyHandler` with the specified application state and
548    /// pre-instance.
549    pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self {
550        Self(Arc::new(ProxyHandlerInner {
551            state,
552            instance_pre,
553            next_id: AtomicU64::from(0),
554            task_queue: Default::default(),
555            worker_count: AtomicUsize::from(0),
556        }))
557    }
558
559    /// Push a task to the task queue for this handler.
560    ///
561    /// This will either spawn a new background worker to run the task or
562    /// deliver it to an already-running worker.
563    ///
564    /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a
565    /// new worker is started for this task.  It is intended to be used as a
566    /// "request identifier" corresponding to that task and can be used e.g. to
567    /// prefix all logging from the `Store` with that identifier.  Note that a
568    /// non-`None` value only makes sense when `<S as
569    /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier
570    /// will not match subsequent tasks handled by the worker.
571    pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) {
572        match self.0.state.max_instance_reuse_count() {
573            0 => panic!("`max_instance_reuse_count` must be at least 1"),
574            _ => {
575                if self.0.worker_count.load(Relaxed) == 0 {
576                    // There are no available workers; skip the queue and pass
577                    // the task directly to the worker, which improves
578                    // performance as measured by `wasmtime-server-rps.sh` by
579                    // about 15%.
580                    self.start_worker(Some(task), req_id);
581                } else {
582                    self.0.task_queue.push(task);
583                    // Start a new worker to handle the task if the last worker
584                    // just went unavailable.  See also `Worker::set_available`
585                    // for what happens if the available worker count goes to
586                    // zero right after we check it here, and note that we only
587                    // check the count _after_ we've pushed the task to the
588                    // queue.  We use `SeqCst` here to ensure that we get an
589                    // updated view of `worker_count` as it exists after the
590                    // `Queue::push` above.
591                    //
592                    // The upshot is that at least one (or more) of the
593                    // following will happen:
594                    //
595                    // - An existing worker will accept the task
596                    // - We'll start a new worker here to accept the task
597                    // - `Worker::set_available` will start a new worker to accept the task
598                    //
599                    // I.e. it should not be possible for the task to be
600                    // orphaned indefinitely in the queue without being
601                    // accepted.
602                    if self.0.worker_count.load(SeqCst) == 0 {
603                        self.start_worker(None, None);
604                    }
605                }
606            }
607        }
608    }
609
610    /// Generate a unique request ID.
611    pub fn next_req_id(&self) -> u64 {
612        self.0.next_id.fetch_add(1, Relaxed)
613    }
614
615    /// Return a reference to the application state.
616    pub fn state(&self) -> &S {
617        &self.0.state
618    }
619
620    /// Return a reference to the pre-instance.
621    pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> {
622        &self.0.instance_pre
623    }
624
625    fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
626        tokio::spawn(
627            Worker {
628                handler: self.clone(),
629                available: false,
630            }
631            .run(task, req_id),
632        );
633    }
634}