Skip to main content

wasmtime_wasi/p2/
filesystem.rs

1use crate::TrappableError;
2use crate::filesystem::File;
3use crate::filesystem::sys;
4use crate::p2::bindings::filesystem::types;
5use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};
6use crate::runtime::AbortOnDropJoinHandle;
7use bytes::{Bytes, BytesMut};
8use std::io;
9use std::mem;
10use wasmtime::format_err;
11
12pub type FsResult<T> = Result<T, FsError>;
13
14pub type FsError = TrappableError<types::ErrorCode>;
15
16impl From<crate::filesystem::ErrorCode> for types::ErrorCode {
17    fn from(error: crate::filesystem::ErrorCode) -> Self {
18        match error {
19            crate::filesystem::ErrorCode::Access => Self::Access,
20            crate::filesystem::ErrorCode::Already => Self::Already,
21            crate::filesystem::ErrorCode::BadDescriptor => Self::BadDescriptor,
22            crate::filesystem::ErrorCode::Busy => Self::Busy,
23            crate::filesystem::ErrorCode::Exist => Self::Exist,
24            crate::filesystem::ErrorCode::FileTooLarge => Self::FileTooLarge,
25            crate::filesystem::ErrorCode::IllegalByteSequence => Self::IllegalByteSequence,
26            crate::filesystem::ErrorCode::InProgress => Self::InProgress,
27            crate::filesystem::ErrorCode::Interrupted => Self::Interrupted,
28            crate::filesystem::ErrorCode::Invalid => Self::Invalid,
29            crate::filesystem::ErrorCode::Io => Self::Io,
30            crate::filesystem::ErrorCode::IsDirectory => Self::IsDirectory,
31            crate::filesystem::ErrorCode::Loop => Self::Loop,
32            crate::filesystem::ErrorCode::TooManyLinks => Self::TooManyLinks,
33            crate::filesystem::ErrorCode::NameTooLong => Self::NameTooLong,
34            crate::filesystem::ErrorCode::NoEntry => Self::NoEntry,
35            crate::filesystem::ErrorCode::InsufficientMemory => Self::InsufficientMemory,
36            crate::filesystem::ErrorCode::InsufficientSpace => Self::InsufficientSpace,
37            crate::filesystem::ErrorCode::NotDirectory => Self::NotDirectory,
38            crate::filesystem::ErrorCode::NotEmpty => Self::NotEmpty,
39            crate::filesystem::ErrorCode::Unsupported => Self::Unsupported,
40            crate::filesystem::ErrorCode::Overflow => Self::Overflow,
41            crate::filesystem::ErrorCode::NotPermitted => Self::NotPermitted,
42            crate::filesystem::ErrorCode::Pipe => Self::Pipe,
43            crate::filesystem::ErrorCode::InvalidSeek => Self::InvalidSeek,
44        }
45    }
46}
47
48impl From<crate::filesystem::ErrorCode> for FsError {
49    fn from(error: crate::filesystem::ErrorCode) -> Self {
50        types::ErrorCode::from(error).into()
51    }
52}
53
54impl From<wasmtime::component::ResourceTableError> for FsError {
55    fn from(error: wasmtime::component::ResourceTableError) -> Self {
56        Self::trap(error)
57    }
58}
59
60impl From<io::Error> for FsError {
61    fn from(error: io::Error) -> Self {
62        types::ErrorCode::from(error).into()
63    }
64}
65
66pub struct FileInputStream {
67    file: File,
68    position: u64,
69    state: ReadState,
70}
71enum ReadState {
72    Idle,
73    Waiting(AbortOnDropJoinHandle<ReadState>),
74    DataAvailable(Bytes),
75    Error(io::Error),
76    Closed,
77}
78impl FileInputStream {
79    pub fn new(file: &File, position: u64) -> Self {
80        Self {
81            file: file.clone(),
82            position,
83            state: ReadState::Idle,
84        }
85    }
86
87    fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
88        let mut buf = BytesMut::zeroed(size.min(crate::MAX_READ_SIZE_ALLOC));
89        loop {
90            match sys::read_at_cursor_unspecified(file, &mut buf, offset) {
91                Ok(0) => return ReadState::Closed,
92                Ok(n) => {
93                    buf.truncate(n);
94                    return ReadState::DataAvailable(buf.freeze());
95                }
96                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
97                    // Try again, continue looping
98                }
99                Err(e) => return ReadState::Error(e),
100            }
101        }
102    }
103
104    /// Wait for existing background task to finish, without starting any new background reads.
105    async fn wait_ready(&mut self) {
106        match &mut self.state {
107            ReadState::Waiting(task) => {
108                self.state = task.await;
109            }
110            _ => {}
111        }
112    }
113}
114#[async_trait::async_trait]
115impl InputStream for FileInputStream {
116    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
117        match &mut self.state {
118            ReadState::Idle => {
119                if size == 0 {
120                    return Ok(Bytes::new());
121                }
122
123                let p = self.position;
124                self.state = ReadState::Waiting(
125                    self.file
126                        .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
127                );
128                Ok(Bytes::new())
129            }
130            ReadState::DataAvailable(b) => {
131                let min_len = b.len().min(size);
132                let chunk = b.split_to(min_len);
133                if b.len() == 0 {
134                    self.state = ReadState::Idle;
135                }
136                self.position += min_len as u64;
137                Ok(chunk)
138            }
139            ReadState::Waiting(_) => Ok(Bytes::new()),
140            ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
141                ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
142                _ => unreachable!(),
143            },
144            ReadState::Closed => Err(StreamError::Closed),
145        }
146    }
147    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
148    /// overhead on synchronous file I/O.
149    async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
150        self.wait_ready().await;
151
152        // Before we defer to the regular `read`, make sure it has data ready to go:
153        if let ReadState::Idle = self.state {
154            let p = self.position;
155            self.state = self
156                .file
157                .run_blocking(move |f| Self::blocking_read(f, p, size))
158                .await;
159        }
160
161        self.read(size)
162    }
163    async fn cancel(&mut self) {
164        match mem::replace(&mut self.state, ReadState::Closed) {
165            ReadState::Waiting(task) => {
166                // The task was created using `spawn_blocking`, so unless we're
167                // lucky enough that the task hasn't started yet, the abort
168                // signal won't have any effect and we're forced to wait for it
169                // to run to completion.
170                // From the guest's point of view, `input-stream::drop` then
171                // appears to block. Certainly less than ideal, but arguably still
172                // better than letting the guest rack up an unbounded number of
173                // background tasks. Also, the guest is only blocked if
174                // the stream was dropped mid-read, which we don't expect to
175                // occur frequently.
176                task.cancel().await;
177            }
178            _ => {}
179        }
180    }
181}
182#[async_trait::async_trait]
183impl Pollable for FileInputStream {
184    async fn ready(&mut self) {
185        if let ReadState::Idle = self.state {
186            // The guest hasn't initiated any read, but is nonetheless waiting
187            // for data to be available. We'll start a read for them:
188
189            const DEFAULT_READ_SIZE: usize = 4096;
190            let p = self.position;
191            self.state = ReadState::Waiting(
192                self.file
193                    .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
194            );
195        }
196
197        self.wait_ready().await
198    }
199}
200
201#[derive(Clone, Copy)]
202pub(crate) enum FileOutputMode {
203    Position(u64),
204    Append,
205}
206
207pub(crate) struct FileOutputStream {
208    file: File,
209    mode: FileOutputMode,
210    state: OutputState,
211}
212
213enum OutputState {
214    Ready,
215    /// Allows join future to be awaited in a cancellable manner. Gone variant indicates
216    /// no task is currently outstanding.
217    Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
218    /// The last I/O operation failed with this error.
219    Error(io::Error),
220    Closed,
221}
222
223impl FileOutputStream {
224    pub fn write_at(file: &File, position: u64) -> Self {
225        Self {
226            file: file.clone(),
227            mode: FileOutputMode::Position(position),
228            state: OutputState::Ready,
229        }
230    }
231
232    pub fn append(file: &File) -> Self {
233        Self {
234            file: file.clone(),
235            mode: FileOutputMode::Append,
236            state: OutputState::Ready,
237        }
238    }
239
240    fn blocking_write(
241        file: &cap_std::fs::File,
242        mut buf: Bytes,
243        mode: FileOutputMode,
244    ) -> io::Result<usize> {
245        match mode {
246            FileOutputMode::Position(mut p) => {
247                let mut total = 0;
248                loop {
249                    let nwritten = sys::write_at_cursor_unspecified(file, buf.as_ref(), p)?;
250                    // afterwards buf contains [nwritten, len):
251                    let _ = buf.split_to(nwritten);
252                    p += nwritten as u64;
253                    total += nwritten;
254                    if buf.is_empty() {
255                        break;
256                    }
257                }
258                Ok(total)
259            }
260            FileOutputMode::Append => {
261                let mut total = 0;
262                loop {
263                    let nwritten = sys::append_cursor_unspecified(file, buf.as_ref())?;
264                    let _ = buf.split_to(nwritten);
265                    total += nwritten;
266                    if buf.is_empty() {
267                        break;
268                    }
269                }
270                Ok(total)
271            }
272        }
273    }
274}
275
276// FIXME: configurable? determine from how much space left in file?
277const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
278
279#[async_trait::async_trait]
280impl OutputStream for FileOutputStream {
281    fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
282        match self.state {
283            OutputState::Ready => {}
284            OutputState::Closed => return Err(StreamError::Closed),
285            OutputState::Waiting(_) | OutputState::Error(_) => {
286                // a write is pending - this call was not permitted
287                return Err(StreamError::Trap(format_err!(
288                    "write not permitted: check_write not called first"
289                )));
290            }
291        }
292
293        let m = self.mode;
294        self.state = OutputState::Waiting(
295            self.file
296                .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
297        );
298        Ok(())
299    }
300    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
301    /// overhead on synchronous file I/O.
302    async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
303        self.ready().await;
304
305        match self.state {
306            OutputState::Ready => {}
307            OutputState::Closed => return Err(StreamError::Closed),
308            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
309                OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
310                _ => unreachable!(),
311            },
312            OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
313        }
314
315        let m = self.mode;
316        match self
317            .file
318            .run_blocking(move |f| Self::blocking_write(f, buf, m))
319            .await
320        {
321            Ok(nwritten) => {
322                if let FileOutputMode::Position(p) = &mut self.mode {
323                    *p += nwritten as u64;
324                }
325                self.state = OutputState::Ready;
326                Ok(())
327            }
328            Err(e) => {
329                self.state = OutputState::Closed;
330                Err(StreamError::LastOperationFailed(e.into()))
331            }
332        }
333    }
334    fn flush(&mut self) -> Result<(), StreamError> {
335        match self.state {
336            // Only userland buffering of file writes is in the blocking task,
337            // so there's nothing extra that needs to be done to request a
338            // flush.
339            OutputState::Ready | OutputState::Waiting(_) => Ok(()),
340            OutputState::Closed => Err(StreamError::Closed),
341            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
342                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
343                _ => unreachable!(),
344            },
345        }
346    }
347    fn check_write(&mut self) -> Result<usize, StreamError> {
348        match self.state {
349            OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
350            OutputState::Closed => Err(StreamError::Closed),
351            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
352                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
353                _ => unreachable!(),
354            },
355            OutputState::Waiting(_) => Ok(0),
356        }
357    }
358    async fn cancel(&mut self) {
359        match mem::replace(&mut self.state, OutputState::Closed) {
360            OutputState::Waiting(task) => {
361                // The task was created using `spawn_blocking`, so unless we're
362                // lucky enough that the task hasn't started yet, the abort
363                // signal won't have any effect and we're forced to wait for it
364                // to run to completion.
365                // From the guest's point of view, `output-stream::drop` then
366                // appears to block. Certainly less than ideal, but arguably still
367                // better than letting the guest rack up an unbounded number of
368                // background tasks. Also, the guest is only blocked if
369                // the stream was dropped mid-write, which we don't expect to
370                // occur frequently.
371                task.cancel().await;
372            }
373            _ => {}
374        }
375    }
376}
377
378#[async_trait::async_trait]
379impl Pollable for FileOutputStream {
380    async fn ready(&mut self) {
381        if let OutputState::Waiting(task) = &mut self.state {
382            self.state = match task.await {
383                Ok(nwritten) => {
384                    if let FileOutputMode::Position(p) = &mut self.mode {
385                        *p += nwritten as u64;
386                    }
387                    OutputState::Ready
388                }
389                Err(e) => OutputState::Error(e),
390            };
391        }
392    }
393}
394
395pub struct ReaddirIterator(
396    std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
397);
398
399impl ReaddirIterator {
400    pub(crate) fn new(
401        i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
402    ) -> Self {
403        ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
404    }
405    pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
406        self.0.lock().unwrap().next().transpose()
407    }
408}
409
410impl IntoIterator for ReaddirIterator {
411    type Item = FsResult<types::DirectoryEntry>;
412    type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
413
414    fn into_iter(self) -> Self::IntoIter {
415        self.0.into_inner().unwrap()
416    }
417}