wasmtime_wasi_http/handler.rs
1//! Provides utilities useful for dispatching incoming HTTP requests
2//! `wasi:http/handler` guest instances.
3
4#[cfg(feature = "p3")]
5use crate::p3;
6use anyhow::{Result, anyhow};
7use futures::stream::{FuturesUnordered, StreamExt};
8use std::collections::VecDeque;
9use std::collections::btree_map::{BTreeMap, Entry};
10use std::future;
11use std::pin::{Pin, pin};
12use std::sync::{
13 Arc, Mutex,
14 atomic::{
15 AtomicBool, AtomicU64, AtomicUsize,
16 Ordering::{Relaxed, SeqCst},
17 },
18};
19use std::task::Poll;
20use std::time::{Duration, Instant};
21use tokio::sync::Notify;
22use wasmtime::AsContextMut;
23use wasmtime::component::Accessor;
24use wasmtime::{Store, StoreContextMut};
25
26/// Alternative p2 bindings generated with `exports: { default: async | store }`
27/// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances.
28pub mod p2 {
29 #[expect(missing_docs, reason = "bindgen-generated code")]
30 pub mod bindings {
31 wasmtime::component::bindgen!({
32 path: "wit",
33 world: "wasi:http/proxy",
34 imports: { default: tracing },
35 exports: { default: async | store },
36 require_store_data_send: true,
37 with: {
38 // http is in this crate
39 "wasi:http": crate::bindings::http,
40 // Upstream package dependencies
41 "wasi:io": wasmtime_wasi::p2::bindings::io,
42 }
43 });
44
45 pub use wasi::*;
46 }
47}
48
49/// Represents either a `wasi:http/incoming-handler@0.2.x` or
50/// `wasi:http/handler@0.3.x` pre-instance.
51pub enum ProxyPre<T: 'static> {
52 /// A `wasi:http/incoming-handler@0.2.x` pre-instance.
53 P2(p2::bindings::ProxyPre<T>),
54 /// A `wasi:http/handler@0.3.x` pre-instance.
55 #[cfg(feature = "p3")]
56 P3(p3::bindings::ProxyPre<T>),
57}
58
59impl<T: 'static> ProxyPre<T> {
60 async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
61 where
62 T: Send,
63 {
64 Ok(match self {
65 Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
66 #[cfg(feature = "p3")]
67 Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
68 })
69 }
70}
71
72/// Represents either a `wasi:http/incoming-handler@0.2.x` or
73/// `wasi:http/handler@0.3.x` instance.
74pub enum Proxy {
75 /// A `wasi:http/incoming-handler@0.2.x` instance.
76 P2(p2::bindings::Proxy),
77 /// A `wasi:http/handler@0.3.x` instance.
78 #[cfg(feature = "p3")]
79 P3(p3::bindings::Proxy),
80}
81
82/// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or
83/// `wasi:http/handler@0.3.x` instance.
84pub type TaskFn<T> = Box<
85 dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
86 + Send,
87>;
88
89/// Async MPMC channel where each item is delivered to at most one consumer.
90struct Queue<T> {
91 queue: Mutex<VecDeque<T>>,
92 notify: Notify,
93}
94
95impl<T> Default for Queue<T> {
96 fn default() -> Self {
97 Self {
98 queue: Default::default(),
99 notify: Default::default(),
100 }
101 }
102}
103
104impl<T> Queue<T> {
105 fn is_empty(&self) -> bool {
106 self.queue.lock().unwrap().is_empty()
107 }
108
109 fn push(&self, item: T) {
110 self.queue.lock().unwrap().push_back(item);
111 self.notify.notify_one();
112 }
113
114 fn try_pop(&self) -> Option<T> {
115 self.queue.lock().unwrap().pop_front()
116 }
117
118 async fn pop(&self) -> T {
119 // This code comes from the Unbound MPMC Channel example in [the
120 // `tokio::sync::Notify`
121 // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
122
123 let mut notified = pin!(self.notify.notified());
124
125 loop {
126 notified.as_mut().enable();
127 if let Some(item) = self.try_pop() {
128 return item;
129 }
130 notified.as_mut().await;
131 notified.set(self.notify.notified());
132 }
133 }
134}
135
136/// Bundles a [`Store`] with a callback to write a profile (if configured).
137pub struct StoreBundle<T: 'static> {
138 /// The [`Store`] to use to handle requests.
139 pub store: Store<T>,
140 /// Callback to write a profile (if enabled) once all requests have been
141 /// handled.
142 pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>,
143}
144
145/// Represents the application-specific state of a web server.
146pub trait HandlerState: 'static + Sync + Send {
147 /// The type of the associated data for [`Store`]s created using
148 /// [`new_store`].
149 type StoreData: Send;
150
151 /// Create a new [`Store`] for handling one or more requests.
152 ///
153 /// The `req_id` parameter is the value passed in the call to
154 /// [`ProxyHandler::spawn`] that created the worker to which the new `Store`
155 /// will belong. See that function's documentation for details.
156 fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>;
157
158 /// Maximum time allowed to handle a request.
159 ///
160 /// In practice, a guest may be allowed to run up to 2x this time in the
161 /// case of instance reuse to avoid penalizing concurrent requests being
162 /// handled by the same instance.
163 fn request_timeout(&self) -> Duration;
164
165 /// Maximum time to keep an idle instance around before dropping it.
166 fn idle_instance_timeout(&self) -> Duration;
167
168 /// Maximum number of requests to handle using a single instance before
169 /// dropping it.
170 fn max_instance_reuse_count(&self) -> usize;
171
172 /// Maximum number of requests to handle concurrently using a single
173 /// instance.
174 fn max_instance_concurrent_reuse_count(&self) -> usize;
175
176 /// Called when a worker exits with an error.
177 fn handle_worker_error(&self, error: anyhow::Error);
178}
179
180struct ProxyHandlerInner<S: HandlerState> {
181 state: S,
182 instance_pre: ProxyPre<S::StoreData>,
183 next_id: AtomicU64,
184 task_queue: Queue<TaskFn<S::StoreData>>,
185 worker_count: AtomicUsize,
186}
187
188/// Helper utility to track the start times of tasks accepted by a worker.
189///
190/// This is used to ensure that timeouts are enforced even when the
191/// `StoreContextMut::run_concurrent` event loop is unable to make progress due
192/// to the guest either busy looping or being blocked on a synchronous call to a
193/// host function which has exclusive access to the `Store`.
194#[derive(Default)]
195struct StartTimes(BTreeMap<Instant, usize>);
196
197impl StartTimes {
198 fn add(&mut self, time: Instant) {
199 *self.0.entry(time).or_insert(0) += 1;
200 }
201
202 fn remove(&mut self, time: Instant) {
203 let Entry::Occupied(mut entry) = self.0.entry(time) else {
204 unreachable!()
205 };
206 match *entry.get() {
207 0 => unreachable!(),
208 1 => {
209 entry.remove();
210 }
211 _ => {
212 *entry.get_mut() -= 1;
213 }
214 }
215 }
216
217 fn earliest(&self) -> Option<Instant> {
218 self.0.first_key_value().map(|(&k, _)| k)
219 }
220}
221
222struct Worker<S>
223where
224 S: HandlerState,
225{
226 handler: ProxyHandler<S>,
227 available: bool,
228}
229
230impl<S> Worker<S>
231where
232 S: HandlerState,
233{
234 fn set_available(&mut self, available: bool) {
235 if available != self.available {
236 self.available = available;
237 if available {
238 self.handler.0.worker_count.fetch_add(1, Relaxed);
239 } else {
240 // Here we use `SeqCst` to ensure the load/store is ordered
241 // correctly with respect to the `Queue::is_empty` check we do
242 // below.
243 let count = self.handler.0.worker_count.fetch_sub(1, SeqCst);
244 // This addresses what would otherwise be a race condition in
245 // `ProxyHandler::spawn` where it only starts a worker if the
246 // available worker count is zero. If we decrement the count to
247 // zero right after `ProxyHandler::spawn` checks it, then no
248 // worker will be started; thus it becomes our responsibility to
249 // start a worker here instead.
250 if count == 1 && !self.handler.0.task_queue.is_empty() {
251 self.handler.start_worker(None, None);
252 }
253 }
254 }
255 }
256
257 async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
258 if let Err(error) = self.run_(task, req_id).await {
259 self.handler.0.state.handle_worker_error(error);
260 }
261 }
262
263 async fn run_(
264 &mut self,
265 task: Option<TaskFn<S::StoreData>>,
266 req_id: Option<u64>,
267 ) -> Result<()> {
268 // NB: The code the follows is rather subtle in that it is structured
269 // carefully to provide a few key invariants related to how instance
270 // reuse and request timeouts interact:
271 //
272 // - A task must never be allowed to run for more than 2x the request
273 // timeout, if any.
274 //
275 // - Every task we accept here must be allowed to run for at least 1x
276 // the request timeout, if any.
277 //
278 // - When more than one task is run concurrently in the same instance,
279 // we must stop accepting new tasks as soon as any existing task reaches
280 // the request timeout. This serves to cap the amount of time we need
281 // to keep the instance alive before _all_ tasks have either completed
282 // or timed out.
283 //
284 // As of this writing, there's an additional wrinkle that makes
285 // guaranteeing those invariants particularly tricky: per #11869 and
286 // #11870, busy guest loops, epoch interruption, and host functions
287 // registered using `Linker::func_{wrap,new}_async` all require
288 // blocking, exclusive access to the `Store`, which effectively prevents
289 // the `StoreContextMut::run_concurrent` event loop from making
290 // progress. That, in turn, prevents any concurrent tasks from
291 // executing, and also prevents the `AsyncFnOnce` passed to
292 // `run_concurrent` from being polled. Consequently, we must rely on a
293 // "second line of defense" to ensure tasks are timed out promptly,
294 // which is to check for timeouts _outside_ the `run_concurrent` future.
295 // Once the aforementioned issues have been addressed, we'll be able to
296 // remove that check and its associated baggage.
297
298 let handler = &self.handler.0;
299
300 let StoreBundle {
301 mut store,
302 write_profile,
303 } = handler.state.new_store(req_id)?;
304
305 let request_timeout = handler.state.request_timeout();
306 let idle_instance_timeout = handler.state.idle_instance_timeout();
307 let max_instance_reuse_count = handler.state.max_instance_reuse_count();
308 let max_instance_concurrent_reuse_count =
309 handler.state.max_instance_concurrent_reuse_count();
310
311 let proxy = &handler.instance_pre.instantiate_async(&mut store).await?;
312 let accept_concurrent = AtomicBool::new(true);
313 let task_start_times = Mutex::new(StartTimes::default());
314
315 let mut future = pin!(store.run_concurrent(async |accessor| {
316 let mut reuse_count = 0;
317 let mut timed_out = false;
318 let mut futures = FuturesUnordered::new();
319
320 let accept_task = |task: TaskFn<S::StoreData>,
321 futures: &mut FuturesUnordered<_>,
322 reuse_count: &mut usize| {
323 // Set `accept_concurrent` to false, conservatively assuming
324 // that the new task will be CPU-bound, at least to begin with.
325 // Only once the `StoreContextMut::run_concurrent` event loop
326 // returns `Pending` will we set `accept_concurrent` back to
327 // true and consider accepting more tasks.
328 //
329 // This approach avoids taking on more than one CPU-bound task
330 // at a time, which would hurt throughput vs. leaving the
331 // additional tasks for other workers to handle.
332 accept_concurrent.store(false, Relaxed);
333 *reuse_count += 1;
334
335 let start_time = Instant::now().checked_add(request_timeout);
336 if let Some(start_time) = start_time {
337 task_start_times.lock().unwrap().add(start_time);
338 }
339
340 futures.push(tokio::time::timeout(request_timeout, async move {
341 (task)(accessor, proxy).await;
342 start_time
343 }));
344 };
345
346 if let Some(task) = task {
347 accept_task(task, &mut futures, &mut reuse_count);
348 }
349
350 let handler = self.handler.clone();
351 while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) {
352 let new_task = {
353 let future_count = futures.len();
354 let mut next_future = pin!(async {
355 if futures.is_empty() {
356 future::pending().await
357 } else {
358 futures.next().await.unwrap()
359 }
360 });
361 let mut next_task = pin!(tokio::time::timeout(
362 if future_count == 0 {
363 idle_instance_timeout
364 } else {
365 Duration::MAX
366 },
367 handler.0.task_queue.pop()
368 ));
369 // Poll any existing tasks, and if they're all `Pending`
370 // _and_ we haven't reached any reuse limits yet, poll for a
371 // new task from the queue.
372 //
373 // Note the the order of operations here is important. By
374 // polling `next_future` first, we'll disover any tasks that
375 // may have timed out, at which point we'll stop accepting
376 // new tasks altogether (see below for details). This is
377 // especially imporant in the case where the task was
378 // blocked on a synchronous call to a host function which
379 // has exclusive access to the `Store`; once that call
380 // finishes, the first think we need to do is time out the
381 // task. If we were to poll for a new task first, then we'd
382 // have to wait for _that_ task to finish or time out before
383 // we could kill the instance.
384 future::poll_fn(|cx| match next_future.as_mut().poll(cx) {
385 Poll::Pending => {
386 // Note that `Pending` here doesn't necessarily mean
387 // all tasks are blocked on I/O. They might simply
388 // be waiting for some deferred work to be done by
389 // the next turn of the
390 // `StoreContextMut::run_concurrent` event loop.
391 // Therefore, we check `accept_concurrent` here and
392 // only advertise we have capacity for another task
393 // if either we have no tasks at all or all our
394 // tasks really are blocked on I/O.
395 self.set_available(
396 reuse_count < max_instance_reuse_count
397 && future_count < max_instance_concurrent_reuse_count
398 && (future_count == 0 || accept_concurrent.load(Relaxed)),
399 );
400
401 if self.available {
402 next_task.as_mut().poll(cx).map(Some)
403 } else {
404 Poll::Pending
405 }
406 }
407 Poll::Ready(Ok(start_time)) => {
408 // Task completed; carry on!
409 if let Some(start_time) = start_time {
410 task_start_times.lock().unwrap().remove(start_time);
411 }
412 Poll::Ready(None)
413 }
414 Poll::Ready(Err(_)) => {
415 // Task timed out; stop accepting new tasks, but
416 // continue polling until any other, in-progress
417 // tasks until they have either finished or timed
418 // out. This effectively kicks off a "graceful
419 // shutdown" of the worker, allowing any other
420 // concurrent tasks time to finish before we drop
421 // the instance.
422 //
423 // TODO: We should also send a cancel request to the
424 // timed-out task to give it a chance to shut down
425 // gracefully (and delay dropping the instance for a
426 // reasonable amount of time), but as of this
427 // writing Wasmtime does not yet provide an API for
428 // doing that. See issue #11833.
429 timed_out = true;
430 reuse_count = max_instance_reuse_count;
431 Poll::Ready(None)
432 }
433 })
434 .await
435 };
436
437 match new_task {
438 Some(Ok(task)) => {
439 accept_task(task, &mut futures, &mut reuse_count);
440 }
441 Some(Err(_)) => break,
442 None => {}
443 }
444 }
445
446 accessor.with(|mut access| write_profile(access.as_context_mut()));
447
448 if timed_out {
449 Err(anyhow!("guest timed out"))
450 } else {
451 anyhow::Ok(())
452 }
453 }));
454
455 let mut sleep = pin!(tokio::time::sleep(Duration::MAX));
456
457 future::poll_fn(|cx| {
458 let poll = future.as_mut().poll(cx);
459 if poll.is_pending() {
460 // If the future returns `Pending`, that's either because it's
461 // idle (in which case it can definitely accept a new task) or
462 // because all its tasks are awaiting I/O, in which case it may
463 // have capacity for additional tasks to run concurrently.
464 //
465 // However, if one of the tasks is blocked on a sync call to a
466 // host function which has exclusive access to the `Store`, the
467 // `StoreContextMut::run_concurrent` event loop will be unable
468 // to make progress until that call finishes. Similarly, if the
469 // task loops indefinitely, subject only to epoch interruption,
470 // the event loop will also be stuck. Either way, any task
471 // timeouts created inside the `AsyncFnOnce` we passed to
472 // `run_concurrent` won't have a chance to trigger.
473 // Consequently, we need to _also_ enforce timeouts here,
474 // outside the event loop.
475 //
476 // Therefore, we check if the oldest outstanding task has been
477 // running for at least `request_timeout*2`, which is the
478 // maximum time needed for any other concurrent tasks to
479 // complete or time out, at which point we can safely discard
480 // the instance. If that deadline has not yet arrived, we
481 // schedule a wakeup to occur when it does.
482 //
483 // We uphold the "never kill an instance with a task which has
484 // been running for less than the request timeout" invariant
485 // here by noting that this timeout will only trigger if the
486 // `AsyncFnOnce` we passed to `run_concurrent` has been unable
487 // to run for at least the past `request_timeout` amount of
488 // time, meaning it can't possibly have accepted a task newer
489 // than that.
490 if let Some(deadline) = task_start_times
491 .lock()
492 .unwrap()
493 .earliest()
494 .and_then(|v| v.checked_add(request_timeout.saturating_mul(2)))
495 {
496 sleep.as_mut().reset(deadline.into());
497 // Note that this will schedule a wakeup for later if the
498 // deadline has not yet arrived:
499 if sleep.as_mut().poll(cx).is_ready() {
500 // Deadline has been reached; kill the instance with an
501 // error.
502 return Poll::Ready(Err(anyhow!("guest timed out")));
503 }
504 }
505
506 // Otherwise, if no timeouts have elapsed, we set
507 // `accept_concurrent` to true and, if it wasn't already true
508 // before, poll the future one more time so it can ask for
509 // another task if appropriate.
510 if !accept_concurrent.swap(true, Relaxed) {
511 return future.as_mut().poll(cx);
512 }
513 }
514
515 poll
516 })
517 .await?
518 }
519}
520
521impl<S> Drop for Worker<S>
522where
523 S: HandlerState,
524{
525 fn drop(&mut self) {
526 self.set_available(false);
527 }
528}
529
530/// Represents the state of a web server.
531///
532/// Note that this supports optional instance reuse, enabled when
533/// `S::max_instance_reuse_count()` returns a number greater than one. See
534/// [`Self::push`] for details.
535pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
536
537impl<S: HandlerState> Clone for ProxyHandler<S> {
538 fn clone(&self) -> Self {
539 Self(self.0.clone())
540 }
541}
542
543impl<S> ProxyHandler<S>
544where
545 S: HandlerState,
546{
547 /// Create a new `ProxyHandler` with the specified application state and
548 /// pre-instance.
549 pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self {
550 Self(Arc::new(ProxyHandlerInner {
551 state,
552 instance_pre,
553 next_id: AtomicU64::from(0),
554 task_queue: Default::default(),
555 worker_count: AtomicUsize::from(0),
556 }))
557 }
558
559 /// Push a task to the task queue for this handler.
560 ///
561 /// This will either spawn a new background worker to run the task or
562 /// deliver it to an already-running worker.
563 ///
564 /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a
565 /// new worker is started for this task. It is intended to be used as a
566 /// "request identifier" corresponding to that task and can be used e.g. to
567 /// prefix all logging from the `Store` with that identifier. Note that a
568 /// non-`None` value only makes sense when `<S as
569 /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier
570 /// will not match subsequent tasks handled by the worker.
571 pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) {
572 match self.0.state.max_instance_reuse_count() {
573 0 => panic!("`max_instance_reuse_count` must be at least 1"),
574 _ => {
575 if self.0.worker_count.load(Relaxed) == 0 {
576 // There are no available workers; skip the queue and pass
577 // the task directly to the worker, which improves
578 // performance as measured by `wasmtime-server-rps.sh` by
579 // about 15%.
580 self.start_worker(Some(task), req_id);
581 } else {
582 self.0.task_queue.push(task);
583 // Start a new worker to handle the task if the last worker
584 // just went unavailable. See also `Worker::set_available`
585 // for what happens if the available worker count goes to
586 // zero right after we check it here, and note that we only
587 // check the count _after_ we've pushed the task to the
588 // queue. We use `SeqCst` here to ensure that we get an
589 // updated view of `worker_count` as it exists after the
590 // `Queue::push` above.
591 //
592 // The upshot is that at least one (or more) of the
593 // following will happen:
594 //
595 // - An existing worker will accept the task
596 // - We'll start a new worker here to accept the task
597 // - `Worker::set_available` will start a new worker to accept the task
598 //
599 // I.e. it should not be possible for the task to be
600 // orphaned indefinitely in the queue without being
601 // accepted.
602 if self.0.worker_count.load(SeqCst) == 0 {
603 self.start_worker(None, None);
604 }
605 }
606 }
607 }
608 }
609
610 /// Generate a unique request ID.
611 pub fn next_req_id(&self) -> u64 {
612 self.0.next_id.fetch_add(1, Relaxed)
613 }
614
615 /// Return a reference to the application state.
616 pub fn state(&self) -> &S {
617 &self.0.state
618 }
619
620 /// Return a reference to the pre-instance.
621 pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> {
622 &self.0.instance_pre
623 }
624
625 fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
626 tokio::spawn(
627 Worker {
628 handler: self.clone(),
629 available: false,
630 }
631 .run(task, req_id),
632 );
633 }
634}