1use crate::filesystem::sys;
2use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
3use crate::p3::bindings::clocks::system_clock;
4use crate::p3::bindings::filesystem::types::{
5 self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
6 Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
7};
8use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
9use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
10use crate::{DirPerms, FilePerms};
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll, ready};
14use core::{iter, mem};
15use std::io::{self, Cursor};
16use std::sync::Arc;
17use tokio::sync::{mpsc, oneshot};
18use tokio::task::{JoinHandle, spawn_blocking};
19use wasmtime::StoreContextMut;
20use wasmtime::component::{
21 Access, Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
22 StreamProducer, StreamReader, StreamResult,
23};
24use wasmtime::error::Context as _;
25
26fn get_descriptor<'a>(
27 table: &'a ResourceTable,
28 fd: &'a Resource<Descriptor>,
29) -> FilesystemResult<&'a Descriptor> {
30 table
31 .get(fd)
32 .context("failed to get descriptor resource from table")
33 .map_err(FilesystemError::trap)
34}
35
36fn get_file<'a>(
37 table: &'a ResourceTable,
38 fd: &'a Resource<Descriptor>,
39) -> FilesystemResult<&'a File> {
40 let file = get_descriptor(table, fd).map(Descriptor::file)??;
41 Ok(file)
42}
43
44fn get_dir<'a>(
45 table: &'a ResourceTable,
46 fd: &'a Resource<Descriptor>,
47) -> FilesystemResult<&'a Dir> {
48 let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
49 Ok(dir)
50}
51
52trait AccessorExt {
53 fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
54 fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
55 fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
56 fn get_dir_pair(
57 &self,
58 a: &Resource<Descriptor>,
59 b: &Resource<Descriptor>,
60 ) -> FilesystemResult<(Dir, Dir)>;
61}
62
63impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
64 fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
65 self.with(|mut store| {
66 let fd = get_descriptor(store.get().table, fd)?;
67 Ok(fd.clone())
68 })
69 }
70
71 fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
72 self.with(|mut store| {
73 let file = get_file(store.get().table, fd)?;
74 Ok(file.clone())
75 })
76 }
77
78 fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
79 self.with(|mut store| {
80 let dir = get_dir(store.get().table, fd)?;
81 Ok(dir.clone())
82 })
83 }
84
85 fn get_dir_pair(
86 &self,
87 a: &Resource<Descriptor>,
88 b: &Resource<Descriptor>,
89 ) -> FilesystemResult<(Dir, Dir)> {
90 self.with(|mut store| {
91 let table = store.get().table;
92 let a = get_dir(table, a)?;
93 let b = get_dir(table, b)?;
94 Ok((a.clone(), b.clone()))
95 })
96 }
97}
98
99fn systemtime_from(t: system_clock::Instant) -> Result<std::time::SystemTime, ErrorCode> {
100 if let Ok(seconds) = t.seconds.try_into() {
101 std::time::SystemTime::UNIX_EPOCH
102 .checked_add(core::time::Duration::new(seconds, t.nanoseconds))
103 .ok_or(ErrorCode::Overflow)
104 } else {
105 std::time::SystemTime::UNIX_EPOCH
106 .checked_sub(core::time::Duration::new(
107 t.seconds.unsigned_abs(),
108 t.nanoseconds,
109 ))
110 .ok_or(ErrorCode::Overflow)
111 }
112}
113
114fn systemtimespec_from(t: NewTimestamp) -> Result<Option<fs_set_times::SystemTimeSpec>, ErrorCode> {
115 use fs_set_times::SystemTimeSpec;
116 match t {
117 NewTimestamp::NoChange => Ok(None),
118 NewTimestamp::Now => Ok(Some(SystemTimeSpec::SymbolicNow)),
119 NewTimestamp::Timestamp(st) => {
120 let st = systemtime_from(st)?;
121 Ok(Some(SystemTimeSpec::Absolute(st)))
122 }
123 }
124}
125
126struct ReadStreamProducer {
127 file: File,
128 offset: u64,
129 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
130 task: Option<JoinHandle<std::io::Result<BytesMut>>>,
131}
132
133impl Drop for ReadStreamProducer {
134 fn drop(&mut self) {
135 self.close(Ok(()))
136 }
137}
138
139impl ReadStreamProducer {
140 fn close(&mut self, res: Result<(), ErrorCode>) {
141 if let Some(tx) = self.result.take() {
142 _ = tx.send(res);
143 }
144 }
145
146 fn complete_read(&mut self, amt: usize) -> StreamResult {
148 let Ok(amt) = amt.try_into() else {
149 self.close(Err(ErrorCode::Overflow));
150 return StreamResult::Dropped;
151 };
152 let Some(amt) = self.offset.checked_add(amt) else {
153 self.close(Err(ErrorCode::Overflow));
154 return StreamResult::Dropped;
155 };
156 self.offset = amt;
157 StreamResult::Completed
158 }
159}
160
161impl<D> StreamProducer<D> for ReadStreamProducer {
162 type Item = u8;
163 type Buffer = Cursor<BytesMut>;
164
165 fn poll_produce<'a>(
166 mut self: Pin<&mut Self>,
167 cx: &mut Context<'_>,
168 store: StoreContextMut<'a, D>,
169 mut dst: Destination<'a, Self::Item, Self::Buffer>,
170 finish: bool,
171 ) -> Poll<wasmtime::Result<StreamResult>> {
172 if let Some(file) = self.file.as_blocking_file() {
173 assert!(self.task.is_none());
175 let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
176 let buf = dst.remaining();
177 if buf.is_empty() {
178 return Poll::Ready(Ok(StreamResult::Completed));
179 }
180 return match sys::read_at_cursor_unspecified(file, buf, self.offset) {
181 Ok(0) => {
182 self.close(Ok(()));
183 Poll::Ready(Ok(StreamResult::Dropped))
184 }
185 Ok(n) => {
186 dst.mark_written(n);
187 Poll::Ready(Ok(self.complete_read(n)))
188 }
189 Err(err) => {
190 self.close(Err(err.into()));
191 Poll::Ready(Ok(StreamResult::Dropped))
192 }
193 };
194 }
195
196 let me = &mut *self;
198 let task = me.task.get_or_insert_with(|| {
199 let mut buf = dst.take_buffer().into_inner();
200 buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
201 let file = Arc::clone(me.file.as_file());
202 let offset = me.offset;
203 spawn_blocking(move || {
204 sys::read_at_cursor_unspecified(file, &mut buf, offset).map(|n| {
205 buf.truncate(n);
206 buf
207 })
208 })
209 });
210
211 let result = match Pin::new(&mut *task).poll(cx) {
215 Poll::Pending if finish => {
219 task.abort();
220 ready!(Pin::new(task).poll(cx))
221 }
222 other => ready!(other),
223 };
224 self.task = None;
225 match result {
226 Ok(Ok(buf)) if buf.is_empty() => {
227 self.close(Ok(()));
228 Poll::Ready(Ok(StreamResult::Dropped))
229 }
230 Ok(Ok(buf)) => {
231 let n = buf.len();
232 dst.set_buffer(Cursor::new(buf));
233 Poll::Ready(Ok(self.complete_read(n)))
234 }
235 Ok(Err(err)) => {
236 self.close(Err(err.into()));
237 Poll::Ready(Ok(StreamResult::Dropped))
238 }
239 Err(err) => {
240 if err.is_cancelled() {
241 return Poll::Ready(Ok(StreamResult::Cancelled));
242 }
243 panic!("I/O task should not panic: {err}")
244 }
245 }
246 }
247}
248
249fn map_dir_entry(
250 entry: std::io::Result<cap_std::fs::DirEntry>,
251) -> Result<Option<DirectoryEntry>, ErrorCode> {
252 match entry {
253 Ok(entry) => {
254 let meta = entry.metadata()?;
255 let Ok(name) = entry.file_name().into_string() else {
256 return Err(ErrorCode::IllegalByteSequence);
257 };
258 Ok(Some(DirectoryEntry {
259 type_: meta.file_type().into(),
260 name,
261 }))
262 }
263 Err(err) => {
264 #[cfg(windows)]
267 {
268 use windows_sys::Win32::Foundation::{
269 ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
270 };
271 if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
272 || err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
273 {
274 return Ok(None);
275 }
276 }
277 Err(err.into())
278 }
279 }
280}
281
282struct ReadDirStream {
283 rx: mpsc::Receiver<DirectoryEntry>,
284 task: JoinHandle<Result<(), ErrorCode>>,
285 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
286}
287
288impl ReadDirStream {
289 fn new(
290 dir: Arc<cap_std::fs::Dir>,
291 result: oneshot::Sender<Result<(), ErrorCode>>,
292 ) -> ReadDirStream {
293 let (tx, rx) = mpsc::channel(1);
294 ReadDirStream {
295 task: spawn_blocking(move || {
296 let entries = dir.entries()?;
297 for entry in entries {
298 if let Some(entry) = map_dir_entry(entry)? {
299 if let Err(_) = tx.blocking_send(entry) {
300 break;
301 }
302 }
303 }
304 Ok(())
305 }),
306 rx,
307 result: Some(result),
308 }
309 }
310
311 fn close(&mut self, res: Result<(), ErrorCode>) {
312 self.rx.close();
313 self.task.abort();
314 let _ = self.result.take().unwrap().send(res);
315 }
316}
317
318impl<D> StreamProducer<D> for ReadDirStream {
319 type Item = DirectoryEntry;
320 type Buffer = Option<DirectoryEntry>;
321
322 fn poll_produce<'a>(
323 mut self: Pin<&mut Self>,
324 cx: &mut Context<'_>,
325 mut store: StoreContextMut<'a, D>,
326 mut dst: Destination<'a, Self::Item, Self::Buffer>,
327 finish: bool,
328 ) -> Poll<wasmtime::Result<StreamResult>> {
329 if dst.remaining(&mut store) == Some(0) {
334 return Poll::Ready(Ok(StreamResult::Completed));
335 }
336
337 match self.rx.poll_recv(cx) {
338 Poll::Ready(Some(item)) => {
341 dst.set_buffer(Some(item));
342 Poll::Ready(Ok(StreamResult::Completed))
343 }
344
345 Poll::Ready(None) => {
352 let result = ready!(Pin::new(&mut self.task).poll(cx))
353 .expect("spawned task should not panic");
354 self.close(result);
355 Poll::Ready(Ok(StreamResult::Dropped))
356 }
357
358 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
361 Poll::Pending => Poll::Pending,
362 }
363 }
364}
365
366impl Drop for ReadDirStream {
367 fn drop(&mut self) {
368 if self.result.is_some() {
369 self.close(Ok(()));
370 }
371 }
372}
373
374struct WriteStreamConsumer {
375 file: File,
376 location: WriteLocation,
377 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
378 buffer: BytesMut,
379 task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
380}
381
382#[derive(Copy, Clone)]
383enum WriteLocation {
384 End,
385 Offset(u64),
386}
387
388impl WriteStreamConsumer {
389 fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
390 Self {
391 file,
392 location: WriteLocation::Offset(offset),
393 result: Some(result),
394 buffer: BytesMut::default(),
395 task: None,
396 }
397 }
398
399 fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
400 Self {
401 file,
402 location: WriteLocation::End,
403 result: Some(result),
404 buffer: BytesMut::default(),
405 task: None,
406 }
407 }
408
409 fn close(&mut self, res: Result<(), ErrorCode>) {
410 _ = self.result.take().unwrap().send(res);
411 }
412
413 fn complete_write(&mut self, amt: usize) -> StreamResult {
415 match &mut self.location {
416 WriteLocation::End => StreamResult::Completed,
417 WriteLocation::Offset(offset) => {
418 let Ok(amt) = amt.try_into() else {
419 self.close(Err(ErrorCode::Overflow));
420 return StreamResult::Dropped;
421 };
422 let Some(amt) = offset.checked_add(amt) else {
423 self.close(Err(ErrorCode::Overflow));
424 return StreamResult::Dropped;
425 };
426 *offset = amt;
427 StreamResult::Completed
428 }
429 }
430 }
431}
432
433impl WriteLocation {
434 fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
435 match *self {
436 WriteLocation::End => sys::append_cursor_unspecified(file, bytes),
437 WriteLocation::Offset(at) => sys::write_at_cursor_unspecified(file, bytes, at),
438 }
439 }
440}
441
442impl<D> StreamConsumer<D> for WriteStreamConsumer {
443 type Item = u8;
444
445 fn poll_consume(
446 mut self: Pin<&mut Self>,
447 cx: &mut Context<'_>,
448 store: StoreContextMut<D>,
449 src: Source<Self::Item>,
450 finish: bool,
451 ) -> Poll<wasmtime::Result<StreamResult>> {
452 let mut src = src.as_direct(store);
453 if let Some(file) = self.file.as_blocking_file() {
454 assert!(self.task.is_none());
456 return match self.location.write(file, src.remaining()) {
457 Ok(n) => {
458 src.mark_read(n);
459 Poll::Ready(Ok(self.complete_write(n)))
460 }
461 Err(err) => {
462 self.close(Err(err.into()));
463 Poll::Ready(Ok(StreamResult::Dropped))
464 }
465 };
466 }
467 let me = &mut *self;
468 let task = me.task.get_or_insert_with(|| {
469 debug_assert!(me.buffer.is_empty());
470 me.buffer.extend_from_slice(src.remaining());
471 let buf = mem::take(&mut me.buffer);
472 let file = Arc::clone(me.file.as_file());
473 let location = me.location;
474 spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
475 });
476 let result = match Pin::new(&mut *task).poll(cx) {
477 Poll::Pending if finish => {
481 task.abort();
482 ready!(Pin::new(task).poll(cx))
483 }
484 other => ready!(other),
485 };
486 self.task = None;
487 match result {
488 Ok(Ok((buf, n))) => {
489 src.mark_read(n);
490 self.buffer = buf;
491 self.buffer.clear();
492 Poll::Ready(Ok(self.complete_write(n)))
493 }
494 Ok(Err(err)) => {
495 self.close(Err(err.into()));
496 Poll::Ready(Ok(StreamResult::Dropped))
497 }
498 Err(err) => {
499 if err.is_cancelled() {
500 return Poll::Ready(Ok(StreamResult::Cancelled));
501 }
502 panic!("I/O task should not panic: {err}")
503 }
504 }
505 }
506}
507
508impl Drop for WriteStreamConsumer {
509 fn drop(&mut self) {
510 if self.result.is_some() {
511 self.close(Ok(()))
512 }
513 }
514}
515
516impl types::Host for WasiFilesystemCtxView<'_> {
517 fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
518 error.downcast()
519 }
520}
521
522impl types::HostDescriptorWithStore for WasiFilesystem {
523 fn read_via_stream<U>(
524 mut store: Access<U, Self>,
525 fd: Resource<Descriptor>,
526 offset: Filesize,
527 ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
528 let file = get_file(store.get().table, &fd)?;
529 if !file.perms.contains(FilePerms::READ) {
530 return Ok((
531 StreamReader::new(&mut store, iter::empty())?,
532 FutureReader::new(&mut store, async {
533 wasmtime::error::Ok(Err(ErrorCode::NotPermitted))
534 })?,
535 ));
536 }
537
538 let file = file.clone();
539 let (result_tx, result_rx) = oneshot::channel();
540 Ok((
541 StreamReader::new(
542 &mut store,
543 ReadStreamProducer {
544 file,
545 offset,
546 result: Some(result_tx),
547 task: None,
548 },
549 )?,
550 FutureReader::new(&mut store, result_rx)?,
551 ))
552 }
553
554 fn write_via_stream<U>(
555 mut store: Access<'_, U, Self>,
556 fd: Resource<Descriptor>,
557 mut data: StreamReader<u8>,
558 offset: Filesize,
559 ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
560 let (result_tx, result_rx) = oneshot::channel();
561 match get_file(store.get().table, &fd).and_then(|file| {
562 if !file.perms.contains(FilePerms::WRITE) {
563 Err(ErrorCode::NotPermitted.into())
564 } else {
565 Ok(file.clone())
566 }
567 }) {
568 Ok(file) => {
569 data.pipe(
570 &mut store,
571 WriteStreamConsumer::new_at(file, offset, result_tx),
572 )?;
573 }
574 Err(err) => {
575 data.close(&mut store)?;
576 let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
577 }
578 }
579 FutureReader::new(&mut store, result_rx)
580 }
581
582 fn append_via_stream<U>(
583 mut store: Access<'_, U, Self>,
584 fd: Resource<Descriptor>,
585 mut data: StreamReader<u8>,
586 ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
587 let (result_tx, result_rx) = oneshot::channel();
588 match get_file(store.get().table, &fd).and_then(|file| {
589 if !file.perms.contains(FilePerms::WRITE) {
590 Err(ErrorCode::NotPermitted.into())
591 } else {
592 Ok(file.clone())
593 }
594 }) {
595 Ok(file) => {
596 data.pipe(&mut store, WriteStreamConsumer::new_append(file, result_tx))?;
597 }
598 Err(err) => {
599 data.close(&mut store)?;
600 let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
601 }
602 }
603 FutureReader::new(&mut store, result_rx)
604 }
605
606 async fn advise<U>(
607 store: &Accessor<U, Self>,
608 fd: Resource<Descriptor>,
609 offset: Filesize,
610 length: Filesize,
611 advice: Advice,
612 ) -> FilesystemResult<()> {
613 let file = store.get_file(&fd)?;
614 file.advise(offset, length, advice.into()).await?;
615 Ok(())
616 }
617
618 async fn sync_data<U>(
619 store: &Accessor<U, Self>,
620 fd: Resource<Descriptor>,
621 ) -> FilesystemResult<()> {
622 let fd = store.get_descriptor(&fd)?;
623 fd.sync_data().await?;
624 Ok(())
625 }
626
627 async fn get_flags<U>(
628 store: &Accessor<U, Self>,
629 fd: Resource<Descriptor>,
630 ) -> FilesystemResult<DescriptorFlags> {
631 let fd = store.get_descriptor(&fd)?;
632 let flags = fd.get_flags().await?;
633 Ok(flags.into())
634 }
635
636 async fn get_type<U>(
637 store: &Accessor<U, Self>,
638 fd: Resource<Descriptor>,
639 ) -> FilesystemResult<DescriptorType> {
640 let fd = store.get_descriptor(&fd)?;
641 let ty = fd.get_type().await?;
642 Ok(ty.into())
643 }
644
645 async fn set_size<U>(
646 store: &Accessor<U, Self>,
647 fd: Resource<Descriptor>,
648 size: Filesize,
649 ) -> FilesystemResult<()> {
650 let file = store.get_file(&fd)?;
651 file.set_size(size).await?;
652 Ok(())
653 }
654
655 async fn set_times<U>(
656 store: &Accessor<U, Self>,
657 fd: Resource<Descriptor>,
658 data_access_timestamp: NewTimestamp,
659 data_modification_timestamp: NewTimestamp,
660 ) -> FilesystemResult<()> {
661 let fd = store.get_descriptor(&fd)?;
662 let atim = systemtimespec_from(data_access_timestamp)?;
663 let mtim = systemtimespec_from(data_modification_timestamp)?;
664 fd.set_times(atim, mtim).await?;
665 Ok(())
666 }
667
668 fn read_directory<U>(
669 mut store: Access<'_, U, Self>,
670 fd: Resource<Descriptor>,
671 ) -> wasmtime::Result<(
672 StreamReader<DirectoryEntry>,
673 FutureReader<Result<(), ErrorCode>>,
674 )> {
675 let (result_tx, result_rx) = oneshot::channel();
676 let stream = match get_dir(store.get().table, &fd).and_then(|dir| {
677 if !dir.perms.contains(DirPerms::READ) {
678 Err(ErrorCode::NotPermitted.into())
679 } else {
680 Ok(dir)
681 }
682 }) {
683 Ok(dir) => {
684 let allow_blocking_current_thread = dir.allow_blocking_current_thread;
685 let dir = Arc::clone(dir.as_dir());
686 if allow_blocking_current_thread {
687 match dir.entries() {
688 Ok(readdir) => StreamReader::new(
689 &mut store,
690 FallibleIteratorProducer::new(
691 readdir.filter_map(|e| map_dir_entry(e).transpose()),
692 result_tx,
693 ),
694 )?,
695 Err(e) => {
696 let _ = result_tx.send(Err(e.into()));
697 StreamReader::new(&mut store, iter::empty())?
698 }
699 }
700 } else {
701 StreamReader::new(&mut store, ReadDirStream::new(dir, result_tx))?
702 }
703 }
704 Err(err) => {
705 let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
706 StreamReader::new(&mut store, iter::empty())?
707 }
708 };
709 Ok((stream, FutureReader::new(&mut store, result_rx)?))
710 }
711
712 async fn sync<U>(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
713 let fd = store.get_descriptor(&fd)?;
714 fd.sync().await?;
715 Ok(())
716 }
717
718 async fn create_directory_at<U>(
719 store: &Accessor<U, Self>,
720 fd: Resource<Descriptor>,
721 path: String,
722 ) -> FilesystemResult<()> {
723 let dir = store.get_dir(&fd)?;
724 dir.create_directory_at(path).await?;
725 Ok(())
726 }
727
728 async fn stat<U>(
729 store: &Accessor<U, Self>,
730 fd: Resource<Descriptor>,
731 ) -> FilesystemResult<DescriptorStat> {
732 let fd = store.get_descriptor(&fd)?;
733 let stat = fd.stat().await?;
734 Ok(stat.into())
735 }
736
737 async fn stat_at<U>(
738 store: &Accessor<U, Self>,
739 fd: Resource<Descriptor>,
740 path_flags: PathFlags,
741 path: String,
742 ) -> FilesystemResult<DescriptorStat> {
743 let dir = store.get_dir(&fd)?;
744 let stat = dir.stat_at(path_flags.into(), path).await?;
745 Ok(stat.into())
746 }
747
748 async fn set_times_at<U>(
749 store: &Accessor<U, Self>,
750 fd: Resource<Descriptor>,
751 path_flags: PathFlags,
752 path: String,
753 data_access_timestamp: NewTimestamp,
754 data_modification_timestamp: NewTimestamp,
755 ) -> FilesystemResult<()> {
756 let dir = store.get_dir(&fd)?;
757 let atim = systemtimespec_from(data_access_timestamp)?;
758 let mtim = systemtimespec_from(data_modification_timestamp)?;
759 dir.set_times_at(path_flags.into(), path, atim, mtim)
760 .await?;
761 Ok(())
762 }
763
764 async fn link_at<U>(
765 store: &Accessor<U, Self>,
766 fd: Resource<Descriptor>,
767 old_path_flags: PathFlags,
768 old_path: String,
769 new_fd: Resource<Descriptor>,
770 new_path: String,
771 ) -> FilesystemResult<()> {
772 let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
773 old_dir
774 .link_at(old_path_flags.into(), old_path, &new_dir, new_path)
775 .await?;
776 Ok(())
777 }
778
779 async fn open_at<U>(
780 store: &Accessor<U, Self>,
781 fd: Resource<Descriptor>,
782 path_flags: PathFlags,
783 path: String,
784 open_flags: OpenFlags,
785 flags: DescriptorFlags,
786 ) -> FilesystemResult<Resource<Descriptor>> {
787 let (allow_blocking_current_thread, dir) = store.with(|mut store| {
788 let store = store.get();
789 let dir = get_dir(&store.table, &fd)?;
790 FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
791 })?;
792 let fd = dir
793 .open_at(
794 path_flags.into(),
795 path,
796 open_flags.into(),
797 flags.into(),
798 allow_blocking_current_thread,
799 )
800 .await?;
801 let fd = store.with(|mut store| store.get().table.push(fd))?;
802 Ok(fd)
803 }
804
805 async fn readlink_at<U>(
806 store: &Accessor<U, Self>,
807 fd: Resource<Descriptor>,
808 path: String,
809 ) -> FilesystemResult<String> {
810 let dir = store.get_dir(&fd)?;
811 let path = dir.readlink_at(path).await?;
812 Ok(path)
813 }
814
815 async fn remove_directory_at<U>(
816 store: &Accessor<U, Self>,
817 fd: Resource<Descriptor>,
818 path: String,
819 ) -> FilesystemResult<()> {
820 let dir = store.get_dir(&fd)?;
821 dir.remove_directory_at(path).await?;
822 Ok(())
823 }
824
825 async fn rename_at<U>(
826 store: &Accessor<U, Self>,
827 fd: Resource<Descriptor>,
828 old_path: String,
829 new_fd: Resource<Descriptor>,
830 new_path: String,
831 ) -> FilesystemResult<()> {
832 let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
833 old_dir.rename_at(old_path, &new_dir, new_path).await?;
834 Ok(())
835 }
836
837 async fn symlink_at<U>(
838 store: &Accessor<U, Self>,
839 fd: Resource<Descriptor>,
840 old_path: String,
841 new_path: String,
842 ) -> FilesystemResult<()> {
843 let dir = store.get_dir(&fd)?;
844 dir.symlink_at(old_path, new_path).await?;
845 Ok(())
846 }
847
848 async fn unlink_file_at<U>(
849 store: &Accessor<U, Self>,
850 fd: Resource<Descriptor>,
851 path: String,
852 ) -> FilesystemResult<()> {
853 let dir = store.get_dir(&fd)?;
854 dir.unlink_file_at(path).await?;
855 Ok(())
856 }
857
858 async fn is_same_object<U>(
859 store: &Accessor<U, Self>,
860 fd: Resource<Descriptor>,
861 other: Resource<Descriptor>,
862 ) -> wasmtime::Result<bool> {
863 let (fd, other) = store.with(|mut store| {
864 let table = store.get().table;
865 let fd = get_descriptor(table, &fd)?.clone();
866 let other = get_descriptor(table, &other)?.clone();
867 wasmtime::error::Ok((fd, other))
868 })?;
869 fd.is_same_object(&other).await
870 }
871
872 async fn metadata_hash<U>(
873 store: &Accessor<U, Self>,
874 fd: Resource<Descriptor>,
875 ) -> FilesystemResult<MetadataHashValue> {
876 let fd = store.get_descriptor(&fd)?;
877 let meta = fd.metadata_hash().await?;
878 Ok(meta.into())
879 }
880
881 async fn metadata_hash_at<U>(
882 store: &Accessor<U, Self>,
883 fd: Resource<Descriptor>,
884 path_flags: PathFlags,
885 path: String,
886 ) -> FilesystemResult<MetadataHashValue> {
887 let dir = store.get_dir(&fd)?;
888 let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
889 Ok(meta.into())
890 }
891}
892
893impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
894 fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
895 self.table
896 .delete(fd)
897 .context("failed to delete descriptor resource from table")?;
898 Ok(())
899 }
900}
901
902impl preopens::Host for WasiFilesystemCtxView<'_> {
903 fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
904 self.get_directories()
905 }
906}