1use crate::TrappableError;
2use crate::filesystem::File;
3use crate::p2::bindings::filesystem::types;
4use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};
5use crate::runtime::AbortOnDropJoinHandle;
6use anyhow::anyhow;
7use bytes::{Bytes, BytesMut};
8use std::io;
9use std::mem;
10
11pub type FsResult<T> = Result<T, FsError>;
12
13pub type FsError = TrappableError<types::ErrorCode>;
14
15impl From<crate::filesystem::ErrorCode> for types::ErrorCode {
16 fn from(error: crate::filesystem::ErrorCode) -> Self {
17 match error {
18 crate::filesystem::ErrorCode::Access => Self::Access,
19 crate::filesystem::ErrorCode::Already => Self::Already,
20 crate::filesystem::ErrorCode::BadDescriptor => Self::BadDescriptor,
21 crate::filesystem::ErrorCode::Busy => Self::Busy,
22 crate::filesystem::ErrorCode::Exist => Self::Exist,
23 crate::filesystem::ErrorCode::FileTooLarge => Self::FileTooLarge,
24 crate::filesystem::ErrorCode::IllegalByteSequence => Self::IllegalByteSequence,
25 crate::filesystem::ErrorCode::InProgress => Self::InProgress,
26 crate::filesystem::ErrorCode::Interrupted => Self::Interrupted,
27 crate::filesystem::ErrorCode::Invalid => Self::Invalid,
28 crate::filesystem::ErrorCode::Io => Self::Io,
29 crate::filesystem::ErrorCode::IsDirectory => Self::IsDirectory,
30 crate::filesystem::ErrorCode::Loop => Self::Loop,
31 crate::filesystem::ErrorCode::TooManyLinks => Self::TooManyLinks,
32 crate::filesystem::ErrorCode::NameTooLong => Self::NameTooLong,
33 crate::filesystem::ErrorCode::NoEntry => Self::NoEntry,
34 crate::filesystem::ErrorCode::InsufficientMemory => Self::InsufficientMemory,
35 crate::filesystem::ErrorCode::InsufficientSpace => Self::InsufficientSpace,
36 crate::filesystem::ErrorCode::NotDirectory => Self::NotDirectory,
37 crate::filesystem::ErrorCode::NotEmpty => Self::NotEmpty,
38 crate::filesystem::ErrorCode::Unsupported => Self::Unsupported,
39 crate::filesystem::ErrorCode::Overflow => Self::Overflow,
40 crate::filesystem::ErrorCode::NotPermitted => Self::NotPermitted,
41 crate::filesystem::ErrorCode::Pipe => Self::Pipe,
42 crate::filesystem::ErrorCode::InvalidSeek => Self::InvalidSeek,
43 }
44 }
45}
46
47impl From<crate::filesystem::ErrorCode> for FsError {
48 fn from(error: crate::filesystem::ErrorCode) -> Self {
49 types::ErrorCode::from(error).into()
50 }
51}
52
53impl From<wasmtime::component::ResourceTableError> for FsError {
54 fn from(error: wasmtime::component::ResourceTableError) -> Self {
55 Self::trap(error)
56 }
57}
58
59impl From<io::Error> for FsError {
60 fn from(error: io::Error) -> Self {
61 types::ErrorCode::from(error).into()
62 }
63}
64
65pub struct FileInputStream {
66 file: File,
67 position: u64,
68 state: ReadState,
69}
70enum ReadState {
71 Idle,
72 Waiting(AbortOnDropJoinHandle<ReadState>),
73 DataAvailable(Bytes),
74 Error(io::Error),
75 Closed,
76}
77impl FileInputStream {
78 pub fn new(file: &File, position: u64) -> Self {
79 Self {
80 file: file.clone(),
81 position,
82 state: ReadState::Idle,
83 }
84 }
85
86 fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
87 use system_interface::fs::FileIoExt;
88
89 let mut buf = BytesMut::zeroed(size);
90 loop {
91 match file.read_at(&mut buf, offset) {
92 Ok(0) => return ReadState::Closed,
93 Ok(n) => {
94 buf.truncate(n);
95 return ReadState::DataAvailable(buf.freeze());
96 }
97 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
98 }
100 Err(e) => return ReadState::Error(e),
101 }
102 }
103 }
104
105 async fn wait_ready(&mut self) {
107 match &mut self.state {
108 ReadState::Waiting(task) => {
109 self.state = task.await;
110 }
111 _ => {}
112 }
113 }
114}
115#[async_trait::async_trait]
116impl InputStream for FileInputStream {
117 fn read(&mut self, size: usize) -> StreamResult<Bytes> {
118 match &mut self.state {
119 ReadState::Idle => {
120 if size == 0 {
121 return Ok(Bytes::new());
122 }
123
124 let p = self.position;
125 self.state = ReadState::Waiting(
126 self.file
127 .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
128 );
129 Ok(Bytes::new())
130 }
131 ReadState::DataAvailable(b) => {
132 let min_len = b.len().min(size);
133 let chunk = b.split_to(min_len);
134 if b.len() == 0 {
135 self.state = ReadState::Idle;
136 }
137 self.position += min_len as u64;
138 Ok(chunk)
139 }
140 ReadState::Waiting(_) => Ok(Bytes::new()),
141 ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
142 ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
143 _ => unreachable!(),
144 },
145 ReadState::Closed => Err(StreamError::Closed),
146 }
147 }
148 async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
151 self.wait_ready().await;
152
153 if let ReadState::Idle = self.state {
155 let p = self.position;
156 self.state = self
157 .file
158 .run_blocking(move |f| Self::blocking_read(f, p, size))
159 .await;
160 }
161
162 self.read(size)
163 }
164 async fn cancel(&mut self) {
165 match mem::replace(&mut self.state, ReadState::Closed) {
166 ReadState::Waiting(task) => {
167 task.cancel().await;
178 }
179 _ => {}
180 }
181 }
182}
183#[async_trait::async_trait]
184impl Pollable for FileInputStream {
185 async fn ready(&mut self) {
186 if let ReadState::Idle = self.state {
187 const DEFAULT_READ_SIZE: usize = 4096;
191 let p = self.position;
192 self.state = ReadState::Waiting(
193 self.file
194 .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
195 );
196 }
197
198 self.wait_ready().await
199 }
200}
201
202#[derive(Clone, Copy)]
203pub(crate) enum FileOutputMode {
204 Position(u64),
205 Append,
206}
207
208pub(crate) struct FileOutputStream {
209 file: File,
210 mode: FileOutputMode,
211 state: OutputState,
212}
213
214enum OutputState {
215 Ready,
216 Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
219 Error(io::Error),
221 Closed,
222}
223
224impl FileOutputStream {
225 pub fn write_at(file: &File, position: u64) -> Self {
226 Self {
227 file: file.clone(),
228 mode: FileOutputMode::Position(position),
229 state: OutputState::Ready,
230 }
231 }
232
233 pub fn append(file: &File) -> Self {
234 Self {
235 file: file.clone(),
236 mode: FileOutputMode::Append,
237 state: OutputState::Ready,
238 }
239 }
240
241 fn blocking_write(
242 file: &cap_std::fs::File,
243 mut buf: Bytes,
244 mode: FileOutputMode,
245 ) -> io::Result<usize> {
246 use system_interface::fs::FileIoExt;
247
248 match mode {
249 FileOutputMode::Position(mut p) => {
250 let mut total = 0;
251 loop {
252 let nwritten = file.write_at(buf.as_ref(), p)?;
253 let _ = buf.split_to(nwritten);
255 p += nwritten as u64;
256 total += nwritten;
257 if buf.is_empty() {
258 break;
259 }
260 }
261 Ok(total)
262 }
263 FileOutputMode::Append => {
264 let mut total = 0;
265 loop {
266 let nwritten = file.append(buf.as_ref())?;
267 let _ = buf.split_to(nwritten);
268 total += nwritten;
269 if buf.is_empty() {
270 break;
271 }
272 }
273 Ok(total)
274 }
275 }
276 }
277}
278
279const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
281
282#[async_trait::async_trait]
283impl OutputStream for FileOutputStream {
284 fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
285 match self.state {
286 OutputState::Ready => {}
287 OutputState::Closed => return Err(StreamError::Closed),
288 OutputState::Waiting(_) | OutputState::Error(_) => {
289 return Err(StreamError::Trap(anyhow!(
291 "write not permitted: check_write not called first"
292 )));
293 }
294 }
295
296 let m = self.mode;
297 self.state = OutputState::Waiting(
298 self.file
299 .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
300 );
301 Ok(())
302 }
303 async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
306 self.ready().await;
307
308 match self.state {
309 OutputState::Ready => {}
310 OutputState::Closed => return Err(StreamError::Closed),
311 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
312 OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
313 _ => unreachable!(),
314 },
315 OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
316 }
317
318 let m = self.mode;
319 match self
320 .file
321 .run_blocking(move |f| Self::blocking_write(f, buf, m))
322 .await
323 {
324 Ok(nwritten) => {
325 if let FileOutputMode::Position(p) = &mut self.mode {
326 *p += nwritten as u64;
327 }
328 self.state = OutputState::Ready;
329 Ok(())
330 }
331 Err(e) => {
332 self.state = OutputState::Closed;
333 Err(StreamError::LastOperationFailed(e.into()))
334 }
335 }
336 }
337 fn flush(&mut self) -> Result<(), StreamError> {
338 match self.state {
339 OutputState::Ready | OutputState::Waiting(_) => Ok(()),
343 OutputState::Closed => Err(StreamError::Closed),
344 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
345 OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
346 _ => unreachable!(),
347 },
348 }
349 }
350 fn check_write(&mut self) -> Result<usize, StreamError> {
351 match self.state {
352 OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
353 OutputState::Closed => Err(StreamError::Closed),
354 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
355 OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
356 _ => unreachable!(),
357 },
358 OutputState::Waiting(_) => Ok(0),
359 }
360 }
361 async fn cancel(&mut self) {
362 match mem::replace(&mut self.state, OutputState::Closed) {
363 OutputState::Waiting(task) => {
364 task.cancel().await;
375 }
376 _ => {}
377 }
378 }
379}
380
381#[async_trait::async_trait]
382impl Pollable for FileOutputStream {
383 async fn ready(&mut self) {
384 if let OutputState::Waiting(task) = &mut self.state {
385 self.state = match task.await {
386 Ok(nwritten) => {
387 if let FileOutputMode::Position(p) = &mut self.mode {
388 *p += nwritten as u64;
389 }
390 OutputState::Ready
391 }
392 Err(e) => OutputState::Error(e),
393 };
394 }
395 }
396}
397
398pub struct ReaddirIterator(
399 std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
400);
401
402impl ReaddirIterator {
403 pub(crate) fn new(
404 i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
405 ) -> Self {
406 ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
407 }
408 pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
409 self.0.lock().unwrap().next().transpose()
410 }
411}
412
413impl IntoIterator for ReaddirIterator {
414 type Item = FsResult<types::DirectoryEntry>;
415 type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
416
417 fn into_iter(self) -> Self::IntoIter {
418 self.0.into_inner().unwrap()
419 }
420}