wasmtime_wasi/p3/filesystem/
host.rs

1use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
2use crate::p3::bindings::clocks::wall_clock;
3use crate::p3::bindings::filesystem::types::{
4    self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
5    Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
6};
7use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
8use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
9use crate::{DirPerms, FilePerms};
10use anyhow::Context as _;
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll, ready};
14use core::{iter, mem};
15use std::io::{self, Cursor};
16use std::sync::Arc;
17use system_interface::fs::FileIoExt as _;
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::{JoinHandle, spawn_blocking};
20use wasmtime::StoreContextMut;
21use wasmtime::component::{
22    Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
23    StreamProducer, StreamReader, StreamResult,
24};
25
26fn get_descriptor<'a>(
27    table: &'a ResourceTable,
28    fd: &'a Resource<Descriptor>,
29) -> FilesystemResult<&'a Descriptor> {
30    table
31        .get(fd)
32        .context("failed to get descriptor resource from table")
33        .map_err(FilesystemError::trap)
34}
35
36fn get_file<'a>(
37    table: &'a ResourceTable,
38    fd: &'a Resource<Descriptor>,
39) -> FilesystemResult<&'a File> {
40    let file = get_descriptor(table, fd).map(Descriptor::file)??;
41    Ok(file)
42}
43
44fn get_dir<'a>(
45    table: &'a ResourceTable,
46    fd: &'a Resource<Descriptor>,
47) -> FilesystemResult<&'a Dir> {
48    let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
49    Ok(dir)
50}
51
52trait AccessorExt {
53    fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
54    fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
55    fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
56    fn get_dir_pair(
57        &self,
58        a: &Resource<Descriptor>,
59        b: &Resource<Descriptor>,
60    ) -> FilesystemResult<(Dir, Dir)>;
61}
62
63impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
64    fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
65        self.with(|mut store| {
66            let fd = get_descriptor(store.get().table, fd)?;
67            Ok(fd.clone())
68        })
69    }
70
71    fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
72        self.with(|mut store| {
73            let file = get_file(store.get().table, fd)?;
74            Ok(file.clone())
75        })
76    }
77
78    fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
79        self.with(|mut store| {
80            let dir = get_dir(store.get().table, fd)?;
81            Ok(dir.clone())
82        })
83    }
84
85    fn get_dir_pair(
86        &self,
87        a: &Resource<Descriptor>,
88        b: &Resource<Descriptor>,
89    ) -> FilesystemResult<(Dir, Dir)> {
90        self.with(|mut store| {
91            let table = store.get().table;
92            let a = get_dir(table, a)?;
93            let b = get_dir(table, b)?;
94            Ok((a.clone(), b.clone()))
95        })
96    }
97}
98
99fn systemtime_from(t: wall_clock::Datetime) -> Result<std::time::SystemTime, ErrorCode> {
100    std::time::SystemTime::UNIX_EPOCH
101        .checked_add(core::time::Duration::new(t.seconds, t.nanoseconds))
102        .ok_or(ErrorCode::Overflow)
103}
104
105fn systemtimespec_from(t: NewTimestamp) -> Result<Option<fs_set_times::SystemTimeSpec>, ErrorCode> {
106    use fs_set_times::SystemTimeSpec;
107    match t {
108        NewTimestamp::NoChange => Ok(None),
109        NewTimestamp::Now => Ok(Some(SystemTimeSpec::SymbolicNow)),
110        NewTimestamp::Timestamp(st) => {
111            let st = systemtime_from(st)?;
112            Ok(Some(SystemTimeSpec::Absolute(st)))
113        }
114    }
115}
116
117struct ReadStreamProducer {
118    file: File,
119    offset: u64,
120    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
121    task: Option<JoinHandle<std::io::Result<BytesMut>>>,
122}
123
124impl Drop for ReadStreamProducer {
125    fn drop(&mut self) {
126        self.close(Ok(()))
127    }
128}
129
130impl ReadStreamProducer {
131    fn close(&mut self, res: Result<(), ErrorCode>) {
132        if let Some(tx) = self.result.take() {
133            _ = tx.send(res);
134        }
135    }
136
137    /// Update the internal `offset` field after reading `amt` bytes from the file.
138    fn complete_read(&mut self, amt: usize) -> StreamResult {
139        let Ok(amt) = amt.try_into() else {
140            self.close(Err(ErrorCode::Overflow));
141            return StreamResult::Dropped;
142        };
143        let Some(amt) = self.offset.checked_add(amt) else {
144            self.close(Err(ErrorCode::Overflow));
145            return StreamResult::Dropped;
146        };
147        self.offset = amt;
148        StreamResult::Completed
149    }
150}
151
152impl<D> StreamProducer<D> for ReadStreamProducer {
153    type Item = u8;
154    type Buffer = Cursor<BytesMut>;
155
156    fn poll_produce<'a>(
157        mut self: Pin<&mut Self>,
158        cx: &mut Context<'_>,
159        store: StoreContextMut<'a, D>,
160        mut dst: Destination<'a, Self::Item, Self::Buffer>,
161        // Intentionally ignore this as in blocking mode everything is always
162        // ready and otherwise spawned blocking work can't be cancelled.
163        _finish: bool,
164    ) -> Poll<wasmtime::Result<StreamResult>> {
165        if let Some(file) = self.file.as_blocking_file() {
166            // Once a blocking file, always a blocking file, so assert as such.
167            assert!(self.task.is_none());
168            let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
169            let buf = dst.remaining();
170            if buf.is_empty() {
171                return Poll::Ready(Ok(StreamResult::Completed));
172            }
173            return match file.read_at(buf, self.offset) {
174                Ok(0) => {
175                    self.close(Ok(()));
176                    Poll::Ready(Ok(StreamResult::Dropped))
177                }
178                Ok(n) => {
179                    dst.mark_written(n);
180                    Poll::Ready(Ok(self.complete_read(n)))
181                }
182                Err(err) => {
183                    self.close(Err(err.into()));
184                    Poll::Ready(Ok(StreamResult::Dropped))
185                }
186            };
187        }
188
189        // Lazily spawn a read task if one hasn't already been spawned yet.
190        let me = &mut *self;
191        let task = me.task.get_or_insert_with(|| {
192            let mut buf = dst.take_buffer().into_inner();
193            buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
194            let file = Arc::clone(me.file.as_file());
195            let offset = me.offset;
196            spawn_blocking(move || {
197                file.read_at(&mut buf, offset).map(|n| {
198                    buf.truncate(n);
199                    buf
200                })
201            })
202        });
203
204        // Await the completion of the read task. Note that this is not a
205        // cancellable await point because we can't cancel the other task, so
206        // the `finish` parameter is ignored.
207        let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
208        self.task = None;
209        match res {
210            Ok(buf) if buf.is_empty() => {
211                self.close(Ok(()));
212                Poll::Ready(Ok(StreamResult::Dropped))
213            }
214            Ok(buf) => {
215                let n = buf.len();
216                dst.set_buffer(Cursor::new(buf));
217                Poll::Ready(Ok(self.complete_read(n)))
218            }
219            Err(err) => {
220                self.close(Err(err.into()));
221                Poll::Ready(Ok(StreamResult::Dropped))
222            }
223        }
224    }
225}
226
227fn map_dir_entry(
228    entry: std::io::Result<cap_std::fs::DirEntry>,
229) -> Result<Option<DirectoryEntry>, ErrorCode> {
230    match entry {
231        Ok(entry) => {
232            let meta = entry.metadata()?;
233            let Ok(name) = entry.file_name().into_string() else {
234                return Err(ErrorCode::IllegalByteSequence);
235            };
236            Ok(Some(DirectoryEntry {
237                type_: meta.file_type().into(),
238                name,
239            }))
240        }
241        Err(err) => {
242            // On windows, filter out files like `C:\DumpStack.log.tmp` which we
243            // can't get full metadata for.
244            #[cfg(windows)]
245            {
246                use windows_sys::Win32::Foundation::{
247                    ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
248                };
249                if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
250                    || err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
251                {
252                    return Ok(None);
253                }
254            }
255            Err(err.into())
256        }
257    }
258}
259
260struct ReadDirStream {
261    rx: mpsc::Receiver<DirectoryEntry>,
262    task: JoinHandle<Result<(), ErrorCode>>,
263    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
264}
265
266impl ReadDirStream {
267    fn new(
268        dir: Arc<cap_std::fs::Dir>,
269        result: oneshot::Sender<Result<(), ErrorCode>>,
270    ) -> ReadDirStream {
271        let (tx, rx) = mpsc::channel(1);
272        ReadDirStream {
273            task: spawn_blocking(move || {
274                let entries = dir.entries()?;
275                for entry in entries {
276                    if let Some(entry) = map_dir_entry(entry)? {
277                        if let Err(_) = tx.blocking_send(entry) {
278                            break;
279                        }
280                    }
281                }
282                Ok(())
283            }),
284            rx,
285            result: Some(result),
286        }
287    }
288
289    fn close(&mut self, res: Result<(), ErrorCode>) {
290        self.rx.close();
291        self.task.abort();
292        let _ = self.result.take().unwrap().send(res);
293    }
294}
295
296impl<D> StreamProducer<D> for ReadDirStream {
297    type Item = DirectoryEntry;
298    type Buffer = Option<DirectoryEntry>;
299
300    fn poll_produce<'a>(
301        mut self: Pin<&mut Self>,
302        cx: &mut Context<'_>,
303        mut store: StoreContextMut<'a, D>,
304        mut dst: Destination<'a, Self::Item, Self::Buffer>,
305        finish: bool,
306    ) -> Poll<wasmtime::Result<StreamResult>> {
307        // If this is a 0-length read then `mpsc::Receiver` does not expose an
308        // API to wait for an item to be available without taking it out of the
309        // channel. In lieu of that just say that we're complete and ready for a
310        // read.
311        if dst.remaining(&mut store) == Some(0) {
312            return Poll::Ready(Ok(StreamResult::Completed));
313        }
314
315        match self.rx.poll_recv(cx) {
316            // If an item is on the channel then send that along and say that
317            // the read is now complete with one item being yielded.
318            Poll::Ready(Some(item)) => {
319                dst.set_buffer(Some(item));
320                Poll::Ready(Ok(StreamResult::Completed))
321            }
322
323            // If there's nothing left on the channel then that means that an
324            // error occurred or the iterator is done. In both cases an
325            // un-cancellable wait for the spawned task is entered and we await
326            // its completion. Upon completion there our own stream is closed
327            // with the result (sending an error code on our oneshot) and then
328            // the stream is reported as dropped.
329            Poll::Ready(None) => {
330                let result = ready!(Pin::new(&mut self.task).poll(cx))
331                    .expect("spawned task should not panic");
332                self.close(result);
333                Poll::Ready(Ok(StreamResult::Dropped))
334            }
335
336            // If an item isn't ready yet then cancel this outstanding request
337            // if `finish` is set, otherwise propagate the `Pending` status.
338            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
339            Poll::Pending => Poll::Pending,
340        }
341    }
342}
343
344impl Drop for ReadDirStream {
345    fn drop(&mut self) {
346        if self.result.is_some() {
347            self.close(Ok(()));
348        }
349    }
350}
351
352struct WriteStreamConsumer {
353    file: File,
354    location: WriteLocation,
355    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
356    buffer: BytesMut,
357    task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
358}
359
360#[derive(Copy, Clone)]
361enum WriteLocation {
362    End,
363    Offset(u64),
364}
365
366impl WriteStreamConsumer {
367    fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
368        Self {
369            file,
370            location: WriteLocation::Offset(offset),
371            result: Some(result),
372            buffer: BytesMut::default(),
373            task: None,
374        }
375    }
376
377    fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
378        Self {
379            file,
380            location: WriteLocation::End,
381            result: Some(result),
382            buffer: BytesMut::default(),
383            task: None,
384        }
385    }
386
387    fn close(&mut self, res: Result<(), ErrorCode>) {
388        _ = self.result.take().unwrap().send(res);
389    }
390
391    /// Update the internal `offset` field after writing `amt` bytes from the file.
392    fn complete_write(&mut self, amt: usize) -> StreamResult {
393        match &mut self.location {
394            WriteLocation::End => StreamResult::Completed,
395            WriteLocation::Offset(offset) => {
396                let Ok(amt) = amt.try_into() else {
397                    self.close(Err(ErrorCode::Overflow));
398                    return StreamResult::Dropped;
399                };
400                let Some(amt) = offset.checked_add(amt) else {
401                    self.close(Err(ErrorCode::Overflow));
402                    return StreamResult::Dropped;
403                };
404                *offset = amt;
405                StreamResult::Completed
406            }
407        }
408    }
409}
410
411impl WriteLocation {
412    fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
413        match *self {
414            WriteLocation::End => file.append(bytes),
415            WriteLocation::Offset(at) => file.write_at(bytes, at),
416        }
417    }
418}
419
420impl<D> StreamConsumer<D> for WriteStreamConsumer {
421    type Item = u8;
422
423    fn poll_consume(
424        mut self: Pin<&mut Self>,
425        cx: &mut Context<'_>,
426        store: StoreContextMut<D>,
427        src: Source<Self::Item>,
428        // Intentionally ignore this as in blocking mode everything is always
429        // ready and otherwise spawned blocking work can't be cancelled.
430        _finish: bool,
431    ) -> Poll<wasmtime::Result<StreamResult>> {
432        let mut src = src.as_direct(store);
433        if let Some(file) = self.file.as_blocking_file() {
434            // Once a blocking file, always a blocking file, so assert as such.
435            assert!(self.task.is_none());
436            return match self.location.write(file, src.remaining()) {
437                Ok(n) => {
438                    src.mark_read(n);
439                    Poll::Ready(Ok(self.complete_write(n)))
440                }
441                Err(err) => {
442                    self.close(Err(err.into()));
443                    Poll::Ready(Ok(StreamResult::Dropped))
444                }
445            };
446        }
447        let me = &mut *self;
448        let task = me.task.get_or_insert_with(|| {
449            debug_assert!(me.buffer.is_empty());
450            me.buffer.extend_from_slice(src.remaining());
451            let buf = mem::take(&mut me.buffer);
452            let file = Arc::clone(me.file.as_file());
453            let location = me.location;
454            spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
455        });
456        let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
457        self.task = None;
458        match res {
459            Ok((buf, n)) => {
460                src.mark_read(n);
461                self.buffer = buf;
462                self.buffer.clear();
463                Poll::Ready(Ok(self.complete_write(n)))
464            }
465            Err(err) => {
466                self.close(Err(err.into()));
467                Poll::Ready(Ok(StreamResult::Dropped))
468            }
469        }
470    }
471}
472
473impl Drop for WriteStreamConsumer {
474    fn drop(&mut self) {
475        if self.result.is_some() {
476            self.close(Ok(()))
477        }
478    }
479}
480
481impl types::Host for WasiFilesystemCtxView<'_> {
482    fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
483        error.downcast()
484    }
485}
486
487impl types::HostDescriptorWithStore for WasiFilesystem {
488    async fn read_via_stream<U>(
489        store: &Accessor<U, Self>,
490        fd: Resource<Descriptor>,
491        offset: Filesize,
492    ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
493        store.with(|mut store| {
494            let file = get_file(store.get().table, &fd)?;
495            if !file.perms.contains(FilePerms::READ) {
496                return Ok((
497                    StreamReader::new(&mut store, iter::empty()),
498                    FutureReader::new(&mut store, async {
499                        anyhow::Ok(Err(ErrorCode::NotPermitted))
500                    }),
501                ));
502            }
503
504            let file = file.clone();
505            let (result_tx, result_rx) = oneshot::channel();
506            Ok((
507                StreamReader::new(
508                    &mut store,
509                    ReadStreamProducer {
510                        file,
511                        offset,
512                        result: Some(result_tx),
513                        task: None,
514                    },
515                ),
516                FutureReader::new(&mut store, result_rx),
517            ))
518        })
519    }
520
521    async fn write_via_stream<U>(
522        store: &Accessor<U, Self>,
523        fd: Resource<Descriptor>,
524        data: StreamReader<u8>,
525        offset: Filesize,
526    ) -> FilesystemResult<()> {
527        let (result_tx, result_rx) = oneshot::channel();
528        store.with(|mut store| {
529            let file = get_file(store.get().table, &fd)?;
530            if !file.perms.contains(FilePerms::WRITE) {
531                return Err(ErrorCode::NotPermitted.into());
532            }
533            let file = file.clone();
534            data.pipe(store, WriteStreamConsumer::new_at(file, offset, result_tx));
535            FilesystemResult::Ok(())
536        })?;
537        result_rx
538            .await
539            .context("oneshot sender dropped")
540            .map_err(FilesystemError::trap)??;
541        Ok(())
542    }
543
544    async fn append_via_stream<U>(
545        store: &Accessor<U, Self>,
546        fd: Resource<Descriptor>,
547        data: StreamReader<u8>,
548    ) -> FilesystemResult<()> {
549        let (result_tx, result_rx) = oneshot::channel();
550        store.with(|mut store| {
551            let file = get_file(store.get().table, &fd)?;
552            if !file.perms.contains(FilePerms::WRITE) {
553                return Err(ErrorCode::NotPermitted.into());
554            }
555            let file = file.clone();
556            data.pipe(store, WriteStreamConsumer::new_append(file, result_tx));
557            FilesystemResult::Ok(())
558        })?;
559        result_rx
560            .await
561            .context("oneshot sender dropped")
562            .map_err(FilesystemError::trap)??;
563        Ok(())
564    }
565
566    async fn advise<U>(
567        store: &Accessor<U, Self>,
568        fd: Resource<Descriptor>,
569        offset: Filesize,
570        length: Filesize,
571        advice: Advice,
572    ) -> FilesystemResult<()> {
573        let file = store.get_file(&fd)?;
574        file.advise(offset, length, advice.into()).await?;
575        Ok(())
576    }
577
578    async fn sync_data<U>(
579        store: &Accessor<U, Self>,
580        fd: Resource<Descriptor>,
581    ) -> FilesystemResult<()> {
582        let fd = store.get_descriptor(&fd)?;
583        fd.sync_data().await?;
584        Ok(())
585    }
586
587    async fn get_flags<U>(
588        store: &Accessor<U, Self>,
589        fd: Resource<Descriptor>,
590    ) -> FilesystemResult<DescriptorFlags> {
591        let fd = store.get_descriptor(&fd)?;
592        let flags = fd.get_flags().await?;
593        Ok(flags.into())
594    }
595
596    async fn get_type<U>(
597        store: &Accessor<U, Self>,
598        fd: Resource<Descriptor>,
599    ) -> FilesystemResult<DescriptorType> {
600        let fd = store.get_descriptor(&fd)?;
601        let ty = fd.get_type().await?;
602        Ok(ty.into())
603    }
604
605    async fn set_size<U>(
606        store: &Accessor<U, Self>,
607        fd: Resource<Descriptor>,
608        size: Filesize,
609    ) -> FilesystemResult<()> {
610        let file = store.get_file(&fd)?;
611        file.set_size(size).await?;
612        Ok(())
613    }
614
615    async fn set_times<U>(
616        store: &Accessor<U, Self>,
617        fd: Resource<Descriptor>,
618        data_access_timestamp: NewTimestamp,
619        data_modification_timestamp: NewTimestamp,
620    ) -> FilesystemResult<()> {
621        let fd = store.get_descriptor(&fd)?;
622        let atim = systemtimespec_from(data_access_timestamp)?;
623        let mtim = systemtimespec_from(data_modification_timestamp)?;
624        fd.set_times(atim, mtim).await?;
625        Ok(())
626    }
627
628    async fn read_directory<U>(
629        store: &Accessor<U, Self>,
630        fd: Resource<Descriptor>,
631    ) -> wasmtime::Result<(
632        StreamReader<DirectoryEntry>,
633        FutureReader<Result<(), ErrorCode>>,
634    )> {
635        store.with(|mut store| {
636            let dir = get_dir(store.get().table, &fd)?;
637            if !dir.perms.contains(DirPerms::READ) {
638                return Ok((
639                    StreamReader::new(&mut store, iter::empty()),
640                    FutureReader::new(&mut store, async {
641                        anyhow::Ok(Err(ErrorCode::NotPermitted))
642                    }),
643                ));
644            }
645            let allow_blocking_current_thread = dir.allow_blocking_current_thread;
646            let dir = Arc::clone(dir.as_dir());
647            let (result_tx, result_rx) = oneshot::channel();
648            let stream = if allow_blocking_current_thread {
649                match dir.entries() {
650                    Ok(readdir) => StreamReader::new(
651                        &mut store,
652                        FallibleIteratorProducer::new(
653                            readdir.filter_map(|e| map_dir_entry(e).transpose()),
654                            result_tx,
655                        ),
656                    ),
657                    Err(e) => {
658                        result_tx.send(Err(e.into())).unwrap();
659                        StreamReader::new(&mut store, iter::empty())
660                    }
661                }
662            } else {
663                StreamReader::new(&mut store, ReadDirStream::new(dir, result_tx))
664            };
665            Ok((stream, FutureReader::new(&mut store, result_rx)))
666        })
667    }
668
669    async fn sync<U>(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
670        let fd = store.get_descriptor(&fd)?;
671        fd.sync().await?;
672        Ok(())
673    }
674
675    async fn create_directory_at<U>(
676        store: &Accessor<U, Self>,
677        fd: Resource<Descriptor>,
678        path: String,
679    ) -> FilesystemResult<()> {
680        let dir = store.get_dir(&fd)?;
681        dir.create_directory_at(path).await?;
682        Ok(())
683    }
684
685    async fn stat<U>(
686        store: &Accessor<U, Self>,
687        fd: Resource<Descriptor>,
688    ) -> FilesystemResult<DescriptorStat> {
689        let fd = store.get_descriptor(&fd)?;
690        let stat = fd.stat().await?;
691        Ok(stat.into())
692    }
693
694    async fn stat_at<U>(
695        store: &Accessor<U, Self>,
696        fd: Resource<Descriptor>,
697        path_flags: PathFlags,
698        path: String,
699    ) -> FilesystemResult<DescriptorStat> {
700        let dir = store.get_dir(&fd)?;
701        let stat = dir.stat_at(path_flags.into(), path).await?;
702        Ok(stat.into())
703    }
704
705    async fn set_times_at<U>(
706        store: &Accessor<U, Self>,
707        fd: Resource<Descriptor>,
708        path_flags: PathFlags,
709        path: String,
710        data_access_timestamp: NewTimestamp,
711        data_modification_timestamp: NewTimestamp,
712    ) -> FilesystemResult<()> {
713        let dir = store.get_dir(&fd)?;
714        let atim = systemtimespec_from(data_access_timestamp)?;
715        let mtim = systemtimespec_from(data_modification_timestamp)?;
716        dir.set_times_at(path_flags.into(), path, atim, mtim)
717            .await?;
718        Ok(())
719    }
720
721    async fn link_at<U>(
722        store: &Accessor<U, Self>,
723        fd: Resource<Descriptor>,
724        old_path_flags: PathFlags,
725        old_path: String,
726        new_fd: Resource<Descriptor>,
727        new_path: String,
728    ) -> FilesystemResult<()> {
729        let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
730        old_dir
731            .link_at(old_path_flags.into(), old_path, &new_dir, new_path)
732            .await?;
733        Ok(())
734    }
735
736    async fn open_at<U>(
737        store: &Accessor<U, Self>,
738        fd: Resource<Descriptor>,
739        path_flags: PathFlags,
740        path: String,
741        open_flags: OpenFlags,
742        flags: DescriptorFlags,
743    ) -> FilesystemResult<Resource<Descriptor>> {
744        let (allow_blocking_current_thread, dir) = store.with(|mut store| {
745            let store = store.get();
746            let dir = get_dir(&store.table, &fd)?;
747            FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
748        })?;
749        let fd = dir
750            .open_at(
751                path_flags.into(),
752                path,
753                open_flags.into(),
754                flags.into(),
755                allow_blocking_current_thread,
756            )
757            .await?;
758        let fd = store.with(|mut store| store.get().table.push(fd))?;
759        Ok(fd)
760    }
761
762    async fn readlink_at<U>(
763        store: &Accessor<U, Self>,
764        fd: Resource<Descriptor>,
765        path: String,
766    ) -> FilesystemResult<String> {
767        let dir = store.get_dir(&fd)?;
768        let path = dir.readlink_at(path).await?;
769        Ok(path)
770    }
771
772    async fn remove_directory_at<U>(
773        store: &Accessor<U, Self>,
774        fd: Resource<Descriptor>,
775        path: String,
776    ) -> FilesystemResult<()> {
777        let dir = store.get_dir(&fd)?;
778        dir.remove_directory_at(path).await?;
779        Ok(())
780    }
781
782    async fn rename_at<U>(
783        store: &Accessor<U, Self>,
784        fd: Resource<Descriptor>,
785        old_path: String,
786        new_fd: Resource<Descriptor>,
787        new_path: String,
788    ) -> FilesystemResult<()> {
789        let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
790        old_dir.rename_at(old_path, &new_dir, new_path).await?;
791        Ok(())
792    }
793
794    async fn symlink_at<U>(
795        store: &Accessor<U, Self>,
796        fd: Resource<Descriptor>,
797        old_path: String,
798        new_path: String,
799    ) -> FilesystemResult<()> {
800        let dir = store.get_dir(&fd)?;
801        dir.symlink_at(old_path, new_path).await?;
802        Ok(())
803    }
804
805    async fn unlink_file_at<U>(
806        store: &Accessor<U, Self>,
807        fd: Resource<Descriptor>,
808        path: String,
809    ) -> FilesystemResult<()> {
810        let dir = store.get_dir(&fd)?;
811        dir.unlink_file_at(path).await?;
812        Ok(())
813    }
814
815    async fn is_same_object<U>(
816        store: &Accessor<U, Self>,
817        fd: Resource<Descriptor>,
818        other: Resource<Descriptor>,
819    ) -> wasmtime::Result<bool> {
820        let (fd, other) = store.with(|mut store| {
821            let table = store.get().table;
822            let fd = get_descriptor(table, &fd)?.clone();
823            let other = get_descriptor(table, &other)?.clone();
824            anyhow::Ok((fd, other))
825        })?;
826        fd.is_same_object(&other).await
827    }
828
829    async fn metadata_hash<U>(
830        store: &Accessor<U, Self>,
831        fd: Resource<Descriptor>,
832    ) -> FilesystemResult<MetadataHashValue> {
833        let fd = store.get_descriptor(&fd)?;
834        let meta = fd.metadata_hash().await?;
835        Ok(meta.into())
836    }
837
838    async fn metadata_hash_at<U>(
839        store: &Accessor<U, Self>,
840        fd: Resource<Descriptor>,
841        path_flags: PathFlags,
842        path: String,
843    ) -> FilesystemResult<MetadataHashValue> {
844        let dir = store.get_dir(&fd)?;
845        let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
846        Ok(meta.into())
847    }
848}
849
850impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
851    fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
852        self.table
853            .delete(fd)
854            .context("failed to delete descriptor resource from table")?;
855        Ok(())
856    }
857}
858
859impl preopens::Host for WasiFilesystemCtxView<'_> {
860    fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
861        self.get_directories()
862    }
863}