wasmtime_wasi/p2/
filesystem.rs

1use crate::TrappableError;
2use crate::filesystem::File;
3use crate::p2::bindings::filesystem::types;
4use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};
5use crate::runtime::AbortOnDropJoinHandle;
6use anyhow::anyhow;
7use bytes::{Bytes, BytesMut};
8use std::io;
9use std::mem;
10
11pub type FsResult<T> = Result<T, FsError>;
12
13pub type FsError = TrappableError<types::ErrorCode>;
14
15impl From<crate::filesystem::ErrorCode> for types::ErrorCode {
16    fn from(error: crate::filesystem::ErrorCode) -> Self {
17        match error {
18            crate::filesystem::ErrorCode::Access => Self::Access,
19            crate::filesystem::ErrorCode::Already => Self::Already,
20            crate::filesystem::ErrorCode::BadDescriptor => Self::BadDescriptor,
21            crate::filesystem::ErrorCode::Busy => Self::Busy,
22            crate::filesystem::ErrorCode::Exist => Self::Exist,
23            crate::filesystem::ErrorCode::FileTooLarge => Self::FileTooLarge,
24            crate::filesystem::ErrorCode::IllegalByteSequence => Self::IllegalByteSequence,
25            crate::filesystem::ErrorCode::InProgress => Self::InProgress,
26            crate::filesystem::ErrorCode::Interrupted => Self::Interrupted,
27            crate::filesystem::ErrorCode::Invalid => Self::Invalid,
28            crate::filesystem::ErrorCode::Io => Self::Io,
29            crate::filesystem::ErrorCode::IsDirectory => Self::IsDirectory,
30            crate::filesystem::ErrorCode::Loop => Self::Loop,
31            crate::filesystem::ErrorCode::TooManyLinks => Self::TooManyLinks,
32            crate::filesystem::ErrorCode::NameTooLong => Self::NameTooLong,
33            crate::filesystem::ErrorCode::NoEntry => Self::NoEntry,
34            crate::filesystem::ErrorCode::InsufficientMemory => Self::InsufficientMemory,
35            crate::filesystem::ErrorCode::InsufficientSpace => Self::InsufficientSpace,
36            crate::filesystem::ErrorCode::NotDirectory => Self::NotDirectory,
37            crate::filesystem::ErrorCode::NotEmpty => Self::NotEmpty,
38            crate::filesystem::ErrorCode::Unsupported => Self::Unsupported,
39            crate::filesystem::ErrorCode::Overflow => Self::Overflow,
40            crate::filesystem::ErrorCode::NotPermitted => Self::NotPermitted,
41            crate::filesystem::ErrorCode::Pipe => Self::Pipe,
42            crate::filesystem::ErrorCode::InvalidSeek => Self::InvalidSeek,
43        }
44    }
45}
46
47impl From<crate::filesystem::ErrorCode> for FsError {
48    fn from(error: crate::filesystem::ErrorCode) -> Self {
49        types::ErrorCode::from(error).into()
50    }
51}
52
53impl From<wasmtime::component::ResourceTableError> for FsError {
54    fn from(error: wasmtime::component::ResourceTableError) -> Self {
55        Self::trap(error)
56    }
57}
58
59impl From<io::Error> for FsError {
60    fn from(error: io::Error) -> Self {
61        types::ErrorCode::from(error).into()
62    }
63}
64
65pub struct FileInputStream {
66    file: File,
67    position: u64,
68    state: ReadState,
69}
70enum ReadState {
71    Idle,
72    Waiting(AbortOnDropJoinHandle<ReadState>),
73    DataAvailable(Bytes),
74    Error(io::Error),
75    Closed,
76}
77impl FileInputStream {
78    pub fn new(file: &File, position: u64) -> Self {
79        Self {
80            file: file.clone(),
81            position,
82            state: ReadState::Idle,
83        }
84    }
85
86    fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
87        use system_interface::fs::FileIoExt;
88
89        let mut buf = BytesMut::zeroed(size);
90        loop {
91            match file.read_at(&mut buf, offset) {
92                Ok(0) => return ReadState::Closed,
93                Ok(n) => {
94                    buf.truncate(n);
95                    return ReadState::DataAvailable(buf.freeze());
96                }
97                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
98                    // Try again, continue looping
99                }
100                Err(e) => return ReadState::Error(e),
101            }
102        }
103    }
104
105    /// Wait for existing background task to finish, without starting any new background reads.
106    async fn wait_ready(&mut self) {
107        match &mut self.state {
108            ReadState::Waiting(task) => {
109                self.state = task.await;
110            }
111            _ => {}
112        }
113    }
114}
115#[async_trait::async_trait]
116impl InputStream for FileInputStream {
117    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
118        match &mut self.state {
119            ReadState::Idle => {
120                if size == 0 {
121                    return Ok(Bytes::new());
122                }
123
124                let p = self.position;
125                self.state = ReadState::Waiting(
126                    self.file
127                        .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
128                );
129                Ok(Bytes::new())
130            }
131            ReadState::DataAvailable(b) => {
132                let min_len = b.len().min(size);
133                let chunk = b.split_to(min_len);
134                if b.len() == 0 {
135                    self.state = ReadState::Idle;
136                }
137                self.position += min_len as u64;
138                Ok(chunk)
139            }
140            ReadState::Waiting(_) => Ok(Bytes::new()),
141            ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
142                ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
143                _ => unreachable!(),
144            },
145            ReadState::Closed => Err(StreamError::Closed),
146        }
147    }
148    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
149    /// overhead on synchronous file I/O.
150    async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
151        self.wait_ready().await;
152
153        // Before we defer to the regular `read`, make sure it has data ready to go:
154        if let ReadState::Idle = self.state {
155            let p = self.position;
156            self.state = self
157                .file
158                .run_blocking(move |f| Self::blocking_read(f, p, size))
159                .await;
160        }
161
162        self.read(size)
163    }
164    async fn cancel(&mut self) {
165        match mem::replace(&mut self.state, ReadState::Closed) {
166            ReadState::Waiting(task) => {
167                // The task was created using `spawn_blocking`, so unless we're
168                // lucky enough that the task hasn't started yet, the abort
169                // signal won't have any effect and we're forced to wait for it
170                // to run to completion.
171                // From the guest's point of view, `input-stream::drop` then
172                // appears to block. Certainly less than ideal, but arguably still
173                // better than letting the guest rack up an unbounded number of
174                // background tasks. Also, the guest is only blocked if
175                // the stream was dropped mid-read, which we don't expect to
176                // occur frequently.
177                task.cancel().await;
178            }
179            _ => {}
180        }
181    }
182}
183#[async_trait::async_trait]
184impl Pollable for FileInputStream {
185    async fn ready(&mut self) {
186        if let ReadState::Idle = self.state {
187            // The guest hasn't initiated any read, but is nonetheless waiting
188            // for data to be available. We'll start a read for them:
189
190            const DEFAULT_READ_SIZE: usize = 4096;
191            let p = self.position;
192            self.state = ReadState::Waiting(
193                self.file
194                    .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
195            );
196        }
197
198        self.wait_ready().await
199    }
200}
201
202#[derive(Clone, Copy)]
203pub(crate) enum FileOutputMode {
204    Position(u64),
205    Append,
206}
207
208pub(crate) struct FileOutputStream {
209    file: File,
210    mode: FileOutputMode,
211    state: OutputState,
212}
213
214enum OutputState {
215    Ready,
216    /// Allows join future to be awaited in a cancellable manner. Gone variant indicates
217    /// no task is currently outstanding.
218    Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
219    /// The last I/O operation failed with this error.
220    Error(io::Error),
221    Closed,
222}
223
224impl FileOutputStream {
225    pub fn write_at(file: &File, position: u64) -> Self {
226        Self {
227            file: file.clone(),
228            mode: FileOutputMode::Position(position),
229            state: OutputState::Ready,
230        }
231    }
232
233    pub fn append(file: &File) -> Self {
234        Self {
235            file: file.clone(),
236            mode: FileOutputMode::Append,
237            state: OutputState::Ready,
238        }
239    }
240
241    fn blocking_write(
242        file: &cap_std::fs::File,
243        mut buf: Bytes,
244        mode: FileOutputMode,
245    ) -> io::Result<usize> {
246        use system_interface::fs::FileIoExt;
247
248        match mode {
249            FileOutputMode::Position(mut p) => {
250                let mut total = 0;
251                loop {
252                    let nwritten = file.write_at(buf.as_ref(), p)?;
253                    // afterwards buf contains [nwritten, len):
254                    let _ = buf.split_to(nwritten);
255                    p += nwritten as u64;
256                    total += nwritten;
257                    if buf.is_empty() {
258                        break;
259                    }
260                }
261                Ok(total)
262            }
263            FileOutputMode::Append => {
264                let mut total = 0;
265                loop {
266                    let nwritten = file.append(buf.as_ref())?;
267                    let _ = buf.split_to(nwritten);
268                    total += nwritten;
269                    if buf.is_empty() {
270                        break;
271                    }
272                }
273                Ok(total)
274            }
275        }
276    }
277}
278
279// FIXME: configurable? determine from how much space left in file?
280const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
281
282#[async_trait::async_trait]
283impl OutputStream for FileOutputStream {
284    fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
285        match self.state {
286            OutputState::Ready => {}
287            OutputState::Closed => return Err(StreamError::Closed),
288            OutputState::Waiting(_) | OutputState::Error(_) => {
289                // a write is pending - this call was not permitted
290                return Err(StreamError::Trap(anyhow!(
291                    "write not permitted: check_write not called first"
292                )));
293            }
294        }
295
296        let m = self.mode;
297        self.state = OutputState::Waiting(
298            self.file
299                .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
300        );
301        Ok(())
302    }
303    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
304    /// overhead on synchronous file I/O.
305    async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
306        self.ready().await;
307
308        match self.state {
309            OutputState::Ready => {}
310            OutputState::Closed => return Err(StreamError::Closed),
311            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
312                OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
313                _ => unreachable!(),
314            },
315            OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
316        }
317
318        let m = self.mode;
319        match self
320            .file
321            .run_blocking(move |f| Self::blocking_write(f, buf, m))
322            .await
323        {
324            Ok(nwritten) => {
325                if let FileOutputMode::Position(p) = &mut self.mode {
326                    *p += nwritten as u64;
327                }
328                self.state = OutputState::Ready;
329                Ok(())
330            }
331            Err(e) => {
332                self.state = OutputState::Closed;
333                Err(StreamError::LastOperationFailed(e.into()))
334            }
335        }
336    }
337    fn flush(&mut self) -> Result<(), StreamError> {
338        match self.state {
339            // Only userland buffering of file writes is in the blocking task,
340            // so there's nothing extra that needs to be done to request a
341            // flush.
342            OutputState::Ready | OutputState::Waiting(_) => Ok(()),
343            OutputState::Closed => Err(StreamError::Closed),
344            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
345                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
346                _ => unreachable!(),
347            },
348        }
349    }
350    fn check_write(&mut self) -> Result<usize, StreamError> {
351        match self.state {
352            OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
353            OutputState::Closed => Err(StreamError::Closed),
354            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
355                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
356                _ => unreachable!(),
357            },
358            OutputState::Waiting(_) => Ok(0),
359        }
360    }
361    async fn cancel(&mut self) {
362        match mem::replace(&mut self.state, OutputState::Closed) {
363            OutputState::Waiting(task) => {
364                // The task was created using `spawn_blocking`, so unless we're
365                // lucky enough that the task hasn't started yet, the abort
366                // signal won't have any effect and we're forced to wait for it
367                // to run to completion.
368                // From the guest's point of view, `output-stream::drop` then
369                // appears to block. Certainly less than ideal, but arguably still
370                // better than letting the guest rack up an unbounded number of
371                // background tasks. Also, the guest is only blocked if
372                // the stream was dropped mid-write, which we don't expect to
373                // occur frequently.
374                task.cancel().await;
375            }
376            _ => {}
377        }
378    }
379}
380
381#[async_trait::async_trait]
382impl Pollable for FileOutputStream {
383    async fn ready(&mut self) {
384        if let OutputState::Waiting(task) = &mut self.state {
385            self.state = match task.await {
386                Ok(nwritten) => {
387                    if let FileOutputMode::Position(p) = &mut self.mode {
388                        *p += nwritten as u64;
389                    }
390                    OutputState::Ready
391                }
392                Err(e) => OutputState::Error(e),
393            };
394        }
395    }
396}
397
398pub struct ReaddirIterator(
399    std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
400);
401
402impl ReaddirIterator {
403    pub(crate) fn new(
404        i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
405    ) -> Self {
406        ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
407    }
408    pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
409        self.0.lock().unwrap().next().transpose()
410    }
411}
412
413impl IntoIterator for ReaddirIterator {
414    type Item = FsResult<types::DirectoryEntry>;
415    type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
416
417    fn into_iter(self) -> Self::IntoIter {
418        self.0.into_inner().unwrap()
419    }
420}