Skip to main content

wasmtime_wasi/p3/filesystem/
host.rs

1use crate::filesystem::sys;
2use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
3use crate::p3::bindings::clocks::system_clock;
4use crate::p3::bindings::filesystem::types::{
5    self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
6    Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
7};
8use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
9use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
10use crate::{DirPerms, FilePerms};
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll, ready};
14use core::{iter, mem};
15use std::io::{self, Cursor};
16use std::sync::Arc;
17use tokio::sync::{mpsc, oneshot};
18use tokio::task::{JoinHandle, spawn_blocking};
19use wasmtime::StoreContextMut;
20use wasmtime::component::{
21    Access, Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
22    StreamProducer, StreamReader, StreamResult,
23};
24use wasmtime::error::Context as _;
25
26fn get_descriptor<'a>(
27    table: &'a ResourceTable,
28    fd: &'a Resource<Descriptor>,
29) -> FilesystemResult<&'a Descriptor> {
30    table
31        .get(fd)
32        .context("failed to get descriptor resource from table")
33        .map_err(FilesystemError::trap)
34}
35
36fn get_file<'a>(
37    table: &'a ResourceTable,
38    fd: &'a Resource<Descriptor>,
39) -> FilesystemResult<&'a File> {
40    let file = get_descriptor(table, fd).map(Descriptor::file)??;
41    Ok(file)
42}
43
44fn get_dir<'a>(
45    table: &'a ResourceTable,
46    fd: &'a Resource<Descriptor>,
47) -> FilesystemResult<&'a Dir> {
48    let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
49    Ok(dir)
50}
51
52trait AccessorExt {
53    fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
54    fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
55    fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
56    fn get_dir_pair(
57        &self,
58        a: &Resource<Descriptor>,
59        b: &Resource<Descriptor>,
60    ) -> FilesystemResult<(Dir, Dir)>;
61}
62
63impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
64    fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
65        self.with(|mut store| {
66            let fd = get_descriptor(store.get().table, fd)?;
67            Ok(fd.clone())
68        })
69    }
70
71    fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
72        self.with(|mut store| {
73            let file = get_file(store.get().table, fd)?;
74            Ok(file.clone())
75        })
76    }
77
78    fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
79        self.with(|mut store| {
80            let dir = get_dir(store.get().table, fd)?;
81            Ok(dir.clone())
82        })
83    }
84
85    fn get_dir_pair(
86        &self,
87        a: &Resource<Descriptor>,
88        b: &Resource<Descriptor>,
89    ) -> FilesystemResult<(Dir, Dir)> {
90        self.with(|mut store| {
91            let table = store.get().table;
92            let a = get_dir(table, a)?;
93            let b = get_dir(table, b)?;
94            Ok((a.clone(), b.clone()))
95        })
96    }
97}
98
99fn systemtime_from(t: system_clock::Instant) -> Result<std::time::SystemTime, ErrorCode> {
100    if let Ok(seconds) = t.seconds.try_into() {
101        std::time::SystemTime::UNIX_EPOCH
102            .checked_add(core::time::Duration::new(seconds, t.nanoseconds))
103            .ok_or(ErrorCode::Overflow)
104    } else {
105        std::time::SystemTime::UNIX_EPOCH
106            .checked_sub(core::time::Duration::new(
107                t.seconds.unsigned_abs(),
108                t.nanoseconds,
109            ))
110            .ok_or(ErrorCode::Overflow)
111    }
112}
113
114fn systemtimespec_from(t: NewTimestamp) -> Result<Option<fs_set_times::SystemTimeSpec>, ErrorCode> {
115    use fs_set_times::SystemTimeSpec;
116    match t {
117        NewTimestamp::NoChange => Ok(None),
118        NewTimestamp::Now => Ok(Some(SystemTimeSpec::SymbolicNow)),
119        NewTimestamp::Timestamp(st) => {
120            let st = systemtime_from(st)?;
121            Ok(Some(SystemTimeSpec::Absolute(st)))
122        }
123    }
124}
125
126struct ReadStreamProducer {
127    file: File,
128    offset: u64,
129    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
130    task: Option<JoinHandle<std::io::Result<BytesMut>>>,
131}
132
133impl Drop for ReadStreamProducer {
134    fn drop(&mut self) {
135        self.close(Ok(()))
136    }
137}
138
139impl ReadStreamProducer {
140    fn close(&mut self, res: Result<(), ErrorCode>) {
141        if let Some(tx) = self.result.take() {
142            _ = tx.send(res);
143        }
144    }
145
146    /// Update the internal `offset` field after reading `amt` bytes from the file.
147    fn complete_read(&mut self, amt: usize) -> StreamResult {
148        let Ok(amt) = amt.try_into() else {
149            self.close(Err(ErrorCode::Overflow));
150            return StreamResult::Dropped;
151        };
152        let Some(amt) = self.offset.checked_add(amt) else {
153            self.close(Err(ErrorCode::Overflow));
154            return StreamResult::Dropped;
155        };
156        self.offset = amt;
157        StreamResult::Completed
158    }
159}
160
161impl<D> StreamProducer<D> for ReadStreamProducer {
162    type Item = u8;
163    type Buffer = Cursor<BytesMut>;
164
165    fn poll_produce<'a>(
166        mut self: Pin<&mut Self>,
167        cx: &mut Context<'_>,
168        store: StoreContextMut<'a, D>,
169        mut dst: Destination<'a, Self::Item, Self::Buffer>,
170        finish: bool,
171    ) -> Poll<wasmtime::Result<StreamResult>> {
172        if let Some(file) = self.file.as_blocking_file() {
173            // Once a blocking file, always a blocking file, so assert as such.
174            assert!(self.task.is_none());
175            let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
176            let buf = dst.remaining();
177            if buf.is_empty() {
178                return Poll::Ready(Ok(StreamResult::Completed));
179            }
180            return match sys::read_at_cursor_unspecified(file, buf, self.offset) {
181                Ok(0) => {
182                    self.close(Ok(()));
183                    Poll::Ready(Ok(StreamResult::Dropped))
184                }
185                Ok(n) => {
186                    dst.mark_written(n);
187                    Poll::Ready(Ok(self.complete_read(n)))
188                }
189                Err(err) => {
190                    self.close(Err(err.into()));
191                    Poll::Ready(Ok(StreamResult::Dropped))
192                }
193            };
194        }
195
196        // Lazily spawn a read task if one hasn't already been spawned yet.
197        let me = &mut *self;
198        let task = me.task.get_or_insert_with(|| {
199            let mut buf = dst.take_buffer().into_inner();
200            buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
201            let file = Arc::clone(me.file.as_file());
202            let offset = me.offset;
203            spawn_blocking(move || {
204                sys::read_at_cursor_unspecified(file, &mut buf, offset).map(|n| {
205                    buf.truncate(n);
206                    buf
207                })
208            })
209        });
210
211        // Await the completion of the read task. Note that this is not a
212        // cancellable await point because we can't cancel the other task, so
213        // the `finish` parameter is ignored.
214        let result = match Pin::new(&mut *task).poll(cx) {
215            // If cancellation is requested, then flag that to Tokio. Note that
216            // this still waits for the actual completion of the spawned task,
217            // which won't actually happen if it's already executing.
218            Poll::Pending if finish => {
219                task.abort();
220                ready!(Pin::new(task).poll(cx))
221            }
222            other => ready!(other),
223        };
224        self.task = None;
225        match result {
226            Ok(Ok(buf)) if buf.is_empty() => {
227                self.close(Ok(()));
228                Poll::Ready(Ok(StreamResult::Dropped))
229            }
230            Ok(Ok(buf)) => {
231                let n = buf.len();
232                dst.set_buffer(Cursor::new(buf));
233                Poll::Ready(Ok(self.complete_read(n)))
234            }
235            Ok(Err(err)) => {
236                self.close(Err(err.into()));
237                Poll::Ready(Ok(StreamResult::Dropped))
238            }
239            Err(err) => {
240                if err.is_cancelled() {
241                    return Poll::Ready(Ok(StreamResult::Cancelled));
242                }
243                panic!("I/O task should not panic: {err}")
244            }
245        }
246    }
247}
248
249fn map_dir_entry(
250    entry: std::io::Result<cap_std::fs::DirEntry>,
251) -> Result<Option<DirectoryEntry>, ErrorCode> {
252    match entry {
253        Ok(entry) => {
254            let meta = entry.metadata()?;
255            let Ok(name) = entry.file_name().into_string() else {
256                return Err(ErrorCode::IllegalByteSequence);
257            };
258            Ok(Some(DirectoryEntry {
259                type_: meta.file_type().into(),
260                name,
261            }))
262        }
263        Err(err) => {
264            // On windows, filter out files like `C:\DumpStack.log.tmp` which we
265            // can't get full metadata for.
266            #[cfg(windows)]
267            {
268                use windows_sys::Win32::Foundation::{
269                    ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
270                };
271                if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
272                    || err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
273                {
274                    return Ok(None);
275                }
276            }
277            Err(err.into())
278        }
279    }
280}
281
282struct ReadDirStream {
283    rx: mpsc::Receiver<DirectoryEntry>,
284    task: JoinHandle<Result<(), ErrorCode>>,
285    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
286}
287
288impl ReadDirStream {
289    fn new(
290        dir: Arc<cap_std::fs::Dir>,
291        result: oneshot::Sender<Result<(), ErrorCode>>,
292    ) -> ReadDirStream {
293        let (tx, rx) = mpsc::channel(1);
294        ReadDirStream {
295            task: spawn_blocking(move || {
296                let entries = dir.entries()?;
297                for entry in entries {
298                    if let Some(entry) = map_dir_entry(entry)? {
299                        if let Err(_) = tx.blocking_send(entry) {
300                            break;
301                        }
302                    }
303                }
304                Ok(())
305            }),
306            rx,
307            result: Some(result),
308        }
309    }
310
311    fn close(&mut self, res: Result<(), ErrorCode>) {
312        self.rx.close();
313        self.task.abort();
314        let _ = self.result.take().unwrap().send(res);
315    }
316}
317
318impl<D> StreamProducer<D> for ReadDirStream {
319    type Item = DirectoryEntry;
320    type Buffer = Option<DirectoryEntry>;
321
322    fn poll_produce<'a>(
323        mut self: Pin<&mut Self>,
324        cx: &mut Context<'_>,
325        mut store: StoreContextMut<'a, D>,
326        mut dst: Destination<'a, Self::Item, Self::Buffer>,
327        finish: bool,
328    ) -> Poll<wasmtime::Result<StreamResult>> {
329        // If this is a 0-length read then `mpsc::Receiver` does not expose an
330        // API to wait for an item to be available without taking it out of the
331        // channel. In lieu of that just say that we're complete and ready for a
332        // read.
333        if dst.remaining(&mut store) == Some(0) {
334            return Poll::Ready(Ok(StreamResult::Completed));
335        }
336
337        match self.rx.poll_recv(cx) {
338            // If an item is on the channel then send that along and say that
339            // the read is now complete with one item being yielded.
340            Poll::Ready(Some(item)) => {
341                dst.set_buffer(Some(item));
342                Poll::Ready(Ok(StreamResult::Completed))
343            }
344
345            // If there's nothing left on the channel then that means that an
346            // error occurred or the iterator is done. In both cases an
347            // un-cancellable wait for the spawned task is entered and we await
348            // its completion. Upon completion there our own stream is closed
349            // with the result (sending an error code on our oneshot) and then
350            // the stream is reported as dropped.
351            Poll::Ready(None) => {
352                let result = ready!(Pin::new(&mut self.task).poll(cx))
353                    .expect("spawned task should not panic");
354                self.close(result);
355                Poll::Ready(Ok(StreamResult::Dropped))
356            }
357
358            // If an item isn't ready yet then cancel this outstanding request
359            // if `finish` is set, otherwise propagate the `Pending` status.
360            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
361            Poll::Pending => Poll::Pending,
362        }
363    }
364}
365
366impl Drop for ReadDirStream {
367    fn drop(&mut self) {
368        if self.result.is_some() {
369            self.close(Ok(()));
370        }
371    }
372}
373
374struct WriteStreamConsumer {
375    file: File,
376    location: WriteLocation,
377    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
378    buffer: BytesMut,
379    task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
380}
381
382#[derive(Copy, Clone)]
383enum WriteLocation {
384    End,
385    Offset(u64),
386}
387
388impl WriteStreamConsumer {
389    fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
390        Self {
391            file,
392            location: WriteLocation::Offset(offset),
393            result: Some(result),
394            buffer: BytesMut::default(),
395            task: None,
396        }
397    }
398
399    fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
400        Self {
401            file,
402            location: WriteLocation::End,
403            result: Some(result),
404            buffer: BytesMut::default(),
405            task: None,
406        }
407    }
408
409    fn close(&mut self, res: Result<(), ErrorCode>) {
410        _ = self.result.take().unwrap().send(res);
411    }
412
413    /// Update the internal `offset` field after writing `amt` bytes from the file.
414    fn complete_write(&mut self, amt: usize) -> StreamResult {
415        match &mut self.location {
416            WriteLocation::End => StreamResult::Completed,
417            WriteLocation::Offset(offset) => {
418                let Ok(amt) = amt.try_into() else {
419                    self.close(Err(ErrorCode::Overflow));
420                    return StreamResult::Dropped;
421                };
422                let Some(amt) = offset.checked_add(amt) else {
423                    self.close(Err(ErrorCode::Overflow));
424                    return StreamResult::Dropped;
425                };
426                *offset = amt;
427                StreamResult::Completed
428            }
429        }
430    }
431}
432
433impl WriteLocation {
434    fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
435        match *self {
436            WriteLocation::End => sys::append_cursor_unspecified(file, bytes),
437            WriteLocation::Offset(at) => sys::write_at_cursor_unspecified(file, bytes, at),
438        }
439    }
440}
441
442impl<D> StreamConsumer<D> for WriteStreamConsumer {
443    type Item = u8;
444
445    fn poll_consume(
446        mut self: Pin<&mut Self>,
447        cx: &mut Context<'_>,
448        store: StoreContextMut<D>,
449        src: Source<Self::Item>,
450        finish: bool,
451    ) -> Poll<wasmtime::Result<StreamResult>> {
452        let mut src = src.as_direct(store);
453        if let Some(file) = self.file.as_blocking_file() {
454            // Once a blocking file, always a blocking file, so assert as such.
455            assert!(self.task.is_none());
456            return match self.location.write(file, src.remaining()) {
457                Ok(n) => {
458                    src.mark_read(n);
459                    Poll::Ready(Ok(self.complete_write(n)))
460                }
461                Err(err) => {
462                    self.close(Err(err.into()));
463                    Poll::Ready(Ok(StreamResult::Dropped))
464                }
465            };
466        }
467        let me = &mut *self;
468        let task = me.task.get_or_insert_with(|| {
469            debug_assert!(me.buffer.is_empty());
470            me.buffer.extend_from_slice(src.remaining());
471            let buf = mem::take(&mut me.buffer);
472            let file = Arc::clone(me.file.as_file());
473            let location = me.location;
474            spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
475        });
476        let result = match Pin::new(&mut *task).poll(cx) {
477            // If cancellation is requested, then flag that to Tokio. Note that
478            // this still waits for the actual completion of the spawned task,
479            // which won't actually happen if it's already executing.
480            Poll::Pending if finish => {
481                task.abort();
482                ready!(Pin::new(task).poll(cx))
483            }
484            other => ready!(other),
485        };
486        self.task = None;
487        match result {
488            Ok(Ok((buf, n))) => {
489                src.mark_read(n);
490                self.buffer = buf;
491                self.buffer.clear();
492                Poll::Ready(Ok(self.complete_write(n)))
493            }
494            Ok(Err(err)) => {
495                self.close(Err(err.into()));
496                Poll::Ready(Ok(StreamResult::Dropped))
497            }
498            Err(err) => {
499                if err.is_cancelled() {
500                    return Poll::Ready(Ok(StreamResult::Cancelled));
501                }
502                panic!("I/O task should not panic: {err}")
503            }
504        }
505    }
506}
507
508impl Drop for WriteStreamConsumer {
509    fn drop(&mut self) {
510        if self.result.is_some() {
511            self.close(Ok(()))
512        }
513    }
514}
515
516impl types::Host for WasiFilesystemCtxView<'_> {
517    fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
518        error.downcast()
519    }
520}
521
522impl types::HostDescriptorWithStore for WasiFilesystem {
523    fn read_via_stream<U>(
524        mut store: Access<U, Self>,
525        fd: Resource<Descriptor>,
526        offset: Filesize,
527    ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
528        let file = get_file(store.get().table, &fd)?;
529        if !file.perms.contains(FilePerms::READ) {
530            return Ok((
531                StreamReader::new(&mut store, iter::empty())?,
532                FutureReader::new(&mut store, async {
533                    wasmtime::error::Ok(Err(ErrorCode::NotPermitted))
534                })?,
535            ));
536        }
537
538        let file = file.clone();
539        let (result_tx, result_rx) = oneshot::channel();
540        Ok((
541            StreamReader::new(
542                &mut store,
543                ReadStreamProducer {
544                    file,
545                    offset,
546                    result: Some(result_tx),
547                    task: None,
548                },
549            )?,
550            FutureReader::new(&mut store, result_rx)?,
551        ))
552    }
553
554    fn write_via_stream<U>(
555        mut store: Access<'_, U, Self>,
556        fd: Resource<Descriptor>,
557        mut data: StreamReader<u8>,
558        offset: Filesize,
559    ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
560        let (result_tx, result_rx) = oneshot::channel();
561        match get_file(store.get().table, &fd).and_then(|file| {
562            if !file.perms.contains(FilePerms::WRITE) {
563                Err(ErrorCode::NotPermitted.into())
564            } else {
565                Ok(file.clone())
566            }
567        }) {
568            Ok(file) => {
569                data.pipe(
570                    &mut store,
571                    WriteStreamConsumer::new_at(file, offset, result_tx),
572                )?;
573            }
574            Err(err) => {
575                data.close(&mut store)?;
576                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
577            }
578        }
579        FutureReader::new(&mut store, result_rx)
580    }
581
582    fn append_via_stream<U>(
583        mut store: Access<'_, U, Self>,
584        fd: Resource<Descriptor>,
585        mut data: StreamReader<u8>,
586    ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
587        let (result_tx, result_rx) = oneshot::channel();
588        match get_file(store.get().table, &fd).and_then(|file| {
589            if !file.perms.contains(FilePerms::WRITE) {
590                Err(ErrorCode::NotPermitted.into())
591            } else {
592                Ok(file.clone())
593            }
594        }) {
595            Ok(file) => {
596                data.pipe(&mut store, WriteStreamConsumer::new_append(file, result_tx))?;
597            }
598            Err(err) => {
599                data.close(&mut store)?;
600                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
601            }
602        }
603        FutureReader::new(&mut store, result_rx)
604    }
605
606    async fn advise<U>(
607        store: &Accessor<U, Self>,
608        fd: Resource<Descriptor>,
609        offset: Filesize,
610        length: Filesize,
611        advice: Advice,
612    ) -> FilesystemResult<()> {
613        let file = store.get_file(&fd)?;
614        file.advise(offset, length, advice.into()).await?;
615        Ok(())
616    }
617
618    async fn sync_data<U>(
619        store: &Accessor<U, Self>,
620        fd: Resource<Descriptor>,
621    ) -> FilesystemResult<()> {
622        let fd = store.get_descriptor(&fd)?;
623        fd.sync_data().await?;
624        Ok(())
625    }
626
627    async fn get_flags<U>(
628        store: &Accessor<U, Self>,
629        fd: Resource<Descriptor>,
630    ) -> FilesystemResult<DescriptorFlags> {
631        let fd = store.get_descriptor(&fd)?;
632        let flags = fd.get_flags().await?;
633        Ok(flags.into())
634    }
635
636    async fn get_type<U>(
637        store: &Accessor<U, Self>,
638        fd: Resource<Descriptor>,
639    ) -> FilesystemResult<DescriptorType> {
640        let fd = store.get_descriptor(&fd)?;
641        let ty = fd.get_type().await?;
642        Ok(ty.into())
643    }
644
645    async fn set_size<U>(
646        store: &Accessor<U, Self>,
647        fd: Resource<Descriptor>,
648        size: Filesize,
649    ) -> FilesystemResult<()> {
650        let file = store.get_file(&fd)?;
651        file.set_size(size).await?;
652        Ok(())
653    }
654
655    async fn set_times<U>(
656        store: &Accessor<U, Self>,
657        fd: Resource<Descriptor>,
658        data_access_timestamp: NewTimestamp,
659        data_modification_timestamp: NewTimestamp,
660    ) -> FilesystemResult<()> {
661        let fd = store.get_descriptor(&fd)?;
662        let atim = systemtimespec_from(data_access_timestamp)?;
663        let mtim = systemtimespec_from(data_modification_timestamp)?;
664        fd.set_times(atim, mtim).await?;
665        Ok(())
666    }
667
668    fn read_directory<U>(
669        mut store: Access<'_, U, Self>,
670        fd: Resource<Descriptor>,
671    ) -> wasmtime::Result<(
672        StreamReader<DirectoryEntry>,
673        FutureReader<Result<(), ErrorCode>>,
674    )> {
675        let (result_tx, result_rx) = oneshot::channel();
676        let stream = match get_dir(store.get().table, &fd).and_then(|dir| {
677            if !dir.perms.contains(DirPerms::READ) {
678                Err(ErrorCode::NotPermitted.into())
679            } else {
680                Ok(dir)
681            }
682        }) {
683            Ok(dir) => {
684                let allow_blocking_current_thread = dir.allow_blocking_current_thread;
685                let dir = Arc::clone(dir.as_dir());
686                if allow_blocking_current_thread {
687                    match dir.entries() {
688                        Ok(readdir) => StreamReader::new(
689                            &mut store,
690                            FallibleIteratorProducer::new(
691                                readdir.filter_map(|e| map_dir_entry(e).transpose()),
692                                result_tx,
693                            ),
694                        )?,
695                        Err(e) => {
696                            let _ = result_tx.send(Err(e.into()));
697                            StreamReader::new(&mut store, iter::empty())?
698                        }
699                    }
700                } else {
701                    StreamReader::new(&mut store, ReadDirStream::new(dir, result_tx))?
702                }
703            }
704            Err(err) => {
705                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
706                StreamReader::new(&mut store, iter::empty())?
707            }
708        };
709        Ok((stream, FutureReader::new(&mut store, result_rx)?))
710    }
711
712    async fn sync<U>(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
713        let fd = store.get_descriptor(&fd)?;
714        fd.sync().await?;
715        Ok(())
716    }
717
718    async fn create_directory_at<U>(
719        store: &Accessor<U, Self>,
720        fd: Resource<Descriptor>,
721        path: String,
722    ) -> FilesystemResult<()> {
723        let dir = store.get_dir(&fd)?;
724        dir.create_directory_at(path).await?;
725        Ok(())
726    }
727
728    async fn stat<U>(
729        store: &Accessor<U, Self>,
730        fd: Resource<Descriptor>,
731    ) -> FilesystemResult<DescriptorStat> {
732        let fd = store.get_descriptor(&fd)?;
733        let stat = fd.stat().await?;
734        Ok(stat.into())
735    }
736
737    async fn stat_at<U>(
738        store: &Accessor<U, Self>,
739        fd: Resource<Descriptor>,
740        path_flags: PathFlags,
741        path: String,
742    ) -> FilesystemResult<DescriptorStat> {
743        let dir = store.get_dir(&fd)?;
744        let stat = dir.stat_at(path_flags.into(), path).await?;
745        Ok(stat.into())
746    }
747
748    async fn set_times_at<U>(
749        store: &Accessor<U, Self>,
750        fd: Resource<Descriptor>,
751        path_flags: PathFlags,
752        path: String,
753        data_access_timestamp: NewTimestamp,
754        data_modification_timestamp: NewTimestamp,
755    ) -> FilesystemResult<()> {
756        let dir = store.get_dir(&fd)?;
757        let atim = systemtimespec_from(data_access_timestamp)?;
758        let mtim = systemtimespec_from(data_modification_timestamp)?;
759        dir.set_times_at(path_flags.into(), path, atim, mtim)
760            .await?;
761        Ok(())
762    }
763
764    async fn link_at<U>(
765        store: &Accessor<U, Self>,
766        fd: Resource<Descriptor>,
767        old_path_flags: PathFlags,
768        old_path: String,
769        new_fd: Resource<Descriptor>,
770        new_path: String,
771    ) -> FilesystemResult<()> {
772        let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
773        old_dir
774            .link_at(old_path_flags.into(), old_path, &new_dir, new_path)
775            .await?;
776        Ok(())
777    }
778
779    async fn open_at<U>(
780        store: &Accessor<U, Self>,
781        fd: Resource<Descriptor>,
782        path_flags: PathFlags,
783        path: String,
784        open_flags: OpenFlags,
785        flags: DescriptorFlags,
786    ) -> FilesystemResult<Resource<Descriptor>> {
787        let (allow_blocking_current_thread, dir) = store.with(|mut store| {
788            let store = store.get();
789            let dir = get_dir(&store.table, &fd)?;
790            FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
791        })?;
792        let fd = dir
793            .open_at(
794                path_flags.into(),
795                path,
796                open_flags.into(),
797                flags.into(),
798                allow_blocking_current_thread,
799            )
800            .await?;
801        let fd = store.with(|mut store| store.get().table.push(fd))?;
802        Ok(fd)
803    }
804
805    async fn readlink_at<U>(
806        store: &Accessor<U, Self>,
807        fd: Resource<Descriptor>,
808        path: String,
809    ) -> FilesystemResult<String> {
810        let dir = store.get_dir(&fd)?;
811        let path = dir.readlink_at(path).await?;
812        Ok(path)
813    }
814
815    async fn remove_directory_at<U>(
816        store: &Accessor<U, Self>,
817        fd: Resource<Descriptor>,
818        path: String,
819    ) -> FilesystemResult<()> {
820        let dir = store.get_dir(&fd)?;
821        dir.remove_directory_at(path).await?;
822        Ok(())
823    }
824
825    async fn rename_at<U>(
826        store: &Accessor<U, Self>,
827        fd: Resource<Descriptor>,
828        old_path: String,
829        new_fd: Resource<Descriptor>,
830        new_path: String,
831    ) -> FilesystemResult<()> {
832        let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
833        old_dir.rename_at(old_path, &new_dir, new_path).await?;
834        Ok(())
835    }
836
837    async fn symlink_at<U>(
838        store: &Accessor<U, Self>,
839        fd: Resource<Descriptor>,
840        old_path: String,
841        new_path: String,
842    ) -> FilesystemResult<()> {
843        let dir = store.get_dir(&fd)?;
844        dir.symlink_at(old_path, new_path).await?;
845        Ok(())
846    }
847
848    async fn unlink_file_at<U>(
849        store: &Accessor<U, Self>,
850        fd: Resource<Descriptor>,
851        path: String,
852    ) -> FilesystemResult<()> {
853        let dir = store.get_dir(&fd)?;
854        dir.unlink_file_at(path).await?;
855        Ok(())
856    }
857
858    async fn is_same_object<U>(
859        store: &Accessor<U, Self>,
860        fd: Resource<Descriptor>,
861        other: Resource<Descriptor>,
862    ) -> wasmtime::Result<bool> {
863        let (fd, other) = store.with(|mut store| {
864            let table = store.get().table;
865            let fd = get_descriptor(table, &fd)?.clone();
866            let other = get_descriptor(table, &other)?.clone();
867            wasmtime::error::Ok((fd, other))
868        })?;
869        fd.is_same_object(&other).await
870    }
871
872    async fn metadata_hash<U>(
873        store: &Accessor<U, Self>,
874        fd: Resource<Descriptor>,
875    ) -> FilesystemResult<MetadataHashValue> {
876        let fd = store.get_descriptor(&fd)?;
877        let meta = fd.metadata_hash().await?;
878        Ok(meta.into())
879    }
880
881    async fn metadata_hash_at<U>(
882        store: &Accessor<U, Self>,
883        fd: Resource<Descriptor>,
884        path_flags: PathFlags,
885        path: String,
886    ) -> FilesystemResult<MetadataHashValue> {
887        let dir = store.get_dir(&fd)?;
888        let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
889        Ok(meta.into())
890    }
891}
892
893impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
894    fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
895        self.table
896            .delete(fd)
897            .context("failed to delete descriptor resource from table")?;
898        Ok(())
899    }
900}
901
902impl preopens::Host for WasiFilesystemCtxView<'_> {
903    fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
904        self.get_directories()
905    }
906}