wasmtime_wasi/p2/
filesystem.rs

1use crate::p2::bindings::filesystem::types;
2use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};
3use crate::runtime::{AbortOnDropJoinHandle, spawn_blocking};
4use crate::{DirPerms, FilePerms, OpenMode, TrappableError};
5use anyhow::anyhow;
6use bytes::{Bytes, BytesMut};
7use std::io;
8use std::mem;
9use std::sync::Arc;
10
11pub type FsResult<T> = Result<T, FsError>;
12
13pub type FsError = TrappableError<types::ErrorCode>;
14
15impl From<wasmtime::component::ResourceTableError> for FsError {
16    fn from(error: wasmtime::component::ResourceTableError) -> Self {
17        Self::trap(error)
18    }
19}
20
21impl From<io::Error> for FsError {
22    fn from(error: io::Error) -> Self {
23        types::ErrorCode::from(error).into()
24    }
25}
26
27pub enum Descriptor {
28    File(File),
29    Dir(Dir),
30}
31
32impl Descriptor {
33    pub fn file(&self) -> Result<&File, types::ErrorCode> {
34        match self {
35            Descriptor::File(f) => Ok(f),
36            Descriptor::Dir(_) => Err(types::ErrorCode::BadDescriptor),
37        }
38    }
39
40    pub fn dir(&self) -> Result<&Dir, types::ErrorCode> {
41        match self {
42            Descriptor::Dir(d) => Ok(d),
43            Descriptor::File(_) => Err(types::ErrorCode::NotDirectory),
44        }
45    }
46
47    pub fn is_file(&self) -> bool {
48        match self {
49            Descriptor::File(_) => true,
50            Descriptor::Dir(_) => false,
51        }
52    }
53
54    pub fn is_dir(&self) -> bool {
55        match self {
56            Descriptor::File(_) => false,
57            Descriptor::Dir(_) => true,
58        }
59    }
60}
61
62#[derive(Clone)]
63pub struct File {
64    /// The operating system File this struct is mediating access to.
65    ///
66    /// Wrapped in an Arc because the same underlying file is used for
67    /// implementing the stream types. A copy is also needed for
68    /// [`spawn_blocking`].
69    ///
70    /// [`spawn_blocking`]: Self::spawn_blocking
71    pub file: Arc<cap_std::fs::File>,
72    /// Permissions to enforce on access to the file. These permissions are
73    /// specified by a user of the `crate::p2::WasiCtxBuilder`, and are
74    /// enforced prior to any enforced by the underlying operating system.
75    pub perms: FilePerms,
76    /// The mode the file was opened under: bits for reading, and writing.
77    /// Required to correctly report the DescriptorFlags, because cap-std
78    /// doesn't presently provide a cross-platform equivalent of reading the
79    /// oflags back out using fcntl.
80    pub open_mode: OpenMode,
81
82    allow_blocking_current_thread: bool,
83}
84
85impl File {
86    pub fn new(
87        file: cap_std::fs::File,
88        perms: FilePerms,
89        open_mode: OpenMode,
90        allow_blocking_current_thread: bool,
91    ) -> Self {
92        Self {
93            file: Arc::new(file),
94            perms,
95            open_mode,
96            allow_blocking_current_thread,
97        }
98    }
99
100    /// Execute the blocking `body` function.
101    ///
102    /// Depending on how the WasiCtx was configured, the body may either be:
103    /// - Executed directly on the current thread. In this case the `async`
104    ///   signature of this method is effectively a lie and the returned
105    ///   Future will always be immediately Ready. Or:
106    /// - Spawned on a background thread using [`tokio::task::spawn_blocking`]
107    ///   and immediately awaited.
108    ///
109    /// Intentionally blocking the executor thread might seem unorthodox, but is
110    /// not actually a problem for specific workloads. See:
111    /// - [`crate::p2::WasiCtxBuilder::allow_blocking_current_thread`]
112    /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973)
113    /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190)
114    pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
115    where
116        F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
117        R: Send + 'static,
118    {
119        match self.as_blocking_file() {
120            Some(file) => body(file),
121            None => self.spawn_blocking(body).await,
122        }
123    }
124
125    pub(crate) fn spawn_blocking<F, R>(&self, body: F) -> AbortOnDropJoinHandle<R>
126    where
127        F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
128        R: Send + 'static,
129    {
130        let f = self.file.clone();
131        spawn_blocking(move || body(&f))
132    }
133
134    /// Returns `Some` when the current thread is allowed to block in filesystem
135    /// operations, and otherwise returns `None` to indicate that
136    /// `spawn_blocking` must be used.
137    pub(crate) fn as_blocking_file(&self) -> Option<&cap_std::fs::File> {
138        if self.allow_blocking_current_thread {
139            Some(&self.file)
140        } else {
141            None
142        }
143    }
144}
145
146#[derive(Clone)]
147pub struct Dir {
148    /// The operating system file descriptor this struct is mediating access
149    /// to.
150    ///
151    /// Wrapped in an Arc because a copy is needed for [`spawn_blocking`].
152    ///
153    /// [`spawn_blocking`]: Self::spawn_blocking
154    pub dir: Arc<cap_std::fs::Dir>,
155    /// Permissions to enforce on access to this directory. These permissions
156    /// are specified by a user of the `crate::p2::WasiCtxBuilder`, and
157    /// are enforced prior to any enforced by the underlying operating system.
158    ///
159    /// These permissions are also enforced on any directories opened under
160    /// this directory.
161    pub perms: DirPerms,
162    /// Permissions to enforce on any files opened under this directory.
163    pub file_perms: FilePerms,
164    /// The mode the directory was opened under: bits for reading, and writing.
165    /// Required to correctly report the DescriptorFlags, because cap-std
166    /// doesn't presently provide a cross-platform equivalent of reading the
167    /// oflags back out using fcntl.
168    pub open_mode: OpenMode,
169
170    allow_blocking_current_thread: bool,
171}
172
173impl Dir {
174    pub fn new(
175        dir: cap_std::fs::Dir,
176        perms: DirPerms,
177        file_perms: FilePerms,
178        open_mode: OpenMode,
179        allow_blocking_current_thread: bool,
180    ) -> Self {
181        Dir {
182            dir: Arc::new(dir),
183            perms,
184            file_perms,
185            open_mode,
186            allow_blocking_current_thread,
187        }
188    }
189
190    /// Execute the blocking `body` function.
191    ///
192    /// Depending on how the WasiCtx was configured, the body may either be:
193    /// - Executed directly on the current thread. In this case the `async`
194    ///   signature of this method is effectively a lie and the returned
195    ///   Future will always be immediately Ready. Or:
196    /// - Spawned on a background thread using [`tokio::task::spawn_blocking`]
197    ///   and immediately awaited.
198    ///
199    /// Intentionally blocking the executor thread might seem unorthodox, but is
200    /// not actually a problem for specific workloads. See:
201    /// - [`crate::p2::WasiCtxBuilder::allow_blocking_current_thread`]
202    /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973)
203    /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190)
204    pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
205    where
206        F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static,
207        R: Send + 'static,
208    {
209        if self.allow_blocking_current_thread {
210            body(&self.dir)
211        } else {
212            let d = self.dir.clone();
213            spawn_blocking(move || body(&d)).await
214        }
215    }
216}
217
218pub struct FileInputStream {
219    file: File,
220    position: u64,
221    state: ReadState,
222}
223enum ReadState {
224    Idle,
225    Waiting(AbortOnDropJoinHandle<ReadState>),
226    DataAvailable(Bytes),
227    Error(io::Error),
228    Closed,
229}
230impl FileInputStream {
231    pub fn new(file: &File, position: u64) -> Self {
232        Self {
233            file: file.clone(),
234            position,
235            state: ReadState::Idle,
236        }
237    }
238
239    fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
240        use system_interface::fs::FileIoExt;
241
242        let mut buf = BytesMut::zeroed(size);
243        loop {
244            match file.read_at(&mut buf, offset) {
245                Ok(0) => return ReadState::Closed,
246                Ok(n) => {
247                    buf.truncate(n);
248                    return ReadState::DataAvailable(buf.freeze());
249                }
250                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
251                    // Try again, continue looping
252                }
253                Err(e) => return ReadState::Error(e),
254            }
255        }
256    }
257
258    /// Wait for existing background task to finish, without starting any new background reads.
259    async fn wait_ready(&mut self) {
260        match &mut self.state {
261            ReadState::Waiting(task) => {
262                self.state = task.await;
263            }
264            _ => {}
265        }
266    }
267}
268#[async_trait::async_trait]
269impl InputStream for FileInputStream {
270    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
271        match &mut self.state {
272            ReadState::Idle => {
273                if size == 0 {
274                    return Ok(Bytes::new());
275                }
276
277                let p = self.position;
278                self.state = ReadState::Waiting(
279                    self.file
280                        .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
281                );
282                Ok(Bytes::new())
283            }
284            ReadState::DataAvailable(b) => {
285                let min_len = b.len().min(size);
286                let chunk = b.split_to(min_len);
287                if b.len() == 0 {
288                    self.state = ReadState::Idle;
289                }
290                self.position += min_len as u64;
291                Ok(chunk)
292            }
293            ReadState::Waiting(_) => Ok(Bytes::new()),
294            ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
295                ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
296                _ => unreachable!(),
297            },
298            ReadState::Closed => Err(StreamError::Closed),
299        }
300    }
301    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
302    /// overhead on synchronous file I/O.
303    async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
304        self.wait_ready().await;
305
306        // Before we defer to the regular `read`, make sure it has data ready to go:
307        if let ReadState::Idle = self.state {
308            let p = self.position;
309            self.state = self
310                .file
311                .run_blocking(move |f| Self::blocking_read(f, p, size))
312                .await;
313        }
314
315        self.read(size)
316    }
317    async fn cancel(&mut self) {
318        match mem::replace(&mut self.state, ReadState::Closed) {
319            ReadState::Waiting(task) => {
320                // The task was created using `spawn_blocking`, so unless we're
321                // lucky enough that the task hasn't started yet, the abort
322                // signal won't have any effect and we're forced to wait for it
323                // to run to completion.
324                // From the guest's point of view, `input-stream::drop` then
325                // appears to block. Certainly less than ideal, but arguably still
326                // better than letting the guest rack up an unbounded number of
327                // background tasks. Also, the guest is only blocked if
328                // the stream was dropped mid-read, which we don't expect to
329                // occur frequently.
330                task.cancel().await;
331            }
332            _ => {}
333        }
334    }
335}
336#[async_trait::async_trait]
337impl Pollable for FileInputStream {
338    async fn ready(&mut self) {
339        if let ReadState::Idle = self.state {
340            // The guest hasn't initiated any read, but is nonetheless waiting
341            // for data to be available. We'll start a read for them:
342
343            const DEFAULT_READ_SIZE: usize = 4096;
344            let p = self.position;
345            self.state = ReadState::Waiting(
346                self.file
347                    .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
348            );
349        }
350
351        self.wait_ready().await
352    }
353}
354
355#[derive(Clone, Copy)]
356pub(crate) enum FileOutputMode {
357    Position(u64),
358    Append,
359}
360
361pub(crate) struct FileOutputStream {
362    file: File,
363    mode: FileOutputMode,
364    state: OutputState,
365}
366
367enum OutputState {
368    Ready,
369    /// Allows join future to be awaited in a cancellable manner. Gone variant indicates
370    /// no task is currently outstanding.
371    Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
372    /// The last I/O operation failed with this error.
373    Error(io::Error),
374    Closed,
375}
376
377impl FileOutputStream {
378    pub fn write_at(file: &File, position: u64) -> Self {
379        Self {
380            file: file.clone(),
381            mode: FileOutputMode::Position(position),
382            state: OutputState::Ready,
383        }
384    }
385
386    pub fn append(file: &File) -> Self {
387        Self {
388            file: file.clone(),
389            mode: FileOutputMode::Append,
390            state: OutputState::Ready,
391        }
392    }
393
394    fn blocking_write(
395        file: &cap_std::fs::File,
396        mut buf: Bytes,
397        mode: FileOutputMode,
398    ) -> io::Result<usize> {
399        use system_interface::fs::FileIoExt;
400
401        match mode {
402            FileOutputMode::Position(mut p) => {
403                let mut total = 0;
404                loop {
405                    let nwritten = file.write_at(buf.as_ref(), p)?;
406                    // afterwards buf contains [nwritten, len):
407                    let _ = buf.split_to(nwritten);
408                    p += nwritten as u64;
409                    total += nwritten;
410                    if buf.is_empty() {
411                        break;
412                    }
413                }
414                Ok(total)
415            }
416            FileOutputMode::Append => {
417                let mut total = 0;
418                loop {
419                    let nwritten = file.append(buf.as_ref())?;
420                    let _ = buf.split_to(nwritten);
421                    total += nwritten;
422                    if buf.is_empty() {
423                        break;
424                    }
425                }
426                Ok(total)
427            }
428        }
429    }
430}
431
432// FIXME: configurable? determine from how much space left in file?
433const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
434
435#[async_trait::async_trait]
436impl OutputStream for FileOutputStream {
437    fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
438        match self.state {
439            OutputState::Ready => {}
440            OutputState::Closed => return Err(StreamError::Closed),
441            OutputState::Waiting(_) | OutputState::Error(_) => {
442                // a write is pending - this call was not permitted
443                return Err(StreamError::Trap(anyhow!(
444                    "write not permitted: check_write not called first"
445                )));
446            }
447        }
448
449        let m = self.mode;
450        self.state = OutputState::Waiting(
451            self.file
452                .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
453        );
454        Ok(())
455    }
456    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
457    /// overhead on synchronous file I/O.
458    async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
459        self.ready().await;
460
461        match self.state {
462            OutputState::Ready => {}
463            OutputState::Closed => return Err(StreamError::Closed),
464            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
465                OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
466                _ => unreachable!(),
467            },
468            OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
469        }
470
471        let m = self.mode;
472        match self
473            .file
474            .run_blocking(move |f| Self::blocking_write(f, buf, m))
475            .await
476        {
477            Ok(nwritten) => {
478                if let FileOutputMode::Position(p) = &mut self.mode {
479                    *p += nwritten as u64;
480                }
481                self.state = OutputState::Ready;
482                Ok(())
483            }
484            Err(e) => {
485                self.state = OutputState::Closed;
486                Err(StreamError::LastOperationFailed(e.into()))
487            }
488        }
489    }
490    fn flush(&mut self) -> Result<(), StreamError> {
491        match self.state {
492            // Only userland buffering of file writes is in the blocking task,
493            // so there's nothing extra that needs to be done to request a
494            // flush.
495            OutputState::Ready | OutputState::Waiting(_) => Ok(()),
496            OutputState::Closed => Err(StreamError::Closed),
497            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
498                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
499                _ => unreachable!(),
500            },
501        }
502    }
503    fn check_write(&mut self) -> Result<usize, StreamError> {
504        match self.state {
505            OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
506            OutputState::Closed => Err(StreamError::Closed),
507            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
508                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
509                _ => unreachable!(),
510            },
511            OutputState::Waiting(_) => Ok(0),
512        }
513    }
514    async fn cancel(&mut self) {
515        match mem::replace(&mut self.state, OutputState::Closed) {
516            OutputState::Waiting(task) => {
517                // The task was created using `spawn_blocking`, so unless we're
518                // lucky enough that the task hasn't started yet, the abort
519                // signal won't have any effect and we're forced to wait for it
520                // to run to completion.
521                // From the guest's point of view, `output-stream::drop` then
522                // appears to block. Certainly less than ideal, but arguably still
523                // better than letting the guest rack up an unbounded number of
524                // background tasks. Also, the guest is only blocked if
525                // the stream was dropped mid-write, which we don't expect to
526                // occur frequently.
527                task.cancel().await;
528            }
529            _ => {}
530        }
531    }
532}
533
534#[async_trait::async_trait]
535impl Pollable for FileOutputStream {
536    async fn ready(&mut self) {
537        if let OutputState::Waiting(task) = &mut self.state {
538            self.state = match task.await {
539                Ok(nwritten) => {
540                    if let FileOutputMode::Position(p) = &mut self.mode {
541                        *p += nwritten as u64;
542                    }
543                    OutputState::Ready
544                }
545                Err(e) => OutputState::Error(e),
546            };
547        }
548    }
549}
550
551pub struct ReaddirIterator(
552    std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
553);
554
555impl ReaddirIterator {
556    pub(crate) fn new(
557        i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
558    ) -> Self {
559        ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
560    }
561    pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
562        self.0.lock().unwrap().next().transpose()
563    }
564}
565
566impl IntoIterator for ReaddirIterator {
567    type Item = FsResult<types::DirectoryEntry>;
568    type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
569
570    fn into_iter(self) -> Self::IntoIter {
571        self.0.into_inner().unwrap()
572    }
573}