1use crate::TrappableError;
2use crate::filesystem::File;
3use crate::filesystem::sys;
4use crate::p2::bindings::filesystem::types;
5use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};
6use crate::runtime::AbortOnDropJoinHandle;
7use bytes::{Bytes, BytesMut};
8use std::io;
9use std::mem;
10use wasmtime::format_err;
11
12pub type FsResult<T> = Result<T, FsError>;
13
14pub type FsError = TrappableError<types::ErrorCode>;
15
16impl From<crate::filesystem::ErrorCode> for types::ErrorCode {
17 fn from(error: crate::filesystem::ErrorCode) -> Self {
18 match error {
19 crate::filesystem::ErrorCode::Access => Self::Access,
20 crate::filesystem::ErrorCode::Already => Self::Already,
21 crate::filesystem::ErrorCode::BadDescriptor => Self::BadDescriptor,
22 crate::filesystem::ErrorCode::Busy => Self::Busy,
23 crate::filesystem::ErrorCode::Exist => Self::Exist,
24 crate::filesystem::ErrorCode::FileTooLarge => Self::FileTooLarge,
25 crate::filesystem::ErrorCode::IllegalByteSequence => Self::IllegalByteSequence,
26 crate::filesystem::ErrorCode::InProgress => Self::InProgress,
27 crate::filesystem::ErrorCode::Interrupted => Self::Interrupted,
28 crate::filesystem::ErrorCode::Invalid => Self::Invalid,
29 crate::filesystem::ErrorCode::Io => Self::Io,
30 crate::filesystem::ErrorCode::IsDirectory => Self::IsDirectory,
31 crate::filesystem::ErrorCode::Loop => Self::Loop,
32 crate::filesystem::ErrorCode::TooManyLinks => Self::TooManyLinks,
33 crate::filesystem::ErrorCode::NameTooLong => Self::NameTooLong,
34 crate::filesystem::ErrorCode::NoEntry => Self::NoEntry,
35 crate::filesystem::ErrorCode::InsufficientMemory => Self::InsufficientMemory,
36 crate::filesystem::ErrorCode::InsufficientSpace => Self::InsufficientSpace,
37 crate::filesystem::ErrorCode::NotDirectory => Self::NotDirectory,
38 crate::filesystem::ErrorCode::NotEmpty => Self::NotEmpty,
39 crate::filesystem::ErrorCode::Unsupported => Self::Unsupported,
40 crate::filesystem::ErrorCode::Overflow => Self::Overflow,
41 crate::filesystem::ErrorCode::NotPermitted => Self::NotPermitted,
42 crate::filesystem::ErrorCode::Pipe => Self::Pipe,
43 crate::filesystem::ErrorCode::InvalidSeek => Self::InvalidSeek,
44 }
45 }
46}
47
48impl From<crate::filesystem::ErrorCode> for FsError {
49 fn from(error: crate::filesystem::ErrorCode) -> Self {
50 types::ErrorCode::from(error).into()
51 }
52}
53
54impl From<wasmtime::component::ResourceTableError> for FsError {
55 fn from(error: wasmtime::component::ResourceTableError) -> Self {
56 Self::trap(error)
57 }
58}
59
60impl From<io::Error> for FsError {
61 fn from(error: io::Error) -> Self {
62 types::ErrorCode::from(error).into()
63 }
64}
65
66pub struct FileInputStream {
67 file: File,
68 position: u64,
69 state: ReadState,
70}
71enum ReadState {
72 Idle,
73 Waiting(AbortOnDropJoinHandle<ReadState>),
74 DataAvailable(Bytes),
75 Error(io::Error),
76 Closed,
77}
78impl FileInputStream {
79 pub fn new(file: &File, position: u64) -> Self {
80 Self {
81 file: file.clone(),
82 position,
83 state: ReadState::Idle,
84 }
85 }
86
87 fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
88 let mut buf = BytesMut::zeroed(size.min(crate::MAX_READ_SIZE_ALLOC));
89 loop {
90 match sys::read_at_cursor_unspecified(file, &mut buf, offset) {
91 Ok(0) => return ReadState::Closed,
92 Ok(n) => {
93 buf.truncate(n);
94 return ReadState::DataAvailable(buf.freeze());
95 }
96 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
97 }
99 Err(e) => return ReadState::Error(e),
100 }
101 }
102 }
103
104 async fn wait_ready(&mut self) {
106 match &mut self.state {
107 ReadState::Waiting(task) => {
108 self.state = task.await;
109 }
110 _ => {}
111 }
112 }
113}
114#[async_trait::async_trait]
115impl InputStream for FileInputStream {
116 fn read(&mut self, size: usize) -> StreamResult<Bytes> {
117 match &mut self.state {
118 ReadState::Idle => {
119 if size == 0 {
120 return Ok(Bytes::new());
121 }
122
123 let p = self.position;
124 self.state = ReadState::Waiting(
125 self.file
126 .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
127 );
128 Ok(Bytes::new())
129 }
130 ReadState::DataAvailable(b) => {
131 let min_len = b.len().min(size);
132 let chunk = b.split_to(min_len);
133 if b.len() == 0 {
134 self.state = ReadState::Idle;
135 }
136 self.position += min_len as u64;
137 Ok(chunk)
138 }
139 ReadState::Waiting(_) => Ok(Bytes::new()),
140 ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
141 ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
142 _ => unreachable!(),
143 },
144 ReadState::Closed => Err(StreamError::Closed),
145 }
146 }
147 async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
150 self.wait_ready().await;
151
152 if let ReadState::Idle = self.state {
154 let p = self.position;
155 self.state = self
156 .file
157 .run_blocking(move |f| Self::blocking_read(f, p, size))
158 .await;
159 }
160
161 self.read(size)
162 }
163 async fn cancel(&mut self) {
164 match mem::replace(&mut self.state, ReadState::Closed) {
165 ReadState::Waiting(task) => {
166 task.cancel().await;
177 }
178 _ => {}
179 }
180 }
181}
182#[async_trait::async_trait]
183impl Pollable for FileInputStream {
184 async fn ready(&mut self) {
185 if let ReadState::Idle = self.state {
186 const DEFAULT_READ_SIZE: usize = 4096;
190 let p = self.position;
191 self.state = ReadState::Waiting(
192 self.file
193 .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
194 );
195 }
196
197 self.wait_ready().await
198 }
199}
200
201#[derive(Clone, Copy)]
202pub(crate) enum FileOutputMode {
203 Position(u64),
204 Append,
205}
206
207pub(crate) struct FileOutputStream {
208 file: File,
209 mode: FileOutputMode,
210 state: OutputState,
211}
212
213enum OutputState {
214 Ready,
215 Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
218 Error(io::Error),
220 Closed,
221}
222
223impl FileOutputStream {
224 pub fn write_at(file: &File, position: u64) -> Self {
225 Self {
226 file: file.clone(),
227 mode: FileOutputMode::Position(position),
228 state: OutputState::Ready,
229 }
230 }
231
232 pub fn append(file: &File) -> Self {
233 Self {
234 file: file.clone(),
235 mode: FileOutputMode::Append,
236 state: OutputState::Ready,
237 }
238 }
239
240 fn blocking_write(
241 file: &cap_std::fs::File,
242 mut buf: Bytes,
243 mode: FileOutputMode,
244 ) -> io::Result<usize> {
245 match mode {
246 FileOutputMode::Position(mut p) => {
247 let mut total = 0;
248 loop {
249 let nwritten = sys::write_at_cursor_unspecified(file, buf.as_ref(), p)?;
250 let _ = buf.split_to(nwritten);
252 p += nwritten as u64;
253 total += nwritten;
254 if buf.is_empty() {
255 break;
256 }
257 }
258 Ok(total)
259 }
260 FileOutputMode::Append => {
261 let mut total = 0;
262 loop {
263 let nwritten = sys::append_cursor_unspecified(file, buf.as_ref())?;
264 let _ = buf.split_to(nwritten);
265 total += nwritten;
266 if buf.is_empty() {
267 break;
268 }
269 }
270 Ok(total)
271 }
272 }
273 }
274}
275
276const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
278
279#[async_trait::async_trait]
280impl OutputStream for FileOutputStream {
281 fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
282 match self.state {
283 OutputState::Ready => {}
284 OutputState::Closed => return Err(StreamError::Closed),
285 OutputState::Waiting(_) | OutputState::Error(_) => {
286 return Err(StreamError::Trap(format_err!(
288 "write not permitted: check_write not called first"
289 )));
290 }
291 }
292
293 let m = self.mode;
294 self.state = OutputState::Waiting(
295 self.file
296 .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
297 );
298 Ok(())
299 }
300 async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
303 self.ready().await;
304
305 match self.state {
306 OutputState::Ready => {}
307 OutputState::Closed => return Err(StreamError::Closed),
308 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
309 OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
310 _ => unreachable!(),
311 },
312 OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
313 }
314
315 let m = self.mode;
316 match self
317 .file
318 .run_blocking(move |f| Self::blocking_write(f, buf, m))
319 .await
320 {
321 Ok(nwritten) => {
322 if let FileOutputMode::Position(p) = &mut self.mode {
323 *p += nwritten as u64;
324 }
325 self.state = OutputState::Ready;
326 Ok(())
327 }
328 Err(e) => {
329 self.state = OutputState::Closed;
330 Err(StreamError::LastOperationFailed(e.into()))
331 }
332 }
333 }
334 fn flush(&mut self) -> Result<(), StreamError> {
335 match self.state {
336 OutputState::Ready | OutputState::Waiting(_) => Ok(()),
340 OutputState::Closed => Err(StreamError::Closed),
341 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
342 OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
343 _ => unreachable!(),
344 },
345 }
346 }
347 fn check_write(&mut self) -> Result<usize, StreamError> {
348 match self.state {
349 OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
350 OutputState::Closed => Err(StreamError::Closed),
351 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
352 OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
353 _ => unreachable!(),
354 },
355 OutputState::Waiting(_) => Ok(0),
356 }
357 }
358 async fn cancel(&mut self) {
359 match mem::replace(&mut self.state, OutputState::Closed) {
360 OutputState::Waiting(task) => {
361 task.cancel().await;
372 }
373 _ => {}
374 }
375 }
376}
377
378#[async_trait::async_trait]
379impl Pollable for FileOutputStream {
380 async fn ready(&mut self) {
381 if let OutputState::Waiting(task) = &mut self.state {
382 self.state = match task.await {
383 Ok(nwritten) => {
384 if let FileOutputMode::Position(p) = &mut self.mode {
385 *p += nwritten as u64;
386 }
387 OutputState::Ready
388 }
389 Err(e) => OutputState::Error(e),
390 };
391 }
392 }
393}
394
395pub struct ReaddirIterator(
396 std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
397);
398
399impl ReaddirIterator {
400 pub(crate) fn new(
401 i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
402 ) -> Self {
403 ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
404 }
405 pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
406 self.0.lock().unwrap().next().transpose()
407 }
408}
409
410impl IntoIterator for ReaddirIterator {
411 type Item = FsResult<types::DirectoryEntry>;
412 type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
413
414 fn into_iter(self) -> Self::IntoIter {
415 self.0.into_inner().unwrap()
416 }
417}