wasmtime_wasi_http/handler.rs
1//! Provides utilities useful for dispatching incoming HTTP requests
2//! `wasi:http/handler` guest instances.
3
4#[cfg(feature = "p3")]
5use crate::p3;
6use futures::stream::{FuturesUnordered, Stream};
7use std::collections::VecDeque;
8use std::collections::btree_map::{BTreeMap, Entry};
9use std::future;
10use std::pin::{Pin, pin};
11use std::sync::{
12 Arc, Mutex,
13 atomic::{
14 AtomicBool, AtomicU64, AtomicUsize,
15 Ordering::{Relaxed, SeqCst},
16 },
17};
18use std::task::Poll;
19use std::time::{Duration, Instant};
20use tokio::sync::Notify;
21use wasmtime::AsContextMut;
22use wasmtime::component::Accessor;
23use wasmtime::{Result, Store, StoreContextMut, format_err};
24
25/// Alternative p2 bindings generated with `exports: { default: async | store }`
26/// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances.
27#[cfg(feature = "p2")]
28pub mod p2 {
29 #[expect(missing_docs, reason = "bindgen-generated code")]
30 pub mod bindings {
31 wasmtime::component::bindgen!({
32 path: "wit",
33 world: "wasi:http/proxy",
34 imports: { default: tracing },
35 exports: { default: async | store },
36 require_store_data_send: true,
37 with: {
38 // http is in this crate
39 "wasi:http": crate::p2::bindings::http,
40 // Upstream package dependencies
41 "wasi:io": wasmtime_wasi::p2::bindings::io,
42 }
43 });
44
45 pub use wasi::*;
46 }
47}
48
49/// Represents either a `wasi:http/incoming-handler@0.2.x` or
50/// `wasi:http/handler@0.3.x` pre-instance.
51pub enum ProxyPre<T: 'static> {
52 /// A `wasi:http/incoming-handler@0.2.x` pre-instance.
53 #[cfg(feature = "p2")]
54 P2(p2::bindings::ProxyPre<T>),
55 /// A `wasi:http/handler@0.3.x` pre-instance.
56 #[cfg(feature = "p3")]
57 P3(p3::bindings::ServicePre<T>),
58}
59
60impl<T: 'static> ProxyPre<T> {
61 async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
62 where
63 T: Send,
64 {
65 Ok(match self {
66 #[cfg(feature = "p2")]
67 Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
68 #[cfg(feature = "p3")]
69 Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
70 })
71 }
72}
73
74/// Represents either a `wasi:http/incoming-handler@0.2.x` or
75/// `wasi:http/handler@0.3.x` instance.
76pub enum Proxy {
77 /// A `wasi:http/incoming-handler@0.2.x` instance.
78 #[cfg(feature = "p2")]
79 P2(p2::bindings::Proxy),
80 /// A `wasi:http/handler@0.3.x` instance.
81 #[cfg(feature = "p3")]
82 P3(p3::bindings::Service),
83}
84
85/// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or
86/// `wasi:http/handler@0.3.x` instance.
87pub type TaskFn<T> = Box<
88 dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
89 + Send,
90>;
91
92/// Async MPMC channel where each item is delivered to at most one consumer.
93struct Queue<T> {
94 queue: Mutex<VecDeque<T>>,
95 notify: Notify,
96}
97
98impl<T> Default for Queue<T> {
99 fn default() -> Self {
100 Self {
101 queue: Default::default(),
102 notify: Default::default(),
103 }
104 }
105}
106
107impl<T> Queue<T> {
108 fn is_empty(&self) -> bool {
109 self.queue.lock().unwrap().is_empty()
110 }
111
112 fn push(&self, item: T) {
113 self.queue.lock().unwrap().push_back(item);
114 self.notify.notify_one();
115 }
116
117 fn try_pop(&self) -> Option<T> {
118 self.queue.lock().unwrap().pop_front()
119 }
120
121 async fn pop(&self) -> T {
122 // This code comes from the Unbound MPMC Channel example in [the
123 // `tokio::sync::Notify`
124 // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
125
126 let mut notified = pin!(self.notify.notified());
127
128 loop {
129 notified.as_mut().enable();
130 if let Some(item) = self.try_pop() {
131 return item;
132 }
133 notified.as_mut().await;
134 notified.set(self.notify.notified());
135 }
136 }
137}
138
139/// Bundles a [`Store`] with a callback to write a profile (if configured).
140pub struct StoreBundle<T: 'static> {
141 /// The [`Store`] to use to handle requests.
142 pub store: Store<T>,
143 /// Callback to write a profile (if enabled) once all requests have been
144 /// handled.
145 pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>,
146}
147
148/// Represents the application-specific state of a web server.
149pub trait HandlerState: 'static + Sync + Send {
150 /// The type of the associated data for [`Store`]s created using
151 /// [`Self::new_store`].
152 type StoreData: Send;
153
154 /// Create a new [`Store`] for handling one or more requests.
155 ///
156 /// The `req_id` parameter is the value passed in the call to
157 /// [`ProxyHandler::spawn`] that created the worker to which the new `Store`
158 /// will belong. See that function's documentation for details.
159 fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>;
160
161 /// Maximum time allowed to handle a request.
162 ///
163 /// In practice, a guest may be allowed to run up to 2x this time in the
164 /// case of instance reuse to avoid penalizing concurrent requests being
165 /// handled by the same instance.
166 fn request_timeout(&self) -> Duration;
167
168 /// Maximum time to keep an idle instance around before dropping it.
169 fn idle_instance_timeout(&self) -> Duration;
170
171 /// Maximum number of requests to handle using a single instance before
172 /// dropping it.
173 fn max_instance_reuse_count(&self) -> usize;
174
175 /// Maximum number of requests to handle concurrently using a single
176 /// instance.
177 fn max_instance_concurrent_reuse_count(&self) -> usize;
178
179 /// Called when a worker exits with an error.
180 fn handle_worker_error(&self, error: wasmtime::Error);
181}
182
183struct ProxyHandlerInner<S: HandlerState> {
184 state: S,
185 instance_pre: ProxyPre<S::StoreData>,
186 next_id: AtomicU64,
187 task_queue: Queue<TaskFn<S::StoreData>>,
188 worker_count: AtomicUsize,
189}
190
191/// Helper utility to track the start times of tasks accepted by a worker.
192///
193/// This is used to ensure that timeouts are enforced even when the
194/// `StoreContextMut::run_concurrent` event loop is unable to make progress due
195/// to the guest either busy looping or being blocked on a synchronous call to a
196/// host function which has exclusive access to the `Store`.
197#[derive(Default)]
198struct StartTimes(BTreeMap<Instant, usize>);
199
200impl StartTimes {
201 fn add(&mut self, time: Instant) {
202 *self.0.entry(time).or_insert(0) += 1;
203 }
204
205 fn remove(&mut self, time: Instant) {
206 let Entry::Occupied(mut entry) = self.0.entry(time) else {
207 unreachable!()
208 };
209 match *entry.get() {
210 0 => unreachable!(),
211 1 => {
212 entry.remove();
213 }
214 _ => {
215 *entry.get_mut() -= 1;
216 }
217 }
218 }
219
220 fn earliest(&self) -> Option<Instant> {
221 self.0.first_key_value().map(|(&k, _)| k)
222 }
223}
224
225struct Worker<S>
226where
227 S: HandlerState,
228{
229 handler: ProxyHandler<S>,
230 available: bool,
231}
232
233impl<S> Worker<S>
234where
235 S: HandlerState,
236{
237 fn set_available(&mut self, available: bool) {
238 if available != self.available {
239 self.available = available;
240 if available {
241 self.handler.0.worker_count.fetch_add(1, Relaxed);
242 } else {
243 // Here we use `SeqCst` to ensure the load/store is ordered
244 // correctly with respect to the `Queue::is_empty` check we do
245 // below.
246 let count = self.handler.0.worker_count.fetch_sub(1, SeqCst);
247 // This addresses what would otherwise be a race condition in
248 // `ProxyHandler::spawn` where it only starts a worker if the
249 // available worker count is zero. If we decrement the count to
250 // zero right after `ProxyHandler::spawn` checks it, then no
251 // worker will be started; thus it becomes our responsibility to
252 // start a worker here instead.
253 if count == 1 && !self.handler.0.task_queue.is_empty() {
254 self.handler.start_worker(None, None);
255 }
256 }
257 }
258 }
259
260 async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
261 if let Err(error) = self.run_(task, req_id).await {
262 self.handler.0.state.handle_worker_error(error);
263 }
264 }
265
266 async fn run_(
267 &mut self,
268 task: Option<TaskFn<S::StoreData>>,
269 req_id: Option<u64>,
270 ) -> Result<()> {
271 // NB: The code the follows is rather subtle in that it is structured
272 // carefully to provide a few key invariants related to how instance
273 // reuse and request timeouts interact:
274 //
275 // - A task must never be allowed to run for more than 2x the request
276 // timeout, if any.
277 //
278 // - Every task we accept here must be allowed to run for at least 1x
279 // the request timeout, if any.
280 //
281 // - When more than one task is run concurrently in the same instance,
282 // we must stop accepting new tasks as soon as any existing task reaches
283 // the request timeout. This serves to cap the amount of time we need
284 // to keep the instance alive before _all_ tasks have either completed
285 // or timed out.
286 //
287 // As of this writing, there's an additional wrinkle that makes
288 // guaranteeing those invariants particularly tricky: per #11869 and
289 // #11870, busy guest loops, epoch interruption, and host functions
290 // registered using `Linker::func_{wrap,new}_async` all require
291 // blocking, exclusive access to the `Store`, which effectively prevents
292 // the `StoreContextMut::run_concurrent` event loop from making
293 // progress. That, in turn, prevents any concurrent tasks from
294 // executing, and also prevents the `AsyncFnOnce` passed to
295 // `run_concurrent` from being polled. Consequently, we must rely on a
296 // "second line of defense" to ensure tasks are timed out promptly,
297 // which is to check for timeouts _outside_ the `run_concurrent` future.
298 // Once the aforementioned issues have been addressed, we'll be able to
299 // remove that check and its associated baggage.
300
301 let handler = &self.handler.0;
302
303 let StoreBundle {
304 mut store,
305 write_profile,
306 } = handler.state.new_store(req_id)?;
307
308 let request_timeout = handler.state.request_timeout();
309 let idle_instance_timeout = handler.state.idle_instance_timeout();
310 let max_instance_reuse_count = handler.state.max_instance_reuse_count();
311 let max_instance_concurrent_reuse_count =
312 handler.state.max_instance_concurrent_reuse_count();
313
314 let proxy = &handler.instance_pre.instantiate_async(&mut store).await?;
315 let accept_concurrent = AtomicBool::new(true);
316 let task_start_times = Mutex::new(StartTimes::default());
317
318 let mut future = pin!(store.run_concurrent(async |accessor| {
319 let mut reuse_count = 0;
320 let mut timed_out = false;
321 let mut futures = FuturesUnordered::new();
322
323 let accept_task = |task: TaskFn<S::StoreData>,
324 futures: &mut FuturesUnordered<_>,
325 reuse_count: &mut usize| {
326 // Set `accept_concurrent` to false, conservatively assuming
327 // that the new task will be CPU-bound, at least to begin with.
328 // Only once the `StoreContextMut::run_concurrent` event loop
329 // returns `Pending` will we set `accept_concurrent` back to
330 // true and consider accepting more tasks.
331 //
332 // This approach avoids taking on more than one CPU-bound task
333 // at a time, which would hurt throughput vs. leaving the
334 // additional tasks for other workers to handle.
335 accept_concurrent.store(false, Relaxed);
336 *reuse_count += 1;
337
338 let start_time = Instant::now().checked_add(request_timeout);
339 if let Some(start_time) = start_time {
340 task_start_times.lock().unwrap().add(start_time);
341 }
342
343 futures.push(tokio::time::timeout(request_timeout, async move {
344 (task)(accessor, proxy).await;
345 start_time
346 }));
347 };
348
349 if let Some(task) = task {
350 accept_task(task, &mut futures, &mut reuse_count);
351 }
352
353 // This is the main driver loop for this worker. This is modeled as
354 // a `poll_fn` which internally loops around the possible events.
355 // Events are sourced from the locals here, pinned outside of the
356 // `poll_fn` closure.
357 let mut futures = pin!(futures);
358 let mut idle_timeout_set = false;
359 let mut idle_timeout = pin!(tokio::time::sleep(Duration::MAX));
360 let handler = self.handler.clone();
361 let mut incoming_tasks = pin!(futures::stream::unfold(
362 &handler.0.task_queue,
363 |queue| async move {
364 let task = queue.pop().await;
365 Some((task, queue))
366 }
367 ));
368 future::poll_fn(|cx| {
369 // See docs about the idle timeout handling at the very bottom
370 // for what this is doing.
371 let prev_idle_timeout_set = idle_timeout_set;
372 idle_timeout_set = false;
373
374 loop {
375 // First, and crucially first , poll `futures` first. This
376 // way we'll discover any tasks that may have timed out, at
377 // which point we'll stop accepting new tasks altogether
378 // (see below for details). This is especially important in
379 // the case where the task was blocked on a synchronous call
380 // to a host function which has exclusive access to the
381 // `Store`; once that call finishes, the first thing we need
382 // to do is time out the task. If we were to poll for a new
383 // task first, then we'd have to wait for _that_ task to
384 // finish or time out before we could kill the instance.
385 match futures.as_mut().poll_next(cx) {
386 // Task completed; carry on!
387 Poll::Ready(Some(Ok(start_time))) => {
388 if let Some(start_time) = start_time {
389 task_start_times.lock().unwrap().remove(start_time);
390 }
391 }
392
393 // Task timed out; stop accepting new tasks, but
394 // continue polling until any other, in-progress tasks
395 // until they have either finished or timed out. This
396 // effectively kicks off a "graceful shutdown" of the
397 // worker, allowing any other concurrent tasks time to
398 // finish before we drop the instance.
399 //
400 // TODO: We should also send a cancel request to the
401 // timed-out task to give it a chance to shut down
402 // gracefully (and delay dropping the instance for a
403 // reasonable amount of time), but as of this writing
404 // Wasmtime does not yet provide an API for doing that.
405 // See issue #11833.
406 Poll::Ready(Some(Err(_))) => {
407 timed_out = true;
408 reuse_count = max_instance_reuse_count;
409 }
410
411 Poll::Ready(None) | Poll::Pending => {}
412 }
413
414 // At this point `futures` is either empty or it's `Pending`
415 // meaning nothing is ready. Note that `Pending` here
416 // doesn't necessarily mean all tasks are blocked on I/O.
417 // They might simply be waiting for some deferred work to be
418 // done by the next turn of the
419 // `StoreContextMut::run_concurrent` event loop. Therefore,
420 // we check `accept_concurrent` here and only advertise we
421 // have capacity for another task if either we have no tasks
422 // at all or all our tasks really are blocked on I/O.
423 self.set_available(
424 reuse_count < max_instance_reuse_count
425 && futures.len() < max_instance_concurrent_reuse_count
426 && (futures.is_empty() || accept_concurrent.load(Relaxed)),
427 );
428
429 // If we're available for accepting more requests after the
430 // deduction above, then try to accept a new task. If that's
431 // successful then push it into `futures` and turn this loop
432 // again to see where we're at next time around.
433 if self.available
434 && let Poll::Ready(Some(task)) = incoming_tasks.as_mut().poll_next(cx)
435 {
436 accept_task(task, &mut futures, &mut reuse_count);
437 continue;
438 }
439
440 // If, at this point, we still have some requests that are
441 // being processed then go ahead and bail out of this
442 // singular call to `poll` by saying we're not ready yet.
443 // This means we unconditionally wait for events within
444 // `futures` and we're also registered, optionally, for
445 // listening for incoming connections. That's all the events
446 // we're interested in, so this iteration of `poll` is complete.
447 if !futures.is_empty() {
448 break Poll::Pending;
449 }
450
451 // At this point `futures` is empty, and we haven't gotten
452 // any incoming tasks. Check the store we're using to see if
453 // there are any "interesting" tasks around. These are tasks
454 // which act as effectively strong references to this worker
455 // to keep it running. If there are still interesting tasks,
456 // then we're done with this iteration of `poll`. We'll get
457 // woken up when anything changes, but otherwise it's time
458 // to let something else happen.
459 //
460 // This is all skipped if something has timed out though. In
461 // that situation we're basically no longer interested in
462 // this store so we're no longer cooperatively trying to let
463 // it keep going.
464 if !timed_out && !accessor.poll_no_interesting_tasks(cx).is_ready() {
465 break Poll::Pending;
466 }
467
468 // And now at this point we (a) have no `futures`, (b) no
469 // new connections came in, and (c) the store is completely
470 // devoid of interesting work. In this situation if we're
471 // not actually capable of accepting any more work, then
472 // we're completely done and it's time to exit this worker.
473 if !self.available {
474 break Poll::Ready(());
475 }
476
477 // And now, finally, we wait for a timeout. Here we're just
478 // like above except that we're candidate for accepting more
479 // work in the future. If this is our first time here then
480 // reset the idle timeout to `idle_instance_timeout` from
481 // now, but othrewise just go take a look at `idle_timeout`
482 // and see if it's elapsed yet.
483 //
484 // Note that the way that this entire loop is structured is
485 // that we've already polled all the interesting sources of
486 // events we're interested in at this point, for example
487 // `futures`, `accessor`, and `incoming_tasks`. Here we add
488 // `idle_timeout` to that set and once anything is ready and
489 // fires then this entire loop will restart and we'll check
490 // everything again.
491 //
492 // Also note that the idle timeout is supposed to start when
493 // the store is itself entirely idle. The way this loop is
494 // structured is that when we entire this `poll` closure the
495 // `idle_timeout_set` variable is unconditionally set to
496 // `false`. That way if we exit out for some other reason,
497 // such as getting work, then the idle timeout will get
498 // reset next time we fall down here. Otherwise though if we
499 // fell down this far we actually want to preserve
500 // `idle_timeout_set` from when we first started, so that's
501 // restored here.
502 idle_timeout_set = prev_idle_timeout_set;
503 if !idle_timeout_set {
504 idle_timeout
505 .as_mut()
506 .reset(tokio::time::Instant::now() + idle_instance_timeout);
507 idle_timeout_set = true;
508 }
509 break idle_timeout.as_mut().poll(cx);
510 }
511 })
512 .await;
513
514 accessor.with(|mut access| write_profile(access.as_context_mut()));
515
516 if timed_out {
517 Err(format_err!("guest timed out"))
518 } else {
519 wasmtime::error::Ok(())
520 }
521 }));
522
523 let mut sleep = pin!(tokio::time::sleep(Duration::MAX));
524
525 future::poll_fn(|cx| {
526 let poll = future.as_mut().poll(cx);
527 if poll.is_pending() {
528 // If the future returns `Pending`, that's either because it's
529 // idle (in which case it can definitely accept a new task) or
530 // because all its tasks are awaiting I/O, in which case it may
531 // have capacity for additional tasks to run concurrently.
532 //
533 // However, if one of the tasks is blocked on a sync call to a
534 // host function which has exclusive access to the `Store`, the
535 // `StoreContextMut::run_concurrent` event loop will be unable
536 // to make progress until that call finishes. Similarly, if the
537 // task loops indefinitely, subject only to epoch interruption,
538 // the event loop will also be stuck. Either way, any task
539 // timeouts created inside the `AsyncFnOnce` we passed to
540 // `run_concurrent` won't have a chance to trigger.
541 // Consequently, we need to _also_ enforce timeouts here,
542 // outside the event loop.
543 //
544 // Therefore, we check if the oldest outstanding task has been
545 // running for at least `request_timeout*2`, which is the
546 // maximum time needed for any other concurrent tasks to
547 // complete or time out, at which point we can safely discard
548 // the instance. If that deadline has not yet arrived, we
549 // schedule a wakeup to occur when it does.
550 //
551 // We uphold the "never kill an instance with a task which has
552 // been running for less than the request timeout" invariant
553 // here by noting that this timeout will only trigger if the
554 // `AsyncFnOnce` we passed to `run_concurrent` has been unable
555 // to run for at least the past `request_timeout` amount of
556 // time, meaning it can't possibly have accepted a task newer
557 // than that.
558 if let Some(deadline) = task_start_times
559 .lock()
560 .unwrap()
561 .earliest()
562 .and_then(|v| v.checked_add(request_timeout.saturating_mul(2)))
563 {
564 sleep.as_mut().reset(deadline.into());
565 // Note that this will schedule a wakeup for later if the
566 // deadline has not yet arrived:
567 if sleep.as_mut().poll(cx).is_ready() {
568 // Deadline has been reached; kill the instance with an
569 // error.
570 return Poll::Ready(Err(format_err!("guest timed out")));
571 }
572 }
573
574 // Otherwise, if no timeouts have elapsed, we set
575 // `accept_concurrent` to true and, if it wasn't already true
576 // before, poll the future one more time so it can ask for
577 // another task if appropriate.
578 if !accept_concurrent.swap(true, Relaxed) {
579 return future.as_mut().poll(cx);
580 }
581 }
582
583 poll
584 })
585 .await?
586 }
587}
588
589impl<S> Drop for Worker<S>
590where
591 S: HandlerState,
592{
593 fn drop(&mut self) {
594 self.set_available(false);
595 }
596}
597
598/// Represents the state of a web server.
599///
600/// Note that this supports optional instance reuse, enabled when
601/// `S::max_instance_reuse_count()` returns a number greater than one. See
602/// [`Self::spawn`] for details.
603pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
604
605impl<S: HandlerState> Clone for ProxyHandler<S> {
606 fn clone(&self) -> Self {
607 Self(self.0.clone())
608 }
609}
610
611impl<S> ProxyHandler<S>
612where
613 S: HandlerState,
614{
615 /// Create a new `ProxyHandler` with the specified application state and
616 /// pre-instance.
617 pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self {
618 Self(Arc::new(ProxyHandlerInner {
619 state,
620 instance_pre,
621 next_id: AtomicU64::from(0),
622 task_queue: Default::default(),
623 worker_count: AtomicUsize::from(0),
624 }))
625 }
626
627 /// Push a task to the task queue for this handler.
628 ///
629 /// This will either spawn a new background worker to run the task or
630 /// deliver it to an already-running worker.
631 ///
632 /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a
633 /// new worker is started for this task. It is intended to be used as a
634 /// "request identifier" corresponding to that task and can be used e.g. to
635 /// prefix all logging from the `Store` with that identifier. Note that a
636 /// non-`None` value only makes sense when `<S as
637 /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier
638 /// will not match subsequent tasks handled by the worker.
639 pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) {
640 match self.0.state.max_instance_reuse_count() {
641 0 => panic!("`max_instance_reuse_count` must be at least 1"),
642 _ => {
643 if self.0.worker_count.load(Relaxed) == 0 {
644 // There are no available workers; skip the queue and pass
645 // the task directly to the worker, which improves
646 // performance as measured by `wasmtime-server-rps.sh` by
647 // about 15%.
648 self.start_worker(Some(task), req_id);
649 } else {
650 self.0.task_queue.push(task);
651 // Start a new worker to handle the task if the last worker
652 // just went unavailable. See also `Worker::set_available`
653 // for what happens if the available worker count goes to
654 // zero right after we check it here, and note that we only
655 // check the count _after_ we've pushed the task to the
656 // queue. We use `SeqCst` here to ensure that we get an
657 // updated view of `worker_count` as it exists after the
658 // `Queue::push` above.
659 //
660 // The upshot is that at least one (or more) of the
661 // following will happen:
662 //
663 // - An existing worker will accept the task
664 // - We'll start a new worker here to accept the task
665 // - `Worker::set_available` will start a new worker to accept the task
666 //
667 // I.e. it should not be possible for the task to be
668 // orphaned indefinitely in the queue without being
669 // accepted.
670 if self.0.worker_count.load(SeqCst) == 0 {
671 self.start_worker(None, None);
672 }
673 }
674 }
675 }
676 }
677
678 /// Generate a unique request ID.
679 pub fn next_req_id(&self) -> u64 {
680 self.0.next_id.fetch_add(1, Relaxed)
681 }
682
683 /// Return a reference to the application state.
684 pub fn state(&self) -> &S {
685 &self.0.state
686 }
687
688 /// Return a reference to the pre-instance.
689 pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> {
690 &self.0.instance_pre
691 }
692
693 fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
694 tokio::spawn(
695 Worker {
696 handler: self.clone(),
697 available: false,
698 }
699 .run(task, req_id),
700 );
701 }
702}