1use crate::p2::bindings::filesystem::types;
2use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};
3use crate::runtime::{AbortOnDropJoinHandle, spawn_blocking};
4use crate::{DirPerms, FilePerms, OpenMode, TrappableError};
5use anyhow::anyhow;
6use bytes::{Bytes, BytesMut};
7use std::io;
8use std::mem;
9use std::sync::Arc;
10
11pub type FsResult<T> = Result<T, FsError>;
12
13pub type FsError = TrappableError<types::ErrorCode>;
14
15impl From<wasmtime::component::ResourceTableError> for FsError {
16 fn from(error: wasmtime::component::ResourceTableError) -> Self {
17 Self::trap(error)
18 }
19}
20
21impl From<io::Error> for FsError {
22 fn from(error: io::Error) -> Self {
23 types::ErrorCode::from(error).into()
24 }
25}
26
27pub enum Descriptor {
28 File(File),
29 Dir(Dir),
30}
31
32impl Descriptor {
33 pub fn file(&self) -> Result<&File, types::ErrorCode> {
34 match self {
35 Descriptor::File(f) => Ok(f),
36 Descriptor::Dir(_) => Err(types::ErrorCode::BadDescriptor),
37 }
38 }
39
40 pub fn dir(&self) -> Result<&Dir, types::ErrorCode> {
41 match self {
42 Descriptor::Dir(d) => Ok(d),
43 Descriptor::File(_) => Err(types::ErrorCode::NotDirectory),
44 }
45 }
46
47 pub fn is_file(&self) -> bool {
48 match self {
49 Descriptor::File(_) => true,
50 Descriptor::Dir(_) => false,
51 }
52 }
53
54 pub fn is_dir(&self) -> bool {
55 match self {
56 Descriptor::File(_) => false,
57 Descriptor::Dir(_) => true,
58 }
59 }
60}
61
62#[derive(Clone)]
63pub struct File {
64 pub file: Arc<cap_std::fs::File>,
72 pub perms: FilePerms,
76 pub open_mode: OpenMode,
81
82 allow_blocking_current_thread: bool,
83}
84
85impl File {
86 pub fn new(
87 file: cap_std::fs::File,
88 perms: FilePerms,
89 open_mode: OpenMode,
90 allow_blocking_current_thread: bool,
91 ) -> Self {
92 Self {
93 file: Arc::new(file),
94 perms,
95 open_mode,
96 allow_blocking_current_thread,
97 }
98 }
99
100 pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
115 where
116 F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
117 R: Send + 'static,
118 {
119 match self.as_blocking_file() {
120 Some(file) => body(file),
121 None => self.spawn_blocking(body).await,
122 }
123 }
124
125 pub(crate) fn spawn_blocking<F, R>(&self, body: F) -> AbortOnDropJoinHandle<R>
126 where
127 F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
128 R: Send + 'static,
129 {
130 let f = self.file.clone();
131 spawn_blocking(move || body(&f))
132 }
133
134 pub(crate) fn as_blocking_file(&self) -> Option<&cap_std::fs::File> {
138 if self.allow_blocking_current_thread {
139 Some(&self.file)
140 } else {
141 None
142 }
143 }
144}
145
146#[derive(Clone)]
147pub struct Dir {
148 pub dir: Arc<cap_std::fs::Dir>,
155 pub perms: DirPerms,
162 pub file_perms: FilePerms,
164 pub open_mode: OpenMode,
169
170 allow_blocking_current_thread: bool,
171}
172
173impl Dir {
174 pub fn new(
175 dir: cap_std::fs::Dir,
176 perms: DirPerms,
177 file_perms: FilePerms,
178 open_mode: OpenMode,
179 allow_blocking_current_thread: bool,
180 ) -> Self {
181 Dir {
182 dir: Arc::new(dir),
183 perms,
184 file_perms,
185 open_mode,
186 allow_blocking_current_thread,
187 }
188 }
189
190 pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
205 where
206 F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static,
207 R: Send + 'static,
208 {
209 if self.allow_blocking_current_thread {
210 body(&self.dir)
211 } else {
212 let d = self.dir.clone();
213 spawn_blocking(move || body(&d)).await
214 }
215 }
216}
217
218pub struct FileInputStream {
219 file: File,
220 position: u64,
221 state: ReadState,
222}
223enum ReadState {
224 Idle,
225 Waiting(AbortOnDropJoinHandle<ReadState>),
226 DataAvailable(Bytes),
227 Error(io::Error),
228 Closed,
229}
230impl FileInputStream {
231 pub fn new(file: &File, position: u64) -> Self {
232 Self {
233 file: file.clone(),
234 position,
235 state: ReadState::Idle,
236 }
237 }
238
239 fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
240 use system_interface::fs::FileIoExt;
241
242 let mut buf = BytesMut::zeroed(size);
243 loop {
244 match file.read_at(&mut buf, offset) {
245 Ok(0) => return ReadState::Closed,
246 Ok(n) => {
247 buf.truncate(n);
248 return ReadState::DataAvailable(buf.freeze());
249 }
250 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
251 }
253 Err(e) => return ReadState::Error(e),
254 }
255 }
256 }
257
258 async fn wait_ready(&mut self) {
260 match &mut self.state {
261 ReadState::Waiting(task) => {
262 self.state = task.await;
263 }
264 _ => {}
265 }
266 }
267}
268#[async_trait::async_trait]
269impl InputStream for FileInputStream {
270 fn read(&mut self, size: usize) -> StreamResult<Bytes> {
271 match &mut self.state {
272 ReadState::Idle => {
273 if size == 0 {
274 return Ok(Bytes::new());
275 }
276
277 let p = self.position;
278 self.state = ReadState::Waiting(
279 self.file
280 .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
281 );
282 Ok(Bytes::new())
283 }
284 ReadState::DataAvailable(b) => {
285 let min_len = b.len().min(size);
286 let chunk = b.split_to(min_len);
287 if b.len() == 0 {
288 self.state = ReadState::Idle;
289 }
290 self.position += min_len as u64;
291 Ok(chunk)
292 }
293 ReadState::Waiting(_) => Ok(Bytes::new()),
294 ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
295 ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
296 _ => unreachable!(),
297 },
298 ReadState::Closed => Err(StreamError::Closed),
299 }
300 }
301 async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
304 self.wait_ready().await;
305
306 if let ReadState::Idle = self.state {
308 let p = self.position;
309 self.state = self
310 .file
311 .run_blocking(move |f| Self::blocking_read(f, p, size))
312 .await;
313 }
314
315 self.read(size)
316 }
317 async fn cancel(&mut self) {
318 match mem::replace(&mut self.state, ReadState::Closed) {
319 ReadState::Waiting(task) => {
320 task.cancel().await;
331 }
332 _ => {}
333 }
334 }
335}
336#[async_trait::async_trait]
337impl Pollable for FileInputStream {
338 async fn ready(&mut self) {
339 if let ReadState::Idle = self.state {
340 const DEFAULT_READ_SIZE: usize = 4096;
344 let p = self.position;
345 self.state = ReadState::Waiting(
346 self.file
347 .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
348 );
349 }
350
351 self.wait_ready().await
352 }
353}
354
355#[derive(Clone, Copy)]
356pub(crate) enum FileOutputMode {
357 Position(u64),
358 Append,
359}
360
361pub(crate) struct FileOutputStream {
362 file: File,
363 mode: FileOutputMode,
364 state: OutputState,
365}
366
367enum OutputState {
368 Ready,
369 Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
372 Error(io::Error),
374 Closed,
375}
376
377impl FileOutputStream {
378 pub fn write_at(file: &File, position: u64) -> Self {
379 Self {
380 file: file.clone(),
381 mode: FileOutputMode::Position(position),
382 state: OutputState::Ready,
383 }
384 }
385
386 pub fn append(file: &File) -> Self {
387 Self {
388 file: file.clone(),
389 mode: FileOutputMode::Append,
390 state: OutputState::Ready,
391 }
392 }
393
394 fn blocking_write(
395 file: &cap_std::fs::File,
396 mut buf: Bytes,
397 mode: FileOutputMode,
398 ) -> io::Result<usize> {
399 use system_interface::fs::FileIoExt;
400
401 match mode {
402 FileOutputMode::Position(mut p) => {
403 let mut total = 0;
404 loop {
405 let nwritten = file.write_at(buf.as_ref(), p)?;
406 let _ = buf.split_to(nwritten);
408 p += nwritten as u64;
409 total += nwritten;
410 if buf.is_empty() {
411 break;
412 }
413 }
414 Ok(total)
415 }
416 FileOutputMode::Append => {
417 let mut total = 0;
418 loop {
419 let nwritten = file.append(buf.as_ref())?;
420 let _ = buf.split_to(nwritten);
421 total += nwritten;
422 if buf.is_empty() {
423 break;
424 }
425 }
426 Ok(total)
427 }
428 }
429 }
430}
431
432const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
434
435#[async_trait::async_trait]
436impl OutputStream for FileOutputStream {
437 fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
438 match self.state {
439 OutputState::Ready => {}
440 OutputState::Closed => return Err(StreamError::Closed),
441 OutputState::Waiting(_) | OutputState::Error(_) => {
442 return Err(StreamError::Trap(anyhow!(
444 "write not permitted: check_write not called first"
445 )));
446 }
447 }
448
449 let m = self.mode;
450 self.state = OutputState::Waiting(
451 self.file
452 .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
453 );
454 Ok(())
455 }
456 async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
459 self.ready().await;
460
461 match self.state {
462 OutputState::Ready => {}
463 OutputState::Closed => return Err(StreamError::Closed),
464 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
465 OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
466 _ => unreachable!(),
467 },
468 OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
469 }
470
471 let m = self.mode;
472 match self
473 .file
474 .run_blocking(move |f| Self::blocking_write(f, buf, m))
475 .await
476 {
477 Ok(nwritten) => {
478 if let FileOutputMode::Position(p) = &mut self.mode {
479 *p += nwritten as u64;
480 }
481 self.state = OutputState::Ready;
482 Ok(())
483 }
484 Err(e) => {
485 self.state = OutputState::Closed;
486 Err(StreamError::LastOperationFailed(e.into()))
487 }
488 }
489 }
490 fn flush(&mut self) -> Result<(), StreamError> {
491 match self.state {
492 OutputState::Ready | OutputState::Waiting(_) => Ok(()),
496 OutputState::Closed => Err(StreamError::Closed),
497 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
498 OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
499 _ => unreachable!(),
500 },
501 }
502 }
503 fn check_write(&mut self) -> Result<usize, StreamError> {
504 match self.state {
505 OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
506 OutputState::Closed => Err(StreamError::Closed),
507 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
508 OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
509 _ => unreachable!(),
510 },
511 OutputState::Waiting(_) => Ok(0),
512 }
513 }
514 async fn cancel(&mut self) {
515 match mem::replace(&mut self.state, OutputState::Closed) {
516 OutputState::Waiting(task) => {
517 task.cancel().await;
528 }
529 _ => {}
530 }
531 }
532}
533
534#[async_trait::async_trait]
535impl Pollable for FileOutputStream {
536 async fn ready(&mut self) {
537 if let OutputState::Waiting(task) = &mut self.state {
538 self.state = match task.await {
539 Ok(nwritten) => {
540 if let FileOutputMode::Position(p) = &mut self.mode {
541 *p += nwritten as u64;
542 }
543 OutputState::Ready
544 }
545 Err(e) => OutputState::Error(e),
546 };
547 }
548 }
549}
550
551pub struct ReaddirIterator(
552 std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
553);
554
555impl ReaddirIterator {
556 pub(crate) fn new(
557 i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
558 ) -> Self {
559 ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
560 }
561 pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
562 self.0.lock().unwrap().next().transpose()
563 }
564}
565
566impl IntoIterator for ReaddirIterator {
567 type Item = FsResult<types::DirectoryEntry>;
568 type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
569
570 fn into_iter(self) -> Self::IntoIter {
571 self.0.into_inner().unwrap()
572 }
573}