1use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
2use crate::p3::bindings::clocks::wall_clock;
3use crate::p3::bindings::filesystem::types::{
4 self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
5 Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
6};
7use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
8use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
9use crate::{DirPerms, FilePerms};
10use anyhow::Context as _;
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll, ready};
14use core::{iter, mem};
15use std::io::{self, Cursor};
16use std::sync::Arc;
17use system_interface::fs::FileIoExt as _;
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::{JoinHandle, spawn_blocking};
20use wasmtime::StoreContextMut;
21use wasmtime::component::{
22 Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
23 StreamProducer, StreamReader, StreamResult,
24};
25
26fn get_descriptor<'a>(
27 table: &'a ResourceTable,
28 fd: &'a Resource<Descriptor>,
29) -> FilesystemResult<&'a Descriptor> {
30 table
31 .get(fd)
32 .context("failed to get descriptor resource from table")
33 .map_err(FilesystemError::trap)
34}
35
36fn get_file<'a>(
37 table: &'a ResourceTable,
38 fd: &'a Resource<Descriptor>,
39) -> FilesystemResult<&'a File> {
40 let file = get_descriptor(table, fd).map(Descriptor::file)??;
41 Ok(file)
42}
43
44fn get_dir<'a>(
45 table: &'a ResourceTable,
46 fd: &'a Resource<Descriptor>,
47) -> FilesystemResult<&'a Dir> {
48 let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
49 Ok(dir)
50}
51
52trait AccessorExt {
53 fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
54 fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
55 fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
56 fn get_dir_pair(
57 &self,
58 a: &Resource<Descriptor>,
59 b: &Resource<Descriptor>,
60 ) -> FilesystemResult<(Dir, Dir)>;
61}
62
63impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
64 fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
65 self.with(|mut store| {
66 let fd = get_descriptor(store.get().table, fd)?;
67 Ok(fd.clone())
68 })
69 }
70
71 fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
72 self.with(|mut store| {
73 let file = get_file(store.get().table, fd)?;
74 Ok(file.clone())
75 })
76 }
77
78 fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
79 self.with(|mut store| {
80 let dir = get_dir(store.get().table, fd)?;
81 Ok(dir.clone())
82 })
83 }
84
85 fn get_dir_pair(
86 &self,
87 a: &Resource<Descriptor>,
88 b: &Resource<Descriptor>,
89 ) -> FilesystemResult<(Dir, Dir)> {
90 self.with(|mut store| {
91 let table = store.get().table;
92 let a = get_dir(table, a)?;
93 let b = get_dir(table, b)?;
94 Ok((a.clone(), b.clone()))
95 })
96 }
97}
98
99fn systemtime_from(t: wall_clock::Datetime) -> Result<std::time::SystemTime, ErrorCode> {
100 std::time::SystemTime::UNIX_EPOCH
101 .checked_add(core::time::Duration::new(t.seconds, t.nanoseconds))
102 .ok_or(ErrorCode::Overflow)
103}
104
105fn systemtimespec_from(t: NewTimestamp) -> Result<Option<fs_set_times::SystemTimeSpec>, ErrorCode> {
106 use fs_set_times::SystemTimeSpec;
107 match t {
108 NewTimestamp::NoChange => Ok(None),
109 NewTimestamp::Now => Ok(Some(SystemTimeSpec::SymbolicNow)),
110 NewTimestamp::Timestamp(st) => {
111 let st = systemtime_from(st)?;
112 Ok(Some(SystemTimeSpec::Absolute(st)))
113 }
114 }
115}
116
117struct ReadStreamProducer {
118 file: File,
119 offset: u64,
120 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
121 task: Option<JoinHandle<std::io::Result<BytesMut>>>,
122}
123
124impl Drop for ReadStreamProducer {
125 fn drop(&mut self) {
126 self.close(Ok(()))
127 }
128}
129
130impl ReadStreamProducer {
131 fn close(&mut self, res: Result<(), ErrorCode>) {
132 if let Some(tx) = self.result.take() {
133 _ = tx.send(res);
134 }
135 }
136
137 fn complete_read(&mut self, amt: usize) -> StreamResult {
139 let Ok(amt) = amt.try_into() else {
140 self.close(Err(ErrorCode::Overflow));
141 return StreamResult::Dropped;
142 };
143 let Some(amt) = self.offset.checked_add(amt) else {
144 self.close(Err(ErrorCode::Overflow));
145 return StreamResult::Dropped;
146 };
147 self.offset = amt;
148 StreamResult::Completed
149 }
150}
151
152impl<D> StreamProducer<D> for ReadStreamProducer {
153 type Item = u8;
154 type Buffer = Cursor<BytesMut>;
155
156 fn poll_produce<'a>(
157 mut self: Pin<&mut Self>,
158 cx: &mut Context<'_>,
159 store: StoreContextMut<'a, D>,
160 mut dst: Destination<'a, Self::Item, Self::Buffer>,
161 _finish: bool,
164 ) -> Poll<wasmtime::Result<StreamResult>> {
165 if let Some(file) = self.file.as_blocking_file() {
166 assert!(self.task.is_none());
168 let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
169 let buf = dst.remaining();
170 if buf.is_empty() {
171 return Poll::Ready(Ok(StreamResult::Completed));
172 }
173 return match file.read_at(buf, self.offset) {
174 Ok(0) => {
175 self.close(Ok(()));
176 Poll::Ready(Ok(StreamResult::Dropped))
177 }
178 Ok(n) => {
179 dst.mark_written(n);
180 Poll::Ready(Ok(self.complete_read(n)))
181 }
182 Err(err) => {
183 self.close(Err(err.into()));
184 Poll::Ready(Ok(StreamResult::Dropped))
185 }
186 };
187 }
188
189 let me = &mut *self;
191 let task = me.task.get_or_insert_with(|| {
192 let mut buf = dst.take_buffer().into_inner();
193 buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
194 let file = Arc::clone(me.file.as_file());
195 let offset = me.offset;
196 spawn_blocking(move || {
197 file.read_at(&mut buf, offset).map(|n| {
198 buf.truncate(n);
199 buf
200 })
201 })
202 });
203
204 let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
208 self.task = None;
209 match res {
210 Ok(buf) if buf.is_empty() => {
211 self.close(Ok(()));
212 Poll::Ready(Ok(StreamResult::Dropped))
213 }
214 Ok(buf) => {
215 let n = buf.len();
216 dst.set_buffer(Cursor::new(buf));
217 Poll::Ready(Ok(self.complete_read(n)))
218 }
219 Err(err) => {
220 self.close(Err(err.into()));
221 Poll::Ready(Ok(StreamResult::Dropped))
222 }
223 }
224 }
225}
226
227fn map_dir_entry(
228 entry: std::io::Result<cap_std::fs::DirEntry>,
229) -> Result<Option<DirectoryEntry>, ErrorCode> {
230 match entry {
231 Ok(entry) => {
232 let meta = entry.metadata()?;
233 let Ok(name) = entry.file_name().into_string() else {
234 return Err(ErrorCode::IllegalByteSequence);
235 };
236 Ok(Some(DirectoryEntry {
237 type_: meta.file_type().into(),
238 name,
239 }))
240 }
241 Err(err) => {
242 #[cfg(windows)]
245 {
246 use windows_sys::Win32::Foundation::{
247 ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
248 };
249 if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
250 || err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
251 {
252 return Ok(None);
253 }
254 }
255 Err(err.into())
256 }
257 }
258}
259
260struct ReadDirStream {
261 rx: mpsc::Receiver<DirectoryEntry>,
262 task: JoinHandle<Result<(), ErrorCode>>,
263 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
264}
265
266impl ReadDirStream {
267 fn new(
268 dir: Arc<cap_std::fs::Dir>,
269 result: oneshot::Sender<Result<(), ErrorCode>>,
270 ) -> ReadDirStream {
271 let (tx, rx) = mpsc::channel(1);
272 ReadDirStream {
273 task: spawn_blocking(move || {
274 let entries = dir.entries()?;
275 for entry in entries {
276 if let Some(entry) = map_dir_entry(entry)? {
277 if let Err(_) = tx.blocking_send(entry) {
278 break;
279 }
280 }
281 }
282 Ok(())
283 }),
284 rx,
285 result: Some(result),
286 }
287 }
288
289 fn close(&mut self, res: Result<(), ErrorCode>) {
290 self.rx.close();
291 self.task.abort();
292 let _ = self.result.take().unwrap().send(res);
293 }
294}
295
296impl<D> StreamProducer<D> for ReadDirStream {
297 type Item = DirectoryEntry;
298 type Buffer = Option<DirectoryEntry>;
299
300 fn poll_produce<'a>(
301 mut self: Pin<&mut Self>,
302 cx: &mut Context<'_>,
303 mut store: StoreContextMut<'a, D>,
304 mut dst: Destination<'a, Self::Item, Self::Buffer>,
305 finish: bool,
306 ) -> Poll<wasmtime::Result<StreamResult>> {
307 if dst.remaining(&mut store) == Some(0) {
312 return Poll::Ready(Ok(StreamResult::Completed));
313 }
314
315 match self.rx.poll_recv(cx) {
316 Poll::Ready(Some(item)) => {
319 dst.set_buffer(Some(item));
320 Poll::Ready(Ok(StreamResult::Completed))
321 }
322
323 Poll::Ready(None) => {
330 let result = ready!(Pin::new(&mut self.task).poll(cx))
331 .expect("spawned task should not panic");
332 self.close(result);
333 Poll::Ready(Ok(StreamResult::Dropped))
334 }
335
336 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
339 Poll::Pending => Poll::Pending,
340 }
341 }
342}
343
344impl Drop for ReadDirStream {
345 fn drop(&mut self) {
346 if self.result.is_some() {
347 self.close(Ok(()));
348 }
349 }
350}
351
352struct WriteStreamConsumer {
353 file: File,
354 location: WriteLocation,
355 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
356 buffer: BytesMut,
357 task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
358}
359
360#[derive(Copy, Clone)]
361enum WriteLocation {
362 End,
363 Offset(u64),
364}
365
366impl WriteStreamConsumer {
367 fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
368 Self {
369 file,
370 location: WriteLocation::Offset(offset),
371 result: Some(result),
372 buffer: BytesMut::default(),
373 task: None,
374 }
375 }
376
377 fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
378 Self {
379 file,
380 location: WriteLocation::End,
381 result: Some(result),
382 buffer: BytesMut::default(),
383 task: None,
384 }
385 }
386
387 fn close(&mut self, res: Result<(), ErrorCode>) {
388 _ = self.result.take().unwrap().send(res);
389 }
390
391 fn complete_write(&mut self, amt: usize) -> StreamResult {
393 match &mut self.location {
394 WriteLocation::End => StreamResult::Completed,
395 WriteLocation::Offset(offset) => {
396 let Ok(amt) = amt.try_into() else {
397 self.close(Err(ErrorCode::Overflow));
398 return StreamResult::Dropped;
399 };
400 let Some(amt) = offset.checked_add(amt) else {
401 self.close(Err(ErrorCode::Overflow));
402 return StreamResult::Dropped;
403 };
404 *offset = amt;
405 StreamResult::Completed
406 }
407 }
408 }
409}
410
411impl WriteLocation {
412 fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
413 match *self {
414 WriteLocation::End => file.append(bytes),
415 WriteLocation::Offset(at) => file.write_at(bytes, at),
416 }
417 }
418}
419
420impl<D> StreamConsumer<D> for WriteStreamConsumer {
421 type Item = u8;
422
423 fn poll_consume(
424 mut self: Pin<&mut Self>,
425 cx: &mut Context<'_>,
426 store: StoreContextMut<D>,
427 src: Source<Self::Item>,
428 _finish: bool,
431 ) -> Poll<wasmtime::Result<StreamResult>> {
432 let mut src = src.as_direct(store);
433 if let Some(file) = self.file.as_blocking_file() {
434 assert!(self.task.is_none());
436 return match self.location.write(file, src.remaining()) {
437 Ok(n) => {
438 src.mark_read(n);
439 Poll::Ready(Ok(self.complete_write(n)))
440 }
441 Err(err) => {
442 self.close(Err(err.into()));
443 Poll::Ready(Ok(StreamResult::Dropped))
444 }
445 };
446 }
447 let me = &mut *self;
448 let task = me.task.get_or_insert_with(|| {
449 debug_assert!(me.buffer.is_empty());
450 me.buffer.extend_from_slice(src.remaining());
451 let buf = mem::take(&mut me.buffer);
452 let file = Arc::clone(me.file.as_file());
453 let location = me.location;
454 spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
455 });
456 let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
457 self.task = None;
458 match res {
459 Ok((buf, n)) => {
460 src.mark_read(n);
461 self.buffer = buf;
462 self.buffer.clear();
463 Poll::Ready(Ok(self.complete_write(n)))
464 }
465 Err(err) => {
466 self.close(Err(err.into()));
467 Poll::Ready(Ok(StreamResult::Dropped))
468 }
469 }
470 }
471}
472
473impl Drop for WriteStreamConsumer {
474 fn drop(&mut self) {
475 if self.result.is_some() {
476 self.close(Ok(()))
477 }
478 }
479}
480
481impl types::Host for WasiFilesystemCtxView<'_> {
482 fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
483 error.downcast()
484 }
485}
486
487impl types::HostDescriptorWithStore for WasiFilesystem {
488 async fn read_via_stream<U>(
489 store: &Accessor<U, Self>,
490 fd: Resource<Descriptor>,
491 offset: Filesize,
492 ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
493 store.with(|mut store| {
494 let file = get_file(store.get().table, &fd)?;
495 if !file.perms.contains(FilePerms::READ) {
496 return Ok((
497 StreamReader::new(&mut store, iter::empty()),
498 FutureReader::new(&mut store, async {
499 anyhow::Ok(Err(ErrorCode::NotPermitted))
500 }),
501 ));
502 }
503
504 let file = file.clone();
505 let (result_tx, result_rx) = oneshot::channel();
506 Ok((
507 StreamReader::new(
508 &mut store,
509 ReadStreamProducer {
510 file,
511 offset,
512 result: Some(result_tx),
513 task: None,
514 },
515 ),
516 FutureReader::new(&mut store, result_rx),
517 ))
518 })
519 }
520
521 async fn write_via_stream<U>(
522 store: &Accessor<U, Self>,
523 fd: Resource<Descriptor>,
524 data: StreamReader<u8>,
525 offset: Filesize,
526 ) -> FilesystemResult<()> {
527 let (result_tx, result_rx) = oneshot::channel();
528 store.with(|mut store| {
529 let file = get_file(store.get().table, &fd)?;
530 if !file.perms.contains(FilePerms::WRITE) {
531 return Err(ErrorCode::NotPermitted.into());
532 }
533 let file = file.clone();
534 data.pipe(store, WriteStreamConsumer::new_at(file, offset, result_tx));
535 FilesystemResult::Ok(())
536 })?;
537 result_rx
538 .await
539 .context("oneshot sender dropped")
540 .map_err(FilesystemError::trap)??;
541 Ok(())
542 }
543
544 async fn append_via_stream<U>(
545 store: &Accessor<U, Self>,
546 fd: Resource<Descriptor>,
547 data: StreamReader<u8>,
548 ) -> FilesystemResult<()> {
549 let (result_tx, result_rx) = oneshot::channel();
550 store.with(|mut store| {
551 let file = get_file(store.get().table, &fd)?;
552 if !file.perms.contains(FilePerms::WRITE) {
553 return Err(ErrorCode::NotPermitted.into());
554 }
555 let file = file.clone();
556 data.pipe(store, WriteStreamConsumer::new_append(file, result_tx));
557 FilesystemResult::Ok(())
558 })?;
559 result_rx
560 .await
561 .context("oneshot sender dropped")
562 .map_err(FilesystemError::trap)??;
563 Ok(())
564 }
565
566 async fn advise<U>(
567 store: &Accessor<U, Self>,
568 fd: Resource<Descriptor>,
569 offset: Filesize,
570 length: Filesize,
571 advice: Advice,
572 ) -> FilesystemResult<()> {
573 let file = store.get_file(&fd)?;
574 file.advise(offset, length, advice.into()).await?;
575 Ok(())
576 }
577
578 async fn sync_data<U>(
579 store: &Accessor<U, Self>,
580 fd: Resource<Descriptor>,
581 ) -> FilesystemResult<()> {
582 let fd = store.get_descriptor(&fd)?;
583 fd.sync_data().await?;
584 Ok(())
585 }
586
587 async fn get_flags<U>(
588 store: &Accessor<U, Self>,
589 fd: Resource<Descriptor>,
590 ) -> FilesystemResult<DescriptorFlags> {
591 let fd = store.get_descriptor(&fd)?;
592 let flags = fd.get_flags().await?;
593 Ok(flags.into())
594 }
595
596 async fn get_type<U>(
597 store: &Accessor<U, Self>,
598 fd: Resource<Descriptor>,
599 ) -> FilesystemResult<DescriptorType> {
600 let fd = store.get_descriptor(&fd)?;
601 let ty = fd.get_type().await?;
602 Ok(ty.into())
603 }
604
605 async fn set_size<U>(
606 store: &Accessor<U, Self>,
607 fd: Resource<Descriptor>,
608 size: Filesize,
609 ) -> FilesystemResult<()> {
610 let file = store.get_file(&fd)?;
611 file.set_size(size).await?;
612 Ok(())
613 }
614
615 async fn set_times<U>(
616 store: &Accessor<U, Self>,
617 fd: Resource<Descriptor>,
618 data_access_timestamp: NewTimestamp,
619 data_modification_timestamp: NewTimestamp,
620 ) -> FilesystemResult<()> {
621 let fd = store.get_descriptor(&fd)?;
622 let atim = systemtimespec_from(data_access_timestamp)?;
623 let mtim = systemtimespec_from(data_modification_timestamp)?;
624 fd.set_times(atim, mtim).await?;
625 Ok(())
626 }
627
628 async fn read_directory<U>(
629 store: &Accessor<U, Self>,
630 fd: Resource<Descriptor>,
631 ) -> wasmtime::Result<(
632 StreamReader<DirectoryEntry>,
633 FutureReader<Result<(), ErrorCode>>,
634 )> {
635 store.with(|mut store| {
636 let dir = get_dir(store.get().table, &fd)?;
637 if !dir.perms.contains(DirPerms::READ) {
638 return Ok((
639 StreamReader::new(&mut store, iter::empty()),
640 FutureReader::new(&mut store, async {
641 anyhow::Ok(Err(ErrorCode::NotPermitted))
642 }),
643 ));
644 }
645 let allow_blocking_current_thread = dir.allow_blocking_current_thread;
646 let dir = Arc::clone(dir.as_dir());
647 let (result_tx, result_rx) = oneshot::channel();
648 let stream = if allow_blocking_current_thread {
649 match dir.entries() {
650 Ok(readdir) => StreamReader::new(
651 &mut store,
652 FallibleIteratorProducer::new(
653 readdir.filter_map(|e| map_dir_entry(e).transpose()),
654 result_tx,
655 ),
656 ),
657 Err(e) => {
658 result_tx.send(Err(e.into())).unwrap();
659 StreamReader::new(&mut store, iter::empty())
660 }
661 }
662 } else {
663 StreamReader::new(&mut store, ReadDirStream::new(dir, result_tx))
664 };
665 Ok((stream, FutureReader::new(&mut store, result_rx)))
666 })
667 }
668
669 async fn sync<U>(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
670 let fd = store.get_descriptor(&fd)?;
671 fd.sync().await?;
672 Ok(())
673 }
674
675 async fn create_directory_at<U>(
676 store: &Accessor<U, Self>,
677 fd: Resource<Descriptor>,
678 path: String,
679 ) -> FilesystemResult<()> {
680 let dir = store.get_dir(&fd)?;
681 dir.create_directory_at(path).await?;
682 Ok(())
683 }
684
685 async fn stat<U>(
686 store: &Accessor<U, Self>,
687 fd: Resource<Descriptor>,
688 ) -> FilesystemResult<DescriptorStat> {
689 let fd = store.get_descriptor(&fd)?;
690 let stat = fd.stat().await?;
691 Ok(stat.into())
692 }
693
694 async fn stat_at<U>(
695 store: &Accessor<U, Self>,
696 fd: Resource<Descriptor>,
697 path_flags: PathFlags,
698 path: String,
699 ) -> FilesystemResult<DescriptorStat> {
700 let dir = store.get_dir(&fd)?;
701 let stat = dir.stat_at(path_flags.into(), path).await?;
702 Ok(stat.into())
703 }
704
705 async fn set_times_at<U>(
706 store: &Accessor<U, Self>,
707 fd: Resource<Descriptor>,
708 path_flags: PathFlags,
709 path: String,
710 data_access_timestamp: NewTimestamp,
711 data_modification_timestamp: NewTimestamp,
712 ) -> FilesystemResult<()> {
713 let dir = store.get_dir(&fd)?;
714 let atim = systemtimespec_from(data_access_timestamp)?;
715 let mtim = systemtimespec_from(data_modification_timestamp)?;
716 dir.set_times_at(path_flags.into(), path, atim, mtim)
717 .await?;
718 Ok(())
719 }
720
721 async fn link_at<U>(
722 store: &Accessor<U, Self>,
723 fd: Resource<Descriptor>,
724 old_path_flags: PathFlags,
725 old_path: String,
726 new_fd: Resource<Descriptor>,
727 new_path: String,
728 ) -> FilesystemResult<()> {
729 let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
730 old_dir
731 .link_at(old_path_flags.into(), old_path, &new_dir, new_path)
732 .await?;
733 Ok(())
734 }
735
736 async fn open_at<U>(
737 store: &Accessor<U, Self>,
738 fd: Resource<Descriptor>,
739 path_flags: PathFlags,
740 path: String,
741 open_flags: OpenFlags,
742 flags: DescriptorFlags,
743 ) -> FilesystemResult<Resource<Descriptor>> {
744 let (allow_blocking_current_thread, dir) = store.with(|mut store| {
745 let store = store.get();
746 let dir = get_dir(&store.table, &fd)?;
747 FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
748 })?;
749 let fd = dir
750 .open_at(
751 path_flags.into(),
752 path,
753 open_flags.into(),
754 flags.into(),
755 allow_blocking_current_thread,
756 )
757 .await?;
758 let fd = store.with(|mut store| store.get().table.push(fd))?;
759 Ok(fd)
760 }
761
762 async fn readlink_at<U>(
763 store: &Accessor<U, Self>,
764 fd: Resource<Descriptor>,
765 path: String,
766 ) -> FilesystemResult<String> {
767 let dir = store.get_dir(&fd)?;
768 let path = dir.readlink_at(path).await?;
769 Ok(path)
770 }
771
772 async fn remove_directory_at<U>(
773 store: &Accessor<U, Self>,
774 fd: Resource<Descriptor>,
775 path: String,
776 ) -> FilesystemResult<()> {
777 let dir = store.get_dir(&fd)?;
778 dir.remove_directory_at(path).await?;
779 Ok(())
780 }
781
782 async fn rename_at<U>(
783 store: &Accessor<U, Self>,
784 fd: Resource<Descriptor>,
785 old_path: String,
786 new_fd: Resource<Descriptor>,
787 new_path: String,
788 ) -> FilesystemResult<()> {
789 let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
790 old_dir.rename_at(old_path, &new_dir, new_path).await?;
791 Ok(())
792 }
793
794 async fn symlink_at<U>(
795 store: &Accessor<U, Self>,
796 fd: Resource<Descriptor>,
797 old_path: String,
798 new_path: String,
799 ) -> FilesystemResult<()> {
800 let dir = store.get_dir(&fd)?;
801 dir.symlink_at(old_path, new_path).await?;
802 Ok(())
803 }
804
805 async fn unlink_file_at<U>(
806 store: &Accessor<U, Self>,
807 fd: Resource<Descriptor>,
808 path: String,
809 ) -> FilesystemResult<()> {
810 let dir = store.get_dir(&fd)?;
811 dir.unlink_file_at(path).await?;
812 Ok(())
813 }
814
815 async fn is_same_object<U>(
816 store: &Accessor<U, Self>,
817 fd: Resource<Descriptor>,
818 other: Resource<Descriptor>,
819 ) -> wasmtime::Result<bool> {
820 let (fd, other) = store.with(|mut store| {
821 let table = store.get().table;
822 let fd = get_descriptor(table, &fd)?.clone();
823 let other = get_descriptor(table, &other)?.clone();
824 anyhow::Ok((fd, other))
825 })?;
826 fd.is_same_object(&other).await
827 }
828
829 async fn metadata_hash<U>(
830 store: &Accessor<U, Self>,
831 fd: Resource<Descriptor>,
832 ) -> FilesystemResult<MetadataHashValue> {
833 let fd = store.get_descriptor(&fd)?;
834 let meta = fd.metadata_hash().await?;
835 Ok(meta.into())
836 }
837
838 async fn metadata_hash_at<U>(
839 store: &Accessor<U, Self>,
840 fd: Resource<Descriptor>,
841 path_flags: PathFlags,
842 path: String,
843 ) -> FilesystemResult<MetadataHashValue> {
844 let dir = store.get_dir(&fd)?;
845 let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
846 Ok(meta.into())
847 }
848}
849
850impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
851 fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
852 self.table
853 .delete(fd)
854 .context("failed to delete descriptor resource from table")?;
855 Ok(())
856 }
857}
858
859impl preopens::Host for WasiFilesystemCtxView<'_> {
860 fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
861 self.get_directories()
862 }
863}