Skip to main content

wasmtime/runtime/vm/instance/allocator/
pooling.rs

1//! Implements the pooling instance allocator.
2//!
3//! The pooling instance allocator maps memory in advance and allocates
4//! instances, memories, tables, and stacks from a pool of available resources.
5//! Using the pooling instance allocator can speed up module instantiation when
6//! modules can be constrained based on configurable limits
7//! ([`InstanceLimits`]). Each new instance is stored in a "slot"; as instances
8//! are allocated and freed, these slots are either filled or emptied:
9//!
10//! ```text
11//! ┌──────┬──────┬──────┬──────┬──────┐
12//! │Slot 0│Slot 1│Slot 2│Slot 3│......│
13//! └──────┴──────┴──────┴──────┴──────┘
14//! ```
15//!
16//! Each slot has a "slot ID"--an index into the pool. Slot IDs are handed out
17//! by the [`index_allocator`] module. Note that each kind of pool-allocated
18//! item is stored in its own separate pool: [`memory_pool`], [`table_pool`],
19//! [`stack_pool`]. See those modules for more details.
20
21mod decommit_queue;
22mod index_allocator;
23mod memory_pool;
24mod metrics;
25mod table_pool;
26
27#[cfg(feature = "gc")]
28mod gc_heap_pool;
29
30#[cfg(all(feature = "async"))]
31mod generic_stack_pool;
32#[cfg(all(feature = "async", unix, not(miri)))]
33mod unix_stack_pool;
34
35#[cfg(all(feature = "async"))]
36cfg_if::cfg_if! {
37    if #[cfg(all(unix, not(miri), not(asan)))] {
38        use unix_stack_pool as stack_pool;
39    } else {
40        use generic_stack_pool as stack_pool;
41    }
42}
43
44use self::decommit_queue::DecommitQueue;
45use self::memory_pool::MemoryPool;
46use self::table_pool::TablePool;
47use super::{
48    InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex,
49};
50use crate::Enabled;
51use crate::prelude::*;
52use crate::runtime::vm::{
53    CompiledModuleId, Memory, Table,
54    instance::Instance,
55    mpk::{self, ProtectionKey, ProtectionMask},
56    sys::vm::PageMap,
57};
58use core::sync::atomic::AtomicUsize;
59use std::borrow::Cow;
60use std::fmt::Display;
61use std::sync::{Mutex, MutexGuard};
62use std::{
63    mem,
64    sync::atomic::{AtomicU64, Ordering},
65};
66use wasmtime_environ::{
67    DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets,
68};
69
70pub use self::metrics::PoolingAllocatorMetrics;
71
72#[cfg(feature = "gc")]
73use super::GcHeapAllocationIndex;
74#[cfg(feature = "gc")]
75use crate::runtime::vm::{GcHeap, GcRuntime};
76#[cfg(feature = "gc")]
77use gc_heap_pool::GcHeapPool;
78
79#[cfg(feature = "async")]
80use stack_pool::StackPool;
81
82#[cfg(feature = "component-model")]
83use wasmtime_environ::{
84    StaticModuleIndex,
85    component::{Component, VMComponentOffsets},
86};
87
88fn round_up_to_pow2(n: usize, to: usize) -> usize {
89    debug_assert!(to > 0);
90    debug_assert!(to.is_power_of_two());
91    (n + to - 1) & !(to - 1)
92}
93
94/// Instance-related limit configuration for pooling.
95///
96/// More docs on this can be found at `wasmtime::PoolingAllocationConfig`.
97#[derive(Debug, Copy, Clone)]
98pub struct InstanceLimits {
99    /// The maximum number of component instances that may be allocated
100    /// concurrently.
101    pub total_component_instances: u32,
102
103    /// The maximum size of a component's `VMComponentContext`, including
104    /// the aggregate size of all its inner core modules' `VMContext` sizes.
105    pub component_instance_size: usize,
106
107    /// The maximum number of core module instances that may be allocated
108    /// concurrently.
109    pub total_core_instances: u32,
110
111    /// The maximum number of core module instances that a single component may
112    /// transitively contain.
113    pub max_core_instances_per_component: u32,
114
115    /// The maximum number of Wasm linear memories that a component may
116    /// transitively contain.
117    pub max_memories_per_component: u32,
118
119    /// The maximum number of tables that a component may transitively contain.
120    pub max_tables_per_component: u32,
121
122    /// The total number of linear memories in the pool, across all instances.
123    pub total_memories: u32,
124
125    /// The total number of tables in the pool, across all instances.
126    pub total_tables: u32,
127
128    /// The total number of async stacks in the pool, across all instances.
129    #[cfg(feature = "async")]
130    pub total_stacks: u32,
131
132    /// Maximum size of a core instance's `VMContext`.
133    pub core_instance_size: usize,
134
135    /// Maximum number of tables per instance.
136    pub max_tables_per_module: u32,
137
138    /// Maximum number of word-size elements per table.
139    ///
140    /// Note that tables for element types such as continuations
141    /// that use more than one word of storage may store fewer
142    /// elements.
143    pub table_elements: usize,
144
145    /// Maximum number of linear memories per instance.
146    pub max_memories_per_module: u32,
147
148    /// Maximum byte size of a linear memory, must be smaller than
149    /// `memory_reservation` in `Tunables`.
150    pub max_memory_size: usize,
151
152    /// The total number of GC heaps in the pool, across all instances.
153    #[cfg(feature = "gc")]
154    pub total_gc_heaps: u32,
155}
156
157impl Default for InstanceLimits {
158    fn default() -> Self {
159        let total = if cfg!(target_pointer_width = "32") {
160            100
161        } else {
162            1000
163        };
164        // See doc comments for `wasmtime::PoolingAllocationConfig` for these
165        // default values
166        Self {
167            total_component_instances: total,
168            component_instance_size: 1 << 20, // 1 MiB
169            total_core_instances: total,
170            max_core_instances_per_component: u32::MAX,
171            max_memories_per_component: u32::MAX,
172            max_tables_per_component: u32::MAX,
173            total_memories: total,
174            total_tables: total,
175            #[cfg(feature = "async")]
176            total_stacks: total,
177            core_instance_size: 1 << 20, // 1 MiB
178            max_tables_per_module: 1,
179            // NB: in #8504 it was seen that a C# module in debug module can
180            // have 10k+ elements.
181            table_elements: 20_000,
182            max_memories_per_module: 1,
183            #[cfg(target_pointer_width = "64")]
184            max_memory_size: 1 << 32, // 4G,
185            #[cfg(target_pointer_width = "32")]
186            max_memory_size: 10 << 20, // 10 MiB
187            #[cfg(feature = "gc")]
188            total_gc_heaps: total,
189        }
190    }
191}
192
193/// Configuration options for the pooling instance allocator supplied at
194/// construction.
195#[derive(Copy, Clone, Debug)]
196pub struct PoolingInstanceAllocatorConfig {
197    /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime`
198    pub max_unused_warm_slots: u32,
199    /// The target number of decommits to do per batch. This is not precise, as
200    /// we can queue up decommits at times when we aren't prepared to
201    /// immediately flush them, and so we may go over this target size
202    /// occasionally.
203    pub decommit_batch_size: usize,
204    /// The size, in bytes, of async stacks to allocate (not including the guard
205    /// page).
206    pub stack_size: usize,
207    /// The limits to apply to instances allocated within this allocator.
208    pub limits: InstanceLimits,
209    /// Whether or not async stacks are zeroed after use.
210    pub async_stack_zeroing: bool,
211    /// If async stack zeroing is enabled and the host platform is Linux this is
212    /// how much memory to zero out with `memset`.
213    ///
214    /// The rest of memory will be zeroed out with `madvise`.
215    #[cfg(feature = "async")]
216    pub async_stack_keep_resident: usize,
217    /// How much linear memory, in bytes, to keep resident after resetting for
218    /// use with the next instance. This much memory will be `memset` to zero
219    /// when a linear memory is deallocated.
220    ///
221    /// Memory exceeding this amount in the wasm linear memory will be released
222    /// with `madvise` back to the kernel.
223    ///
224    /// Only applicable on Linux.
225    pub linear_memory_keep_resident: usize,
226    /// Same as `linear_memory_keep_resident` but for tables.
227    pub table_keep_resident: usize,
228    /// Whether to enable memory protection keys.
229    pub memory_protection_keys: Enabled,
230    /// How many memory protection keys to allocate.
231    pub max_memory_protection_keys: usize,
232    /// Whether to enable PAGEMAP_SCAN on Linux.
233    pub pagemap_scan: Enabled,
234}
235
236impl Default for PoolingInstanceAllocatorConfig {
237    fn default() -> PoolingInstanceAllocatorConfig {
238        PoolingInstanceAllocatorConfig {
239            max_unused_warm_slots: 100,
240            decommit_batch_size: 1,
241            stack_size: 2 << 20,
242            limits: InstanceLimits::default(),
243            async_stack_zeroing: false,
244            #[cfg(feature = "async")]
245            async_stack_keep_resident: 0,
246            linear_memory_keep_resident: 0,
247            table_keep_resident: 0,
248            memory_protection_keys: Enabled::No,
249            max_memory_protection_keys: 16,
250            pagemap_scan: Enabled::No,
251        }
252    }
253}
254
255impl PoolingInstanceAllocatorConfig {
256    pub fn is_pagemap_scan_available() -> bool {
257        PageMap::new().is_some()
258    }
259}
260
261/// An error returned when the pooling allocator cannot allocate a table,
262/// memory, etc... because the maximum number of concurrent allocations for that
263/// entity has been reached.
264#[derive(Debug)]
265pub struct PoolConcurrencyLimitError {
266    limit: usize,
267    kind: Cow<'static, str>,
268}
269
270impl core::error::Error for PoolConcurrencyLimitError {}
271
272impl Display for PoolConcurrencyLimitError {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        let limit = self.limit;
275        let kind = &self.kind;
276        write!(f, "maximum concurrent limit of {limit} for {kind} reached")
277    }
278}
279
280impl PoolConcurrencyLimitError {
281    fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
282        Self {
283            limit,
284            kind: kind.into(),
285        }
286    }
287}
288
289/// Implements the pooling instance allocator.
290///
291/// This allocator internally maintains pools of instances, memories, tables,
292/// and stacks.
293///
294/// Note: the resource pools are manually dropped so that the fault handler
295/// terminates correctly.
296#[derive(Debug)]
297pub struct PoolingInstanceAllocator {
298    decommit_batch_size: usize,
299    limits: InstanceLimits,
300
301    // The number of live core module and component instances at any given
302    // time. Note that this can temporarily go over the configured limit. This
303    // doesn't mean we have actually overshot, but that we attempted to allocate
304    // a new instance and incremented the counter, we've seen (or are about to
305    // see) that the counter is beyond the configured threshold, and are going
306    // to decrement the counter and return an error but haven't done so yet. See
307    // the increment trait methods for more details.
308    live_core_instances: AtomicU64,
309    live_component_instances: AtomicU64,
310
311    decommit_queue: Mutex<DecommitQueue>,
312
313    memories: MemoryPool,
314    live_memories: AtomicUsize,
315
316    tables: TablePool,
317    live_tables: AtomicUsize,
318
319    #[cfg(feature = "gc")]
320    gc_heaps: GcHeapPool,
321    #[cfg(feature = "gc")]
322    live_gc_heaps: AtomicUsize,
323
324    #[cfg(feature = "async")]
325    stacks: StackPool,
326    #[cfg(feature = "async")]
327    live_stacks: AtomicUsize,
328
329    pagemap: Option<PageMap>,
330}
331
332impl Drop for PoolingInstanceAllocator {
333    fn drop(&mut self) {
334        if !cfg!(debug_assertions) {
335            return;
336        }
337
338        // NB: when cfg(not(debug_assertions)) it is okay that we don't flush
339        // the queue, as the sub-pools will unmap those ranges anyways, so
340        // there's no point in decommitting them. But we do need to flush the
341        // queue when debug assertions are enabled to make sure that all
342        // entities get returned to their associated sub-pools and we can
343        // differentiate between a leaking slot and an enqueued-for-decommit
344        // slot.
345        let queue = self.decommit_queue.lock().unwrap();
346        self.flush_decommit_queue(queue);
347
348        debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
349        debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
350        debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
351        debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);
352
353        debug_assert!(self.memories.is_empty());
354        debug_assert!(self.tables.is_empty());
355
356        #[cfg(feature = "gc")]
357        {
358            debug_assert!(self.gc_heaps.is_empty());
359            debug_assert_eq!(self.live_gc_heaps.load(Ordering::Acquire), 0);
360        }
361
362        #[cfg(feature = "async")]
363        {
364            debug_assert!(self.stacks.is_empty());
365            debug_assert_eq!(self.live_stacks.load(Ordering::Acquire), 0);
366        }
367    }
368}
369
370impl PoolingInstanceAllocator {
371    /// Creates a new pooling instance allocator with the given strategy and limits.
372    pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
373        Ok(Self {
374            decommit_batch_size: config.decommit_batch_size,
375            limits: config.limits,
376            live_component_instances: AtomicU64::new(0),
377            live_core_instances: AtomicU64::new(0),
378            decommit_queue: Mutex::new(DecommitQueue::default()),
379            memories: MemoryPool::new(config, tunables)?,
380            live_memories: AtomicUsize::new(0),
381            tables: TablePool::new(config)?,
382            live_tables: AtomicUsize::new(0),
383            #[cfg(feature = "gc")]
384            gc_heaps: GcHeapPool::new(config)?,
385            #[cfg(feature = "gc")]
386            live_gc_heaps: AtomicUsize::new(0),
387            #[cfg(feature = "async")]
388            stacks: StackPool::new(config)?,
389            #[cfg(feature = "async")]
390            live_stacks: AtomicUsize::new(0),
391            pagemap: match config.pagemap_scan {
392                Enabled::Auto => PageMap::new(),
393                Enabled::Yes => Some(PageMap::new().ok_or_else(|| {
394                    format_err!(
395                        "required to enable PAGEMAP_SCAN but this system \
396                         does not support it"
397                    )
398                })?),
399                Enabled::No => None,
400            },
401        })
402    }
403
404    fn core_instance_size(&self) -> usize {
405        round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
406    }
407
408    fn validate_table_plans(&self, module: &Module) -> Result<()> {
409        self.tables.validate(module)
410    }
411
412    fn validate_memory_plans(&self, module: &Module) -> Result<()> {
413        self.memories.validate_memories(module)
414    }
415
416    fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
417        let layout = Instance::alloc_layout(offsets);
418        if layout.size() <= self.core_instance_size() {
419            return Ok(());
420        }
421
422        // If this `module` exceeds the allocation size allotted to it then an
423        // error will be reported here. The error of "required N bytes but
424        // cannot allocate that" is pretty opaque, however, because it's not
425        // clear what the breakdown of the N bytes are and what to optimize
426        // next. To help provide a better error message here some fancy-ish
427        // logic is done here to report the breakdown of the byte request into
428        // the largest portions and where it's coming from.
429        let mut message = format!(
430            "instance allocation for this module \
431             requires {} bytes which exceeds the configured maximum \
432             of {} bytes; breakdown of allocation requirement:\n\n",
433            layout.size(),
434            self.core_instance_size(),
435        );
436
437        let mut remaining = layout.size();
438        let mut push = |name: &str, bytes: usize| {
439            assert!(remaining >= bytes);
440            remaining -= bytes;
441
442            // If the `name` region is more than 5% of the allocation request
443            // then report it here, otherwise ignore it. We have less than 20
444            // fields so we're guaranteed that something should be reported, and
445            // otherwise it's not particularly interesting to learn about 5
446            // different fields that are all 8 or 0 bytes. Only try to report
447            // the "major" sources of bytes here.
448            if bytes > layout.size() / 20 {
449                message.push_str(&format!(
450                    " * {:.02}% - {} bytes - {}\n",
451                    ((bytes as f32) / (layout.size() as f32)) * 100.0,
452                    bytes,
453                    name,
454                ));
455            }
456        };
457
458        // The `Instance` itself requires some size allocated to it.
459        push("instance state management", mem::size_of::<Instance>());
460
461        // Afterwards the `VMContext`'s regions are why we're requesting bytes,
462        // so ask it for descriptions on each region's byte size.
463        for (desc, size) in offsets.region_sizes() {
464            push(desc, size as usize);
465        }
466
467        // double-check we accounted for all the bytes
468        assert_eq!(remaining, 0);
469
470        bail!("{message}")
471    }
472
473    #[cfg(feature = "component-model")]
474    fn validate_component_instance_size(
475        &self,
476        offsets: &VMComponentOffsets<HostPtr>,
477        core_instances_aggregate_size: usize,
478    ) -> Result<()> {
479        let vmcomponentctx_size = usize::try_from(offsets.size_of_vmctx()).unwrap();
480        let total_instance_size = core_instances_aggregate_size.saturating_add(vmcomponentctx_size);
481        if total_instance_size <= self.limits.component_instance_size {
482            return Ok(());
483        }
484
485        // TODO: Add context with detailed accounting of what makes up all the
486        // `VMComponentContext`'s space like we do for module instances.
487        bail!(
488            "instance allocation for this component requires {total_instance_size} bytes of `VMComponentContext` \
489             and aggregated core instance runtime space which exceeds the configured maximum of {} bytes. \
490             `VMComponentContext` used {vmcomponentctx_size} bytes, `core module instances` used \
491             {core_instances_aggregate_size} bytes.",
492            self.limits.component_instance_size
493        )
494    }
495
496    fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
497        // Take the queue out of the mutex and drop the lock, to minimize
498        // contention.
499        let queue = mem::take(&mut *locked_queue);
500        drop(locked_queue);
501        queue.flush(self)
502    }
503
504    /// Execute `f` and if it returns `Err(PoolConcurrencyLimitError)`, then try
505    /// flushing the decommit queue. If flushing the queue freed up slots, then
506    /// try running `f` again.
507    #[cfg(feature = "async")]
508    fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
509        f().or_else(|e| {
510            if e.is::<PoolConcurrencyLimitError>() {
511                let queue = self.decommit_queue.lock().unwrap();
512                if self.flush_decommit_queue(queue) {
513                    return f();
514                }
515            }
516
517            Err(e)
518        })
519    }
520
521    fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
522        match local_queue.raw_len() {
523            // If we didn't enqueue any regions for decommit, then we must have
524            // either memset the whole entity or eagerly remapped it to zero
525            // because we don't have linux's `madvise(DONTNEED)` semantics. In
526            // either case, the entity slot is ready for reuse immediately.
527            0 => {
528                local_queue.flush(self);
529            }
530
531            // We enqueued at least our batch size of regions for decommit, so
532            // flush the local queue immediately. Don't bother inspecting (or
533            // locking!) the shared queue.
534            n if n >= self.decommit_batch_size => {
535                local_queue.flush(self);
536            }
537
538            // If we enqueued some regions for decommit, but did not reach our
539            // batch size, so we don't want to flush it yet, then merge the
540            // local queue into the shared queue.
541            n => {
542                debug_assert!(n < self.decommit_batch_size);
543                let mut shared_queue = self.decommit_queue.lock().unwrap();
544                shared_queue.append(&mut local_queue);
545                // And if the shared queue now has at least as many regions
546                // enqueued for decommit as our batch size, then we can flush
547                // it.
548                if shared_queue.raw_len() >= self.decommit_batch_size {
549                    self.flush_decommit_queue(shared_queue);
550                }
551            }
552        }
553    }
554}
555
556#[async_trait::async_trait]
557unsafe impl InstanceAllocator for PoolingInstanceAllocator {
558    #[cfg(feature = "component-model")]
559    fn validate_component<'a>(
560        &self,
561        component: &Component,
562        offsets: &VMComponentOffsets<HostPtr>,
563        get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
564    ) -> Result<()> {
565        let mut num_core_instances = 0;
566        let mut num_memories = 0;
567        let mut num_tables = 0;
568        let mut core_instances_aggregate_size: usize = 0;
569        for init in &component.initializers {
570            use wasmtime_environ::component::GlobalInitializer::*;
571            use wasmtime_environ::component::InstantiateModule;
572            match init {
573                InstantiateModule(InstantiateModule::Import(_, _), _) => {
574                    num_core_instances += 1;
575                    // Can't statically account for the total vmctx size, number
576                    // of memories, and number of tables in this component.
577                }
578                InstantiateModule(InstantiateModule::Static(static_module_index, _), _) => {
579                    let module = get_module(*static_module_index);
580                    let offsets = VMOffsets::new(HostPtr, &module);
581                    let layout = Instance::alloc_layout(&offsets);
582                    self.validate_module(module, &offsets)?;
583                    num_core_instances += 1;
584                    num_memories += module.num_defined_memories();
585                    num_tables += module.num_defined_tables();
586                    core_instances_aggregate_size += layout.size();
587                }
588                LowerImport { .. }
589                | ExtractMemory(_)
590                | ExtractTable(_)
591                | ExtractRealloc(_)
592                | ExtractCallback(_)
593                | ExtractPostReturn(_)
594                | Resource(_) => {}
595            }
596        }
597
598        if num_core_instances
599            > usize::try_from(self.limits.max_core_instances_per_component).unwrap()
600        {
601            bail!(
602                "The component transitively contains {num_core_instances} core module instances, \
603                 which exceeds the configured maximum of {} in the pooling allocator",
604                self.limits.max_core_instances_per_component
605            );
606        }
607
608        if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
609            bail!(
610                "The component transitively contains {num_memories} Wasm linear memories, which \
611                 exceeds the configured maximum of {} in the pooling allocator",
612                self.limits.max_memories_per_component
613            );
614        }
615
616        if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
617            bail!(
618                "The component transitively contains {num_tables} tables, which exceeds the \
619                 configured maximum of {} in the pooling allocator",
620                self.limits.max_tables_per_component
621            );
622        }
623
624        self.validate_component_instance_size(offsets, core_instances_aggregate_size)
625            .context("component instance size does not fit in pooling allocator requirements")?;
626
627        Ok(())
628    }
629
630    fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
631        self.validate_memory_plans(module)
632            .context("module memory does not fit in pooling allocator requirements")?;
633        self.validate_table_plans(module)
634            .context("module table does not fit in pooling allocator requirements")?;
635        self.validate_core_instance_size(offsets)
636            .context("module instance size does not fit in pooling allocator requirements")?;
637        Ok(())
638    }
639
640    #[cfg(feature = "gc")]
641    fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> {
642        self.memories.validate_memory(memory)
643    }
644
645    #[cfg(feature = "component-model")]
646    fn increment_component_instance_count(&self) -> Result<()> {
647        let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
648        if old_count >= u64::from(self.limits.total_component_instances) {
649            self.decrement_component_instance_count();
650            return Err(PoolConcurrencyLimitError::new(
651                usize::try_from(self.limits.total_component_instances).unwrap(),
652                "component instances",
653            )
654            .into());
655        }
656        Ok(())
657    }
658
659    #[cfg(feature = "component-model")]
660    fn decrement_component_instance_count(&self) {
661        self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
662    }
663
664    fn increment_core_instance_count(&self) -> Result<()> {
665        let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
666        if old_count >= u64::from(self.limits.total_core_instances) {
667            self.decrement_core_instance_count();
668            return Err(PoolConcurrencyLimitError::new(
669                usize::try_from(self.limits.total_core_instances).unwrap(),
670                "core instances",
671            )
672            .into());
673        }
674        Ok(())
675    }
676
677    fn decrement_core_instance_count(&self) {
678        self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
679    }
680
681    async fn allocate_memory(
682        &self,
683        request: &mut InstanceAllocationRequest<'_, '_>,
684        ty: &wasmtime_environ::Memory,
685        memory_index: Option<DefinedMemoryIndex>,
686    ) -> Result<(MemoryAllocationIndex, Memory)> {
687        async {
688            // FIXME(rust-lang/rust#145127) this should ideally use a version of
689            // `with_flush_and_retry` but adapted for async closures instead of only
690            // sync closures. Right now that won't compile though so this is the
691            // manually expanded version of the method.
692            let e = match self.memories.allocate(request, ty, memory_index).await {
693                Ok(result) => return Ok(result),
694                Err(e) => e,
695            };
696
697            if e.is::<PoolConcurrencyLimitError>() {
698                let queue = self.decommit_queue.lock().unwrap();
699                if self.flush_decommit_queue(queue) {
700                    return self.memories.allocate(request, ty, memory_index).await;
701                }
702            }
703
704            Err(e)
705        }
706        .await
707        .inspect(|_| {
708            self.live_memories.fetch_add(1, Ordering::Relaxed);
709        })
710    }
711
712    unsafe fn deallocate_memory(
713        &self,
714        _memory_index: Option<DefinedMemoryIndex>,
715        allocation_index: MemoryAllocationIndex,
716        memory: Memory,
717    ) {
718        let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
719        debug_assert!(prev > 0);
720
721        // Reset the image slot. If there is any error clearing the
722        // image, just drop it here, and let the drop handler for the
723        // slot unmap in a way that retains the address space
724        // reservation.
725        let mut image = memory.unwrap_static_image();
726        let mut queue = DecommitQueue::default();
727        let bytes_resident = image
728            .clear_and_remain_ready(
729                self.pagemap.as_ref(),
730                self.memories.keep_resident,
731                |ptr, len| {
732                    // SAFETY: the memory in `image` won't be used until this
733                    // decommit queue is flushed, and by definition the memory is
734                    // not in use when calling this function.
735                    unsafe {
736                        queue.push_raw(ptr, len);
737                    }
738                },
739            )
740            .expect("failed to reset memory image");
741
742        // SAFETY: this image is not in use and its memory regions were enqueued
743        // with `push_raw` above.
744        unsafe {
745            queue.push_memory(allocation_index, image, bytes_resident);
746        }
747        self.merge_or_flush(queue);
748    }
749
750    async fn allocate_table(
751        &self,
752        request: &mut InstanceAllocationRequest<'_, '_>,
753        ty: &wasmtime_environ::Table,
754        _table_index: DefinedTableIndex,
755    ) -> Result<(super::TableAllocationIndex, Table)> {
756        async {
757            // FIXME: see `allocate_memory` above for comments about duplication
758            // with `with_flush_and_retry`.
759            let e = match self.tables.allocate(request, ty).await {
760                Ok(result) => return Ok(result),
761                Err(e) => e,
762            };
763
764            if e.is::<PoolConcurrencyLimitError>() {
765                let queue = self.decommit_queue.lock().unwrap();
766                if self.flush_decommit_queue(queue) {
767                    return self.tables.allocate(request, ty).await;
768                }
769            }
770
771            Err(e)
772        }
773        .await
774        .inspect(|_| {
775            self.live_tables.fetch_add(1, Ordering::Relaxed);
776        })
777    }
778
779    unsafe fn deallocate_table(
780        &self,
781        _table_index: DefinedTableIndex,
782        allocation_index: TableAllocationIndex,
783        mut table: Table,
784    ) {
785        let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
786        debug_assert!(prev > 0);
787
788        let mut queue = DecommitQueue::default();
789        // SAFETY: This table is no longer in use by the allocator when this
790        // method is called and additionally all image ranges are pushed with
791        // the understanding that the memory won't get used until the whole
792        // queue is flushed.
793        let bytes_resident = unsafe {
794            self.tables.reset_table_pages_to_zero(
795                self.pagemap.as_ref(),
796                allocation_index,
797                &mut table,
798                |ptr, len| {
799                    queue.push_raw(ptr, len);
800                },
801            )
802        };
803
804        // SAFETY: the table has had all its memory regions enqueued above.
805        unsafe {
806            queue.push_table(allocation_index, table, bytes_resident);
807        }
808        self.merge_or_flush(queue);
809    }
810
811    #[cfg(feature = "async")]
812    fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
813        let ret = self.with_flush_and_retry(|| self.stacks.allocate())?;
814        self.live_stacks.fetch_add(1, Ordering::Relaxed);
815        Ok(ret)
816    }
817
818    #[cfg(feature = "async")]
819    unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
820        self.live_stacks.fetch_sub(1, Ordering::Relaxed);
821        let mut queue = DecommitQueue::default();
822        // SAFETY: the stack is no longer in use by definition when this
823        // function is called and memory ranges pushed here are otherwise no
824        // longer in use.
825        let bytes_resident = unsafe {
826            self.stacks
827                .zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len))
828        };
829        // SAFETY: this stack's memory regions were enqueued above.
830        unsafe {
831            queue.push_stack(stack, bytes_resident);
832        }
833        self.merge_or_flush(queue);
834    }
835
836    fn purge_module(&self, module: CompiledModuleId) {
837        self.memories.purge_module(module);
838    }
839
840    fn next_available_pkey(&self) -> Option<ProtectionKey> {
841        self.memories.next_available_pkey()
842    }
843
844    fn restrict_to_pkey(&self, pkey: ProtectionKey) {
845        mpk::allow(ProtectionMask::zero().or(pkey));
846    }
847
848    fn allow_all_pkeys(&self) {
849        mpk::allow(ProtectionMask::all());
850    }
851
852    #[cfg(feature = "gc")]
853    fn allocate_gc_heap(
854        &self,
855        engine: &crate::Engine,
856        gc_runtime: &dyn GcRuntime,
857        memory_alloc_index: MemoryAllocationIndex,
858        memory: Memory,
859    ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
860        let ret = self
861            .gc_heaps
862            .allocate(engine, gc_runtime, memory_alloc_index, memory)?;
863        self.live_gc_heaps.fetch_add(1, Ordering::Relaxed);
864        Ok(ret)
865    }
866
867    #[cfg(feature = "gc")]
868    fn deallocate_gc_heap(
869        &self,
870        allocation_index: GcHeapAllocationIndex,
871        gc_heap: Box<dyn GcHeap>,
872    ) -> (MemoryAllocationIndex, Memory) {
873        self.live_gc_heaps.fetch_sub(1, Ordering::Relaxed);
874        self.gc_heaps.deallocate(allocation_index, gc_heap)
875    }
876
877    fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
878        Some(self)
879    }
880}
881
882#[cfg(test)]
883#[cfg(target_pointer_width = "64")]
884mod test {
885    use super::*;
886
887    #[test]
888    fn test_pooling_allocator_with_memory_pages_exceeded() {
889        let config = PoolingInstanceAllocatorConfig {
890            limits: InstanceLimits {
891                total_memories: 1,
892                max_memory_size: 0x100010000,
893                ..Default::default()
894            },
895            ..PoolingInstanceAllocatorConfig::default()
896        };
897        assert_eq!(
898            PoolingInstanceAllocator::new(
899                &config,
900                &Tunables {
901                    memory_reservation: 0x10000,
902                    ..Tunables::default_host()
903                },
904            )
905            .map_err(|e| e.to_string())
906            .expect_err("expected a failure constructing instance allocator"),
907            "maximum memory size of 0x100010000 bytes exceeds the configured \
908             memory reservation of 0x10000 bytes"
909        );
910    }
911
912    #[cfg(all(
913        unix,
914        target_pointer_width = "64",
915        feature = "async",
916        not(miri),
917        not(asan)
918    ))]
919    #[test]
920    fn test_stack_zeroed() -> Result<()> {
921        let config = PoolingInstanceAllocatorConfig {
922            max_unused_warm_slots: 0,
923            limits: InstanceLimits {
924                total_stacks: 1,
925                total_memories: 0,
926                total_tables: 0,
927                ..Default::default()
928            },
929            stack_size: 128,
930            async_stack_zeroing: true,
931            ..PoolingInstanceAllocatorConfig::default()
932        };
933        let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
934
935        unsafe {
936            for _ in 0..255 {
937                let stack = allocator.allocate_fiber_stack()?;
938
939                // The stack pointer is at the top, so decrement it first
940                let addr = stack.top().unwrap().sub(1);
941
942                assert_eq!(*addr, 0);
943                *addr = 1;
944
945                allocator.deallocate_fiber_stack(stack);
946            }
947        }
948
949        Ok(())
950    }
951
952    #[cfg(all(
953        unix,
954        target_pointer_width = "64",
955        feature = "async",
956        not(miri),
957        not(asan)
958    ))]
959    #[test]
960    fn test_stack_unzeroed() -> Result<()> {
961        let config = PoolingInstanceAllocatorConfig {
962            max_unused_warm_slots: 0,
963            limits: InstanceLimits {
964                total_stacks: 1,
965                total_memories: 0,
966                total_tables: 0,
967                ..Default::default()
968            },
969            stack_size: 128,
970            async_stack_zeroing: false,
971            ..PoolingInstanceAllocatorConfig::default()
972        };
973        let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
974
975        unsafe {
976            for i in 0..255 {
977                let stack = allocator.allocate_fiber_stack()?;
978
979                // The stack pointer is at the top, so decrement it first
980                let addr = stack.top().unwrap().sub(1);
981
982                assert_eq!(*addr, i);
983                *addr = i + 1;
984
985                allocator.deallocate_fiber_stack(stack);
986            }
987        }
988
989        Ok(())
990    }
991}