Skip to main content

wasmtime_wasi/p3/filesystem/
host.rs

1use crate::filesystem::sys;
2use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
3use crate::p3::bindings::clocks::system_clock;
4use crate::p3::bindings::filesystem::types::{
5    self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
6    Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
7};
8use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
9use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
10use crate::{DirPerms, FilePerms};
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll, ready};
14use core::{iter, mem};
15use std::io;
16use std::sync::Arc;
17use std::time::SystemTime;
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::{JoinHandle, spawn_blocking};
20use wasmtime::StoreContextMut;
21use wasmtime::component::{
22    Access, Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
23    StreamProducer, StreamReader, StreamResult,
24};
25use wasmtime::error::Context as _;
26
27fn get_descriptor<'a>(
28    table: &'a ResourceTable,
29    fd: &'a Resource<Descriptor>,
30) -> FilesystemResult<&'a Descriptor> {
31    table
32        .get(fd)
33        .context("failed to get descriptor resource from table")
34        .map_err(FilesystemError::trap)
35}
36
37fn get_file<'a>(
38    table: &'a ResourceTable,
39    fd: &'a Resource<Descriptor>,
40) -> FilesystemResult<&'a File> {
41    let file = get_descriptor(table, fd).map(Descriptor::file)??;
42    Ok(file)
43}
44
45fn get_dir<'a>(
46    table: &'a ResourceTable,
47    fd: &'a Resource<Descriptor>,
48) -> FilesystemResult<&'a Dir> {
49    let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
50    Ok(dir)
51}
52
53trait AccessorExt {
54    fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
55    fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
56    fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
57    fn get_dir_pair(
58        &self,
59        a: &Resource<Descriptor>,
60        b: &Resource<Descriptor>,
61    ) -> FilesystemResult<(Dir, Dir)>;
62}
63
64impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
65    fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
66        self.with(|mut store| {
67            let fd = get_descriptor(store.get().table, fd)?;
68            Ok(fd.clone())
69        })
70    }
71
72    fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
73        self.with(|mut store| {
74            let file = get_file(store.get().table, fd)?;
75            Ok(file.clone())
76        })
77    }
78
79    fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
80        self.with(|mut store| {
81            let dir = get_dir(store.get().table, fd)?;
82            Ok(dir.clone())
83        })
84    }
85
86    fn get_dir_pair(
87        &self,
88        a: &Resource<Descriptor>,
89        b: &Resource<Descriptor>,
90    ) -> FilesystemResult<(Dir, Dir)> {
91        self.with(|mut store| {
92            let table = store.get().table;
93            let a = get_dir(table, a)?;
94            let b = get_dir(table, b)?;
95            Ok((a.clone(), b.clone()))
96        })
97    }
98}
99
100fn systemtime_from(t: system_clock::Instant) -> Result<std::time::SystemTime, ErrorCode> {
101    if let Ok(seconds) = t.seconds.try_into() {
102        std::time::SystemTime::UNIX_EPOCH
103            .checked_add(core::time::Duration::new(seconds, t.nanoseconds))
104            .ok_or(ErrorCode::Overflow)
105    } else {
106        std::time::SystemTime::UNIX_EPOCH
107            .checked_sub(core::time::Duration::new(
108                t.seconds.unsigned_abs(),
109                t.nanoseconds,
110            ))
111            .ok_or(ErrorCode::Overflow)
112    }
113}
114
115fn systemtimespec_from(t: NewTimestamp) -> Result<Option<SystemTime>, ErrorCode> {
116    match t {
117        NewTimestamp::NoChange => Ok(None),
118        NewTimestamp::Now => Ok(Some(SystemTime::now())),
119        NewTimestamp::Timestamp(st) => Ok(Some(systemtime_from(st)?)),
120    }
121}
122
123struct ReadStreamProducer {
124    file: File,
125    offset: u64,
126    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
127    task: Option<JoinHandle<std::io::Result<BytesMut>>>,
128}
129
130impl Drop for ReadStreamProducer {
131    fn drop(&mut self) {
132        self.close(Ok(()))
133    }
134}
135
136impl ReadStreamProducer {
137    fn close(&mut self, res: Result<(), ErrorCode>) {
138        if let Some(tx) = self.result.take() {
139            _ = tx.send(res);
140        }
141    }
142
143    /// Update the internal `offset` field after reading `amt` bytes from the file.
144    fn complete_read(&mut self, amt: usize) -> StreamResult {
145        let Ok(amt) = amt.try_into() else {
146            self.close(Err(ErrorCode::Overflow));
147            return StreamResult::Dropped;
148        };
149        let Some(amt) = self.offset.checked_add(amt) else {
150            self.close(Err(ErrorCode::Overflow));
151            return StreamResult::Dropped;
152        };
153        self.offset = amt;
154        StreamResult::Completed
155    }
156}
157
158impl<D> StreamProducer<D> for ReadStreamProducer {
159    type Item = u8;
160    type Buffer = BytesMut;
161
162    fn poll_produce<'a>(
163        mut self: Pin<&mut Self>,
164        cx: &mut Context<'_>,
165        store: StoreContextMut<'a, D>,
166        mut dst: Destination<'a, Self::Item, Self::Buffer>,
167        finish: bool,
168    ) -> Poll<wasmtime::Result<StreamResult>> {
169        if let Some(file) = self.file.as_blocking_file() {
170            // Once a blocking file, always a blocking file, so assert as such.
171            assert!(self.task.is_none());
172            let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
173            let buf = dst.remaining();
174            if buf.is_empty() {
175                return Poll::Ready(Ok(StreamResult::Completed));
176            }
177            return match sys::read_at_cursor_unspecified(file, buf, self.offset) {
178                Ok(0) => {
179                    self.close(Ok(()));
180                    Poll::Ready(Ok(StreamResult::Dropped))
181                }
182                Ok(n) => {
183                    dst.mark_written(n);
184                    Poll::Ready(Ok(self.complete_read(n)))
185                }
186                Err(err) => {
187                    self.close(Err(err.into()));
188                    Poll::Ready(Ok(StreamResult::Dropped))
189                }
190            };
191        }
192
193        // Lazily spawn a read task if one hasn't already been spawned yet.
194        let me = &mut *self;
195        let task = me.task.get_or_insert_with(|| {
196            let mut buf = dst.take_buffer();
197            buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
198            let file = Arc::clone(me.file.as_file());
199            let offset = me.offset;
200            spawn_blocking(move || {
201                sys::read_at_cursor_unspecified(file, &mut buf, offset).map(|n| {
202                    buf.truncate(n);
203                    buf
204                })
205            })
206        });
207
208        // Await the completion of the read task. Note that this is not a
209        // cancellable await point because we can't cancel the other task, so
210        // the `finish` parameter is ignored.
211        let result = match Pin::new(&mut *task).poll(cx) {
212            // If cancellation is requested, then flag that to Tokio. Note that
213            // this still waits for the actual completion of the spawned task,
214            // which won't actually happen if it's already executing.
215            Poll::Pending if finish => {
216                task.abort();
217                ready!(Pin::new(task).poll(cx))
218            }
219            other => ready!(other),
220        };
221        self.task = None;
222        match result {
223            Ok(Ok(buf)) if buf.is_empty() => {
224                self.close(Ok(()));
225                Poll::Ready(Ok(StreamResult::Dropped))
226            }
227            Ok(Ok(buf)) => {
228                let n = buf.len();
229                dst.set_buffer(buf);
230                Poll::Ready(Ok(self.complete_read(n)))
231            }
232            Ok(Err(err)) => {
233                self.close(Err(err.into()));
234                Poll::Ready(Ok(StreamResult::Dropped))
235            }
236            Err(err) => {
237                if err.is_cancelled() {
238                    return Poll::Ready(Ok(StreamResult::Cancelled));
239                }
240                panic!("I/O task should not panic: {err}")
241            }
242        }
243    }
244}
245
246fn map_dir_entry(
247    entry: std::io::Result<cap_std::fs::DirEntry>,
248) -> Result<Option<DirectoryEntry>, ErrorCode> {
249    match entry {
250        Ok(entry) => {
251            let meta = entry.metadata()?;
252            let Ok(name) = entry.file_name().into_string() else {
253                return Err(ErrorCode::IllegalByteSequence);
254            };
255            Ok(Some(DirectoryEntry {
256                type_: meta.file_type().into(),
257                name,
258            }))
259        }
260        Err(err) => {
261            // On windows, filter out files like `C:\DumpStack.log.tmp` which we
262            // can't get full metadata for.
263            #[cfg(windows)]
264            {
265                use windows_sys::Win32::Foundation::{
266                    ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
267                };
268                if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
269                    || err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
270                {
271                    return Ok(None);
272                }
273            }
274            Err(err.into())
275        }
276    }
277}
278
279struct ReadDirStream {
280    rx: mpsc::Receiver<DirectoryEntry>,
281    task: JoinHandle<Result<(), ErrorCode>>,
282    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
283}
284
285impl ReadDirStream {
286    fn new(
287        dir: Arc<cap_std::fs::Dir>,
288        result: oneshot::Sender<Result<(), ErrorCode>>,
289    ) -> ReadDirStream {
290        let (tx, rx) = mpsc::channel(1);
291        ReadDirStream {
292            task: spawn_blocking(move || {
293                let entries = dir.entries()?;
294                for entry in entries {
295                    if let Some(entry) = map_dir_entry(entry)? {
296                        if let Err(_) = tx.blocking_send(entry) {
297                            break;
298                        }
299                    }
300                }
301                Ok(())
302            }),
303            rx,
304            result: Some(result),
305        }
306    }
307
308    fn close(&mut self, res: Result<(), ErrorCode>) {
309        self.rx.close();
310        self.task.abort();
311        let _ = self.result.take().unwrap().send(res);
312    }
313}
314
315impl<D> StreamProducer<D> for ReadDirStream {
316    type Item = DirectoryEntry;
317    type Buffer = Option<DirectoryEntry>;
318
319    fn poll_produce<'a>(
320        mut self: Pin<&mut Self>,
321        cx: &mut Context<'_>,
322        mut store: StoreContextMut<'a, D>,
323        mut dst: Destination<'a, Self::Item, Self::Buffer>,
324        finish: bool,
325    ) -> Poll<wasmtime::Result<StreamResult>> {
326        // If this is a 0-length read then `mpsc::Receiver` does not expose an
327        // API to wait for an item to be available without taking it out of the
328        // channel. In lieu of that just say that we're complete and ready for a
329        // read.
330        if dst.remaining(&mut store) == Some(0) {
331            return Poll::Ready(Ok(StreamResult::Completed));
332        }
333
334        match self.rx.poll_recv(cx) {
335            // If an item is on the channel then send that along and say that
336            // the read is now complete with one item being yielded.
337            Poll::Ready(Some(item)) => {
338                dst.set_buffer(Some(item));
339                Poll::Ready(Ok(StreamResult::Completed))
340            }
341
342            // If there's nothing left on the channel then that means that an
343            // error occurred or the iterator is done. In both cases an
344            // un-cancellable wait for the spawned task is entered and we await
345            // its completion. Upon completion there our own stream is closed
346            // with the result (sending an error code on our oneshot) and then
347            // the stream is reported as dropped.
348            Poll::Ready(None) => {
349                let result = ready!(Pin::new(&mut self.task).poll(cx))
350                    .expect("spawned task should not panic");
351                self.close(result);
352                Poll::Ready(Ok(StreamResult::Dropped))
353            }
354
355            // If an item isn't ready yet then cancel this outstanding request
356            // if `finish` is set, otherwise propagate the `Pending` status.
357            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
358            Poll::Pending => Poll::Pending,
359        }
360    }
361}
362
363impl Drop for ReadDirStream {
364    fn drop(&mut self) {
365        if self.result.is_some() {
366            self.close(Ok(()));
367        }
368    }
369}
370
371struct WriteStreamConsumer {
372    file: File,
373    location: WriteLocation,
374    result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
375    buffer: BytesMut,
376    task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
377}
378
379#[derive(Copy, Clone)]
380enum WriteLocation {
381    End,
382    Offset(u64),
383}
384
385impl WriteStreamConsumer {
386    fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
387        Self {
388            file,
389            location: WriteLocation::Offset(offset),
390            result: Some(result),
391            buffer: BytesMut::default(),
392            task: None,
393        }
394    }
395
396    fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
397        Self {
398            file,
399            location: WriteLocation::End,
400            result: Some(result),
401            buffer: BytesMut::default(),
402            task: None,
403        }
404    }
405
406    fn close(&mut self, res: Result<(), ErrorCode>) {
407        _ = self.result.take().unwrap().send(res);
408    }
409
410    /// Update the internal `offset` field after writing `amt` bytes from the file.
411    fn complete_write(&mut self, amt: usize) -> StreamResult {
412        match &mut self.location {
413            WriteLocation::End => StreamResult::Completed,
414            WriteLocation::Offset(offset) => {
415                let Ok(amt) = amt.try_into() else {
416                    self.close(Err(ErrorCode::Overflow));
417                    return StreamResult::Dropped;
418                };
419                let Some(amt) = offset.checked_add(amt) else {
420                    self.close(Err(ErrorCode::Overflow));
421                    return StreamResult::Dropped;
422                };
423                *offset = amt;
424                StreamResult::Completed
425            }
426        }
427    }
428}
429
430impl WriteLocation {
431    fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
432        match *self {
433            WriteLocation::End => sys::append_cursor_unspecified(file, bytes),
434            WriteLocation::Offset(at) => sys::write_at_cursor_unspecified(file, bytes, at),
435        }
436    }
437}
438
439impl<D> StreamConsumer<D> for WriteStreamConsumer {
440    type Item = u8;
441
442    fn poll_consume(
443        mut self: Pin<&mut Self>,
444        cx: &mut Context<'_>,
445        store: StoreContextMut<D>,
446        src: Source<Self::Item>,
447        finish: bool,
448    ) -> Poll<wasmtime::Result<StreamResult>> {
449        let mut src = src.as_direct(store);
450        if let Some(file) = self.file.as_blocking_file() {
451            // Once a blocking file, always a blocking file, so assert as such.
452            assert!(self.task.is_none());
453            return match self.location.write(file, src.remaining()) {
454                Ok(n) => {
455                    src.mark_read(n);
456                    Poll::Ready(Ok(self.complete_write(n)))
457                }
458                Err(err) => {
459                    self.close(Err(err.into()));
460                    Poll::Ready(Ok(StreamResult::Dropped))
461                }
462            };
463        }
464        let me = &mut *self;
465        let task = me.task.get_or_insert_with(|| {
466            debug_assert!(me.buffer.is_empty());
467            me.buffer.extend_from_slice(src.remaining());
468            let buf = mem::take(&mut me.buffer);
469            let file = Arc::clone(me.file.as_file());
470            let location = me.location;
471            spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
472        });
473        let result = match Pin::new(&mut *task).poll(cx) {
474            // If cancellation is requested, then flag that to Tokio. Note that
475            // this still waits for the actual completion of the spawned task,
476            // which won't actually happen if it's already executing.
477            Poll::Pending if finish => {
478                task.abort();
479                ready!(Pin::new(task).poll(cx))
480            }
481            other => ready!(other),
482        };
483        self.task = None;
484        match result {
485            Ok(Ok((buf, n))) => {
486                src.mark_read(n);
487                self.buffer = buf;
488                self.buffer.clear();
489                Poll::Ready(Ok(self.complete_write(n)))
490            }
491            Ok(Err(err)) => {
492                self.close(Err(err.into()));
493                Poll::Ready(Ok(StreamResult::Dropped))
494            }
495            Err(err) => {
496                if err.is_cancelled() {
497                    return Poll::Ready(Ok(StreamResult::Cancelled));
498                }
499                panic!("I/O task should not panic: {err}")
500            }
501        }
502    }
503}
504
505impl Drop for WriteStreamConsumer {
506    fn drop(&mut self) {
507        if self.result.is_some() {
508            self.close(Ok(()))
509        }
510    }
511}
512
513impl types::Host for WasiFilesystemCtxView<'_> {
514    fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
515        error.downcast()
516    }
517}
518
519impl<U> types::HostDescriptorWithStore<U> for WasiFilesystem {
520    fn read_via_stream(
521        mut store: Access<U, Self>,
522        fd: Resource<Descriptor>,
523        offset: Filesize,
524    ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
525        let file = get_file(store.get().table, &fd)?;
526        if !file.perms.contains(FilePerms::READ) {
527            return Ok((
528                StreamReader::new(&mut store, iter::empty())?,
529                FutureReader::new(&mut store, async {
530                    wasmtime::error::Ok(Err(ErrorCode::NotPermitted))
531                })?,
532            ));
533        }
534
535        let file = file.clone();
536        let (result_tx, result_rx) = oneshot::channel();
537        Ok((
538            StreamReader::new(
539                &mut store,
540                ReadStreamProducer {
541                    file,
542                    offset,
543                    result: Some(result_tx),
544                    task: None,
545                },
546            )?,
547            FutureReader::new(&mut store, result_rx)?,
548        ))
549    }
550
551    fn write_via_stream(
552        mut store: Access<'_, U, Self>,
553        fd: Resource<Descriptor>,
554        mut data: StreamReader<u8>,
555        offset: Filesize,
556    ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
557        let (result_tx, result_rx) = oneshot::channel();
558        match get_file(store.get().table, &fd).and_then(|file| {
559            if !file.perms.contains(FilePerms::WRITE) {
560                Err(ErrorCode::NotPermitted.into())
561            } else {
562                Ok(file.clone())
563            }
564        }) {
565            Ok(file) => {
566                data.pipe(
567                    &mut store,
568                    WriteStreamConsumer::new_at(file, offset, result_tx),
569                )?;
570            }
571            Err(err) => {
572                data.close(&mut store)?;
573                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
574            }
575        }
576        FutureReader::new(&mut store, result_rx)
577    }
578
579    fn append_via_stream(
580        mut store: Access<'_, U, Self>,
581        fd: Resource<Descriptor>,
582        mut data: StreamReader<u8>,
583    ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
584        let (result_tx, result_rx) = oneshot::channel();
585        match get_file(store.get().table, &fd).and_then(|file| {
586            if !file.perms.contains(FilePerms::WRITE) {
587                Err(ErrorCode::NotPermitted.into())
588            } else {
589                Ok(file.clone())
590            }
591        }) {
592            Ok(file) => {
593                data.pipe(&mut store, WriteStreamConsumer::new_append(file, result_tx))?;
594            }
595            Err(err) => {
596                data.close(&mut store)?;
597                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
598            }
599        }
600        FutureReader::new(&mut store, result_rx)
601    }
602
603    async fn advise(
604        store: &Accessor<U, Self>,
605        fd: Resource<Descriptor>,
606        offset: Filesize,
607        length: Filesize,
608        advice: Advice,
609    ) -> FilesystemResult<()> {
610        let file = store.get_file(&fd)?;
611        file.advise(offset, length, advice.into()).await?;
612        Ok(())
613    }
614
615    async fn sync_data(
616        store: &Accessor<U, Self>,
617        fd: Resource<Descriptor>,
618    ) -> FilesystemResult<()> {
619        let fd = store.get_descriptor(&fd)?;
620        fd.sync_data().await?;
621        Ok(())
622    }
623
624    async fn get_flags(
625        store: &Accessor<U, Self>,
626        fd: Resource<Descriptor>,
627    ) -> FilesystemResult<DescriptorFlags> {
628        let fd = store.get_descriptor(&fd)?;
629        let flags = fd.get_flags().await?;
630        Ok(flags.into())
631    }
632
633    async fn get_type(
634        store: &Accessor<U, Self>,
635        fd: Resource<Descriptor>,
636    ) -> FilesystemResult<DescriptorType> {
637        let fd = store.get_descriptor(&fd)?;
638        let ty = fd.get_type().await?;
639        Ok(ty.into())
640    }
641
642    async fn set_size(
643        store: &Accessor<U, Self>,
644        fd: Resource<Descriptor>,
645        size: Filesize,
646    ) -> FilesystemResult<()> {
647        let file = store.get_file(&fd)?;
648        file.set_size(size).await?;
649        Ok(())
650    }
651
652    async fn set_times(
653        store: &Accessor<U, Self>,
654        fd: Resource<Descriptor>,
655        data_access_timestamp: NewTimestamp,
656        data_modification_timestamp: NewTimestamp,
657    ) -> FilesystemResult<()> {
658        let fd = store.get_descriptor(&fd)?;
659        let atim = systemtimespec_from(data_access_timestamp)?;
660        let mtim = systemtimespec_from(data_modification_timestamp)?;
661        fd.set_times(atim, mtim).await?;
662        Ok(())
663    }
664
665    fn read_directory(
666        mut store: Access<'_, U, Self>,
667        fd: Resource<Descriptor>,
668    ) -> wasmtime::Result<(
669        StreamReader<DirectoryEntry>,
670        FutureReader<Result<(), ErrorCode>>,
671    )> {
672        let (result_tx, result_rx) = oneshot::channel();
673        let stream = match get_dir(store.get().table, &fd).and_then(|dir| {
674            if !dir.perms.contains(DirPerms::READ) {
675                Err(ErrorCode::NotPermitted.into())
676            } else {
677                Ok(dir)
678            }
679        }) {
680            Ok(dir) => {
681                let allow_blocking_current_thread = dir.allow_blocking_current_thread;
682                let dir = Arc::clone(dir.as_dir());
683                if allow_blocking_current_thread {
684                    match dir.entries() {
685                        Ok(readdir) => StreamReader::new(
686                            &mut store,
687                            FallibleIteratorProducer::new(
688                                readdir.filter_map(|e| map_dir_entry(e).transpose()),
689                                result_tx,
690                            ),
691                        )?,
692                        Err(e) => {
693                            let _ = result_tx.send(Err(e.into()));
694                            StreamReader::new(&mut store, iter::empty())?
695                        }
696                    }
697                } else {
698                    StreamReader::new(&mut store, ReadDirStream::new(dir, result_tx))?
699                }
700            }
701            Err(err) => {
702                let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
703                StreamReader::new(&mut store, iter::empty())?
704            }
705        };
706        Ok((stream, FutureReader::new(&mut store, result_rx)?))
707    }
708
709    async fn sync(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
710        let fd = store.get_descriptor(&fd)?;
711        fd.sync().await?;
712        Ok(())
713    }
714
715    async fn create_directory_at(
716        store: &Accessor<U, Self>,
717        fd: Resource<Descriptor>,
718        path: String,
719    ) -> FilesystemResult<()> {
720        let dir = store.get_dir(&fd)?;
721        dir.create_directory_at(path).await?;
722        Ok(())
723    }
724
725    async fn stat(
726        store: &Accessor<U, Self>,
727        fd: Resource<Descriptor>,
728    ) -> FilesystemResult<DescriptorStat> {
729        let fd = store.get_descriptor(&fd)?;
730        let stat = fd.stat().await?;
731        Ok(stat.into())
732    }
733
734    async fn stat_at(
735        store: &Accessor<U, Self>,
736        fd: Resource<Descriptor>,
737        path_flags: PathFlags,
738        path: String,
739    ) -> FilesystemResult<DescriptorStat> {
740        let dir = store.get_dir(&fd)?;
741        let stat = dir.stat_at(path_flags.into(), path).await?;
742        Ok(stat.into())
743    }
744
745    async fn set_times_at(
746        store: &Accessor<U, Self>,
747        fd: Resource<Descriptor>,
748        path_flags: PathFlags,
749        path: String,
750        data_access_timestamp: NewTimestamp,
751        data_modification_timestamp: NewTimestamp,
752    ) -> FilesystemResult<()> {
753        let dir = store.get_dir(&fd)?;
754        let atim = systemtimespec_from(data_access_timestamp)?;
755        let mtim = systemtimespec_from(data_modification_timestamp)?;
756        dir.set_times_at(path_flags.into(), path, atim, mtim)
757            .await?;
758        Ok(())
759    }
760
761    async fn link_at(
762        store: &Accessor<U, Self>,
763        fd: Resource<Descriptor>,
764        old_path_flags: PathFlags,
765        old_path: String,
766        new_fd: Resource<Descriptor>,
767        new_path: String,
768    ) -> FilesystemResult<()> {
769        let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
770        old_dir
771            .link_at(old_path_flags.into(), old_path, &new_dir, new_path)
772            .await?;
773        Ok(())
774    }
775
776    async fn open_at(
777        store: &Accessor<U, Self>,
778        fd: Resource<Descriptor>,
779        path_flags: PathFlags,
780        path: String,
781        open_flags: OpenFlags,
782        flags: DescriptorFlags,
783    ) -> FilesystemResult<Resource<Descriptor>> {
784        let (allow_blocking_current_thread, dir) = store.with(|mut store| {
785            let store = store.get();
786            let dir = get_dir(&store.table, &fd)?;
787            FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
788        })?;
789        let fd = dir
790            .open_at(
791                path_flags.into(),
792                path,
793                open_flags.into(),
794                flags.into(),
795                allow_blocking_current_thread,
796            )
797            .await?;
798        let fd = store.with(|mut store| store.get().table.push(fd))?;
799        Ok(fd)
800    }
801
802    async fn readlink_at(
803        store: &Accessor<U, Self>,
804        fd: Resource<Descriptor>,
805        path: String,
806    ) -> FilesystemResult<String> {
807        let dir = store.get_dir(&fd)?;
808        let path = dir.readlink_at(path).await?;
809        Ok(path)
810    }
811
812    async fn remove_directory_at(
813        store: &Accessor<U, Self>,
814        fd: Resource<Descriptor>,
815        path: String,
816    ) -> FilesystemResult<()> {
817        let dir = store.get_dir(&fd)?;
818        dir.remove_directory_at(path).await?;
819        Ok(())
820    }
821
822    async fn rename_at(
823        store: &Accessor<U, Self>,
824        fd: Resource<Descriptor>,
825        old_path: String,
826        new_fd: Resource<Descriptor>,
827        new_path: String,
828    ) -> FilesystemResult<()> {
829        let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
830        old_dir.rename_at(old_path, &new_dir, new_path).await?;
831        Ok(())
832    }
833
834    async fn symlink_at(
835        store: &Accessor<U, Self>,
836        fd: Resource<Descriptor>,
837        old_path: String,
838        new_path: String,
839    ) -> FilesystemResult<()> {
840        let dir = store.get_dir(&fd)?;
841        dir.symlink_at(old_path, new_path).await?;
842        Ok(())
843    }
844
845    async fn unlink_file_at(
846        store: &Accessor<U, Self>,
847        fd: Resource<Descriptor>,
848        path: String,
849    ) -> FilesystemResult<()> {
850        let dir = store.get_dir(&fd)?;
851        dir.unlink_file_at(path).await?;
852        Ok(())
853    }
854
855    async fn is_same_object(
856        store: &Accessor<U, Self>,
857        fd: Resource<Descriptor>,
858        other: Resource<Descriptor>,
859    ) -> wasmtime::Result<bool> {
860        let (fd, other) = store.with(|mut store| {
861            let table = store.get().table;
862            let fd = get_descriptor(table, &fd)?.clone();
863            let other = get_descriptor(table, &other)?.clone();
864            wasmtime::error::Ok((fd, other))
865        })?;
866        fd.is_same_object(&other).await
867    }
868
869    async fn metadata_hash(
870        store: &Accessor<U, Self>,
871        fd: Resource<Descriptor>,
872    ) -> FilesystemResult<MetadataHashValue> {
873        let fd = store.get_descriptor(&fd)?;
874        let meta = fd.metadata_hash().await?;
875        Ok(meta.into())
876    }
877
878    async fn metadata_hash_at(
879        store: &Accessor<U, Self>,
880        fd: Resource<Descriptor>,
881        path_flags: PathFlags,
882        path: String,
883    ) -> FilesystemResult<MetadataHashValue> {
884        let dir = store.get_dir(&fd)?;
885        let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
886        Ok(meta.into())
887    }
888}
889
890impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
891    fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
892        self.table
893            .delete(fd)
894            .context("failed to delete descriptor resource from table")?;
895        Ok(())
896    }
897}
898
899impl preopens::Host for WasiFilesystemCtxView<'_> {
900    fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
901        self.get_directories()
902    }
903}