Skip to main content

wasmtime_wasi/p3/filesystem/
host.rs

1use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
2use crate::p3::bindings::clocks::system_clock;
3use crate::p3::bindings::filesystem::types::{
4    self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
5    Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
6};
7use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
8use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
9use crate::{DirPerms, FilePerms};
10use bytes::BytesMut;
11use core::pin::Pin;
12use core::task::{Context, Poll, ready};
13use core::{iter, mem};
14use std::io::{self, Cursor};
15use std::sync::Arc;
16use system_interface::fs::FileIoExt as _;
17use tokio::sync::{mpsc, oneshot};
18use tokio::task::{JoinHandle, spawn_blocking};
19use wasmtime::StoreContextMut;
20use wasmtime::component::{
21    Access, Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
22    StreamProducer, StreamReader, StreamResult,
23};
24use wasmtime::error::Context as _;
25
26fn get_descriptor<'a>(
27    table: &'a ResourceTable,
28    fd: &'a Resource<Descriptor>,
29) -> FilesystemResult<&'a Descriptor> {
30    table
31        .get(fd)
32        .context("failed to get descriptor resource from table")
33        .map_err(FilesystemError::trap)
34}
35
36fn get_file<'a>(
37    table: &'a ResourceTable,
38    fd: &'a Resource<Descriptor>,
39) -> FilesystemResult<&'a File> {
40    let file = get_descriptor(table, fd).map(Descriptor::file)??;
41    Ok(file)
42}
43
44fn get_dir<'a>(
45    table: &'a ResourceTable,
46    fd: &'a Resource<Descriptor>,
47) -> FilesystemResult<&'a Dir> {
48    let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
49    Ok(dir)
50}
51
52trait AccessorExt {
53    fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
54    fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
55    fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
56    fn get_dir_pair(
57        &self,
58        a: &Resource<Descriptor>,
59        b: &Resource<Descriptor>,
60    ) -> FilesystemResult<(Dir, Dir)>;
61}
62
63impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
64    fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
65        self.with(|mut store| {
66            let fd = get_descriptor(store.get().table, fd)?;
67            Ok(fd.clone())
68        })
69    }
70
71    fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
72        self.with(|mut store| {
73            let file = get_file(store.get().table, fd)?;
74            Ok(file.clone())
75        })
76    }
77
78    fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
79        self.with(|mut store| {
80            let dir = get_dir(store.get().table, fd)?;
81            Ok(dir.clone())
82        })
83    }
84
85    fn get_dir_pair(
86        &self,
87        a: &Resource<Descriptor>,
88        b: &Resource<Descriptor>,
89    ) -> FilesystemResult<(Dir, Dir)> {
90        self.with(|mut store| {
91            let table = store.get().table;
92            let a = get_dir(table, a)?;
93            let b = get_dir(table, b)?;
94            Ok((a.clone(), b.clone()))
95        })
96    }
97}
98
99fn systemtime_from(t: system_clock::Instant) -> Result<std::time::SystemTime, ErrorCode> {
100    if let Ok(seconds) = t.seconds.try_into() {
101        std::time::SystemTime::UNIX_EPOCH
102            .checked_add(core::time::Duration::new(seconds, t.nanoseconds))
103            .ok_or(ErrorCode::Overflow)
104    } else {
105        std::time::SystemTime::UNIX_EPOCH
106            .checked_sub(core::time::Duration::new(
107                t.seconds.unsigned_abs(),
108                t.nanoseconds,
109            ))
110            .ok_or(ErrorCode::Overflow)
111    }
112}
113
114fn systemtimespec_from(t: NewTimestamp) -> Result<Option<fs_set_times::SystemTimeSpec>, ErrorCode> {
115    use fs_set_times::SystemTimeSpec;
116    match t {
117        NewTimestamp::NoChange => Ok(None),
118        NewTimestamp::Now => Ok(Some(SystemTimeSpec::SymbolicNow)),
119        NewTimestamp::Timestamp(st) => {
120            let st = systemtime_from(st)?;
121            Ok(Some(SystemTimeSpec::Absolute(st)))
122        }
123    }
124}
125
126struct ReadStreamProducer {
127    file: File,
128    offset: u64,
129    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
130    task: Option<JoinHandle<std::io::Result<BytesMut>>>,
131}
132
133impl Drop for ReadStreamProducer {
134    fn drop(&mut self) {
135        self.close(Ok(()))
136    }
137}
138
139impl ReadStreamProducer {
140    fn close(&mut self, res: Result<(), ErrorCode>) {
141        if let Some(tx) = self.result.take() {
142            _ = tx.send(res);
143        }
144    }
145
146    /// Update the internal `offset` field after reading `amt` bytes from the file.
147    fn complete_read(&mut self, amt: usize) -> StreamResult {
148        let Ok(amt) = amt.try_into() else {
149            self.close(Err(ErrorCode::Overflow));
150            return StreamResult::Dropped;
151        };
152        let Some(amt) = self.offset.checked_add(amt) else {
153            self.close(Err(ErrorCode::Overflow));
154            return StreamResult::Dropped;
155        };
156        self.offset = amt;
157        StreamResult::Completed
158    }
159}
160
161impl<D> StreamProducer<D> for ReadStreamProducer {
162    type Item = u8;
163    type Buffer = Cursor<BytesMut>;
164
165    fn poll_produce<'a>(
166        mut self: Pin<&mut Self>,
167        cx: &mut Context<'_>,
168        store: StoreContextMut<'a, D>,
169        mut dst: Destination<'a, Self::Item, Self::Buffer>,
170        // Intentionally ignore this as in blocking mode everything is always
171        // ready and otherwise spawned blocking work can't be cancelled.
172        _finish: bool,
173    ) -> Poll<wasmtime::Result<StreamResult>> {
174        if let Some(file) = self.file.as_blocking_file() {
175            // Once a blocking file, always a blocking file, so assert as such.
176            assert!(self.task.is_none());
177            let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
178            let buf = dst.remaining();
179            if buf.is_empty() {
180                return Poll::Ready(Ok(StreamResult::Completed));
181            }
182            return match file.read_at(buf, self.offset) {
183                Ok(0) => {
184                    self.close(Ok(()));
185                    Poll::Ready(Ok(StreamResult::Dropped))
186                }
187                Ok(n) => {
188                    dst.mark_written(n);
189                    Poll::Ready(Ok(self.complete_read(n)))
190                }
191                Err(err) => {
192                    self.close(Err(err.into()));
193                    Poll::Ready(Ok(StreamResult::Dropped))
194                }
195            };
196        }
197
198        // Lazily spawn a read task if one hasn't already been spawned yet.
199        let me = &mut *self;
200        let task = me.task.get_or_insert_with(|| {
201            let mut buf = dst.take_buffer().into_inner();
202            buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
203            let file = Arc::clone(me.file.as_file());
204            let offset = me.offset;
205            spawn_blocking(move || {
206                file.read_at(&mut buf, offset).map(|n| {
207                    buf.truncate(n);
208                    buf
209                })
210            })
211        });
212
213        // Await the completion of the read task. Note that this is not a
214        // cancellable await point because we can't cancel the other task, so
215        // the `finish` parameter is ignored.
216        let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
217        self.task = None;
218        match res {
219            Ok(buf) if buf.is_empty() => {
220                self.close(Ok(()));
221                Poll::Ready(Ok(StreamResult::Dropped))
222            }
223            Ok(buf) => {
224                let n = buf.len();
225                dst.set_buffer(Cursor::new(buf));
226                Poll::Ready(Ok(self.complete_read(n)))
227            }
228            Err(err) => {
229                self.close(Err(err.into()));
230                Poll::Ready(Ok(StreamResult::Dropped))
231            }
232        }
233    }
234}
235
236fn map_dir_entry(
237    entry: std::io::Result<cap_std::fs::DirEntry>,
238) -> Result<Option<DirectoryEntry>, ErrorCode> {
239    match entry {
240        Ok(entry) => {
241            let meta = entry.metadata()?;
242            let Ok(name) = entry.file_name().into_string() else {
243                return Err(ErrorCode::IllegalByteSequence);
244            };
245            Ok(Some(DirectoryEntry {
246                type_: meta.file_type().into(),
247                name,
248            }))
249        }
250        Err(err) => {
251            // On windows, filter out files like `C:\DumpStack.log.tmp` which we
252            // can't get full metadata for.
253            #[cfg(windows)]
254            {
255                use windows_sys::Win32::Foundation::{
256                    ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
257                };
258                if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
259                    || err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
260                {
261                    return Ok(None);
262                }
263            }
264            Err(err.into())
265        }
266    }
267}
268
269struct ReadDirStream {
270    rx: mpsc::Receiver<DirectoryEntry>,
271    task: JoinHandle<Result<(), ErrorCode>>,
272    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
273}
274
275impl ReadDirStream {
276    fn new(
277        dir: Arc<cap_std::fs::Dir>,
278        result: oneshot::Sender<Result<(), ErrorCode>>,
279    ) -> ReadDirStream {
280        let (tx, rx) = mpsc::channel(1);
281        ReadDirStream {
282            task: spawn_blocking(move || {
283                let entries = dir.entries()?;
284                for entry in entries {
285                    if let Some(entry) = map_dir_entry(entry)? {
286                        if let Err(_) = tx.blocking_send(entry) {
287                            break;
288                        }
289                    }
290                }
291                Ok(())
292            }),
293            rx,
294            result: Some(result),
295        }
296    }
297
298    fn close(&mut self, res: Result<(), ErrorCode>) {
299        self.rx.close();
300        self.task.abort();
301        let _ = self.result.take().unwrap().send(res);
302    }
303}
304
305impl<D> StreamProducer<D> for ReadDirStream {
306    type Item = DirectoryEntry;
307    type Buffer = Option<DirectoryEntry>;
308
309    fn poll_produce<'a>(
310        mut self: Pin<&mut Self>,
311        cx: &mut Context<'_>,
312        mut store: StoreContextMut<'a, D>,
313        mut dst: Destination<'a, Self::Item, Self::Buffer>,
314        finish: bool,
315    ) -> Poll<wasmtime::Result<StreamResult>> {
316        // If this is a 0-length read then `mpsc::Receiver` does not expose an
317        // API to wait for an item to be available without taking it out of the
318        // channel. In lieu of that just say that we're complete and ready for a
319        // read.
320        if dst.remaining(&mut store) == Some(0) {
321            return Poll::Ready(Ok(StreamResult::Completed));
322        }
323
324        match self.rx.poll_recv(cx) {
325            // If an item is on the channel then send that along and say that
326            // the read is now complete with one item being yielded.
327            Poll::Ready(Some(item)) => {
328                dst.set_buffer(Some(item));
329                Poll::Ready(Ok(StreamResult::Completed))
330            }
331
332            // If there's nothing left on the channel then that means that an
333            // error occurred or the iterator is done. In both cases an
334            // un-cancellable wait for the spawned task is entered and we await
335            // its completion. Upon completion there our own stream is closed
336            // with the result (sending an error code on our oneshot) and then
337            // the stream is reported as dropped.
338            Poll::Ready(None) => {
339                let result = ready!(Pin::new(&mut self.task).poll(cx))
340                    .expect("spawned task should not panic");
341                self.close(result);
342                Poll::Ready(Ok(StreamResult::Dropped))
343            }
344
345            // If an item isn't ready yet then cancel this outstanding request
346            // if `finish` is set, otherwise propagate the `Pending` status.
347            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
348            Poll::Pending => Poll::Pending,
349        }
350    }
351}
352
353impl Drop for ReadDirStream {
354    fn drop(&mut self) {
355        if self.result.is_some() {
356            self.close(Ok(()));
357        }
358    }
359}
360
361struct WriteStreamConsumer {
362    file: File,
363    location: WriteLocation,
364    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
365    buffer: BytesMut,
366    task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
367}
368
369#[derive(Copy, Clone)]
370enum WriteLocation {
371    End,
372    Offset(u64),
373}
374
375impl WriteStreamConsumer {
376    fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
377        Self {
378            file,
379            location: WriteLocation::Offset(offset),
380            result: Some(result),
381            buffer: BytesMut::default(),
382            task: None,
383        }
384    }
385
386    fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
387        Self {
388            file,
389            location: WriteLocation::End,
390            result: Some(result),
391            buffer: BytesMut::default(),
392            task: None,
393        }
394    }
395
396    fn close(&mut self, res: Result<(), ErrorCode>) {
397        _ = self.result.take().unwrap().send(res);
398    }
399
400    /// Update the internal `offset` field after writing `amt` bytes from the file.
401    fn complete_write(&mut self, amt: usize) -> StreamResult {
402        match &mut self.location {
403            WriteLocation::End => StreamResult::Completed,
404            WriteLocation::Offset(offset) => {
405                let Ok(amt) = amt.try_into() else {
406                    self.close(Err(ErrorCode::Overflow));
407                    return StreamResult::Dropped;
408                };
409                let Some(amt) = offset.checked_add(amt) else {
410                    self.close(Err(ErrorCode::Overflow));
411                    return StreamResult::Dropped;
412                };
413                *offset = amt;
414                StreamResult::Completed
415            }
416        }
417    }
418}
419
420impl WriteLocation {
421    fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
422        match *self {
423            WriteLocation::End => file.append(bytes),
424            WriteLocation::Offset(at) => file.write_at(bytes, at),
425        }
426    }
427}
428
429impl<D> StreamConsumer<D> for WriteStreamConsumer {
430    type Item = u8;
431
432    fn poll_consume(
433        mut self: Pin<&mut Self>,
434        cx: &mut Context<'_>,
435        store: StoreContextMut<D>,
436        src: Source<Self::Item>,
437        // Intentionally ignore this as in blocking mode everything is always
438        // ready and otherwise spawned blocking work can't be cancelled.
439        _finish: bool,
440    ) -> Poll<wasmtime::Result<StreamResult>> {
441        let mut src = src.as_direct(store);
442        if let Some(file) = self.file.as_blocking_file() {
443            // Once a blocking file, always a blocking file, so assert as such.
444            assert!(self.task.is_none());
445            return match self.location.write(file, src.remaining()) {
446                Ok(n) => {
447                    src.mark_read(n);
448                    Poll::Ready(Ok(self.complete_write(n)))
449                }
450                Err(err) => {
451                    self.close(Err(err.into()));
452                    Poll::Ready(Ok(StreamResult::Dropped))
453                }
454            };
455        }
456        let me = &mut *self;
457        let task = me.task.get_or_insert_with(|| {
458            debug_assert!(me.buffer.is_empty());
459            me.buffer.extend_from_slice(src.remaining());
460            let buf = mem::take(&mut me.buffer);
461            let file = Arc::clone(me.file.as_file());
462            let location = me.location;
463            spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
464        });
465        let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
466        self.task = None;
467        match res {
468            Ok((buf, n)) => {
469                src.mark_read(n);
470                self.buffer = buf;
471                self.buffer.clear();
472                Poll::Ready(Ok(self.complete_write(n)))
473            }
474            Err(err) => {
475                self.close(Err(err.into()));
476                Poll::Ready(Ok(StreamResult::Dropped))
477            }
478        }
479    }
480}
481
482impl Drop for WriteStreamConsumer {
483    fn drop(&mut self) {
484        if self.result.is_some() {
485            self.close(Ok(()))
486        }
487    }
488}
489
490impl types::Host for WasiFilesystemCtxView<'_> {
491    fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
492        error.downcast()
493    }
494}
495
496impl types::HostDescriptorWithStore for WasiFilesystem {
497    fn read_via_stream<U>(
498        mut store: Access<U, Self>,
499        fd: Resource<Descriptor>,
500        offset: Filesize,
501    ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
502        let file = get_file(store.get().table, &fd)?;
503        if !file.perms.contains(FilePerms::READ) {
504            return Ok((
505                StreamReader::new(&mut store, iter::empty()),
506                FutureReader::new(&mut store, async {
507                    wasmtime::error::Ok(Err(ErrorCode::NotPermitted))
508                }),
509            ));
510        }
511
512        let file = file.clone();
513        let (result_tx, result_rx) = oneshot::channel();
514        Ok((
515            StreamReader::new(
516                &mut store,
517                ReadStreamProducer {
518                    file,
519                    offset,
520                    result: Some(result_tx),
521                    task: None,
522                },
523            ),
524            FutureReader::new(&mut store, result_rx),
525        ))
526    }
527
528    fn write_via_stream<U>(
529        mut store: Access<'_, U, Self>,
530        fd: Resource<Descriptor>,
531        mut data: StreamReader<u8>,
532        offset: Filesize,
533    ) -> FutureReader<Result<(), ErrorCode>> {
534        let (result_tx, result_rx) = oneshot::channel();
535        match get_file(store.get().table, &fd).and_then(|file| {
536            if !file.perms.contains(FilePerms::WRITE) {
537                Err(ErrorCode::NotPermitted.into())
538            } else {
539                Ok(file.clone())
540            }
541        }) {
542            Ok(file) => {
543                data.pipe(
544                    &mut store,
545                    WriteStreamConsumer::new_at(file, offset, result_tx),
546                );
547            }
548            Err(err) => {
549                data.close(&mut store);
550                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
551            }
552        }
553        FutureReader::new(&mut store, result_rx)
554    }
555
556    fn append_via_stream<U>(
557        mut store: Access<'_, U, Self>,
558        fd: Resource<Descriptor>,
559        mut data: StreamReader<u8>,
560    ) -> FutureReader<Result<(), ErrorCode>> {
561        let (result_tx, result_rx) = oneshot::channel();
562        match get_file(store.get().table, &fd).and_then(|file| {
563            if !file.perms.contains(FilePerms::WRITE) {
564                Err(ErrorCode::NotPermitted.into())
565            } else {
566                Ok(file.clone())
567            }
568        }) {
569            Ok(file) => {
570                data.pipe(&mut store, WriteStreamConsumer::new_append(file, result_tx));
571            }
572            Err(err) => {
573                data.close(&mut store);
574                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
575            }
576        }
577        FutureReader::new(&mut store, result_rx)
578    }
579
580    async fn advise<U>(
581        store: &Accessor<U, Self>,
582        fd: Resource<Descriptor>,
583        offset: Filesize,
584        length: Filesize,
585        advice: Advice,
586    ) -> FilesystemResult<()> {
587        let file = store.get_file(&fd)?;
588        file.advise(offset, length, advice.into()).await?;
589        Ok(())
590    }
591
592    async fn sync_data<U>(
593        store: &Accessor<U, Self>,
594        fd: Resource<Descriptor>,
595    ) -> FilesystemResult<()> {
596        let fd = store.get_descriptor(&fd)?;
597        fd.sync_data().await?;
598        Ok(())
599    }
600
601    async fn get_flags<U>(
602        store: &Accessor<U, Self>,
603        fd: Resource<Descriptor>,
604    ) -> FilesystemResult<DescriptorFlags> {
605        let fd = store.get_descriptor(&fd)?;
606        let flags = fd.get_flags().await?;
607        Ok(flags.into())
608    }
609
610    async fn get_type<U>(
611        store: &Accessor<U, Self>,
612        fd: Resource<Descriptor>,
613    ) -> FilesystemResult<DescriptorType> {
614        let fd = store.get_descriptor(&fd)?;
615        let ty = fd.get_type().await?;
616        Ok(ty.into())
617    }
618
619    async fn set_size<U>(
620        store: &Accessor<U, Self>,
621        fd: Resource<Descriptor>,
622        size: Filesize,
623    ) -> FilesystemResult<()> {
624        let file = store.get_file(&fd)?;
625        file.set_size(size).await?;
626        Ok(())
627    }
628
629    async fn set_times<U>(
630        store: &Accessor<U, Self>,
631        fd: Resource<Descriptor>,
632        data_access_timestamp: NewTimestamp,
633        data_modification_timestamp: NewTimestamp,
634    ) -> FilesystemResult<()> {
635        let fd = store.get_descriptor(&fd)?;
636        let atim = systemtimespec_from(data_access_timestamp)?;
637        let mtim = systemtimespec_from(data_modification_timestamp)?;
638        fd.set_times(atim, mtim).await?;
639        Ok(())
640    }
641
642    fn read_directory<U>(
643        mut store: Access<'_, U, Self>,
644        fd: Resource<Descriptor>,
645    ) -> (
646        StreamReader<DirectoryEntry>,
647        FutureReader<Result<(), ErrorCode>>,
648    ) {
649        let (result_tx, result_rx) = oneshot::channel();
650        let stream = match get_dir(store.get().table, &fd).and_then(|dir| {
651            if !dir.perms.contains(DirPerms::READ) {
652                Err(ErrorCode::NotPermitted.into())
653            } else {
654                Ok(dir)
655            }
656        }) {
657            Ok(dir) => {
658                let allow_blocking_current_thread = dir.allow_blocking_current_thread;
659                let dir = Arc::clone(dir.as_dir());
660                if allow_blocking_current_thread {
661                    match dir.entries() {
662                        Ok(readdir) => StreamReader::new(
663                            &mut store,
664                            FallibleIteratorProducer::new(
665                                readdir.filter_map(|e| map_dir_entry(e).transpose()),
666                                result_tx,
667                            ),
668                        ),
669                        Err(e) => {
670                            let _ = result_tx.send(Err(e.into()));
671                            StreamReader::new(&mut store, iter::empty())
672                        }
673                    }
674                } else {
675                    StreamReader::new(&mut store, ReadDirStream::new(dir, result_tx))
676                }
677            }
678            Err(err) => {
679                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
680                StreamReader::new(&mut store, iter::empty())
681            }
682        };
683        (stream, FutureReader::new(&mut store, result_rx))
684    }
685
686    async fn sync<U>(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
687        let fd = store.get_descriptor(&fd)?;
688        fd.sync().await?;
689        Ok(())
690    }
691
692    async fn create_directory_at<U>(
693        store: &Accessor<U, Self>,
694        fd: Resource<Descriptor>,
695        path: String,
696    ) -> FilesystemResult<()> {
697        let dir = store.get_dir(&fd)?;
698        dir.create_directory_at(path).await?;
699        Ok(())
700    }
701
702    async fn stat<U>(
703        store: &Accessor<U, Self>,
704        fd: Resource<Descriptor>,
705    ) -> FilesystemResult<DescriptorStat> {
706        let fd = store.get_descriptor(&fd)?;
707        let stat = fd.stat().await?;
708        Ok(stat.into())
709    }
710
711    async fn stat_at<U>(
712        store: &Accessor<U, Self>,
713        fd: Resource<Descriptor>,
714        path_flags: PathFlags,
715        path: String,
716    ) -> FilesystemResult<DescriptorStat> {
717        let dir = store.get_dir(&fd)?;
718        let stat = dir.stat_at(path_flags.into(), path).await?;
719        Ok(stat.into())
720    }
721
722    async fn set_times_at<U>(
723        store: &Accessor<U, Self>,
724        fd: Resource<Descriptor>,
725        path_flags: PathFlags,
726        path: String,
727        data_access_timestamp: NewTimestamp,
728        data_modification_timestamp: NewTimestamp,
729    ) -> FilesystemResult<()> {
730        let dir = store.get_dir(&fd)?;
731        let atim = systemtimespec_from(data_access_timestamp)?;
732        let mtim = systemtimespec_from(data_modification_timestamp)?;
733        dir.set_times_at(path_flags.into(), path, atim, mtim)
734            .await?;
735        Ok(())
736    }
737
738    async fn link_at<U>(
739        store: &Accessor<U, Self>,
740        fd: Resource<Descriptor>,
741        old_path_flags: PathFlags,
742        old_path: String,
743        new_fd: Resource<Descriptor>,
744        new_path: String,
745    ) -> FilesystemResult<()> {
746        let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
747        old_dir
748            .link_at(old_path_flags.into(), old_path, &new_dir, new_path)
749            .await?;
750        Ok(())
751    }
752
753    async fn open_at<U>(
754        store: &Accessor<U, Self>,
755        fd: Resource<Descriptor>,
756        path_flags: PathFlags,
757        path: String,
758        open_flags: OpenFlags,
759        flags: DescriptorFlags,
760    ) -> FilesystemResult<Resource<Descriptor>> {
761        let (allow_blocking_current_thread, dir) = store.with(|mut store| {
762            let store = store.get();
763            let dir = get_dir(&store.table, &fd)?;
764            FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
765        })?;
766        let fd = dir
767            .open_at(
768                path_flags.into(),
769                path,
770                open_flags.into(),
771                flags.into(),
772                allow_blocking_current_thread,
773            )
774            .await?;
775        let fd = store.with(|mut store| store.get().table.push(fd))?;
776        Ok(fd)
777    }
778
779    async fn readlink_at<U>(
780        store: &Accessor<U, Self>,
781        fd: Resource<Descriptor>,
782        path: String,
783    ) -> FilesystemResult<String> {
784        let dir = store.get_dir(&fd)?;
785        let path = dir.readlink_at(path).await?;
786        Ok(path)
787    }
788
789    async fn remove_directory_at<U>(
790        store: &Accessor<U, Self>,
791        fd: Resource<Descriptor>,
792        path: String,
793    ) -> FilesystemResult<()> {
794        let dir = store.get_dir(&fd)?;
795        dir.remove_directory_at(path).await?;
796        Ok(())
797    }
798
799    async fn rename_at<U>(
800        store: &Accessor<U, Self>,
801        fd: Resource<Descriptor>,
802        old_path: String,
803        new_fd: Resource<Descriptor>,
804        new_path: String,
805    ) -> FilesystemResult<()> {
806        let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
807        old_dir.rename_at(old_path, &new_dir, new_path).await?;
808        Ok(())
809    }
810
811    async fn symlink_at<U>(
812        store: &Accessor<U, Self>,
813        fd: Resource<Descriptor>,
814        old_path: String,
815        new_path: String,
816    ) -> FilesystemResult<()> {
817        let dir = store.get_dir(&fd)?;
818        dir.symlink_at(old_path, new_path).await?;
819        Ok(())
820    }
821
822    async fn unlink_file_at<U>(
823        store: &Accessor<U, Self>,
824        fd: Resource<Descriptor>,
825        path: String,
826    ) -> FilesystemResult<()> {
827        let dir = store.get_dir(&fd)?;
828        dir.unlink_file_at(path).await?;
829        Ok(())
830    }
831
832    async fn is_same_object<U>(
833        store: &Accessor<U, Self>,
834        fd: Resource<Descriptor>,
835        other: Resource<Descriptor>,
836    ) -> wasmtime::Result<bool> {
837        let (fd, other) = store.with(|mut store| {
838            let table = store.get().table;
839            let fd = get_descriptor(table, &fd)?.clone();
840            let other = get_descriptor(table, &other)?.clone();
841            wasmtime::error::Ok((fd, other))
842        })?;
843        fd.is_same_object(&other).await
844    }
845
846    async fn metadata_hash<U>(
847        store: &Accessor<U, Self>,
848        fd: Resource<Descriptor>,
849    ) -> FilesystemResult<MetadataHashValue> {
850        let fd = store.get_descriptor(&fd)?;
851        let meta = fd.metadata_hash().await?;
852        Ok(meta.into())
853    }
854
855    async fn metadata_hash_at<U>(
856        store: &Accessor<U, Self>,
857        fd: Resource<Descriptor>,
858        path_flags: PathFlags,
859        path: String,
860    ) -> FilesystemResult<MetadataHashValue> {
861        let dir = store.get_dir(&fd)?;
862        let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
863        Ok(meta.into())
864    }
865}
866
867impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
868    fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
869        self.table
870            .delete(fd)
871            .context("failed to delete descriptor resource from table")?;
872        Ok(())
873    }
874}
875
876impl preopens::Host for WasiFilesystemCtxView<'_> {
877    fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
878        self.get_directories()
879    }
880}