wasmtime_cache/
worker.rs

1//! Background worker that watches over the cache.
2//!
3//! It cleans up old cache, updates statistics and optimizes the cache.
4//! We allow losing some messages (it doesn't hurt) and some races,
5//! but we guarantee eventual consistency and fault tolerancy.
6//! Background tasks can be CPU intensive, but the worker thread has low priority.
7
8use super::{fs_write_atomic, CacheConfig};
9use log::{debug, info, trace, warn};
10use serde_derive::{Deserialize, Serialize};
11use std::cmp;
12use std::collections::HashMap;
13use std::ffi::OsStr;
14use std::fmt;
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
18#[cfg(test)]
19use std::sync::{Arc, Condvar, Mutex};
20use std::thread;
21use std::time::Duration;
22#[cfg(not(test))]
23use std::time::SystemTime;
24#[cfg(test)]
25use tests::system_time_stub::SystemTimeStub as SystemTime;
26
27#[derive(Clone)]
28pub(super) struct Worker {
29    sender: SyncSender<CacheEvent>,
30    #[cfg(test)]
31    stats: Arc<(Mutex<WorkerStats>, Condvar)>,
32}
33
34struct WorkerThread {
35    receiver: Receiver<CacheEvent>,
36    cache_config: CacheConfig,
37    #[cfg(test)]
38    stats: Arc<(Mutex<WorkerStats>, Condvar)>,
39}
40
41#[cfg(test)]
42#[derive(Default)]
43struct WorkerStats {
44    dropped: u32,
45    sent: u32,
46    handled: u32,
47}
48
49#[derive(Debug, Clone)]
50enum CacheEvent {
51    OnCacheGet(PathBuf),
52    OnCacheUpdate(PathBuf),
53}
54
55impl Worker {
56    pub(super) fn start_new(cache_config: &CacheConfig) -> Self {
57        let queue_size = match cache_config.worker_event_queue_size() {
58            num if num <= usize::max_value() as u64 => num as usize,
59            _ => usize::max_value(),
60        };
61        let (tx, rx) = sync_channel(queue_size);
62
63        #[cfg(test)]
64        let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
65
66        let worker_thread = WorkerThread {
67            receiver: rx,
68            cache_config: cache_config.clone(),
69            #[cfg(test)]
70            stats: stats.clone(),
71        };
72
73        // when self is dropped, sender will be dropped, what will cause the channel
74        // to hang, and the worker thread to exit -- it happens in the tests
75        // non-tests binary has only a static worker, so Rust doesn't drop it
76        thread::spawn(move || worker_thread.run());
77
78        Self {
79            sender: tx,
80            #[cfg(test)]
81            stats,
82        }
83    }
84
85    pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
86        let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
87        self.send_cache_event(event);
88    }
89
90    pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
91        let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
92        self.send_cache_event(event);
93    }
94
95    #[inline]
96    fn send_cache_event(&self, event: CacheEvent) {
97        let sent_event = self.sender.try_send(event.clone());
98
99        if let Err(ref err) = sent_event {
100            info!(
101                "Failed to send asynchronously message to worker thread, \
102                 event: {:?}, error: {}",
103                event, err
104            );
105        }
106
107        #[cfg(test)]
108        {
109            let mut stats = self
110                .stats
111                .0
112                .lock()
113                .expect("Failed to acquire worker stats lock");
114
115            if sent_event.is_ok() {
116                stats.sent += 1;
117            } else {
118                stats.dropped += 1;
119            }
120        }
121    }
122
123    #[cfg(test)]
124    pub(super) fn events_dropped(&self) -> u32 {
125        let stats = self
126            .stats
127            .0
128            .lock()
129            .expect("Failed to acquire worker stats lock");
130        stats.dropped
131    }
132
133    #[cfg(test)]
134    pub(super) fn wait_for_all_events_handled(&self) {
135        let (stats, condvar) = &*self.stats;
136        let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
137        while stats.handled != stats.sent {
138            stats = condvar
139                .wait(stats)
140                .expect("Failed to reacquire worker stats lock");
141        }
142    }
143}
144
145impl fmt::Debug for Worker {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        f.debug_struct("Worker").finish()
148    }
149}
150
151#[derive(Serialize, Deserialize)]
152struct ModuleCacheStatistics {
153    pub usages: u64,
154    #[serde(rename = "optimized-compression")]
155    pub compression_level: i32,
156}
157
158impl ModuleCacheStatistics {
159    fn default(cache_config: &CacheConfig) -> Self {
160        Self {
161            usages: 0,
162            compression_level: cache_config.baseline_compression_level(),
163        }
164    }
165}
166
167enum CacheEntry {
168    Recognized {
169        path: PathBuf,
170        mtime: SystemTime,
171        size: u64,
172    },
173    Unrecognized {
174        path: PathBuf,
175        is_dir: bool,
176    },
177}
178
179macro_rules! unwrap_or_warn {
180    ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
181        match $result {
182            Ok(val) => val,
183            Err(err) => {
184                warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
185                $cont
186            }
187        }
188    };
189}
190
191impl WorkerThread {
192    fn run(self) {
193        debug!("Cache worker thread started.");
194
195        Self::lower_thread_priority();
196
197        #[cfg(test)]
198        let (stats, condvar) = &*self.stats;
199
200        for event in self.receiver.iter() {
201            match event {
202                CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
203                CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
204            }
205
206            #[cfg(test)]
207            {
208                let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
209                stats.handled += 1;
210                condvar.notify_all();
211            }
212        }
213    }
214
215    #[cfg(target_os = "fuchsia")]
216    fn lower_thread_priority() {
217        // TODO This needs to use Fuchsia thread profiles
218        // https://fuchsia.dev/fuchsia-src/reference/kernel_objects/profile
219        warn!(
220            "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
221        );
222    }
223
224    #[cfg(target_os = "windows")]
225    fn lower_thread_priority() {
226        use windows_sys::Win32::System::Threading::*;
227
228        // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority
229        // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
230
231        if unsafe {
232            SetThreadPriority(
233                GetCurrentThread(),
234                THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(),
235            )
236        } == 0
237        {
238            warn!(
239                "Failed to lower worker thread priority. It might affect application performance."
240            );
241        }
242    }
243
244    #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
245    fn lower_thread_priority() {
246        // http://man7.org/linux/man-pages/man7/sched.7.html
247
248        const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
249
250        match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
251            Ok(current_nice) => {
252                debug!("New nice value of worker thread: {}", current_nice);
253            }
254            Err(err) => {
255                warn!(
256                    "Failed to lower worker thread priority ({:?}). It might affect application performance.", err);
257            }
258        };
259    }
260
261    /// Increases the usage counter and recompresses the file
262    /// if the usage counter reached configurable threshold.
263    fn handle_on_cache_get(&self, path: PathBuf) {
264        trace!("handle_on_cache_get() for path: {}", path.display());
265
266        // construct .stats file path
267        let filename = path.file_name().unwrap().to_str().unwrap();
268        let stats_path = path.with_file_name(format!("{filename}.stats"));
269
270        // load .stats file (default if none or error)
271        let mut stats = read_stats_file(stats_path.as_ref())
272            .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
273
274        // step 1: update the usage counter & write to the disk
275        //         it's racy, but it's fine (the counter will be just smaller,
276        //         sometimes will retrigger recompression)
277        stats.usages += 1;
278        if !write_stats_file(stats_path.as_ref(), &stats) {
279            return;
280        }
281
282        // step 2: recompress if there's a need
283        let opt_compr_lvl = self.cache_config.optimized_compression_level();
284        if stats.compression_level >= opt_compr_lvl
285            || stats.usages
286                < self
287                    .cache_config
288                    .optimized_compression_usage_counter_threshold()
289        {
290            return;
291        }
292
293        let lock_path = if let Some(p) = acquire_task_fs_lock(
294            path.as_ref(),
295            self.cache_config.optimizing_compression_task_timeout(),
296            self.cache_config
297                .allowed_clock_drift_for_files_from_future(),
298        ) {
299            p
300        } else {
301            return;
302        };
303
304        trace!("Trying to recompress file: {}", path.display());
305
306        // recompress, write to other file, rename (it's atomic file content exchange)
307        // and update the stats file
308        let compressed_cache_bytes = unwrap_or_warn!(
309            fs::read(&path),
310            return,
311            "Failed to read old cache file",
312            path
313        );
314
315        let cache_bytes = unwrap_or_warn!(
316            zstd::decode_all(&compressed_cache_bytes[..]),
317            return,
318            "Failed to decompress cached code",
319            path
320        );
321
322        let recompressed_cache_bytes = unwrap_or_warn!(
323            zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
324            return,
325            "Failed to compress cached code",
326            path
327        );
328
329        unwrap_or_warn!(
330            fs::write(&lock_path, &recompressed_cache_bytes),
331            return,
332            "Failed to write recompressed cache",
333            lock_path
334        );
335
336        unwrap_or_warn!(
337            fs::rename(&lock_path, &path),
338            {
339                if let Err(error) = fs::remove_file(&lock_path) {
340                    warn!(
341                        "Failed to clean up (remove) recompressed cache, path {}, err: {}",
342                        lock_path.display(),
343                        error
344                    );
345                }
346
347                return;
348            },
349            "Failed to rename recompressed cache",
350            lock_path
351        );
352
353        // update stats file (reload it! recompression can take some time)
354        if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
355            if new_stats.compression_level >= opt_compr_lvl {
356                // Rare race:
357                //    two instances with different opt_compr_lvl: we don't know in which order they updated
358                //    the cache file and the stats file (they are not updated together atomically)
359                // Possible solution is to use directories per cache entry, but it complicates the system
360                // and is not worth it.
361                debug!(
362                    "DETECTED task did more than once (or race with new file): \
363                     recompression of {}. Note: if optimized compression level setting \
364                     has changed in the meantine, the stats file might contain \
365                     inconsistent compression level due to race.",
366                    path.display()
367                );
368            } else {
369                new_stats.compression_level = opt_compr_lvl;
370                let _ = write_stats_file(stats_path.as_ref(), &new_stats);
371            }
372
373            if new_stats.usages < stats.usages {
374                debug!(
375                    "DETECTED lower usage count (new file or race with counter \
376                     increasing): file {}",
377                    path.display()
378                );
379            }
380        } else {
381            debug!(
382                "Can't read stats file again to update compression level (it might got \
383                 cleaned up): file {}",
384                stats_path.display()
385            );
386        }
387
388        trace!("Task finished: recompress file: {}", path.display());
389    }
390
391    fn handle_on_cache_update(&self, path: PathBuf) {
392        trace!("handle_on_cache_update() for path: {}", path.display());
393
394        // ---------------------- step 1: create .stats file
395
396        // construct .stats file path
397        let filename = path
398            .file_name()
399            .expect("Expected valid cache file name")
400            .to_str()
401            .expect("Expected valid cache file name");
402        let stats_path = path.with_file_name(format!("{filename}.stats"));
403
404        // create and write stats file
405        let mut stats = ModuleCacheStatistics::default(&self.cache_config);
406        stats.usages += 1;
407        write_stats_file(&stats_path, &stats);
408
409        // ---------------------- step 2: perform cleanup task if needed
410
411        // acquire lock for cleanup task
412        // Lock is a proof of recent cleanup task, so we don't want to delete them.
413        // Expired locks will be deleted by the cleanup task.
414        let cleanup_file = self.cache_config.directory().join(".cleanup"); // some non existing marker file
415        if acquire_task_fs_lock(
416            &cleanup_file,
417            self.cache_config.cleanup_interval(),
418            self.cache_config
419                .allowed_clock_drift_for_files_from_future(),
420        )
421        .is_none()
422        {
423            return;
424        }
425
426        trace!("Trying to clean up cache");
427
428        let mut cache_index = self.list_cache_contents();
429        let future_tolerance = SystemTime::now()
430            .checked_add(
431                self.cache_config
432                    .allowed_clock_drift_for_files_from_future(),
433            )
434            .expect("Brace your cache, the next Big Bang is coming (time overflow)");
435        cache_index.sort_unstable_by(|lhs, rhs| {
436            // sort by age
437            use CacheEntry::*;
438            match (lhs, rhs) {
439                (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
440                    match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
441                        // later == younger
442                        (false, false) => rhs_mt.cmp(lhs_mt),
443                        // files from far future are treated as oldest recognized files
444                        // we want to delete them, so the cache keeps track of recent files
445                        // however, we don't delete them uncodintionally,
446                        // because .stats file can be overwritten with a meaningful mtime
447                        (true, false) => cmp::Ordering::Greater,
448                        (false, true) => cmp::Ordering::Less,
449                        (true, true) => cmp::Ordering::Equal,
450                    }
451                }
452                // unrecognized is kind of infinity
453                (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
454                (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
455                (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
456            }
457        });
458
459        // find "cut" boundary:
460        // - remove unrecognized files anyway,
461        // - remove some cache files if some quota has been exceeded
462        let mut total_size = 0u64;
463        let mut start_delete_idx = None;
464        let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
465
466        let total_size_limit = self.cache_config.files_total_size_soft_limit();
467        let file_count_limit = self.cache_config.file_count_soft_limit();
468        let tsl_if_deleting = total_size_limit
469            .checked_mul(
470                self.cache_config
471                    .files_total_size_limit_percent_if_deleting() as u64,
472            )
473            .unwrap()
474            / 100;
475        let fcl_if_deleting = file_count_limit
476            .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
477            .unwrap()
478            / 100;
479
480        for (idx, item) in cache_index.iter().enumerate() {
481            let size = if let CacheEntry::Recognized { size, .. } = item {
482                size
483            } else {
484                start_delete_idx = Some(idx);
485                break;
486            };
487
488            total_size += size;
489            if start_delete_idx_if_deleting_recognized_items.is_none()
490                && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
491            {
492                start_delete_idx_if_deleting_recognized_items = Some(idx);
493            }
494
495            if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
496                start_delete_idx = start_delete_idx_if_deleting_recognized_items;
497                break;
498            }
499        }
500
501        if let Some(idx) = start_delete_idx {
502            for item in &cache_index[idx..] {
503                let (result, path, entity) = match item {
504                    CacheEntry::Recognized { path, .. }
505                    | CacheEntry::Unrecognized {
506                        path,
507                        is_dir: false,
508                    } => (fs::remove_file(path), path, "file"),
509                    CacheEntry::Unrecognized { path, is_dir: true } => {
510                        (fs::remove_dir_all(path), path, "directory")
511                    }
512                };
513                if let Err(err) = result {
514                    warn!(
515                        "Failed to remove {} during cleanup, path: {}, err: {}",
516                        entity,
517                        path.display(),
518                        err
519                    );
520                }
521            }
522        }
523
524        trace!("Task finished: clean up cache");
525    }
526
527    // Be fault tolerant: list as much as you can, and ignore the rest
528    fn list_cache_contents(&self) -> Vec<CacheEntry> {
529        fn enter_dir(
530            vec: &mut Vec<CacheEntry>,
531            dir_path: &Path,
532            level: u8,
533            cache_config: &CacheConfig,
534        ) {
535            macro_rules! add_unrecognized {
536                (file: $path:expr) => {
537                    add_unrecognized!(false, $path)
538                };
539                (dir: $path:expr) => {
540                    add_unrecognized!(true, $path)
541                };
542                ($is_dir:expr, $path:expr) => {
543                    vec.push(CacheEntry::Unrecognized {
544                        path: $path.to_path_buf(),
545                        is_dir: $is_dir,
546                    })
547                };
548            }
549            macro_rules! add_unrecognized_and {
550                ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
551                    $( add_unrecognized!($ty: $path); )*
552                        $cont
553                }};
554            }
555
556            macro_rules! unwrap_or {
557                ($result:expr, $cont:stmt, $err_msg:expr) => {
558                    unwrap_or!($result, $cont, $err_msg, dir_path)
559                };
560                ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
561                    unwrap_or_warn!(
562                        $result,
563                        $cont,
564                        format!("{}, level: {}", $err_msg, level),
565                        $path
566                    )
567                };
568            }
569
570            // If we fail to list a directory, something bad is happening anyway
571            // (something touches our cache or we have disk failure)
572            // Try to delete it, so we can stay within soft limits of the cache size.
573            // This comment applies later in this function, too.
574            let it = unwrap_or!(
575                fs::read_dir(dir_path),
576                add_unrecognized_and!([dir: dir_path], return),
577                "Failed to list cache directory, deleting it"
578            );
579
580            let mut cache_files = HashMap::new();
581            for entry in it {
582                // read_dir() returns an iterator over results - in case some of them are errors
583                // we don't know their names, so we can't delete them. We don't want to delete
584                // the whole directory with good entries too, so we just ignore the erroneous entries.
585                let entry = unwrap_or!(
586                    entry,
587                    continue,
588                    "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
589                );
590                let path = entry.path();
591                match (level, path.is_dir()) {
592                    (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
593                    (0..=1, false) => {
594                        if level == 0
595                            && path.file_stem() == Some(OsStr::new(".cleanup"))
596                                && path.extension().is_some()
597                                // assume it's cleanup lock
598                                && !is_fs_lock_expired(
599                                    Some(&entry),
600                                    &path,
601                                    cache_config.cleanup_interval(),
602                                    cache_config.allowed_clock_drift_for_files_from_future(),
603                                )
604                        {
605                            continue; // skip active lock
606                        }
607                        add_unrecognized!(file: path);
608                    }
609                    (2, false) => {
610                        match path.extension().and_then(OsStr::to_str) {
611                            // mod or stats file
612                            None | Some("stats") => {
613                                cache_files.insert(path, entry);
614                            }
615
616                            Some(ext) => {
617                                // check if valid lock
618                                let recognized = ext.starts_with("wip-")
619                                    && !is_fs_lock_expired(
620                                        Some(&entry),
621                                        &path,
622                                        cache_config.optimizing_compression_task_timeout(),
623                                        cache_config.allowed_clock_drift_for_files_from_future(),
624                                    );
625
626                                if !recognized {
627                                    add_unrecognized!(file: path);
628                                }
629                            }
630                        }
631                    }
632                    (_, is_dir) => add_unrecognized!(is_dir, path),
633                }
634            }
635
636            // associate module with its stats & handle them
637            // assumption: just mods and stats
638            for (path, entry) in cache_files.iter() {
639                let path_buf: PathBuf;
640                let (mod_, stats_, is_mod) = match path.extension() {
641                    Some(_) => {
642                        path_buf = path.with_extension("");
643                        (
644                            cache_files.get(&path_buf).map(|v| (&path_buf, v)),
645                            Some((path, entry)),
646                            false,
647                        )
648                    }
649                    None => {
650                        path_buf = path.with_extension("stats");
651                        (
652                            Some((path, entry)),
653                            cache_files.get(&path_buf).map(|v| (&path_buf, v)),
654                            true,
655                        )
656                    }
657                };
658
659                // construct a cache entry
660                match (mod_, stats_, is_mod) {
661                    (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
662                        let mod_metadata = unwrap_or!(
663                            mod_entry.metadata(),
664                            add_unrecognized_and!([file: stats_path, file: mod_path], continue),
665                            "Failed to get metadata, deleting BOTH module cache and stats files",
666                            mod_path
667                        );
668                        let stats_mtime = unwrap_or!(
669                            stats_entry.metadata().and_then(|m| m.modified()),
670                            add_unrecognized_and!(
671                                [file: stats_path],
672                                unwrap_or!(
673                                    mod_metadata.modified(),
674                                    add_unrecognized_and!(
675                                        [file: stats_path, file: mod_path],
676                                        continue
677                                    ),
678                                    "Failed to get mtime, deleting BOTH module cache and stats \
679                                     files",
680                                    mod_path
681                                )
682                            ),
683                            "Failed to get metadata/mtime, deleting the file",
684                            stats_path
685                        );
686                        // .into() called for the SystemTimeStub if cfg(test)
687                        vec.push(CacheEntry::Recognized {
688                            path: mod_path.to_path_buf(),
689                            mtime: stats_mtime.into(),
690                            size: mod_metadata.len(),
691                        })
692                    }
693                    (Some(_), Some(_), false) => (), // was or will be handled by previous branch
694                    (Some((mod_path, mod_entry)), None, _) => {
695                        let (mod_metadata, mod_mtime) = unwrap_or!(
696                            mod_entry
697                                .metadata()
698                                .and_then(|md| md.modified().map(|mt| (md, mt))),
699                            add_unrecognized_and!([file: mod_path], continue),
700                            "Failed to get metadata/mtime, deleting the file",
701                            mod_path
702                        );
703                        // .into() called for the SystemTimeStub if cfg(test)
704                        vec.push(CacheEntry::Recognized {
705                            path: mod_path.to_path_buf(),
706                            mtime: mod_mtime.into(),
707                            size: mod_metadata.len(),
708                        })
709                    }
710                    (None, Some((stats_path, _stats_entry)), _) => {
711                        debug!("Found orphaned stats file: {}", stats_path.display());
712                        add_unrecognized!(file: stats_path);
713                    }
714                    _ => unreachable!(),
715                }
716            }
717        }
718
719        let mut vec = Vec::new();
720        enter_dir(
721            &mut vec,
722            self.cache_config.directory(),
723            0,
724            &self.cache_config,
725        );
726        vec
727    }
728}
729
730fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
731    fs::read_to_string(path)
732        .map_err(|err| {
733            trace!(
734                "Failed to read stats file, path: {}, err: {}",
735                path.display(),
736                err
737            )
738        })
739        .and_then(|contents| {
740            toml::from_str::<ModuleCacheStatistics>(&contents).map_err(|err| {
741                trace!(
742                    "Failed to parse stats file, path: {}, err: {}",
743                    path.display(),
744                    err,
745                )
746            })
747        })
748        .ok()
749}
750
751fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
752    toml::to_string_pretty(&stats)
753        .map_err(|err| {
754            warn!(
755                "Failed to serialize stats file, path: {}, err: {}",
756                path.display(),
757                err
758            )
759        })
760        .and_then(|serialized| {
761            fs_write_atomic(path, "stats", serialized.as_bytes()).map_err(|_| ())
762        })
763        .is_ok()
764}
765
766/// Tries to acquire a lock for specific task.
767///
768/// Returns Some(path) to the lock if succeeds. The task path must not
769/// contain any extension and have file stem.
770///
771/// To release a lock you need either manually rename or remove it,
772/// or wait until it expires and cleanup task removes it.
773///
774/// Note: this function is racy. Main idea is: be fault tolerant and
775///       never block some task. The price is that we rarely do some task
776///       more than once.
777fn acquire_task_fs_lock(
778    task_path: &Path,
779    timeout: Duration,
780    allowed_future_drift: Duration,
781) -> Option<PathBuf> {
782    assert!(task_path.extension().is_none());
783    assert!(task_path.file_stem().is_some());
784
785    // list directory
786    let dir_path = task_path.parent()?;
787    let it = fs::read_dir(dir_path)
788        .map_err(|err| {
789            warn!(
790                "Failed to list cache directory, path: {}, err: {}",
791                dir_path.display(),
792                err
793            )
794        })
795        .ok()?;
796
797    // look for existing locks
798    for entry in it {
799        let entry = entry
800            .map_err(|err| {
801                warn!(
802                    "Failed to list cache directory, path: {}, err: {}",
803                    dir_path.display(),
804                    err
805                )
806            })
807            .ok()?;
808
809        let path = entry.path();
810        if path.is_dir() || path.file_stem() != task_path.file_stem() {
811            continue;
812        }
813
814        // check extension and mtime
815        match path.extension() {
816            None => continue,
817            Some(ext) => {
818                if let Some(ext_str) = ext.to_str() {
819                    // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
820                    if ext_str.starts_with("wip-")
821                        && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
822                    {
823                        return None;
824                    }
825                }
826            }
827        }
828    }
829
830    // create the lock
831    let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
832    let _file = fs::OpenOptions::new()
833        .create_new(true)
834        .write(true)
835        .open(&lock_path)
836        .map_err(|err| {
837            warn!(
838                "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
839                lock_path.display(),
840                err
841            )
842        })
843        .ok()?;
844
845    Some(lock_path)
846}
847
848// we have either both, or just path; dir entry is desirable since on some platforms we can get
849// metadata without extra syscalls
850// furthermore: it's better to get a path if we have it instead of allocating a new one from the dir entry
851fn is_fs_lock_expired(
852    entry: Option<&fs::DirEntry>,
853    path: &PathBuf,
854    threshold: Duration,
855    allowed_future_drift: Duration,
856) -> bool {
857    let mtime = match entry
858        .map_or_else(|| path.metadata(), |e| e.metadata())
859        .and_then(|metadata| metadata.modified())
860    {
861        Ok(mt) => mt,
862        Err(err) => {
863            warn!(
864                "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
865                path.display(),
866                err
867            );
868            return true; // can't read mtime, treat as expired, so this task will not be starved
869        }
870    };
871
872    // DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
873    match SystemTime::now().duration_since(mtime) {
874        Ok(elapsed) => elapsed >= threshold,
875        Err(err) => {
876            trace!(
877                "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
878                path.display(),
879                err
880            );
881            // the lock is expired if the time is too far in the future
882            // it is fine to have network share and not synchronized clocks,
883            // but it's not good when user changes time in their system clock
884            err.duration() > allowed_future_drift
885        }
886    }
887}
888
889#[cfg(test)]
890mod tests;