Skip to main content

wasmtime_wasi/p2/
pipe.rs

1//! Virtual pipes.
2//!
3//! These types provide easy implementations of `WasiFile` that mimic much of the behavior of Unix
4//! pipes. These are particularly helpful for redirecting WASI stdio handles to destinations other
5//! than OS files.
6//!
7//! Some convenience constructors are included for common backing types like `Vec<u8>` and `String`,
8//! but the virtual pipes can be instantiated with any `Read` or `Write` type.
9//!
10use bytes::Bytes;
11use std::pin::{Pin, pin};
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll};
14use tokio::io::{self, AsyncRead, AsyncWrite};
15use tokio::sync::mpsc;
16use wasmtime::format_err;
17use wasmtime_wasi_io::{
18    poll::Pollable,
19    streams::{InputStream, OutputStream, StreamError},
20};
21
22pub use crate::p2::write_stream::AsyncWriteStream;
23
24#[derive(Debug, Clone)]
25pub struct MemoryInputPipe {
26    buffer: Arc<Mutex<Bytes>>,
27}
28
29impl MemoryInputPipe {
30    pub fn new(bytes: impl Into<Bytes>) -> Self {
31        Self {
32            buffer: Arc::new(Mutex::new(bytes.into())),
33        }
34    }
35
36    pub fn is_empty(&self) -> bool {
37        self.buffer.lock().unwrap().is_empty()
38    }
39}
40
41#[async_trait::async_trait]
42impl InputStream for MemoryInputPipe {
43    fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
44        let mut buffer = self.buffer.lock().unwrap();
45        if buffer.is_empty() {
46            return Err(StreamError::Closed);
47        }
48
49        let size = size.min(buffer.len());
50        let read = buffer.split_to(size);
51        Ok(read)
52    }
53}
54
55#[async_trait::async_trait]
56impl Pollable for MemoryInputPipe {
57    async fn ready(&mut self) {}
58}
59
60impl AsyncRead for MemoryInputPipe {
61    fn poll_read(
62        self: Pin<&mut Self>,
63        _cx: &mut Context<'_>,
64        buf: &mut io::ReadBuf<'_>,
65    ) -> Poll<io::Result<()>> {
66        let mut buffer = self.buffer.lock().unwrap();
67        let size = buf.remaining().min(buffer.len());
68        let read = buffer.split_to(size);
69        buf.put_slice(&read);
70        Poll::Ready(Ok(()))
71    }
72}
73
74#[derive(Debug, Clone)]
75pub struct MemoryOutputPipe {
76    capacity: usize,
77    buffer: Arc<Mutex<bytes::BytesMut>>,
78}
79
80impl MemoryOutputPipe {
81    pub fn new(capacity: usize) -> Self {
82        MemoryOutputPipe {
83            capacity,
84            buffer: std::sync::Arc::new(std::sync::Mutex::new(bytes::BytesMut::new())),
85        }
86    }
87
88    pub fn contents(&self) -> bytes::Bytes {
89        self.buffer.lock().unwrap().clone().freeze()
90    }
91
92    pub fn try_into_inner(self) -> Option<bytes::BytesMut> {
93        std::sync::Arc::into_inner(self.buffer).map(|m| m.into_inner().unwrap())
94    }
95}
96
97#[async_trait::async_trait]
98impl OutputStream for MemoryOutputPipe {
99    fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
100        let mut buf = self.buffer.lock().unwrap();
101        if bytes.len() > self.capacity - buf.len() {
102            return Err(StreamError::Trap(format_err!(
103                "write beyond capacity of MemoryOutputPipe"
104            )));
105        }
106        buf.extend_from_slice(bytes.as_ref());
107        // Always ready for writing
108        Ok(())
109    }
110    fn flush(&mut self) -> Result<(), StreamError> {
111        // This stream is always flushed
112        Ok(())
113    }
114    fn check_write(&mut self) -> Result<usize, StreamError> {
115        let consumed = self.buffer.lock().unwrap().len();
116        if consumed < self.capacity {
117            Ok(self.capacity - consumed)
118        } else {
119            // Since the buffer is full, no more bytes will ever be written
120            Err(StreamError::Closed)
121        }
122    }
123}
124
125#[async_trait::async_trait]
126impl Pollable for MemoryOutputPipe {
127    async fn ready(&mut self) {}
128}
129
130impl AsyncWrite for MemoryOutputPipe {
131    fn poll_write(
132        self: Pin<&mut Self>,
133        _cx: &mut Context<'_>,
134        buf: &[u8],
135    ) -> Poll<io::Result<usize>> {
136        let mut buffer = self.buffer.lock().unwrap();
137        let amt = buf.len().min(self.capacity - buffer.len());
138        buffer.extend_from_slice(&buf[..amt]);
139        Poll::Ready(Ok(amt))
140    }
141    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
142        Poll::Ready(Ok(()))
143    }
144    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
145        Poll::Ready(Ok(()))
146    }
147}
148
149/// Provides a [`InputStream`] impl from a [`tokio::io::AsyncRead`] impl
150pub struct AsyncReadStream {
151    closed: bool,
152    buffer: Option<Result<Bytes, StreamError>>,
153    receiver: mpsc::Receiver<Result<Bytes, StreamError>>,
154    join_handle: Option<crate::runtime::AbortOnDropJoinHandle<()>>,
155}
156
157impl AsyncReadStream {
158    /// Create a [`AsyncReadStream`]. In order to use the [`InputStream`] impl
159    /// provided by this struct, the argument must impl [`tokio::io::AsyncRead`].
160    pub fn new<T: AsyncRead + Send + 'static>(reader: T) -> Self {
161        let (sender, receiver) = mpsc::channel(1);
162        let join_handle = crate::runtime::spawn(async move {
163            let mut reader = pin!(reader);
164            loop {
165                use tokio::io::AsyncReadExt;
166                let mut buf = bytes::BytesMut::with_capacity(crate::MAX_READ_SIZE_ALLOC);
167                let sent = match reader.read_buf(&mut buf).await {
168                    Ok(nbytes) if nbytes == 0 => sender.send(Err(StreamError::Closed)).await,
169                    Ok(_) => sender.send(Ok(buf.freeze())).await,
170                    Err(e) => {
171                        sender
172                            .send(Err(StreamError::LastOperationFailed(e.into())))
173                            .await
174                    }
175                };
176                if sent.is_err() {
177                    // no more receiver - stop trying to read
178                    break;
179                }
180            }
181        });
182        AsyncReadStream {
183            closed: false,
184            buffer: None,
185            receiver,
186            join_handle: Some(join_handle),
187        }
188    }
189    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
190        if self.buffer.is_some() || self.closed {
191            return Poll::Ready(());
192        }
193        match self.receiver.poll_recv(cx) {
194            Poll::Ready(Some(res)) => {
195                self.buffer = Some(res);
196                Poll::Ready(())
197            }
198            Poll::Ready(None) => {
199                panic!("no more sender for an open AsyncReadStream - should be impossible")
200            }
201            Poll::Pending => Poll::Pending,
202        }
203    }
204}
205
206#[async_trait::async_trait]
207impl InputStream for AsyncReadStream {
208    fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
209        use mpsc::error::TryRecvError;
210
211        match self.buffer.take() {
212            Some(Ok(mut bytes)) => {
213                // TODO: de-duplicate the buffer management with the case below
214                let len = bytes.len().min(size);
215                let rest = bytes.split_off(len);
216                if !rest.is_empty() {
217                    self.buffer = Some(Ok(rest));
218                }
219                return Ok(bytes);
220            }
221            Some(Err(e)) => {
222                self.closed = true;
223                return Err(e);
224            }
225            None => {}
226        }
227
228        match self.receiver.try_recv() {
229            Ok(Ok(mut bytes)) => {
230                let len = bytes.len().min(size);
231                let rest = bytes.split_off(len);
232                if !rest.is_empty() {
233                    self.buffer = Some(Ok(rest));
234                }
235
236                Ok(bytes)
237            }
238            Ok(Err(e)) => {
239                self.closed = true;
240                Err(e)
241            }
242            Err(TryRecvError::Empty) => {
243                if self.closed {
244                    // Note: if the stream is already closed it should return an error,
245                    //       returning empty list would break the wasi contract (returning 0 and ready)
246                    Err(StreamError::Closed)
247                } else {
248                    Ok(Bytes::new())
249                }
250            }
251            Err(TryRecvError::Disconnected) => Err(StreamError::Trap(format_err!(
252                "AsyncReadStream sender died - should be impossible"
253            ))),
254        }
255    }
256
257    async fn cancel(&mut self) {
258        match self.join_handle.take() {
259            Some(task) => _ = task.cancel().await,
260            None => {}
261        }
262    }
263}
264
265#[async_trait::async_trait]
266impl Pollable for AsyncReadStream {
267    async fn ready(&mut self) {
268        std::future::poll_fn(|cx| self.poll_ready(cx)).await
269    }
270}
271
272/// An output stream that consumes all input written to it, and is always ready.
273#[derive(Copy, Clone)]
274pub struct SinkOutputStream;
275
276#[async_trait::async_trait]
277impl OutputStream for SinkOutputStream {
278    fn write(&mut self, _buf: Bytes) -> Result<(), StreamError> {
279        Ok(())
280    }
281    fn flush(&mut self) -> Result<(), StreamError> {
282        // This stream is always flushed
283        Ok(())
284    }
285
286    fn check_write(&mut self) -> Result<usize, StreamError> {
287        // This stream is always ready for writing.
288        Ok(usize::MAX)
289    }
290}
291
292#[async_trait::async_trait]
293impl Pollable for SinkOutputStream {
294    async fn ready(&mut self) {}
295}
296
297/// A stream that is ready immediately, but will always report that it's closed.
298#[derive(Copy, Clone)]
299pub struct ClosedInputStream;
300
301#[async_trait::async_trait]
302impl InputStream for ClosedInputStream {
303    fn read(&mut self, _size: usize) -> Result<Bytes, StreamError> {
304        Err(StreamError::Closed)
305    }
306}
307
308#[async_trait::async_trait]
309impl Pollable for ClosedInputStream {
310    async fn ready(&mut self) {}
311}
312
313/// An output stream that is always closed.
314#[derive(Copy, Clone)]
315pub struct ClosedOutputStream;
316
317#[async_trait::async_trait]
318impl OutputStream for ClosedOutputStream {
319    fn write(&mut self, _: Bytes) -> Result<(), StreamError> {
320        Err(StreamError::Closed)
321    }
322    fn flush(&mut self) -> Result<(), StreamError> {
323        Err(StreamError::Closed)
324    }
325
326    fn check_write(&mut self) -> Result<usize, StreamError> {
327        Err(StreamError::Closed)
328    }
329}
330
331#[async_trait::async_trait]
332impl Pollable for ClosedOutputStream {
333    async fn ready(&mut self) {}
334}
335
336#[cfg(test)]
337mod test {
338    use super::*;
339    use std::time::Duration;
340    use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
341
342    // This is a gross way to handle CI running under qemu for non-x86 architectures.
343    #[cfg(not(target_arch = "x86_64"))]
344    const TEST_ITERATIONS: usize = 10;
345
346    #[cfg(target_arch = "x86_64")]
347    const TEST_ITERATIONS: usize = 100;
348
349    async fn resolves_immediately<F, O>(fut: F) -> O
350    where
351        F: futures::Future<Output = O>,
352    {
353        // The input `fut` should resolve immediately, but in case it
354        // accidentally doesn't don't hang the test indefinitely. Provide a
355        // generous timeout to account for CI sensitivity and various systems.
356        tokio::time::timeout(Duration::from_secs(2), fut)
357            .await
358            .expect("operation timed out")
359    }
360
361    async fn never_resolves<F: futures::Future>(fut: F) {
362        // The input `fut` should never resolve, so only give it a small window
363        // of budget before we time out. If `fut` is actually resolved this
364        // should show up as a flaky test.
365        tokio::time::timeout(Duration::from_millis(10), fut)
366            .await
367            .err()
368            .expect("operation should time out");
369    }
370
371    pub fn simplex(size: usize) -> (impl AsyncRead, impl AsyncWrite) {
372        let (a, b) = tokio::io::duplex(size);
373        let (_read_half, write_half) = tokio::io::split(a);
374        let (read_half, _write_half) = tokio::io::split(b);
375        (read_half, write_half)
376    }
377
378    #[test_log::test(tokio::test(flavor = "multi_thread"))]
379    async fn empty_read_stream() {
380        let mut reader = AsyncReadStream::new(tokio::io::empty());
381
382        // In a multi-threaded context, the value of state is not deterministic -- the spawned
383        // reader task may run on a different thread.
384        match reader.read(10) {
385            // The reader task ran before we tried to read, and noticed that the input was empty.
386            Err(StreamError::Closed) => {}
387
388            // The reader task hasn't run yet. Call `ready` to await and fill the buffer.
389            Ok(bs) => {
390                assert!(bs.is_empty());
391                resolves_immediately(reader.ready()).await;
392                assert!(matches!(reader.read(0), Err(StreamError::Closed)));
393
394                // Try again to make sure it keeps returning `StreamError::Closed`
395                resolves_immediately(reader.ready()).await;
396                assert!(matches!(reader.read(0), Err(StreamError::Closed)));
397                assert!(matches!(reader.read(0), Err(StreamError::Closed)));
398            }
399            res => panic!("unexpected: {res:?}"),
400        }
401    }
402
403    #[test_log::test(tokio::test(flavor = "multi_thread"))]
404    async fn infinite_read_stream() {
405        let mut reader = AsyncReadStream::new(tokio::io::repeat(0));
406
407        let bs = reader.read(10).unwrap();
408        if bs.is_empty() {
409            // Reader task hasn't run yet. Call `ready` to await and fill the buffer.
410            resolves_immediately(reader.ready()).await;
411            // Now a read should succeed
412            let bs = reader.read(10).unwrap();
413            assert_eq!(bs.len(), 10);
414        } else {
415            assert_eq!(bs.len(), 10);
416        }
417
418        // Subsequent reads should succeed
419        let bs = reader.read(10).unwrap();
420        assert_eq!(bs.len(), 10);
421
422        // Even 0-length reads should succeed and show its open
423        let bs = reader.read(0).unwrap();
424        assert_eq!(bs.len(), 0);
425    }
426
427    async fn finite_async_reader(contents: &[u8]) -> impl AsyncRead + Send + 'static + use<> {
428        let (r, mut w) = simplex(contents.len());
429        w.write_all(contents).await.unwrap();
430        r
431    }
432
433    #[test_log::test(tokio::test(flavor = "multi_thread"))]
434    async fn finite_read_stream() {
435        let mut reader = AsyncReadStream::new(finite_async_reader(&[1; 123]).await);
436
437        let bs = reader.read(123).unwrap();
438        if bs.is_empty() {
439            // Reader task hasn't run yet. Call `ready` to await and fill the buffer.
440            resolves_immediately(reader.ready()).await;
441            // Now a read should succeed
442            let bs = reader.read(123).unwrap();
443            assert_eq!(bs.len(), 123);
444        } else {
445            assert_eq!(bs.len(), 123);
446        }
447
448        // The AsyncRead's should be empty now, but we have a race where the reader task hasn't
449        // yet send that to the AsyncReadStream.
450        match reader.read(0) {
451            Err(StreamError::Closed) => {} // Correct!
452            Ok(bs) => {
453                assert!(bs.is_empty());
454                // Need to await to give this side time to catch up
455                resolves_immediately(reader.ready()).await;
456                // Now a read should show closed
457                assert!(matches!(reader.read(0), Err(StreamError::Closed)));
458            }
459            res => panic!("unexpected: {res:?}"),
460        }
461
462        // Make sure it stays closed
463        resolves_immediately(reader.ready()).await;
464        assert!(matches!(reader.read(0), Err(StreamError::Closed)));
465    }
466
467    #[test_log::test(tokio::test(flavor = "multi_thread"))]
468    // Test that you can write items into the stream, and they get read out in the order they were
469    // written, with the proper indications of readiness for reading:
470    async fn multiple_chunks_read_stream() {
471        let (r, mut w) = simplex(1024);
472        let mut reader = AsyncReadStream::new(r);
473
474        w.write_all(&[123]).await.unwrap();
475
476        let bs = reader.read(1).unwrap();
477        if bs.is_empty() {
478            // Reader task hasn't run yet. Call `ready` to await and fill the buffer.
479            resolves_immediately(reader.ready()).await;
480            // Now a read should succeed
481            let bs = reader.read(1).unwrap();
482            assert_eq!(*bs, [123u8]);
483        } else {
484            assert_eq!(*bs, [123u8]);
485        }
486
487        // The stream should be empty and open now:
488        let bs = reader.read(1).unwrap();
489        assert!(bs.is_empty());
490
491        // We can wait on readiness and it will time out:
492        never_resolves(reader.ready()).await;
493
494        // Still open and empty:
495        let bs = reader.read(1).unwrap();
496        assert!(bs.is_empty());
497
498        // Put something else in the stream:
499        w.write_all(&[45]).await.unwrap();
500
501        // Wait readiness (yes we could possibly win the race and read it out faster, leaving that
502        // out of the test for simplicity)
503        resolves_immediately(reader.ready()).await;
504
505        // read the something else back out:
506        let bs = reader.read(1).unwrap();
507        assert_eq!(*bs, [45u8]);
508
509        // nothing else in there:
510        let bs = reader.read(1).unwrap();
511        assert!(bs.is_empty());
512
513        // We can wait on readiness and it will time out:
514        never_resolves(reader.ready()).await;
515
516        // nothing else in there:
517        let bs = reader.read(1).unwrap();
518        assert!(bs.is_empty());
519
520        // Now close the pipe:
521        drop(w);
522
523        // Wait readiness (yes we could possibly win the race and read it out faster, leaving that
524        // out of the test for simplicity)
525        resolves_immediately(reader.ready()).await;
526
527        // empty and now closed:
528        assert!(matches!(reader.read(1), Err(StreamError::Closed)));
529    }
530
531    #[test_log::test(tokio::test(flavor = "multi_thread"))]
532    // At the moment we are restricting AsyncReadStream from buffering more than 4k. This isn't a
533    // suitable design for all applications, and we will probably make a knob or change the
534    // behavior at some point, but this test shows the behavior as it is implemented:
535    async fn backpressure_read_stream() {
536        let (r, mut w) = simplex(4 * crate::MAX_READ_SIZE_ALLOC); // Make sure this buffer isn't a bottleneck
537        let mut reader = AsyncReadStream::new(r);
538
539        let writer_task = tokio::task::spawn(async move {
540            // Write twice as much as we can buffer up in an AsyncReadStream:
541            w.write_all(&[123; 2 * crate::MAX_READ_SIZE_ALLOC])
542                .await
543                .unwrap();
544            w
545        });
546
547        resolves_immediately(reader.ready()).await;
548
549        // Now we expect the reader task has sent 4k from the stream to the reader.
550        // Try to read out one bigger than the buffer available:
551        let bs = reader.read(crate::MAX_READ_SIZE_ALLOC + 1).unwrap();
552        assert_eq!(bs.len(), crate::MAX_READ_SIZE_ALLOC);
553
554        // Allow the crank to turn more:
555        resolves_immediately(reader.ready()).await;
556
557        // Again we expect the reader task has sent 4k from the stream to the reader.
558        // Try to read out one bigger than the buffer available:
559        let bs = reader.read(crate::MAX_READ_SIZE_ALLOC + 1).unwrap();
560        assert_eq!(bs.len(), crate::MAX_READ_SIZE_ALLOC);
561
562        // The writer task is now finished - join with it:
563        let w = resolves_immediately(writer_task).await;
564
565        // And close the pipe:
566        drop(w);
567
568        // Allow the crank to turn more:
569        resolves_immediately(reader.ready()).await;
570
571        // Now we expect the reader to be empty, and the stream.dropd:
572        assert!(matches!(
573            reader.read(crate::MAX_READ_SIZE_ALLOC + 1),
574            Err(StreamError::Closed)
575        ));
576    }
577
578    #[test_log::test(test_log::test(tokio::test(flavor = "multi_thread")))]
579    async fn sink_write_stream() {
580        let mut writer = AsyncWriteStream::new(2048, tokio::io::sink());
581        let chunk = Bytes::from_static(&[0; 1024]);
582
583        let readiness = resolves_immediately(writer.write_ready())
584            .await
585            .expect("write_ready does not trap");
586        assert_eq!(readiness, 2048);
587        // I can write whatever:
588        writer.write(chunk.clone()).expect("write does not error");
589
590        // This may consume 1k of the buffer:
591        let readiness = resolves_immediately(writer.write_ready())
592            .await
593            .expect("write_ready does not trap");
594        assert!(
595            readiness == 1024 || readiness == 2048,
596            "readiness should be 1024 or 2048, got {readiness}"
597        );
598
599        if readiness == 1024 {
600            writer.write(chunk.clone()).expect("write does not error");
601
602            let readiness = resolves_immediately(writer.write_ready())
603                .await
604                .expect("write_ready does not trap");
605            assert!(
606                readiness == 1024 || readiness == 2048,
607                "readiness should be 1024 or 2048, got {readiness}"
608            );
609        }
610    }
611
612    #[test_log::test(tokio::test(flavor = "multi_thread"))]
613    async fn closed_write_stream() {
614        // Run many times because the test is nondeterministic:
615        for n in 0..TEST_ITERATIONS {
616            closed_write_stream_(n).await
617        }
618    }
619    #[tracing::instrument]
620    async fn closed_write_stream_(n: usize) {
621        let (reader, writer) = simplex(1);
622        let mut writer = AsyncWriteStream::new(1024, writer);
623
624        // Drop the reader to allow the worker to transition to the closed state eventually.
625        drop(reader);
626
627        // First the api is going to report the last operation failed, then subsequently
628        // it will be reported as closed. We set this flag once we see LastOperationFailed.
629        let mut should_be_closed = false;
630
631        // Write some data to the stream to ensure we have data that cannot be flushed.
632        let chunk = Bytes::from_static(&[0; 1]);
633        writer
634            .write(chunk.clone())
635            .expect("first write should succeed");
636
637        // The rest of this test should be valid whether or not we check write readiness:
638        let mut write_ready_res = None;
639        if n % 2 == 0 {
640            let r = resolves_immediately(writer.write_ready()).await;
641            // Check write readiness:
642            match r {
643                // worker hasn't processed write yet:
644                Ok(1023) => {}
645                // worker reports failure:
646                Err(StreamError::LastOperationFailed(_)) => {
647                    tracing::debug!("discovered stream failure in first write_ready");
648                    should_be_closed = true;
649                }
650                r => panic!("unexpected write_ready: {r:?}"),
651            }
652            write_ready_res = Some(r);
653        }
654
655        // When we drop the simplex reader, that causes the simplex writer to return BrokenPipe on
656        // its write. Now that the buffering crank has turned, our next write will give BrokenPipe.
657        let flush_res = writer.flush();
658        match flush_res {
659            // worker reports failure:
660            Err(StreamError::LastOperationFailed(_)) => {
661                tracing::debug!("discovered stream failure trying to flush");
662                assert!(!should_be_closed);
663                should_be_closed = true;
664            }
665            // Already reported failure, now closed
666            Err(StreamError::Closed) => {
667                assert!(
668                    should_be_closed,
669                    "expected a LastOperationFailed before we see Closed. {write_ready_res:?}"
670                );
671            }
672            // Also possible the worker hasn't processed write yet:
673            Ok(()) => {}
674            Err(e) => panic!("unexpected flush error: {e:?} {write_ready_res:?}"),
675        }
676
677        // Waiting for the flush to complete should always indicate that the channel has been
678        // closed.
679        match resolves_immediately(writer.write_ready()).await {
680            // worker reports failure:
681            Err(StreamError::LastOperationFailed(_)) => {
682                tracing::debug!("discovered stream failure trying to flush");
683                assert!(!should_be_closed);
684            }
685            // Already reported failure, now closed
686            Err(StreamError::Closed) => {
687                assert!(should_be_closed);
688            }
689            r => {
690                panic!(
691                    "stream should be reported closed by the end of write_ready after flush, got {r:?}. {write_ready_res:?} {flush_res:?}"
692                )
693            }
694        }
695    }
696
697    #[test_log::test(tokio::test(flavor = "multi_thread"))]
698    async fn multiple_chunks_write_stream() {
699        // Run many times because the test is nondeterministic:
700        for n in 0..TEST_ITERATIONS {
701            multiple_chunks_write_stream_aux(n).await
702        }
703    }
704    #[tracing::instrument]
705    async fn multiple_chunks_write_stream_aux(_: usize) {
706        use std::ops::Deref;
707
708        let (mut reader, writer) = simplex(1024);
709        let mut writer = AsyncWriteStream::new(1024, writer);
710
711        // Write a chunk:
712        let chunk = Bytes::from_static(&[123; 1]);
713
714        let permit = resolves_immediately(writer.write_ready())
715            .await
716            .expect("write should be ready");
717        assert_eq!(permit, 1024);
718
719        writer.write(chunk.clone()).expect("write does not trap");
720
721        // At this point the message will either be waiting for the worker to process the write, or
722        // it will be buffered in the simplex channel.
723        let permit = resolves_immediately(writer.write_ready())
724            .await
725            .expect("write should be ready");
726        assert!(matches!(permit, 1023 | 1024));
727
728        let mut read_buf = vec![0; chunk.len()];
729        let read_len = reader.read_exact(&mut read_buf).await.unwrap();
730        assert_eq!(read_len, chunk.len());
731        assert_eq!(read_buf.as_slice(), chunk.deref());
732
733        // Write a second, different chunk:
734        let chunk2 = Bytes::from_static(&[45; 1]);
735
736        // We're only guaranteed to see a consistent write budget if we flush.
737        writer.flush().expect("channel is still alive");
738
739        let permit = resolves_immediately(writer.write_ready())
740            .await
741            .expect("write should be ready");
742        assert_eq!(permit, 1024);
743
744        writer.write(chunk2.clone()).expect("write does not trap");
745
746        // At this point the message will either be waiting for the worker to process the write, or
747        // it will be buffered in the simplex channel.
748        let permit = resolves_immediately(writer.write_ready())
749            .await
750            .expect("write should be ready");
751        assert!(matches!(permit, 1023 | 1024));
752
753        let mut read2_buf = vec![0; chunk2.len()];
754        let read2_len = reader.read_exact(&mut read2_buf).await.unwrap();
755        assert_eq!(read2_len, chunk2.len());
756        assert_eq!(read2_buf.as_slice(), chunk2.deref());
757
758        // We're only guaranteed to see a consistent write budget if we flush.
759        writer.flush().expect("channel is still alive");
760
761        let permit = resolves_immediately(writer.write_ready())
762            .await
763            .expect("write should be ready");
764        assert_eq!(permit, 1024);
765    }
766
767    #[test_log::test(tokio::test(flavor = "multi_thread"))]
768    async fn backpressure_write_stream() {
769        // Run many times because the test is nondeterministic:
770        for n in 0..TEST_ITERATIONS {
771            backpressure_write_stream_aux(n).await
772        }
773    }
774    #[tracing::instrument]
775    async fn backpressure_write_stream_aux(_: usize) {
776        use futures::future::poll_immediate;
777
778        // The channel can buffer up to 1k, plus another 1k in the stream, before not
779        // accepting more input:
780        let (mut reader, writer) = simplex(1024);
781        let mut writer = AsyncWriteStream::new(1024, writer);
782
783        let chunk = Bytes::from_static(&[0; 1024]);
784
785        let permit = resolves_immediately(writer.write_ready())
786            .await
787            .expect("write should be ready");
788        assert_eq!(permit, 1024);
789
790        writer.write(chunk.clone()).expect("write succeeds");
791
792        // We might still be waiting for the worker to process the message, or the worker may have
793        // processed it and released all the budget back to us.
794        let permit = poll_immediate(writer.write_ready()).await;
795        assert!(matches!(permit, None | Some(Ok(1024))));
796
797        // Given a little time, the worker will process the message and release all the budget
798        // back.
799        let permit = resolves_immediately(writer.write_ready())
800            .await
801            .expect("write should be ready");
802        assert_eq!(permit, 1024);
803
804        // Now fill the buffer between here and the writer task. This should always indicate
805        // back-pressure because now both buffers (simplex and worker) are full.
806        writer.write(chunk.clone()).expect("write does not trap");
807
808        // Try shoving even more down there, and it shouldn't accept more input:
809        writer
810            .write(chunk.clone())
811            .err()
812            .expect("unpermitted write does trap");
813
814        // No amount of waiting will resolve the situation, as nothing is emptying the simplex
815        // buffer.
816        never_resolves(writer.write_ready()).await;
817
818        // There is 2k buffered between the simplex and worker buffers. I should be able to read
819        // all of it out:
820        let mut buf = [0; 2048];
821        reader.read_exact(&mut buf).await.unwrap();
822
823        // and no more:
824        never_resolves(reader.read(&mut buf)).await;
825
826        // Now the backpressure should be cleared, and an additional write should be accepted.
827        let permit = resolves_immediately(writer.write_ready())
828            .await
829            .expect("ready is ok");
830        assert_eq!(permit, 1024);
831
832        // and the write succeeds:
833        writer.write(chunk.clone()).expect("write does not trap");
834    }
835
836    #[test_log::test(tokio::test(flavor = "multi_thread"))]
837    async fn backpressure_write_stream_with_flush() {
838        for n in 0..TEST_ITERATIONS {
839            backpressure_write_stream_with_flush_aux(n).await;
840        }
841    }
842
843    async fn backpressure_write_stream_with_flush_aux(_: usize) {
844        // The channel can buffer up to 1k, plus another 1k in the stream, before not
845        // accepting more input:
846        let (mut reader, writer) = simplex(1024);
847        let mut writer = AsyncWriteStream::new(1024, writer);
848
849        let chunk = Bytes::from_static(&[0; 1024]);
850
851        let permit = resolves_immediately(writer.write_ready())
852            .await
853            .expect("write should be ready");
854        assert_eq!(permit, 1024);
855
856        writer.write(chunk.clone()).expect("write succeeds");
857
858        writer.flush().expect("flush succeeds");
859
860        // Waiting for write_ready to resolve after a flush should always show that we have the
861        // full budget available, as the message will have flushed to the simplex channel.
862        let permit = resolves_immediately(writer.write_ready())
863            .await
864            .expect("write_ready succeeds");
865        assert_eq!(permit, 1024);
866
867        // Write enough to fill the simplex buffer:
868        writer.write(chunk.clone()).expect("write does not trap");
869
870        // Writes should be refused until this flush succeeds.
871        writer.flush().expect("flush succeeds");
872
873        // Try shoving even more down there, and it shouldn't accept more input:
874        writer
875            .write(chunk.clone())
876            .err()
877            .expect("unpermitted write does trap");
878
879        // No amount of waiting will resolve the situation, as nothing is emptying the simplex
880        // buffer.
881        never_resolves(writer.write_ready()).await;
882
883        // There is 2k buffered between the simplex and worker buffers. I should be able to read
884        // all of it out:
885        let mut buf = [0; 2048];
886        reader.read_exact(&mut buf).await.unwrap();
887
888        // and no more:
889        never_resolves(reader.read(&mut buf)).await;
890
891        // Now the backpressure should be cleared, and an additional write should be accepted.
892        let permit = resolves_immediately(writer.write_ready())
893            .await
894            .expect("ready is ok");
895        assert_eq!(permit, 1024);
896
897        // and the write succeeds:
898        writer.write(chunk.clone()).expect("write does not trap");
899
900        writer.flush().expect("flush succeeds");
901
902        let permit = resolves_immediately(writer.write_ready())
903            .await
904            .expect("ready is ok");
905        assert_eq!(permit, 1024);
906    }
907}