Skip to main content

wasmtime/runtime/vm/instance/allocator/
pooling.rs

1//! Implements the pooling instance allocator.
2//!
3//! The pooling instance allocator maps memory in advance and allocates
4//! instances, memories, tables, and stacks from a pool of available resources.
5//! Using the pooling instance allocator can speed up module instantiation when
6//! modules can be constrained based on configurable limits
7//! ([`InstanceLimits`]). Each new instance is stored in a "slot"; as instances
8//! are allocated and freed, these slots are either filled or emptied:
9//!
10//! ```text
11//! ┌──────┬──────┬──────┬──────┬──────┐
12//! │Slot 0│Slot 1│Slot 2│Slot 3│......│
13//! └──────┴──────┴──────┴──────┴──────┘
14//! ```
15//!
16//! Each slot has a "slot ID"--an index into the pool. Slot IDs are handed out
17//! by the [`index_allocator`] module. Note that each kind of pool-allocated
18//! item is stored in its own separate pool: [`memory_pool`], [`table_pool`],
19//! [`stack_pool`]. See those modules for more details.
20
21mod decommit_queue;
22mod index_allocator;
23mod memory_pool;
24mod metrics;
25mod table_pool;
26
27#[cfg(feature = "gc")]
28mod gc_heap_pool;
29
30#[cfg(all(feature = "async"))]
31mod generic_stack_pool;
32#[cfg(all(feature = "async", unix, not(miri)))]
33mod unix_stack_pool;
34
35#[cfg(all(feature = "async"))]
36cfg_if::cfg_if! {
37    if #[cfg(all(unix, not(miri), not(asan)))] {
38        use unix_stack_pool as stack_pool;
39    } else {
40        use generic_stack_pool as stack_pool;
41    }
42}
43
44use self::decommit_queue::DecommitQueue;
45use self::memory_pool::MemoryPool;
46use self::table_pool::TablePool;
47use super::{
48    InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex,
49};
50use crate::Enabled;
51use crate::prelude::*;
52use crate::runtime::vm::{
53    CompiledModuleId, Memory, Table,
54    instance::Instance,
55    mpk::{self, ProtectionKey, ProtectionMask},
56    sys::vm::PageMap,
57};
58use core::future::Future;
59use core::pin::Pin;
60use core::sync::atomic::AtomicUsize;
61use std::borrow::Cow;
62use std::fmt::Display;
63use std::sync::{Mutex, MutexGuard};
64use std::{
65    mem,
66    sync::atomic::{AtomicU64, Ordering},
67};
68use wasmtime_environ::{
69    DefinedMemoryIndex, DefinedTableIndex, HostPtr, MemoryKind, Module, Tunables, VMOffsets,
70};
71
72pub use self::metrics::PoolingAllocatorMetrics;
73
74#[cfg(feature = "gc")]
75use super::GcHeapAllocationIndex;
76#[cfg(feature = "gc")]
77use crate::runtime::vm::{GcHeap, GcRuntime};
78#[cfg(feature = "gc")]
79use gc_heap_pool::GcHeapPool;
80
81#[cfg(feature = "async")]
82use stack_pool::StackPool;
83
84#[cfg(feature = "component-model")]
85use wasmtime_environ::{
86    StaticModuleIndex,
87    component::{Component, VMComponentOffsets},
88};
89
90fn round_up_to_pow2(n: usize, to: usize) -> usize {
91    debug_assert!(to > 0);
92    debug_assert!(to.is_power_of_two());
93    (n + to - 1) & !(to - 1)
94}
95
96/// Instance-related limit configuration for pooling.
97///
98/// More docs on this can be found at `wasmtime::PoolingAllocationConfig`.
99#[derive(Debug, Copy, Clone)]
100pub struct InstanceLimits {
101    /// The maximum number of component instances that may be allocated
102    /// concurrently.
103    pub total_component_instances: u32,
104
105    /// The maximum size of a component's `VMComponentContext`, including
106    /// the aggregate size of all its inner core modules' `VMContext` sizes.
107    pub component_instance_size: usize,
108
109    /// The maximum number of core module instances that may be allocated
110    /// concurrently.
111    pub total_core_instances: u32,
112
113    /// The maximum number of core module instances that a single component may
114    /// transitively contain.
115    pub max_core_instances_per_component: u32,
116
117    /// The maximum number of Wasm linear memories that a component may
118    /// transitively contain.
119    pub max_memories_per_component: u32,
120
121    /// The maximum number of tables that a component may transitively contain.
122    pub max_tables_per_component: u32,
123
124    /// The total number of linear memories in the pool, across all instances.
125    pub total_memories: u32,
126
127    /// The total number of tables in the pool, across all instances.
128    pub total_tables: u32,
129
130    /// The total number of async stacks in the pool, across all instances.
131    #[cfg(feature = "async")]
132    pub total_stacks: u32,
133
134    /// Maximum size of a core instance's `VMContext`.
135    pub core_instance_size: usize,
136
137    /// Maximum number of tables per instance.
138    pub max_tables_per_module: u32,
139
140    /// Maximum number of word-size elements per table.
141    ///
142    /// Note that tables for element types such as continuations
143    /// that use more than one word of storage may store fewer
144    /// elements.
145    pub table_elements: usize,
146
147    /// Maximum number of linear memories per instance.
148    pub max_memories_per_module: u32,
149
150    /// Maximum byte size of a linear memory, must be smaller than
151    /// `memory_reservation` in `Tunables`.
152    pub max_memory_size: usize,
153
154    /// The total number of GC heaps in the pool, across all instances.
155    #[cfg(feature = "gc")]
156    pub total_gc_heaps: u32,
157}
158
159impl Default for InstanceLimits {
160    fn default() -> Self {
161        let total = if cfg!(target_pointer_width = "32") {
162            100
163        } else {
164            1000
165        };
166        // See doc comments for `wasmtime::PoolingAllocationConfig` for these
167        // default values
168        Self {
169            total_component_instances: total,
170            component_instance_size: 1 << 20, // 1 MiB
171            total_core_instances: total,
172            max_core_instances_per_component: u32::MAX,
173            max_memories_per_component: u32::MAX,
174            max_tables_per_component: u32::MAX,
175            total_memories: total,
176            total_tables: total,
177            #[cfg(feature = "async")]
178            total_stacks: total,
179            core_instance_size: 1 << 20, // 1 MiB
180            max_tables_per_module: 1,
181            // NB: in #8504 it was seen that a C# module in debug module can
182            // have 10k+ elements.
183            table_elements: 20_000,
184            max_memories_per_module: 1,
185            #[cfg(target_pointer_width = "64")]
186            max_memory_size: 1 << 32, // 4G,
187            #[cfg(target_pointer_width = "32")]
188            max_memory_size: 10 << 20, // 10 MiB
189            #[cfg(feature = "gc")]
190            total_gc_heaps: total,
191        }
192    }
193}
194
195/// Configuration options for the pooling instance allocator supplied at
196/// construction.
197#[derive(Copy, Clone, Debug)]
198pub struct PoolingInstanceAllocatorConfig {
199    /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime`
200    pub max_unused_warm_slots: u32,
201    /// The target number of decommits to do per batch. This is not precise, as
202    /// we can queue up decommits at times when we aren't prepared to
203    /// immediately flush them, and so we may go over this target size
204    /// occasionally.
205    pub decommit_batch_size: usize,
206    /// The size, in bytes, of async stacks to allocate (not including the guard
207    /// page).
208    pub stack_size: usize,
209    /// The limits to apply to instances allocated within this allocator.
210    pub limits: InstanceLimits,
211    /// Whether or not async stacks are zeroed after use.
212    pub async_stack_zeroing: bool,
213    /// If async stack zeroing is enabled and the host platform is Linux this is
214    /// how much memory to zero out with `memset`.
215    ///
216    /// The rest of memory will be zeroed out with `madvise`.
217    #[cfg(feature = "async")]
218    pub async_stack_keep_resident: usize,
219    /// How much linear memory, in bytes, to keep resident after resetting for
220    /// use with the next instance. This much memory will be `memset` to zero
221    /// when a linear memory is deallocated.
222    ///
223    /// Memory exceeding this amount in the wasm linear memory will be released
224    /// with `madvise` back to the kernel.
225    ///
226    /// Only applicable on Linux.
227    pub linear_memory_keep_resident: usize,
228    /// Same as `linear_memory_keep_resident` but for tables.
229    pub table_keep_resident: usize,
230    /// Whether to enable memory protection keys.
231    pub memory_protection_keys: Enabled,
232    /// How many memory protection keys to allocate.
233    pub max_memory_protection_keys: usize,
234    /// Whether to enable PAGEMAP_SCAN on Linux.
235    pub pagemap_scan: Enabled,
236}
237
238impl Default for PoolingInstanceAllocatorConfig {
239    fn default() -> PoolingInstanceAllocatorConfig {
240        PoolingInstanceAllocatorConfig {
241            max_unused_warm_slots: 100,
242            decommit_batch_size: 1,
243            stack_size: 2 << 20,
244            limits: InstanceLimits::default(),
245            async_stack_zeroing: false,
246            #[cfg(feature = "async")]
247            async_stack_keep_resident: 0,
248            linear_memory_keep_resident: 0,
249            table_keep_resident: 0,
250            memory_protection_keys: Enabled::No,
251            max_memory_protection_keys: 16,
252            pagemap_scan: Enabled::No,
253        }
254    }
255}
256
257impl PoolingInstanceAllocatorConfig {
258    pub fn is_pagemap_scan_available() -> bool {
259        PageMap::new().is_some()
260    }
261}
262
263/// An error returned when the pooling allocator cannot allocate a table,
264/// memory, etc... because the maximum number of concurrent allocations for that
265/// entity has been reached.
266#[derive(Debug)]
267pub struct PoolConcurrencyLimitError {
268    limit: usize,
269    kind: Cow<'static, str>,
270}
271
272impl core::error::Error for PoolConcurrencyLimitError {}
273
274impl Display for PoolConcurrencyLimitError {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        let limit = self.limit;
277        let kind = &self.kind;
278        write!(f, "maximum concurrent limit of {limit} for {kind} reached")
279    }
280}
281
282impl PoolConcurrencyLimitError {
283    fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
284        Self {
285            limit,
286            kind: kind.into(),
287        }
288    }
289}
290
291/// Implements the pooling instance allocator.
292///
293/// This allocator internally maintains pools of instances, memories, tables,
294/// and stacks.
295///
296/// Note: the resource pools are manually dropped so that the fault handler
297/// terminates correctly.
298#[derive(Debug)]
299pub struct PoolingInstanceAllocator {
300    decommit_batch_size: usize,
301    limits: InstanceLimits,
302
303    // The number of live core module and component instances at any given
304    // time. Note that this can temporarily go over the configured limit. This
305    // doesn't mean we have actually overshot, but that we attempted to allocate
306    // a new instance and incremented the counter, we've seen (or are about to
307    // see) that the counter is beyond the configured threshold, and are going
308    // to decrement the counter and return an error but haven't done so yet. See
309    // the increment trait methods for more details.
310    live_core_instances: AtomicU64,
311    live_component_instances: AtomicU64,
312
313    decommit_queue: Mutex<DecommitQueue>,
314
315    memories: MemoryPool,
316    live_memories: AtomicUsize,
317
318    tables: TablePool,
319    live_tables: AtomicUsize,
320
321    #[cfg(feature = "gc")]
322    gc_heaps: Option<GcHeapPool>,
323    #[cfg(feature = "gc")]
324    live_gc_heaps: AtomicUsize,
325
326    #[cfg(feature = "async")]
327    stacks: StackPool,
328    #[cfg(feature = "async")]
329    live_stacks: AtomicUsize,
330
331    pagemap: Option<PageMap>,
332}
333
334impl Drop for PoolingInstanceAllocator {
335    fn drop(&mut self) {
336        if !cfg!(debug_assertions) {
337            return;
338        }
339
340        // NB: when cfg(not(debug_assertions)) it is okay that we don't flush
341        // the queue, as the sub-pools will unmap those ranges anyways, so
342        // there's no point in decommitting them. But we do need to flush the
343        // queue when debug assertions are enabled to make sure that all
344        // entities get returned to their associated sub-pools and we can
345        // differentiate between a leaking slot and an enqueued-for-decommit
346        // slot.
347        let queue = self.decommit_queue.lock().unwrap();
348        self.flush_decommit_queue(queue);
349
350        debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
351        debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
352        debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
353        debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);
354
355        debug_assert!(self.memories.is_empty());
356        debug_assert!(self.tables.is_empty());
357
358        #[cfg(feature = "gc")]
359        if let Some(gc_heaps) = &self.gc_heaps {
360            debug_assert!(gc_heaps.is_empty());
361            debug_assert_eq!(self.live_gc_heaps.load(Ordering::Acquire), 0);
362        }
363
364        #[cfg(feature = "async")]
365        {
366            debug_assert!(self.stacks.is_empty());
367            debug_assert_eq!(self.live_stacks.load(Ordering::Acquire), 0);
368        }
369    }
370}
371
372impl PoolingInstanceAllocator {
373    /// Creates a new pooling instance allocator with the given strategy and limits.
374    pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
375        Ok(Self {
376            decommit_batch_size: config.decommit_batch_size,
377            limits: config.limits,
378            live_component_instances: AtomicU64::new(0),
379            live_core_instances: AtomicU64::new(0),
380            decommit_queue: Mutex::new(DecommitQueue::default()),
381            memories: MemoryPool::new(config, tunables)?,
382            live_memories: AtomicUsize::new(0),
383            tables: TablePool::new(config)?,
384            live_tables: AtomicUsize::new(0),
385            #[cfg(feature = "gc")]
386            gc_heaps: if tunables.collector.is_some() {
387                Some(GcHeapPool::new(config, tunables)?)
388            } else {
389                None
390            },
391            #[cfg(feature = "gc")]
392            live_gc_heaps: AtomicUsize::new(0),
393            #[cfg(feature = "async")]
394            stacks: StackPool::new(config)?,
395            #[cfg(feature = "async")]
396            live_stacks: AtomicUsize::new(0),
397            pagemap: match config.pagemap_scan {
398                Enabled::Auto => PageMap::new(),
399                Enabled::Yes => Some(PageMap::new().ok_or_else(|| {
400                    format_err!(
401                        "required to enable PAGEMAP_SCAN but this system \
402                         does not support it"
403                    )
404                })?),
405                Enabled::No => None,
406            },
407        })
408    }
409
410    fn core_instance_size(&self) -> usize {
411        round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
412    }
413
414    fn validate_table_plans(&self, module: &Module) -> Result<()> {
415        self.tables.validate(module)
416    }
417
418    fn validate_memory_plans(&self, module: &Module) -> Result<()> {
419        self.memories.validate_memories(module)
420    }
421
422    fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
423        let layout = Instance::alloc_layout(offsets);
424        if layout.size() <= self.core_instance_size() {
425            return Ok(());
426        }
427
428        // If this `module` exceeds the allocation size allotted to it then an
429        // error will be reported here. The error of "required N bytes but
430        // cannot allocate that" is pretty opaque, however, because it's not
431        // clear what the breakdown of the N bytes are and what to optimize
432        // next. To help provide a better error message here some fancy-ish
433        // logic is done here to report the breakdown of the byte request into
434        // the largest portions and where it's coming from.
435        let mut message = format!(
436            "instance allocation for this module \
437             requires {} bytes which exceeds the configured maximum \
438             of {} bytes; breakdown of allocation requirement:\n\n",
439            layout.size(),
440            self.core_instance_size(),
441        );
442
443        let mut remaining = layout.size();
444        let mut push = |name: &str, bytes: usize| {
445            assert!(remaining >= bytes);
446            remaining -= bytes;
447
448            // If the `name` region is more than 5% of the allocation request
449            // then report it here, otherwise ignore it. We have less than 20
450            // fields so we're guaranteed that something should be reported, and
451            // otherwise it's not particularly interesting to learn about 5
452            // different fields that are all 8 or 0 bytes. Only try to report
453            // the "major" sources of bytes here.
454            if bytes > layout.size() / 20 {
455                message.push_str(&format!(
456                    " * {:.02}% - {} bytes - {}\n",
457                    ((bytes as f32) / (layout.size() as f32)) * 100.0,
458                    bytes,
459                    name,
460                ));
461            }
462        };
463
464        // The `Instance` itself requires some size allocated to it.
465        push("instance state management", mem::size_of::<Instance>());
466
467        // Afterwards the `VMContext`'s regions are why we're requesting bytes,
468        // so ask it for descriptions on each region's byte size.
469        for (desc, size) in offsets.region_sizes() {
470            push(desc, size as usize);
471        }
472
473        // double-check we accounted for all the bytes
474        assert_eq!(remaining, 0);
475
476        bail!("{message}")
477    }
478
479    #[cfg(feature = "component-model")]
480    fn validate_component_instance_size(
481        &self,
482        offsets: &VMComponentOffsets<HostPtr>,
483        core_instances_aggregate_size: usize,
484    ) -> Result<()> {
485        let vmcomponentctx_size = usize::try_from(offsets.size_of_vmctx()).unwrap();
486        let total_instance_size = core_instances_aggregate_size.saturating_add(vmcomponentctx_size);
487        if total_instance_size <= self.limits.component_instance_size {
488            return Ok(());
489        }
490
491        // TODO: Add context with detailed accounting of what makes up all the
492        // `VMComponentContext`'s space like we do for module instances.
493        bail!(
494            "instance allocation for this component requires {total_instance_size} bytes of `VMComponentContext` \
495             and aggregated core instance runtime space which exceeds the configured maximum of {} bytes. \
496             `VMComponentContext` used {vmcomponentctx_size} bytes, `core module instances` used \
497             {core_instances_aggregate_size} bytes.",
498            self.limits.component_instance_size
499        )
500    }
501
502    fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
503        // Take the queue out of the mutex and drop the lock, to minimize
504        // contention.
505        let queue = mem::take(&mut *locked_queue);
506        drop(locked_queue);
507        queue.flush(self)
508    }
509
510    /// Execute `f` and if it returns `Err(PoolConcurrencyLimitError)`, then try
511    /// flushing the decommit queue. If flushing the queue freed up slots, then
512    /// try running `f` again.
513    #[cfg(feature = "async")]
514    fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
515        f().or_else(|e| {
516            if e.is::<PoolConcurrencyLimitError>() {
517                let queue = self.decommit_queue.lock().unwrap();
518                if self.flush_decommit_queue(queue) {
519                    return f();
520                }
521            }
522
523            Err(e)
524        })
525    }
526
527    fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
528        match local_queue.raw_len() {
529            // If we didn't enqueue any regions for decommit, then we must have
530            // either memset the whole entity or eagerly remapped it to zero
531            // because we don't have linux's `madvise(DONTNEED)` semantics. In
532            // either case, the entity slot is ready for reuse immediately.
533            0 => {
534                local_queue.flush(self);
535            }
536
537            // We enqueued at least our batch size of regions for decommit, so
538            // flush the local queue immediately. Don't bother inspecting (or
539            // locking!) the shared queue.
540            n if n >= self.decommit_batch_size => {
541                local_queue.flush(self);
542            }
543
544            // If we enqueued some regions for decommit, but did not reach our
545            // batch size, so we don't want to flush it yet, then merge the
546            // local queue into the shared queue.
547            n => {
548                debug_assert!(n < self.decommit_batch_size);
549                let mut shared_queue = self.decommit_queue.lock().unwrap();
550                shared_queue.append(&mut local_queue);
551                // And if the shared queue now has at least as many regions
552                // enqueued for decommit as our batch size, then we can flush
553                // it.
554                if shared_queue.raw_len() >= self.decommit_batch_size {
555                    self.flush_decommit_queue(shared_queue);
556                }
557            }
558        }
559    }
560}
561
562unsafe impl InstanceAllocator for PoolingInstanceAllocator {
563    #[cfg(feature = "component-model")]
564    fn validate_component<'a>(
565        &self,
566        component: &Component,
567        offsets: &VMComponentOffsets<HostPtr>,
568        get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
569    ) -> Result<()> {
570        let mut num_core_instances = 0;
571        let mut num_memories = 0;
572        let mut num_tables = 0;
573        let mut core_instances_aggregate_size: usize = 0;
574        for init in &component.initializers {
575            use wasmtime_environ::component::GlobalInitializer::*;
576            use wasmtime_environ::component::InstantiateModule;
577            match init {
578                InstantiateModule(InstantiateModule::Import(_, _), _) => {
579                    num_core_instances += 1;
580                    // Can't statically account for the total vmctx size, number
581                    // of memories, and number of tables in this component.
582                }
583                InstantiateModule(InstantiateModule::Static(static_module_index, _), _) => {
584                    let module = get_module(*static_module_index);
585                    let offsets = VMOffsets::new(HostPtr, &module);
586                    let layout = Instance::alloc_layout(&offsets);
587                    self.validate_module(module, &offsets)?;
588                    num_core_instances += 1;
589                    num_memories += module.num_defined_memories();
590                    num_tables += module.num_defined_tables();
591                    core_instances_aggregate_size += layout.size();
592                }
593                LowerImport { .. }
594                | ExtractMemory(_)
595                | ExtractTable(_)
596                | ExtractRealloc(_)
597                | ExtractCallback(_)
598                | ExtractPostReturn(_)
599                | Resource(_) => {}
600            }
601        }
602
603        if num_core_instances
604            > usize::try_from(self.limits.max_core_instances_per_component).unwrap()
605        {
606            bail!(
607                "The component transitively contains {num_core_instances} core module instances, \
608                 which exceeds the configured maximum of {} in the pooling allocator",
609                self.limits.max_core_instances_per_component
610            );
611        }
612
613        if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
614            bail!(
615                "The component transitively contains {num_memories} Wasm linear memories, which \
616                 exceeds the configured maximum of {} in the pooling allocator",
617                self.limits.max_memories_per_component
618            );
619        }
620
621        if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
622            bail!(
623                "The component transitively contains {num_tables} tables, which exceeds the \
624                 configured maximum of {} in the pooling allocator",
625                self.limits.max_tables_per_component
626            );
627        }
628
629        self.validate_component_instance_size(offsets, core_instances_aggregate_size)
630            .context("component instance size does not fit in pooling allocator requirements")?;
631
632        Ok(())
633    }
634
635    fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
636        self.validate_memory_plans(module)
637            .context("module memory does not fit in pooling allocator requirements")?;
638        self.validate_table_plans(module)
639            .context("module table does not fit in pooling allocator requirements")?;
640        self.validate_core_instance_size(offsets)
641            .context("module instance size does not fit in pooling allocator requirements")?;
642        Ok(())
643    }
644
645    #[cfg(feature = "gc")]
646    fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> {
647        self.memories.validate_memory(memory)
648    }
649
650    #[cfg(feature = "component-model")]
651    fn increment_component_instance_count(&self) -> Result<()> {
652        let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
653        if old_count >= u64::from(self.limits.total_component_instances) {
654            self.decrement_component_instance_count();
655            return Err(PoolConcurrencyLimitError::new(
656                usize::try_from(self.limits.total_component_instances).unwrap(),
657                "component instances",
658            )
659            .into());
660        }
661        Ok(())
662    }
663
664    #[cfg(feature = "component-model")]
665    fn decrement_component_instance_count(&self) {
666        self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
667    }
668
669    fn increment_core_instance_count(&self) -> Result<()> {
670        let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
671        if old_count >= u64::from(self.limits.total_core_instances) {
672            self.decrement_core_instance_count();
673            return Err(PoolConcurrencyLimitError::new(
674                usize::try_from(self.limits.total_core_instances).unwrap(),
675                "core instances",
676            )
677            .into());
678        }
679        Ok(())
680    }
681
682    fn decrement_core_instance_count(&self) {
683        self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
684    }
685
686    fn allocate_memory<'a, 'b: 'a, 'c: 'a>(
687        &'a self,
688        request: &'a mut InstanceAllocationRequest<'b, 'c>,
689        ty: &'a wasmtime_environ::Memory,
690        memory_index: Option<DefinedMemoryIndex>,
691        _memory_kind: MemoryKind,
692    ) -> Pin<Box<dyn Future<Output = Result<(MemoryAllocationIndex, Memory)>> + Send + 'a>> {
693        crate::runtime::box_future(async move {
694            async {
695                // FIXME(rust-lang/rust#145127) this should ideally use a version of
696                // `with_flush_and_retry` but adapted for async closures instead of only
697                // sync closures. Right now that won't compile though so this is the
698                // manually expanded version of the method.
699                let e = match self.memories.allocate(request, ty, memory_index).await {
700                    Ok(result) => return Ok(result),
701                    Err(e) => e,
702                };
703
704                if e.is::<PoolConcurrencyLimitError>() {
705                    let queue = self.decommit_queue.lock().unwrap();
706                    if self.flush_decommit_queue(queue) {
707                        return self.memories.allocate(request, ty, memory_index).await;
708                    }
709                }
710
711                Err(e)
712            }
713            .await
714            .inspect(|_| {
715                self.live_memories.fetch_add(1, Ordering::Relaxed);
716            })
717        })
718    }
719
720    unsafe fn deallocate_memory(
721        &self,
722        _memory_index: Option<DefinedMemoryIndex>,
723        allocation_index: MemoryAllocationIndex,
724        memory: Memory,
725    ) {
726        let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
727        debug_assert!(prev > 0);
728
729        // Reset the image slot. Depending on whether this is successful or not
730        // the `image` is preserved for future use. On success it's queued up to
731        // get deallocated later, and on failure the slot is deallocated
732        // immediately without preserving the image.
733        let mut image = memory.unwrap_static_image();
734        let mut queue = DecommitQueue::default();
735        let bytes_resident = image.clear_and_remain_ready(
736            self.pagemap.as_ref(),
737            self.memories.keep_resident,
738            |ptr, len| {
739                // SAFETY: the memory in `image` won't be used until this
740                // decommit queue is flushed, and by definition the memory is
741                // not in use when calling this function.
742                unsafe {
743                    queue.push_raw(ptr, len);
744                }
745            },
746        );
747
748        match bytes_resident {
749            Ok(bytes_resident) => {
750                // SAFETY: this image is not in use and its memory regions were enqueued
751                // with `push_raw` above.
752                unsafe {
753                    queue.push_memory(allocation_index, image, bytes_resident);
754                }
755                self.merge_or_flush(queue);
756            }
757            Err(e) => {
758                log::warn!("ignoring clear_and_remain_ready error {e}");
759                // SAFETY: `allocation_index` comes from this pool, as an unsafe
760                // contract of this function itself, and it's guaranteed to be no
761                // longer in use so safe to deallocate. The slot couldn't be
762                // preserved so it's dropped here.
763                //
764                // Note that at this point it's not clear how many bytes are
765                // resident in memory, so it's inevitably going to leave statistics
766                // a little off. Also note though that non-Linux platforms don't
767                // keep track of resident bytes anyway, and this path is only
768                // reachable on non-Linux platforms because Linux can't return an
769                // error.
770                unsafe {
771                    self.memories.deallocate(allocation_index, None, 0);
772                }
773            }
774        }
775    }
776
777    fn allocate_table<'a, 'b: 'a, 'c: 'a>(
778        &'a self,
779        request: &'a mut InstanceAllocationRequest<'b, 'c>,
780        ty: &'a wasmtime_environ::Table,
781        _table_index: DefinedTableIndex,
782    ) -> Pin<Box<dyn Future<Output = Result<(super::TableAllocationIndex, Table)>> + Send + 'a>>
783    {
784        crate::runtime::box_future(async move {
785            async {
786                // FIXME: see `allocate_memory` above for comments about duplication
787                // with `with_flush_and_retry`.
788                let e = match self.tables.allocate(request, ty).await {
789                    Ok(result) => return Ok(result),
790                    Err(e) => e,
791                };
792
793                if e.is::<PoolConcurrencyLimitError>() {
794                    let queue = self.decommit_queue.lock().unwrap();
795                    if self.flush_decommit_queue(queue) {
796                        return self.tables.allocate(request, ty).await;
797                    }
798                }
799
800                Err(e)
801            }
802            .await
803            .inspect(|_| {
804                self.live_tables.fetch_add(1, Ordering::Relaxed);
805            })
806        })
807    }
808
809    unsafe fn deallocate_table(
810        &self,
811        _table_index: DefinedTableIndex,
812        allocation_index: TableAllocationIndex,
813        mut table: Table,
814    ) {
815        let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
816        debug_assert!(prev > 0);
817
818        let mut queue = DecommitQueue::default();
819        // SAFETY: This table is no longer in use by the allocator when this
820        // method is called and additionally all image ranges are pushed with
821        // the understanding that the memory won't get used until the whole
822        // queue is flushed.
823        let bytes_resident = unsafe {
824            self.tables.reset_table_pages_to_zero(
825                self.pagemap.as_ref(),
826                allocation_index,
827                &mut table,
828                |ptr, len| {
829                    queue.push_raw(ptr, len);
830                },
831            )
832        };
833
834        // SAFETY: the table has had all its memory regions enqueued above.
835        unsafe {
836            queue.push_table(allocation_index, table, bytes_resident);
837        }
838        self.merge_or_flush(queue);
839    }
840
841    #[cfg(feature = "async")]
842    fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
843        let ret = self.with_flush_and_retry(|| self.stacks.allocate())?;
844        self.live_stacks.fetch_add(1, Ordering::Relaxed);
845        Ok(ret)
846    }
847
848    #[cfg(feature = "async")]
849    unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
850        self.live_stacks.fetch_sub(1, Ordering::Relaxed);
851        let mut queue = DecommitQueue::default();
852        // SAFETY: the stack is no longer in use by definition when this
853        // function is called and memory ranges pushed here are otherwise no
854        // longer in use.
855        let bytes_resident = unsafe {
856            self.stacks
857                .zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len))
858        };
859        // SAFETY: this stack's memory regions were enqueued above.
860        unsafe {
861            queue.push_stack(stack, bytes_resident);
862        }
863        self.merge_or_flush(queue);
864    }
865
866    fn purge_module(&self, module: CompiledModuleId) {
867        self.memories.purge_module(module);
868    }
869
870    fn next_available_pkey(&self) -> Option<ProtectionKey> {
871        self.memories.next_available_pkey()
872    }
873
874    fn restrict_to_pkey(&self, pkey: ProtectionKey) {
875        mpk::allow(ProtectionMask::zero().or(pkey));
876    }
877
878    fn allow_all_pkeys(&self) {
879        mpk::allow(ProtectionMask::all());
880    }
881
882    #[cfg(feature = "gc")]
883    fn allocate_gc_heap(
884        &self,
885        engine: &crate::Engine,
886        gc_runtime: &dyn GcRuntime,
887        memory_alloc_index: MemoryAllocationIndex,
888        memory: Memory,
889    ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
890        let ret = self.gc_heaps.as_ref().unwrap().allocate(
891            engine,
892            gc_runtime,
893            memory_alloc_index,
894            memory,
895        )?;
896        self.live_gc_heaps.fetch_add(1, Ordering::Relaxed);
897        Ok(ret)
898    }
899
900    #[cfg(feature = "gc")]
901    fn deallocate_gc_heap(
902        &self,
903        allocation_index: GcHeapAllocationIndex,
904        gc_heap: Box<dyn GcHeap>,
905    ) -> (MemoryAllocationIndex, Memory) {
906        let gc_heaps = self.gc_heaps.as_ref().unwrap();
907        self.live_gc_heaps.fetch_sub(1, Ordering::Relaxed);
908        gc_heaps.deallocate(allocation_index, gc_heap)
909    }
910
911    fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
912        Some(self)
913    }
914}
915
916#[cfg(test)]
917#[cfg(target_pointer_width = "64")]
918mod test {
919    use super::*;
920
921    #[test]
922    fn test_pooling_allocator_with_memory_pages_exceeded() {
923        let config = PoolingInstanceAllocatorConfig {
924            limits: InstanceLimits {
925                total_memories: 1,
926                max_memory_size: 0x100010000,
927                ..Default::default()
928            },
929            ..PoolingInstanceAllocatorConfig::default()
930        };
931        assert_eq!(
932            PoolingInstanceAllocator::new(
933                &config,
934                &Tunables {
935                    memory_reservation: 0x10000,
936                    ..Tunables::default_host()
937                },
938            )
939            .map_err(|e| e.to_string())
940            .expect_err("expected a failure constructing instance allocator"),
941            "maximum memory size of 0x100010000 bytes exceeds the configured \
942             memory reservation of 0x10000 bytes"
943        );
944    }
945
946    #[cfg(all(
947        unix,
948        target_pointer_width = "64",
949        feature = "async",
950        not(miri),
951        not(asan)
952    ))]
953    #[test]
954    fn test_stack_zeroed() -> Result<()> {
955        let config = PoolingInstanceAllocatorConfig {
956            max_unused_warm_slots: 0,
957            limits: InstanceLimits {
958                total_stacks: 1,
959                total_memories: 0,
960                total_tables: 0,
961                ..Default::default()
962            },
963            stack_size: 128,
964            async_stack_zeroing: true,
965            ..PoolingInstanceAllocatorConfig::default()
966        };
967        let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
968
969        unsafe {
970            for _ in 0..255 {
971                let stack = allocator.allocate_fiber_stack()?;
972
973                // The stack pointer is at the top, so decrement it first
974                let addr = stack.top().unwrap().sub(1);
975
976                assert_eq!(*addr, 0);
977                *addr = 1;
978
979                allocator.deallocate_fiber_stack(stack);
980            }
981        }
982
983        Ok(())
984    }
985
986    #[cfg(all(
987        unix,
988        target_pointer_width = "64",
989        feature = "async",
990        not(miri),
991        not(asan)
992    ))]
993    #[test]
994    fn test_stack_unzeroed() -> Result<()> {
995        let config = PoolingInstanceAllocatorConfig {
996            max_unused_warm_slots: 0,
997            limits: InstanceLimits {
998                total_stacks: 1,
999                total_memories: 0,
1000                total_tables: 0,
1001                ..Default::default()
1002            },
1003            stack_size: 128,
1004            async_stack_zeroing: false,
1005            ..PoolingInstanceAllocatorConfig::default()
1006        };
1007        let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
1008
1009        unsafe {
1010            for i in 0..255 {
1011                let stack = allocator.allocate_fiber_stack()?;
1012
1013                // The stack pointer is at the top, so decrement it first
1014                let addr = stack.top().unwrap().sub(1);
1015
1016                assert_eq!(*addr, i);
1017                *addr = i + 1;
1018
1019                allocator.deallocate_fiber_stack(stack);
1020            }
1021        }
1022
1023        Ok(())
1024    }
1025}