1use super::{fs_write_atomic, CacheConfig};
9use log::{debug, info, trace, warn};
10use serde_derive::{Deserialize, Serialize};
11use std::cmp;
12use std::collections::HashMap;
13use std::ffi::OsStr;
14use std::fmt;
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
18#[cfg(test)]
19use std::sync::{Arc, Condvar, Mutex};
20use std::thread;
21use std::time::Duration;
22#[cfg(not(test))]
23use std::time::SystemTime;
24#[cfg(test)]
25use tests::system_time_stub::SystemTimeStub as SystemTime;
26
27#[derive(Clone)]
28pub(super) struct Worker {
29 sender: SyncSender<CacheEvent>,
30 #[cfg(test)]
31 stats: Arc<(Mutex<WorkerStats>, Condvar)>,
32}
33
34struct WorkerThread {
35 receiver: Receiver<CacheEvent>,
36 cache_config: CacheConfig,
37 #[cfg(test)]
38 stats: Arc<(Mutex<WorkerStats>, Condvar)>,
39}
40
41#[cfg(test)]
42#[derive(Default)]
43struct WorkerStats {
44 dropped: u32,
45 sent: u32,
46 handled: u32,
47}
48
49#[derive(Debug, Clone)]
50enum CacheEvent {
51 OnCacheGet(PathBuf),
52 OnCacheUpdate(PathBuf),
53}
54
55impl Worker {
56 pub(super) fn start_new(cache_config: &CacheConfig) -> Self {
57 let queue_size = match cache_config.worker_event_queue_size() {
58 num if num <= usize::max_value() as u64 => num as usize,
59 _ => usize::max_value(),
60 };
61 let (tx, rx) = sync_channel(queue_size);
62
63 #[cfg(test)]
64 let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
65
66 let worker_thread = WorkerThread {
67 receiver: rx,
68 cache_config: cache_config.clone(),
69 #[cfg(test)]
70 stats: stats.clone(),
71 };
72
73 thread::spawn(move || worker_thread.run());
77
78 Self {
79 sender: tx,
80 #[cfg(test)]
81 stats,
82 }
83 }
84
85 pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
86 let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
87 self.send_cache_event(event);
88 }
89
90 pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
91 let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
92 self.send_cache_event(event);
93 }
94
95 #[inline]
96 fn send_cache_event(&self, event: CacheEvent) {
97 let sent_event = self.sender.try_send(event.clone());
98
99 if let Err(ref err) = sent_event {
100 info!(
101 "Failed to send asynchronously message to worker thread, \
102 event: {:?}, error: {}",
103 event, err
104 );
105 }
106
107 #[cfg(test)]
108 {
109 let mut stats = self
110 .stats
111 .0
112 .lock()
113 .expect("Failed to acquire worker stats lock");
114
115 if sent_event.is_ok() {
116 stats.sent += 1;
117 } else {
118 stats.dropped += 1;
119 }
120 }
121 }
122
123 #[cfg(test)]
124 pub(super) fn events_dropped(&self) -> u32 {
125 let stats = self
126 .stats
127 .0
128 .lock()
129 .expect("Failed to acquire worker stats lock");
130 stats.dropped
131 }
132
133 #[cfg(test)]
134 pub(super) fn wait_for_all_events_handled(&self) {
135 let (stats, condvar) = &*self.stats;
136 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
137 while stats.handled != stats.sent {
138 stats = condvar
139 .wait(stats)
140 .expect("Failed to reacquire worker stats lock");
141 }
142 }
143}
144
145impl fmt::Debug for Worker {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 f.debug_struct("Worker").finish()
148 }
149}
150
151#[derive(Serialize, Deserialize)]
152struct ModuleCacheStatistics {
153 pub usages: u64,
154 #[serde(rename = "optimized-compression")]
155 pub compression_level: i32,
156}
157
158impl ModuleCacheStatistics {
159 fn default(cache_config: &CacheConfig) -> Self {
160 Self {
161 usages: 0,
162 compression_level: cache_config.baseline_compression_level(),
163 }
164 }
165}
166
167enum CacheEntry {
168 Recognized {
169 path: PathBuf,
170 mtime: SystemTime,
171 size: u64,
172 },
173 Unrecognized {
174 path: PathBuf,
175 is_dir: bool,
176 },
177}
178
179macro_rules! unwrap_or_warn {
180 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
181 match $result {
182 Ok(val) => val,
183 Err(err) => {
184 warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
185 $cont
186 }
187 }
188 };
189}
190
191impl WorkerThread {
192 fn run(self) {
193 debug!("Cache worker thread started.");
194
195 Self::lower_thread_priority();
196
197 #[cfg(test)]
198 let (stats, condvar) = &*self.stats;
199
200 for event in self.receiver.iter() {
201 match event {
202 CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
203 CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
204 }
205
206 #[cfg(test)]
207 {
208 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
209 stats.handled += 1;
210 condvar.notify_all();
211 }
212 }
213 }
214
215 #[cfg(target_os = "fuchsia")]
216 fn lower_thread_priority() {
217 warn!(
220 "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
221 );
222 }
223
224 #[cfg(target_os = "windows")]
225 fn lower_thread_priority() {
226 use windows_sys::Win32::System::Threading::*;
227
228 if unsafe {
232 SetThreadPriority(
233 GetCurrentThread(),
234 THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(),
235 )
236 } == 0
237 {
238 warn!(
239 "Failed to lower worker thread priority. It might affect application performance."
240 );
241 }
242 }
243
244 #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
245 fn lower_thread_priority() {
246 const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
249
250 match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
251 Ok(current_nice) => {
252 debug!("New nice value of worker thread: {}", current_nice);
253 }
254 Err(err) => {
255 warn!(
256 "Failed to lower worker thread priority ({:?}). It might affect application performance.", err);
257 }
258 };
259 }
260
261 fn handle_on_cache_get(&self, path: PathBuf) {
264 trace!("handle_on_cache_get() for path: {}", path.display());
265
266 let filename = path.file_name().unwrap().to_str().unwrap();
268 let stats_path = path.with_file_name(format!("{filename}.stats"));
269
270 let mut stats = read_stats_file(stats_path.as_ref())
272 .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
273
274 stats.usages += 1;
278 if !write_stats_file(stats_path.as_ref(), &stats) {
279 return;
280 }
281
282 let opt_compr_lvl = self.cache_config.optimized_compression_level();
284 if stats.compression_level >= opt_compr_lvl
285 || stats.usages
286 < self
287 .cache_config
288 .optimized_compression_usage_counter_threshold()
289 {
290 return;
291 }
292
293 let lock_path = if let Some(p) = acquire_task_fs_lock(
294 path.as_ref(),
295 self.cache_config.optimizing_compression_task_timeout(),
296 self.cache_config
297 .allowed_clock_drift_for_files_from_future(),
298 ) {
299 p
300 } else {
301 return;
302 };
303
304 trace!("Trying to recompress file: {}", path.display());
305
306 let compressed_cache_bytes = unwrap_or_warn!(
309 fs::read(&path),
310 return,
311 "Failed to read old cache file",
312 path
313 );
314
315 let cache_bytes = unwrap_or_warn!(
316 zstd::decode_all(&compressed_cache_bytes[..]),
317 return,
318 "Failed to decompress cached code",
319 path
320 );
321
322 let recompressed_cache_bytes = unwrap_or_warn!(
323 zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
324 return,
325 "Failed to compress cached code",
326 path
327 );
328
329 unwrap_or_warn!(
330 fs::write(&lock_path, &recompressed_cache_bytes),
331 return,
332 "Failed to write recompressed cache",
333 lock_path
334 );
335
336 unwrap_or_warn!(
337 fs::rename(&lock_path, &path),
338 {
339 if let Err(error) = fs::remove_file(&lock_path) {
340 warn!(
341 "Failed to clean up (remove) recompressed cache, path {}, err: {}",
342 lock_path.display(),
343 error
344 );
345 }
346
347 return;
348 },
349 "Failed to rename recompressed cache",
350 lock_path
351 );
352
353 if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
355 if new_stats.compression_level >= opt_compr_lvl {
356 debug!(
362 "DETECTED task did more than once (or race with new file): \
363 recompression of {}. Note: if optimized compression level setting \
364 has changed in the meantine, the stats file might contain \
365 inconsistent compression level due to race.",
366 path.display()
367 );
368 } else {
369 new_stats.compression_level = opt_compr_lvl;
370 let _ = write_stats_file(stats_path.as_ref(), &new_stats);
371 }
372
373 if new_stats.usages < stats.usages {
374 debug!(
375 "DETECTED lower usage count (new file or race with counter \
376 increasing): file {}",
377 path.display()
378 );
379 }
380 } else {
381 debug!(
382 "Can't read stats file again to update compression level (it might got \
383 cleaned up): file {}",
384 stats_path.display()
385 );
386 }
387
388 trace!("Task finished: recompress file: {}", path.display());
389 }
390
391 fn handle_on_cache_update(&self, path: PathBuf) {
392 trace!("handle_on_cache_update() for path: {}", path.display());
393
394 let filename = path
398 .file_name()
399 .expect("Expected valid cache file name")
400 .to_str()
401 .expect("Expected valid cache file name");
402 let stats_path = path.with_file_name(format!("{filename}.stats"));
403
404 let mut stats = ModuleCacheStatistics::default(&self.cache_config);
406 stats.usages += 1;
407 write_stats_file(&stats_path, &stats);
408
409 let cleanup_file = self.cache_config.directory().join(".cleanup"); if acquire_task_fs_lock(
416 &cleanup_file,
417 self.cache_config.cleanup_interval(),
418 self.cache_config
419 .allowed_clock_drift_for_files_from_future(),
420 )
421 .is_none()
422 {
423 return;
424 }
425
426 trace!("Trying to clean up cache");
427
428 let mut cache_index = self.list_cache_contents();
429 let future_tolerance = SystemTime::now()
430 .checked_add(
431 self.cache_config
432 .allowed_clock_drift_for_files_from_future(),
433 )
434 .expect("Brace your cache, the next Big Bang is coming (time overflow)");
435 cache_index.sort_unstable_by(|lhs, rhs| {
436 use CacheEntry::*;
438 match (lhs, rhs) {
439 (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
440 match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
441 (false, false) => rhs_mt.cmp(lhs_mt),
443 (true, false) => cmp::Ordering::Greater,
448 (false, true) => cmp::Ordering::Less,
449 (true, true) => cmp::Ordering::Equal,
450 }
451 }
452 (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
454 (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
455 (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
456 }
457 });
458
459 let mut total_size = 0u64;
463 let mut start_delete_idx = None;
464 let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
465
466 let total_size_limit = self.cache_config.files_total_size_soft_limit();
467 let file_count_limit = self.cache_config.file_count_soft_limit();
468 let tsl_if_deleting = total_size_limit
469 .checked_mul(
470 self.cache_config
471 .files_total_size_limit_percent_if_deleting() as u64,
472 )
473 .unwrap()
474 / 100;
475 let fcl_if_deleting = file_count_limit
476 .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
477 .unwrap()
478 / 100;
479
480 for (idx, item) in cache_index.iter().enumerate() {
481 let size = if let CacheEntry::Recognized { size, .. } = item {
482 size
483 } else {
484 start_delete_idx = Some(idx);
485 break;
486 };
487
488 total_size += size;
489 if start_delete_idx_if_deleting_recognized_items.is_none()
490 && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
491 {
492 start_delete_idx_if_deleting_recognized_items = Some(idx);
493 }
494
495 if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
496 start_delete_idx = start_delete_idx_if_deleting_recognized_items;
497 break;
498 }
499 }
500
501 if let Some(idx) = start_delete_idx {
502 for item in &cache_index[idx..] {
503 let (result, path, entity) = match item {
504 CacheEntry::Recognized { path, .. }
505 | CacheEntry::Unrecognized {
506 path,
507 is_dir: false,
508 } => (fs::remove_file(path), path, "file"),
509 CacheEntry::Unrecognized { path, is_dir: true } => {
510 (fs::remove_dir_all(path), path, "directory")
511 }
512 };
513 if let Err(err) = result {
514 warn!(
515 "Failed to remove {} during cleanup, path: {}, err: {}",
516 entity,
517 path.display(),
518 err
519 );
520 }
521 }
522 }
523
524 trace!("Task finished: clean up cache");
525 }
526
527 fn list_cache_contents(&self) -> Vec<CacheEntry> {
529 fn enter_dir(
530 vec: &mut Vec<CacheEntry>,
531 dir_path: &Path,
532 level: u8,
533 cache_config: &CacheConfig,
534 ) {
535 macro_rules! add_unrecognized {
536 (file: $path:expr) => {
537 add_unrecognized!(false, $path)
538 };
539 (dir: $path:expr) => {
540 add_unrecognized!(true, $path)
541 };
542 ($is_dir:expr, $path:expr) => {
543 vec.push(CacheEntry::Unrecognized {
544 path: $path.to_path_buf(),
545 is_dir: $is_dir,
546 })
547 };
548 }
549 macro_rules! add_unrecognized_and {
550 ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
551 $( add_unrecognized!($ty: $path); )*
552 $cont
553 }};
554 }
555
556 macro_rules! unwrap_or {
557 ($result:expr, $cont:stmt, $err_msg:expr) => {
558 unwrap_or!($result, $cont, $err_msg, dir_path)
559 };
560 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
561 unwrap_or_warn!(
562 $result,
563 $cont,
564 format!("{}, level: {}", $err_msg, level),
565 $path
566 )
567 };
568 }
569
570 let it = unwrap_or!(
575 fs::read_dir(dir_path),
576 add_unrecognized_and!([dir: dir_path], return),
577 "Failed to list cache directory, deleting it"
578 );
579
580 let mut cache_files = HashMap::new();
581 for entry in it {
582 let entry = unwrap_or!(
586 entry,
587 continue,
588 "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
589 );
590 let path = entry.path();
591 match (level, path.is_dir()) {
592 (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
593 (0..=1, false) => {
594 if level == 0
595 && path.file_stem() == Some(OsStr::new(".cleanup"))
596 && path.extension().is_some()
597 && !is_fs_lock_expired(
599 Some(&entry),
600 &path,
601 cache_config.cleanup_interval(),
602 cache_config.allowed_clock_drift_for_files_from_future(),
603 )
604 {
605 continue; }
607 add_unrecognized!(file: path);
608 }
609 (2, false) => {
610 match path.extension().and_then(OsStr::to_str) {
611 None | Some("stats") => {
613 cache_files.insert(path, entry);
614 }
615
616 Some(ext) => {
617 let recognized = ext.starts_with("wip-")
619 && !is_fs_lock_expired(
620 Some(&entry),
621 &path,
622 cache_config.optimizing_compression_task_timeout(),
623 cache_config.allowed_clock_drift_for_files_from_future(),
624 );
625
626 if !recognized {
627 add_unrecognized!(file: path);
628 }
629 }
630 }
631 }
632 (_, is_dir) => add_unrecognized!(is_dir, path),
633 }
634 }
635
636 for (path, entry) in cache_files.iter() {
639 let path_buf: PathBuf;
640 let (mod_, stats_, is_mod) = match path.extension() {
641 Some(_) => {
642 path_buf = path.with_extension("");
643 (
644 cache_files.get(&path_buf).map(|v| (&path_buf, v)),
645 Some((path, entry)),
646 false,
647 )
648 }
649 None => {
650 path_buf = path.with_extension("stats");
651 (
652 Some((path, entry)),
653 cache_files.get(&path_buf).map(|v| (&path_buf, v)),
654 true,
655 )
656 }
657 };
658
659 match (mod_, stats_, is_mod) {
661 (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
662 let mod_metadata = unwrap_or!(
663 mod_entry.metadata(),
664 add_unrecognized_and!([file: stats_path, file: mod_path], continue),
665 "Failed to get metadata, deleting BOTH module cache and stats files",
666 mod_path
667 );
668 let stats_mtime = unwrap_or!(
669 stats_entry.metadata().and_then(|m| m.modified()),
670 add_unrecognized_and!(
671 [file: stats_path],
672 unwrap_or!(
673 mod_metadata.modified(),
674 add_unrecognized_and!(
675 [file: stats_path, file: mod_path],
676 continue
677 ),
678 "Failed to get mtime, deleting BOTH module cache and stats \
679 files",
680 mod_path
681 )
682 ),
683 "Failed to get metadata/mtime, deleting the file",
684 stats_path
685 );
686 vec.push(CacheEntry::Recognized {
688 path: mod_path.to_path_buf(),
689 mtime: stats_mtime.into(),
690 size: mod_metadata.len(),
691 })
692 }
693 (Some(_), Some(_), false) => (), (Some((mod_path, mod_entry)), None, _) => {
695 let (mod_metadata, mod_mtime) = unwrap_or!(
696 mod_entry
697 .metadata()
698 .and_then(|md| md.modified().map(|mt| (md, mt))),
699 add_unrecognized_and!([file: mod_path], continue),
700 "Failed to get metadata/mtime, deleting the file",
701 mod_path
702 );
703 vec.push(CacheEntry::Recognized {
705 path: mod_path.to_path_buf(),
706 mtime: mod_mtime.into(),
707 size: mod_metadata.len(),
708 })
709 }
710 (None, Some((stats_path, _stats_entry)), _) => {
711 debug!("Found orphaned stats file: {}", stats_path.display());
712 add_unrecognized!(file: stats_path);
713 }
714 _ => unreachable!(),
715 }
716 }
717 }
718
719 let mut vec = Vec::new();
720 enter_dir(
721 &mut vec,
722 self.cache_config.directory(),
723 0,
724 &self.cache_config,
725 );
726 vec
727 }
728}
729
730fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
731 fs::read_to_string(path)
732 .map_err(|err| {
733 trace!(
734 "Failed to read stats file, path: {}, err: {}",
735 path.display(),
736 err
737 )
738 })
739 .and_then(|contents| {
740 toml::from_str::<ModuleCacheStatistics>(&contents).map_err(|err| {
741 trace!(
742 "Failed to parse stats file, path: {}, err: {}",
743 path.display(),
744 err,
745 )
746 })
747 })
748 .ok()
749}
750
751fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
752 toml::to_string_pretty(&stats)
753 .map_err(|err| {
754 warn!(
755 "Failed to serialize stats file, path: {}, err: {}",
756 path.display(),
757 err
758 )
759 })
760 .and_then(|serialized| {
761 fs_write_atomic(path, "stats", serialized.as_bytes()).map_err(|_| ())
762 })
763 .is_ok()
764}
765
766fn acquire_task_fs_lock(
778 task_path: &Path,
779 timeout: Duration,
780 allowed_future_drift: Duration,
781) -> Option<PathBuf> {
782 assert!(task_path.extension().is_none());
783 assert!(task_path.file_stem().is_some());
784
785 let dir_path = task_path.parent()?;
787 let it = fs::read_dir(dir_path)
788 .map_err(|err| {
789 warn!(
790 "Failed to list cache directory, path: {}, err: {}",
791 dir_path.display(),
792 err
793 )
794 })
795 .ok()?;
796
797 for entry in it {
799 let entry = entry
800 .map_err(|err| {
801 warn!(
802 "Failed to list cache directory, path: {}, err: {}",
803 dir_path.display(),
804 err
805 )
806 })
807 .ok()?;
808
809 let path = entry.path();
810 if path.is_dir() || path.file_stem() != task_path.file_stem() {
811 continue;
812 }
813
814 match path.extension() {
816 None => continue,
817 Some(ext) => {
818 if let Some(ext_str) = ext.to_str() {
819 if ext_str.starts_with("wip-")
821 && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
822 {
823 return None;
824 }
825 }
826 }
827 }
828 }
829
830 let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
832 let _file = fs::OpenOptions::new()
833 .create_new(true)
834 .write(true)
835 .open(&lock_path)
836 .map_err(|err| {
837 warn!(
838 "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
839 lock_path.display(),
840 err
841 )
842 })
843 .ok()?;
844
845 Some(lock_path)
846}
847
848fn is_fs_lock_expired(
852 entry: Option<&fs::DirEntry>,
853 path: &PathBuf,
854 threshold: Duration,
855 allowed_future_drift: Duration,
856) -> bool {
857 let mtime = match entry
858 .map_or_else(|| path.metadata(), |e| e.metadata())
859 .and_then(|metadata| metadata.modified())
860 {
861 Ok(mt) => mt,
862 Err(err) => {
863 warn!(
864 "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
865 path.display(),
866 err
867 );
868 return true; }
870 };
871
872 match SystemTime::now().duration_since(mtime) {
874 Ok(elapsed) => elapsed >= threshold,
875 Err(err) => {
876 trace!(
877 "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
878 path.display(),
879 err
880 );
881 err.duration() > allowed_future_drift
885 }
886 }
887}
888
889#[cfg(test)]
890mod tests;