1use crate::filesystem::sys;
2use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
3use crate::p3::bindings::clocks::system_clock;
4use crate::p3::bindings::filesystem::types::{
5 self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
6 Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
7};
8use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
9use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
10use crate::{DirPerms, FilePerms};
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll, ready};
14use core::{iter, mem};
15use std::io;
16use std::sync::Arc;
17use std::time::SystemTime;
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::{JoinHandle, spawn_blocking};
20use wasmtime::StoreContextMut;
21use wasmtime::component::{
22 Access, Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
23 StreamProducer, StreamReader, StreamResult,
24};
25use wasmtime::error::Context as _;
26
27fn get_descriptor<'a>(
28 table: &'a ResourceTable,
29 fd: &'a Resource<Descriptor>,
30) -> FilesystemResult<&'a Descriptor> {
31 table
32 .get(fd)
33 .context("failed to get descriptor resource from table")
34 .map_err(FilesystemError::trap)
35}
36
37fn get_file<'a>(
38 table: &'a ResourceTable,
39 fd: &'a Resource<Descriptor>,
40) -> FilesystemResult<&'a File> {
41 let file = get_descriptor(table, fd).map(Descriptor::file)??;
42 Ok(file)
43}
44
45fn get_dir<'a>(
46 table: &'a ResourceTable,
47 fd: &'a Resource<Descriptor>,
48) -> FilesystemResult<&'a Dir> {
49 let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
50 Ok(dir)
51}
52
53trait AccessorExt {
54 fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
55 fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
56 fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
57 fn get_dir_pair(
58 &self,
59 a: &Resource<Descriptor>,
60 b: &Resource<Descriptor>,
61 ) -> FilesystemResult<(Dir, Dir)>;
62}
63
64impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
65 fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
66 self.with(|mut store| {
67 let fd = get_descriptor(store.get().table, fd)?;
68 Ok(fd.clone())
69 })
70 }
71
72 fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
73 self.with(|mut store| {
74 let file = get_file(store.get().table, fd)?;
75 Ok(file.clone())
76 })
77 }
78
79 fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
80 self.with(|mut store| {
81 let dir = get_dir(store.get().table, fd)?;
82 Ok(dir.clone())
83 })
84 }
85
86 fn get_dir_pair(
87 &self,
88 a: &Resource<Descriptor>,
89 b: &Resource<Descriptor>,
90 ) -> FilesystemResult<(Dir, Dir)> {
91 self.with(|mut store| {
92 let table = store.get().table;
93 let a = get_dir(table, a)?;
94 let b = get_dir(table, b)?;
95 Ok((a.clone(), b.clone()))
96 })
97 }
98}
99
100fn systemtime_from(t: system_clock::Instant) -> Result<std::time::SystemTime, ErrorCode> {
101 if let Ok(seconds) = t.seconds.try_into() {
102 std::time::SystemTime::UNIX_EPOCH
103 .checked_add(core::time::Duration::new(seconds, t.nanoseconds))
104 .ok_or(ErrorCode::Overflow)
105 } else {
106 std::time::SystemTime::UNIX_EPOCH
107 .checked_sub(core::time::Duration::new(
108 t.seconds.unsigned_abs(),
109 t.nanoseconds,
110 ))
111 .ok_or(ErrorCode::Overflow)
112 }
113}
114
115fn systemtimespec_from(t: NewTimestamp) -> Result<Option<SystemTime>, ErrorCode> {
116 match t {
117 NewTimestamp::NoChange => Ok(None),
118 NewTimestamp::Now => Ok(Some(SystemTime::now())),
119 NewTimestamp::Timestamp(st) => Ok(Some(systemtime_from(st)?)),
120 }
121}
122
123struct ReadStreamProducer {
124 file: File,
125 offset: u64,
126 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
127 task: Option<JoinHandle<std::io::Result<BytesMut>>>,
128}
129
130impl Drop for ReadStreamProducer {
131 fn drop(&mut self) {
132 self.close(Ok(()))
133 }
134}
135
136impl ReadStreamProducer {
137 fn close(&mut self, res: Result<(), ErrorCode>) {
138 if let Some(tx) = self.result.take() {
139 _ = tx.send(res);
140 }
141 }
142
143 fn complete_read(&mut self, amt: usize) -> StreamResult {
145 let Ok(amt) = amt.try_into() else {
146 self.close(Err(ErrorCode::Overflow));
147 return StreamResult::Dropped;
148 };
149 let Some(amt) = self.offset.checked_add(amt) else {
150 self.close(Err(ErrorCode::Overflow));
151 return StreamResult::Dropped;
152 };
153 self.offset = amt;
154 StreamResult::Completed
155 }
156}
157
158impl<D> StreamProducer<D> for ReadStreamProducer {
159 type Item = u8;
160 type Buffer = BytesMut;
161
162 fn poll_produce<'a>(
163 mut self: Pin<&mut Self>,
164 cx: &mut Context<'_>,
165 store: StoreContextMut<'a, D>,
166 mut dst: Destination<'a, Self::Item, Self::Buffer>,
167 finish: bool,
168 ) -> Poll<wasmtime::Result<StreamResult>> {
169 if let Some(file) = self.file.as_blocking_file() {
170 assert!(self.task.is_none());
172 let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
173 let buf = dst.remaining();
174 if buf.is_empty() {
175 return Poll::Ready(Ok(StreamResult::Completed));
176 }
177 return match sys::read_at_cursor_unspecified(file, buf, self.offset) {
178 Ok(0) => {
179 self.close(Ok(()));
180 Poll::Ready(Ok(StreamResult::Dropped))
181 }
182 Ok(n) => {
183 dst.mark_written(n);
184 Poll::Ready(Ok(self.complete_read(n)))
185 }
186 Err(err) => {
187 self.close(Err(err.into()));
188 Poll::Ready(Ok(StreamResult::Dropped))
189 }
190 };
191 }
192
193 let me = &mut *self;
195 let task = me.task.get_or_insert_with(|| {
196 let mut buf = dst.take_buffer();
197 buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
198 let file = Arc::clone(me.file.as_file());
199 let offset = me.offset;
200 spawn_blocking(move || {
201 sys::read_at_cursor_unspecified(file, &mut buf, offset).map(|n| {
202 buf.truncate(n);
203 buf
204 })
205 })
206 });
207
208 let result = match Pin::new(&mut *task).poll(cx) {
212 Poll::Pending if finish => {
216 task.abort();
217 ready!(Pin::new(task).poll(cx))
218 }
219 other => ready!(other),
220 };
221 self.task = None;
222 match result {
223 Ok(Ok(buf)) if buf.is_empty() => {
224 self.close(Ok(()));
225 Poll::Ready(Ok(StreamResult::Dropped))
226 }
227 Ok(Ok(buf)) => {
228 let n = buf.len();
229 dst.set_buffer(buf);
230 Poll::Ready(Ok(self.complete_read(n)))
231 }
232 Ok(Err(err)) => {
233 self.close(Err(err.into()));
234 Poll::Ready(Ok(StreamResult::Dropped))
235 }
236 Err(err) => {
237 if err.is_cancelled() {
238 return Poll::Ready(Ok(StreamResult::Cancelled));
239 }
240 panic!("I/O task should not panic: {err}")
241 }
242 }
243 }
244}
245
246fn map_dir_entry(
247 entry: std::io::Result<cap_std::fs::DirEntry>,
248) -> Result<Option<DirectoryEntry>, ErrorCode> {
249 match entry {
250 Ok(entry) => {
251 let meta = entry.metadata()?;
252 let Ok(name) = entry.file_name().into_string() else {
253 return Err(ErrorCode::IllegalByteSequence);
254 };
255 Ok(Some(DirectoryEntry {
256 type_: meta.file_type().into(),
257 name,
258 }))
259 }
260 Err(err) => {
261 #[cfg(windows)]
264 {
265 use windows_sys::Win32::Foundation::{
266 ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
267 };
268 if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
269 || err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
270 {
271 return Ok(None);
272 }
273 }
274 Err(err.into())
275 }
276 }
277}
278
279struct ReadDirStream {
280 rx: mpsc::Receiver<DirectoryEntry>,
281 task: JoinHandle<Result<(), ErrorCode>>,
282 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
283}
284
285impl ReadDirStream {
286 fn new(
287 dir: Arc<cap_std::fs::Dir>,
288 result: oneshot::Sender<Result<(), ErrorCode>>,
289 ) -> ReadDirStream {
290 let (tx, rx) = mpsc::channel(1);
291 ReadDirStream {
292 task: spawn_blocking(move || {
293 let entries = dir.entries()?;
294 for entry in entries {
295 if let Some(entry) = map_dir_entry(entry)? {
296 if let Err(_) = tx.blocking_send(entry) {
297 break;
298 }
299 }
300 }
301 Ok(())
302 }),
303 rx,
304 result: Some(result),
305 }
306 }
307
308 fn close(&mut self, res: Result<(), ErrorCode>) {
309 self.rx.close();
310 self.task.abort();
311 let _ = self.result.take().unwrap().send(res);
312 }
313}
314
315impl<D> StreamProducer<D> for ReadDirStream {
316 type Item = DirectoryEntry;
317 type Buffer = Option<DirectoryEntry>;
318
319 fn poll_produce<'a>(
320 mut self: Pin<&mut Self>,
321 cx: &mut Context<'_>,
322 mut store: StoreContextMut<'a, D>,
323 mut dst: Destination<'a, Self::Item, Self::Buffer>,
324 finish: bool,
325 ) -> Poll<wasmtime::Result<StreamResult>> {
326 if dst.remaining(&mut store) == Some(0) {
331 return Poll::Ready(Ok(StreamResult::Completed));
332 }
333
334 match self.rx.poll_recv(cx) {
335 Poll::Ready(Some(item)) => {
338 dst.set_buffer(Some(item));
339 Poll::Ready(Ok(StreamResult::Completed))
340 }
341
342 Poll::Ready(None) => {
349 let result = ready!(Pin::new(&mut self.task).poll(cx))
350 .expect("spawned task should not panic");
351 self.close(result);
352 Poll::Ready(Ok(StreamResult::Dropped))
353 }
354
355 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
358 Poll::Pending => Poll::Pending,
359 }
360 }
361}
362
363impl Drop for ReadDirStream {
364 fn drop(&mut self) {
365 if self.result.is_some() {
366 self.close(Ok(()));
367 }
368 }
369}
370
371struct WriteStreamConsumer {
372 file: File,
373 location: WriteLocation,
374 result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
375 buffer: BytesMut,
376 task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
377}
378
379#[derive(Copy, Clone)]
380enum WriteLocation {
381 End,
382 Offset(u64),
383}
384
385impl WriteStreamConsumer {
386 fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
387 Self {
388 file,
389 location: WriteLocation::Offset(offset),
390 result: Some(result),
391 buffer: BytesMut::default(),
392 task: None,
393 }
394 }
395
396 fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
397 Self {
398 file,
399 location: WriteLocation::End,
400 result: Some(result),
401 buffer: BytesMut::default(),
402 task: None,
403 }
404 }
405
406 fn close(&mut self, res: Result<(), ErrorCode>) {
407 _ = self.result.take().unwrap().send(res);
408 }
409
410 fn complete_write(&mut self, amt: usize) -> StreamResult {
412 match &mut self.location {
413 WriteLocation::End => StreamResult::Completed,
414 WriteLocation::Offset(offset) => {
415 let Ok(amt) = amt.try_into() else {
416 self.close(Err(ErrorCode::Overflow));
417 return StreamResult::Dropped;
418 };
419 let Some(amt) = offset.checked_add(amt) else {
420 self.close(Err(ErrorCode::Overflow));
421 return StreamResult::Dropped;
422 };
423 *offset = amt;
424 StreamResult::Completed
425 }
426 }
427 }
428}
429
430impl WriteLocation {
431 fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
432 match *self {
433 WriteLocation::End => sys::append_cursor_unspecified(file, bytes),
434 WriteLocation::Offset(at) => sys::write_at_cursor_unspecified(file, bytes, at),
435 }
436 }
437}
438
439impl<D> StreamConsumer<D> for WriteStreamConsumer {
440 type Item = u8;
441
442 fn poll_consume(
443 mut self: Pin<&mut Self>,
444 cx: &mut Context<'_>,
445 store: StoreContextMut<D>,
446 src: Source<Self::Item>,
447 finish: bool,
448 ) -> Poll<wasmtime::Result<StreamResult>> {
449 let mut src = src.as_direct(store);
450 if let Some(file) = self.file.as_blocking_file() {
451 assert!(self.task.is_none());
453 return match self.location.write(file, src.remaining()) {
454 Ok(n) => {
455 src.mark_read(n);
456 Poll::Ready(Ok(self.complete_write(n)))
457 }
458 Err(err) => {
459 self.close(Err(err.into()));
460 Poll::Ready(Ok(StreamResult::Dropped))
461 }
462 };
463 }
464 let me = &mut *self;
465 let task = me.task.get_or_insert_with(|| {
466 debug_assert!(me.buffer.is_empty());
467 me.buffer.extend_from_slice(src.remaining());
468 let buf = mem::take(&mut me.buffer);
469 let file = Arc::clone(me.file.as_file());
470 let location = me.location;
471 spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
472 });
473 let result = match Pin::new(&mut *task).poll(cx) {
474 Poll::Pending if finish => {
478 task.abort();
479 ready!(Pin::new(task).poll(cx))
480 }
481 other => ready!(other),
482 };
483 self.task = None;
484 match result {
485 Ok(Ok((buf, n))) => {
486 src.mark_read(n);
487 self.buffer = buf;
488 self.buffer.clear();
489 Poll::Ready(Ok(self.complete_write(n)))
490 }
491 Ok(Err(err)) => {
492 self.close(Err(err.into()));
493 Poll::Ready(Ok(StreamResult::Dropped))
494 }
495 Err(err) => {
496 if err.is_cancelled() {
497 return Poll::Ready(Ok(StreamResult::Cancelled));
498 }
499 panic!("I/O task should not panic: {err}")
500 }
501 }
502 }
503}
504
505impl Drop for WriteStreamConsumer {
506 fn drop(&mut self) {
507 if self.result.is_some() {
508 self.close(Ok(()))
509 }
510 }
511}
512
513impl types::Host for WasiFilesystemCtxView<'_> {
514 fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
515 error.downcast()
516 }
517}
518
519impl<U> types::HostDescriptorWithStore<U> for WasiFilesystem {
520 fn read_via_stream(
521 mut store: Access<U, Self>,
522 fd: Resource<Descriptor>,
523 offset: Filesize,
524 ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
525 let file = get_file(store.get().table, &fd)?;
526 if !file.perms.contains(FilePerms::READ) {
527 return Ok((
528 StreamReader::new(&mut store, iter::empty())?,
529 FutureReader::new(&mut store, async {
530 wasmtime::error::Ok(Err(ErrorCode::NotPermitted))
531 })?,
532 ));
533 }
534
535 let file = file.clone();
536 let (result_tx, result_rx) = oneshot::channel();
537 Ok((
538 StreamReader::new(
539 &mut store,
540 ReadStreamProducer {
541 file,
542 offset,
543 result: Some(result_tx),
544 task: None,
545 },
546 )?,
547 FutureReader::new(&mut store, result_rx)?,
548 ))
549 }
550
551 fn write_via_stream(
552 mut store: Access<'_, U, Self>,
553 fd: Resource<Descriptor>,
554 mut data: StreamReader<u8>,
555 offset: Filesize,
556 ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
557 let (result_tx, result_rx) = oneshot::channel();
558 match get_file(store.get().table, &fd).and_then(|file| {
559 if !file.perms.contains(FilePerms::WRITE) {
560 Err(ErrorCode::NotPermitted.into())
561 } else {
562 Ok(file.clone())
563 }
564 }) {
565 Ok(file) => {
566 data.pipe(
567 &mut store,
568 WriteStreamConsumer::new_at(file, offset, result_tx),
569 )?;
570 }
571 Err(err) => {
572 data.close(&mut store)?;
573 let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
574 }
575 }
576 FutureReader::new(&mut store, result_rx)
577 }
578
579 fn append_via_stream(
580 mut store: Access<'_, U, Self>,
581 fd: Resource<Descriptor>,
582 mut data: StreamReader<u8>,
583 ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
584 let (result_tx, result_rx) = oneshot::channel();
585 match get_file(store.get().table, &fd).and_then(|file| {
586 if !file.perms.contains(FilePerms::WRITE) {
587 Err(ErrorCode::NotPermitted.into())
588 } else {
589 Ok(file.clone())
590 }
591 }) {
592 Ok(file) => {
593 data.pipe(&mut store, WriteStreamConsumer::new_append(file, result_tx))?;
594 }
595 Err(err) => {
596 data.close(&mut store)?;
597 let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
598 }
599 }
600 FutureReader::new(&mut store, result_rx)
601 }
602
603 async fn advise(
604 store: &Accessor<U, Self>,
605 fd: Resource<Descriptor>,
606 offset: Filesize,
607 length: Filesize,
608 advice: Advice,
609 ) -> FilesystemResult<()> {
610 let file = store.get_file(&fd)?;
611 file.advise(offset, length, advice.into()).await?;
612 Ok(())
613 }
614
615 async fn sync_data(
616 store: &Accessor<U, Self>,
617 fd: Resource<Descriptor>,
618 ) -> FilesystemResult<()> {
619 let fd = store.get_descriptor(&fd)?;
620 fd.sync_data().await?;
621 Ok(())
622 }
623
624 async fn get_flags(
625 store: &Accessor<U, Self>,
626 fd: Resource<Descriptor>,
627 ) -> FilesystemResult<DescriptorFlags> {
628 let fd = store.get_descriptor(&fd)?;
629 let flags = fd.get_flags().await?;
630 Ok(flags.into())
631 }
632
633 async fn get_type(
634 store: &Accessor<U, Self>,
635 fd: Resource<Descriptor>,
636 ) -> FilesystemResult<DescriptorType> {
637 let fd = store.get_descriptor(&fd)?;
638 let ty = fd.get_type().await?;
639 Ok(ty.into())
640 }
641
642 async fn set_size(
643 store: &Accessor<U, Self>,
644 fd: Resource<Descriptor>,
645 size: Filesize,
646 ) -> FilesystemResult<()> {
647 let file = store.get_file(&fd)?;
648 file.set_size(size).await?;
649 Ok(())
650 }
651
652 async fn set_times(
653 store: &Accessor<U, Self>,
654 fd: Resource<Descriptor>,
655 data_access_timestamp: NewTimestamp,
656 data_modification_timestamp: NewTimestamp,
657 ) -> FilesystemResult<()> {
658 let fd = store.get_descriptor(&fd)?;
659 let atim = systemtimespec_from(data_access_timestamp)?;
660 let mtim = systemtimespec_from(data_modification_timestamp)?;
661 fd.set_times(atim, mtim).await?;
662 Ok(())
663 }
664
665 fn read_directory(
666 mut store: Access<'_, U, Self>,
667 fd: Resource<Descriptor>,
668 ) -> wasmtime::Result<(
669 StreamReader<DirectoryEntry>,
670 FutureReader<Result<(), ErrorCode>>,
671 )> {
672 let (result_tx, result_rx) = oneshot::channel();
673 let stream = match get_dir(store.get().table, &fd).and_then(|dir| {
674 if !dir.perms.contains(DirPerms::READ) {
675 Err(ErrorCode::NotPermitted.into())
676 } else {
677 Ok(dir)
678 }
679 }) {
680 Ok(dir) => {
681 let allow_blocking_current_thread = dir.allow_blocking_current_thread;
682 let dir = Arc::clone(dir.as_dir());
683 if allow_blocking_current_thread {
684 match dir.entries() {
685 Ok(readdir) => StreamReader::new(
686 &mut store,
687 FallibleIteratorProducer::new(
688 readdir.filter_map(|e| map_dir_entry(e).transpose()),
689 result_tx,
690 ),
691 )?,
692 Err(e) => {
693 let _ = result_tx.send(Err(e.into()));
694 StreamReader::new(&mut store, iter::empty())?
695 }
696 }
697 } else {
698 StreamReader::new(&mut store, ReadDirStream::new(dir, result_tx))?
699 }
700 }
701 Err(err) => {
702 let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Io)));
703 StreamReader::new(&mut store, iter::empty())?
704 }
705 };
706 Ok((stream, FutureReader::new(&mut store, result_rx)?))
707 }
708
709 async fn sync(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
710 let fd = store.get_descriptor(&fd)?;
711 fd.sync().await?;
712 Ok(())
713 }
714
715 async fn create_directory_at(
716 store: &Accessor<U, Self>,
717 fd: Resource<Descriptor>,
718 path: String,
719 ) -> FilesystemResult<()> {
720 let dir = store.get_dir(&fd)?;
721 dir.create_directory_at(path).await?;
722 Ok(())
723 }
724
725 async fn stat(
726 store: &Accessor<U, Self>,
727 fd: Resource<Descriptor>,
728 ) -> FilesystemResult<DescriptorStat> {
729 let fd = store.get_descriptor(&fd)?;
730 let stat = fd.stat().await?;
731 Ok(stat.into())
732 }
733
734 async fn stat_at(
735 store: &Accessor<U, Self>,
736 fd: Resource<Descriptor>,
737 path_flags: PathFlags,
738 path: String,
739 ) -> FilesystemResult<DescriptorStat> {
740 let dir = store.get_dir(&fd)?;
741 let stat = dir.stat_at(path_flags.into(), path).await?;
742 Ok(stat.into())
743 }
744
745 async fn set_times_at(
746 store: &Accessor<U, Self>,
747 fd: Resource<Descriptor>,
748 path_flags: PathFlags,
749 path: String,
750 data_access_timestamp: NewTimestamp,
751 data_modification_timestamp: NewTimestamp,
752 ) -> FilesystemResult<()> {
753 let dir = store.get_dir(&fd)?;
754 let atim = systemtimespec_from(data_access_timestamp)?;
755 let mtim = systemtimespec_from(data_modification_timestamp)?;
756 dir.set_times_at(path_flags.into(), path, atim, mtim)
757 .await?;
758 Ok(())
759 }
760
761 async fn link_at(
762 store: &Accessor<U, Self>,
763 fd: Resource<Descriptor>,
764 old_path_flags: PathFlags,
765 old_path: String,
766 new_fd: Resource<Descriptor>,
767 new_path: String,
768 ) -> FilesystemResult<()> {
769 let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
770 old_dir
771 .link_at(old_path_flags.into(), old_path, &new_dir, new_path)
772 .await?;
773 Ok(())
774 }
775
776 async fn open_at(
777 store: &Accessor<U, Self>,
778 fd: Resource<Descriptor>,
779 path_flags: PathFlags,
780 path: String,
781 open_flags: OpenFlags,
782 flags: DescriptorFlags,
783 ) -> FilesystemResult<Resource<Descriptor>> {
784 let (allow_blocking_current_thread, dir) = store.with(|mut store| {
785 let store = store.get();
786 let dir = get_dir(&store.table, &fd)?;
787 FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
788 })?;
789 let fd = dir
790 .open_at(
791 path_flags.into(),
792 path,
793 open_flags.into(),
794 flags.into(),
795 allow_blocking_current_thread,
796 )
797 .await?;
798 let fd = store.with(|mut store| store.get().table.push(fd))?;
799 Ok(fd)
800 }
801
802 async fn readlink_at(
803 store: &Accessor<U, Self>,
804 fd: Resource<Descriptor>,
805 path: String,
806 ) -> FilesystemResult<String> {
807 let dir = store.get_dir(&fd)?;
808 let path = dir.readlink_at(path).await?;
809 Ok(path)
810 }
811
812 async fn remove_directory_at(
813 store: &Accessor<U, Self>,
814 fd: Resource<Descriptor>,
815 path: String,
816 ) -> FilesystemResult<()> {
817 let dir = store.get_dir(&fd)?;
818 dir.remove_directory_at(path).await?;
819 Ok(())
820 }
821
822 async fn rename_at(
823 store: &Accessor<U, Self>,
824 fd: Resource<Descriptor>,
825 old_path: String,
826 new_fd: Resource<Descriptor>,
827 new_path: String,
828 ) -> FilesystemResult<()> {
829 let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
830 old_dir.rename_at(old_path, &new_dir, new_path).await?;
831 Ok(())
832 }
833
834 async fn symlink_at(
835 store: &Accessor<U, Self>,
836 fd: Resource<Descriptor>,
837 old_path: String,
838 new_path: String,
839 ) -> FilesystemResult<()> {
840 let dir = store.get_dir(&fd)?;
841 dir.symlink_at(old_path, new_path).await?;
842 Ok(())
843 }
844
845 async fn unlink_file_at(
846 store: &Accessor<U, Self>,
847 fd: Resource<Descriptor>,
848 path: String,
849 ) -> FilesystemResult<()> {
850 let dir = store.get_dir(&fd)?;
851 dir.unlink_file_at(path).await?;
852 Ok(())
853 }
854
855 async fn is_same_object(
856 store: &Accessor<U, Self>,
857 fd: Resource<Descriptor>,
858 other: Resource<Descriptor>,
859 ) -> wasmtime::Result<bool> {
860 let (fd, other) = store.with(|mut store| {
861 let table = store.get().table;
862 let fd = get_descriptor(table, &fd)?.clone();
863 let other = get_descriptor(table, &other)?.clone();
864 wasmtime::error::Ok((fd, other))
865 })?;
866 fd.is_same_object(&other).await
867 }
868
869 async fn metadata_hash(
870 store: &Accessor<U, Self>,
871 fd: Resource<Descriptor>,
872 ) -> FilesystemResult<MetadataHashValue> {
873 let fd = store.get_descriptor(&fd)?;
874 let meta = fd.metadata_hash().await?;
875 Ok(meta.into())
876 }
877
878 async fn metadata_hash_at(
879 store: &Accessor<U, Self>,
880 fd: Resource<Descriptor>,
881 path_flags: PathFlags,
882 path: String,
883 ) -> FilesystemResult<MetadataHashValue> {
884 let dir = store.get_dir(&fd)?;
885 let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
886 Ok(meta.into())
887 }
888}
889
890impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
891 fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
892 self.table
893 .delete(fd)
894 .context("failed to delete descriptor resource from table")?;
895 Ok(())
896 }
897}
898
899impl preopens::Host for WasiFilesystemCtxView<'_> {
900 fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
901 self.get_directories()
902 }
903}