1use bytes::Bytes;
11use std::pin::{Pin, pin};
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll};
14use tokio::io::{self, AsyncRead, AsyncWrite};
15use tokio::sync::mpsc;
16use wasmtime::format_err;
17use wasmtime_wasi_io::{
18 poll::Pollable,
19 streams::{InputStream, OutputStream, StreamError},
20};
21
22pub use crate::p2::write_stream::AsyncWriteStream;
23
24#[derive(Debug, Clone)]
25pub struct MemoryInputPipe {
26 buffer: Arc<Mutex<Bytes>>,
27}
28
29impl MemoryInputPipe {
30 pub fn new(bytes: impl Into<Bytes>) -> Self {
31 Self {
32 buffer: Arc::new(Mutex::new(bytes.into())),
33 }
34 }
35
36 pub fn is_empty(&self) -> bool {
37 self.buffer.lock().unwrap().is_empty()
38 }
39}
40
41#[async_trait::async_trait]
42impl InputStream for MemoryInputPipe {
43 fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
44 let mut buffer = self.buffer.lock().unwrap();
45 if buffer.is_empty() {
46 return Err(StreamError::Closed);
47 }
48
49 let size = size.min(buffer.len());
50 let read = buffer.split_to(size);
51 Ok(read)
52 }
53}
54
55#[async_trait::async_trait]
56impl Pollable for MemoryInputPipe {
57 async fn ready(&mut self) {}
58}
59
60impl AsyncRead for MemoryInputPipe {
61 fn poll_read(
62 self: Pin<&mut Self>,
63 _cx: &mut Context<'_>,
64 buf: &mut io::ReadBuf<'_>,
65 ) -> Poll<io::Result<()>> {
66 let mut buffer = self.buffer.lock().unwrap();
67 let size = buf.remaining().min(buffer.len());
68 let read = buffer.split_to(size);
69 buf.put_slice(&read);
70 Poll::Ready(Ok(()))
71 }
72}
73
74#[derive(Debug, Clone)]
75pub struct MemoryOutputPipe {
76 capacity: usize,
77 buffer: Arc<Mutex<bytes::BytesMut>>,
78}
79
80impl MemoryOutputPipe {
81 pub fn new(capacity: usize) -> Self {
82 MemoryOutputPipe {
83 capacity,
84 buffer: std::sync::Arc::new(std::sync::Mutex::new(bytes::BytesMut::new())),
85 }
86 }
87
88 pub fn contents(&self) -> bytes::Bytes {
89 self.buffer.lock().unwrap().clone().freeze()
90 }
91
92 pub fn try_into_inner(self) -> Option<bytes::BytesMut> {
93 std::sync::Arc::into_inner(self.buffer).map(|m| m.into_inner().unwrap())
94 }
95}
96
97#[async_trait::async_trait]
98impl OutputStream for MemoryOutputPipe {
99 fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
100 let mut buf = self.buffer.lock().unwrap();
101 if bytes.len() > self.capacity - buf.len() {
102 return Err(StreamError::Trap(format_err!(
103 "write beyond capacity of MemoryOutputPipe"
104 )));
105 }
106 buf.extend_from_slice(bytes.as_ref());
107 Ok(())
109 }
110 fn flush(&mut self) -> Result<(), StreamError> {
111 Ok(())
113 }
114 fn check_write(&mut self) -> Result<usize, StreamError> {
115 let consumed = self.buffer.lock().unwrap().len();
116 if consumed < self.capacity {
117 Ok(self.capacity - consumed)
118 } else {
119 Err(StreamError::Closed)
121 }
122 }
123}
124
125#[async_trait::async_trait]
126impl Pollable for MemoryOutputPipe {
127 async fn ready(&mut self) {}
128}
129
130impl AsyncWrite for MemoryOutputPipe {
131 fn poll_write(
132 self: Pin<&mut Self>,
133 _cx: &mut Context<'_>,
134 buf: &[u8],
135 ) -> Poll<io::Result<usize>> {
136 let mut buffer = self.buffer.lock().unwrap();
137 let amt = buf.len().min(self.capacity - buffer.len());
138 buffer.extend_from_slice(&buf[..amt]);
139 Poll::Ready(Ok(amt))
140 }
141 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
142 Poll::Ready(Ok(()))
143 }
144 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
145 Poll::Ready(Ok(()))
146 }
147}
148
149pub struct AsyncReadStream {
151 closed: bool,
152 buffer: Option<Result<Bytes, StreamError>>,
153 receiver: mpsc::Receiver<Result<Bytes, StreamError>>,
154 join_handle: Option<crate::runtime::AbortOnDropJoinHandle<()>>,
155}
156
157impl AsyncReadStream {
158 pub fn new<T: AsyncRead + Send + 'static>(reader: T) -> Self {
161 let (sender, receiver) = mpsc::channel(1);
162 let join_handle = crate::runtime::spawn(async move {
163 let mut reader = pin!(reader);
164 loop {
165 use tokio::io::AsyncReadExt;
166 let mut buf = bytes::BytesMut::with_capacity(crate::MAX_READ_SIZE_ALLOC);
167 let sent = match reader.read_buf(&mut buf).await {
168 Ok(nbytes) if nbytes == 0 => sender.send(Err(StreamError::Closed)).await,
169 Ok(_) => sender.send(Ok(buf.freeze())).await,
170 Err(e) => {
171 sender
172 .send(Err(StreamError::LastOperationFailed(e.into())))
173 .await
174 }
175 };
176 if sent.is_err() {
177 break;
179 }
180 }
181 });
182 AsyncReadStream {
183 closed: false,
184 buffer: None,
185 receiver,
186 join_handle: Some(join_handle),
187 }
188 }
189 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
190 if self.buffer.is_some() || self.closed {
191 return Poll::Ready(());
192 }
193 match self.receiver.poll_recv(cx) {
194 Poll::Ready(Some(res)) => {
195 self.buffer = Some(res);
196 Poll::Ready(())
197 }
198 Poll::Ready(None) => {
199 panic!("no more sender for an open AsyncReadStream - should be impossible")
200 }
201 Poll::Pending => Poll::Pending,
202 }
203 }
204}
205
206#[async_trait::async_trait]
207impl InputStream for AsyncReadStream {
208 fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
209 use mpsc::error::TryRecvError;
210
211 match self.buffer.take() {
212 Some(Ok(mut bytes)) => {
213 let len = bytes.len().min(size);
215 let rest = bytes.split_off(len);
216 if !rest.is_empty() {
217 self.buffer = Some(Ok(rest));
218 }
219 return Ok(bytes);
220 }
221 Some(Err(e)) => {
222 self.closed = true;
223 return Err(e);
224 }
225 None => {}
226 }
227
228 match self.receiver.try_recv() {
229 Ok(Ok(mut bytes)) => {
230 let len = bytes.len().min(size);
231 let rest = bytes.split_off(len);
232 if !rest.is_empty() {
233 self.buffer = Some(Ok(rest));
234 }
235
236 Ok(bytes)
237 }
238 Ok(Err(e)) => {
239 self.closed = true;
240 Err(e)
241 }
242 Err(TryRecvError::Empty) => {
243 if self.closed {
244 Err(StreamError::Closed)
247 } else {
248 Ok(Bytes::new())
249 }
250 }
251 Err(TryRecvError::Disconnected) => Err(StreamError::Trap(format_err!(
252 "AsyncReadStream sender died - should be impossible"
253 ))),
254 }
255 }
256
257 async fn cancel(&mut self) {
258 match self.join_handle.take() {
259 Some(task) => _ = task.cancel().await,
260 None => {}
261 }
262 }
263}
264
265#[async_trait::async_trait]
266impl Pollable for AsyncReadStream {
267 async fn ready(&mut self) {
268 std::future::poll_fn(|cx| self.poll_ready(cx)).await
269 }
270}
271
272#[derive(Copy, Clone)]
274pub struct SinkOutputStream;
275
276#[async_trait::async_trait]
277impl OutputStream for SinkOutputStream {
278 fn write(&mut self, _buf: Bytes) -> Result<(), StreamError> {
279 Ok(())
280 }
281 fn flush(&mut self) -> Result<(), StreamError> {
282 Ok(())
284 }
285
286 fn check_write(&mut self) -> Result<usize, StreamError> {
287 Ok(usize::MAX)
289 }
290}
291
292#[async_trait::async_trait]
293impl Pollable for SinkOutputStream {
294 async fn ready(&mut self) {}
295}
296
297#[derive(Copy, Clone)]
299pub struct ClosedInputStream;
300
301#[async_trait::async_trait]
302impl InputStream for ClosedInputStream {
303 fn read(&mut self, _size: usize) -> Result<Bytes, StreamError> {
304 Err(StreamError::Closed)
305 }
306}
307
308#[async_trait::async_trait]
309impl Pollable for ClosedInputStream {
310 async fn ready(&mut self) {}
311}
312
313#[derive(Copy, Clone)]
315pub struct ClosedOutputStream;
316
317#[async_trait::async_trait]
318impl OutputStream for ClosedOutputStream {
319 fn write(&mut self, _: Bytes) -> Result<(), StreamError> {
320 Err(StreamError::Closed)
321 }
322 fn flush(&mut self) -> Result<(), StreamError> {
323 Err(StreamError::Closed)
324 }
325
326 fn check_write(&mut self) -> Result<usize, StreamError> {
327 Err(StreamError::Closed)
328 }
329}
330
331#[async_trait::async_trait]
332impl Pollable for ClosedOutputStream {
333 async fn ready(&mut self) {}
334}
335
336#[cfg(test)]
337mod test {
338 use super::*;
339 use std::time::Duration;
340 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
341
342 #[cfg(not(target_arch = "x86_64"))]
344 const TEST_ITERATIONS: usize = 10;
345
346 #[cfg(target_arch = "x86_64")]
347 const TEST_ITERATIONS: usize = 100;
348
349 async fn resolves_immediately<F, O>(fut: F) -> O
350 where
351 F: futures::Future<Output = O>,
352 {
353 tokio::time::timeout(Duration::from_secs(2), fut)
357 .await
358 .expect("operation timed out")
359 }
360
361 async fn never_resolves<F: futures::Future>(fut: F) {
362 tokio::time::timeout(Duration::from_millis(10), fut)
366 .await
367 .err()
368 .expect("operation should time out");
369 }
370
371 pub fn simplex(size: usize) -> (impl AsyncRead, impl AsyncWrite) {
372 let (a, b) = tokio::io::duplex(size);
373 let (_read_half, write_half) = tokio::io::split(a);
374 let (read_half, _write_half) = tokio::io::split(b);
375 (read_half, write_half)
376 }
377
378 #[test_log::test(tokio::test(flavor = "multi_thread"))]
379 async fn empty_read_stream() {
380 let mut reader = AsyncReadStream::new(tokio::io::empty());
381
382 match reader.read(10) {
385 Err(StreamError::Closed) => {}
387
388 Ok(bs) => {
390 assert!(bs.is_empty());
391 resolves_immediately(reader.ready()).await;
392 assert!(matches!(reader.read(0), Err(StreamError::Closed)));
393
394 resolves_immediately(reader.ready()).await;
396 assert!(matches!(reader.read(0), Err(StreamError::Closed)));
397 assert!(matches!(reader.read(0), Err(StreamError::Closed)));
398 }
399 res => panic!("unexpected: {res:?}"),
400 }
401 }
402
403 #[test_log::test(tokio::test(flavor = "multi_thread"))]
404 async fn infinite_read_stream() {
405 let mut reader = AsyncReadStream::new(tokio::io::repeat(0));
406
407 let bs = reader.read(10).unwrap();
408 if bs.is_empty() {
409 resolves_immediately(reader.ready()).await;
411 let bs = reader.read(10).unwrap();
413 assert_eq!(bs.len(), 10);
414 } else {
415 assert_eq!(bs.len(), 10);
416 }
417
418 let bs = reader.read(10).unwrap();
420 assert_eq!(bs.len(), 10);
421
422 let bs = reader.read(0).unwrap();
424 assert_eq!(bs.len(), 0);
425 }
426
427 async fn finite_async_reader(contents: &[u8]) -> impl AsyncRead + Send + 'static + use<> {
428 let (r, mut w) = simplex(contents.len());
429 w.write_all(contents).await.unwrap();
430 r
431 }
432
433 #[test_log::test(tokio::test(flavor = "multi_thread"))]
434 async fn finite_read_stream() {
435 let mut reader = AsyncReadStream::new(finite_async_reader(&[1; 123]).await);
436
437 let bs = reader.read(123).unwrap();
438 if bs.is_empty() {
439 resolves_immediately(reader.ready()).await;
441 let bs = reader.read(123).unwrap();
443 assert_eq!(bs.len(), 123);
444 } else {
445 assert_eq!(bs.len(), 123);
446 }
447
448 match reader.read(0) {
451 Err(StreamError::Closed) => {} Ok(bs) => {
453 assert!(bs.is_empty());
454 resolves_immediately(reader.ready()).await;
456 assert!(matches!(reader.read(0), Err(StreamError::Closed)));
458 }
459 res => panic!("unexpected: {res:?}"),
460 }
461
462 resolves_immediately(reader.ready()).await;
464 assert!(matches!(reader.read(0), Err(StreamError::Closed)));
465 }
466
467 #[test_log::test(tokio::test(flavor = "multi_thread"))]
468 async fn multiple_chunks_read_stream() {
471 let (r, mut w) = simplex(1024);
472 let mut reader = AsyncReadStream::new(r);
473
474 w.write_all(&[123]).await.unwrap();
475
476 let bs = reader.read(1).unwrap();
477 if bs.is_empty() {
478 resolves_immediately(reader.ready()).await;
480 let bs = reader.read(1).unwrap();
482 assert_eq!(*bs, [123u8]);
483 } else {
484 assert_eq!(*bs, [123u8]);
485 }
486
487 let bs = reader.read(1).unwrap();
489 assert!(bs.is_empty());
490
491 never_resolves(reader.ready()).await;
493
494 let bs = reader.read(1).unwrap();
496 assert!(bs.is_empty());
497
498 w.write_all(&[45]).await.unwrap();
500
501 resolves_immediately(reader.ready()).await;
504
505 let bs = reader.read(1).unwrap();
507 assert_eq!(*bs, [45u8]);
508
509 let bs = reader.read(1).unwrap();
511 assert!(bs.is_empty());
512
513 never_resolves(reader.ready()).await;
515
516 let bs = reader.read(1).unwrap();
518 assert!(bs.is_empty());
519
520 drop(w);
522
523 resolves_immediately(reader.ready()).await;
526
527 assert!(matches!(reader.read(1), Err(StreamError::Closed)));
529 }
530
531 #[test_log::test(tokio::test(flavor = "multi_thread"))]
532 async fn backpressure_read_stream() {
536 let (r, mut w) = simplex(4 * crate::MAX_READ_SIZE_ALLOC); let mut reader = AsyncReadStream::new(r);
538
539 let writer_task = tokio::task::spawn(async move {
540 w.write_all(&[123; 2 * crate::MAX_READ_SIZE_ALLOC])
542 .await
543 .unwrap();
544 w
545 });
546
547 resolves_immediately(reader.ready()).await;
548
549 let bs = reader.read(crate::MAX_READ_SIZE_ALLOC + 1).unwrap();
552 assert_eq!(bs.len(), crate::MAX_READ_SIZE_ALLOC);
553
554 resolves_immediately(reader.ready()).await;
556
557 let bs = reader.read(crate::MAX_READ_SIZE_ALLOC + 1).unwrap();
560 assert_eq!(bs.len(), crate::MAX_READ_SIZE_ALLOC);
561
562 let w = resolves_immediately(writer_task).await;
564
565 drop(w);
567
568 resolves_immediately(reader.ready()).await;
570
571 assert!(matches!(
573 reader.read(crate::MAX_READ_SIZE_ALLOC + 1),
574 Err(StreamError::Closed)
575 ));
576 }
577
578 #[test_log::test(test_log::test(tokio::test(flavor = "multi_thread")))]
579 async fn sink_write_stream() {
580 let mut writer = AsyncWriteStream::new(2048, tokio::io::sink());
581 let chunk = Bytes::from_static(&[0; 1024]);
582
583 let readiness = resolves_immediately(writer.write_ready())
584 .await
585 .expect("write_ready does not trap");
586 assert_eq!(readiness, 2048);
587 writer.write(chunk.clone()).expect("write does not error");
589
590 let readiness = resolves_immediately(writer.write_ready())
592 .await
593 .expect("write_ready does not trap");
594 assert!(
595 readiness == 1024 || readiness == 2048,
596 "readiness should be 1024 or 2048, got {readiness}"
597 );
598
599 if readiness == 1024 {
600 writer.write(chunk.clone()).expect("write does not error");
601
602 let readiness = resolves_immediately(writer.write_ready())
603 .await
604 .expect("write_ready does not trap");
605 assert!(
606 readiness == 1024 || readiness == 2048,
607 "readiness should be 1024 or 2048, got {readiness}"
608 );
609 }
610 }
611
612 #[test_log::test(tokio::test(flavor = "multi_thread"))]
613 async fn closed_write_stream() {
614 for n in 0..TEST_ITERATIONS {
616 closed_write_stream_(n).await
617 }
618 }
619 #[tracing::instrument]
620 async fn closed_write_stream_(n: usize) {
621 let (reader, writer) = simplex(1);
622 let mut writer = AsyncWriteStream::new(1024, writer);
623
624 drop(reader);
626
627 let mut should_be_closed = false;
630
631 let chunk = Bytes::from_static(&[0; 1]);
633 writer
634 .write(chunk.clone())
635 .expect("first write should succeed");
636
637 let mut write_ready_res = None;
639 if n % 2 == 0 {
640 let r = resolves_immediately(writer.write_ready()).await;
641 match r {
643 Ok(1023) => {}
645 Err(StreamError::LastOperationFailed(_)) => {
647 tracing::debug!("discovered stream failure in first write_ready");
648 should_be_closed = true;
649 }
650 r => panic!("unexpected write_ready: {r:?}"),
651 }
652 write_ready_res = Some(r);
653 }
654
655 let flush_res = writer.flush();
658 match flush_res {
659 Err(StreamError::LastOperationFailed(_)) => {
661 tracing::debug!("discovered stream failure trying to flush");
662 assert!(!should_be_closed);
663 should_be_closed = true;
664 }
665 Err(StreamError::Closed) => {
667 assert!(
668 should_be_closed,
669 "expected a LastOperationFailed before we see Closed. {write_ready_res:?}"
670 );
671 }
672 Ok(()) => {}
674 Err(e) => panic!("unexpected flush error: {e:?} {write_ready_res:?}"),
675 }
676
677 match resolves_immediately(writer.write_ready()).await {
680 Err(StreamError::LastOperationFailed(_)) => {
682 tracing::debug!("discovered stream failure trying to flush");
683 assert!(!should_be_closed);
684 }
685 Err(StreamError::Closed) => {
687 assert!(should_be_closed);
688 }
689 r => {
690 panic!(
691 "stream should be reported closed by the end of write_ready after flush, got {r:?}. {write_ready_res:?} {flush_res:?}"
692 )
693 }
694 }
695 }
696
697 #[test_log::test(tokio::test(flavor = "multi_thread"))]
698 async fn multiple_chunks_write_stream() {
699 for n in 0..TEST_ITERATIONS {
701 multiple_chunks_write_stream_aux(n).await
702 }
703 }
704 #[tracing::instrument]
705 async fn multiple_chunks_write_stream_aux(_: usize) {
706 use std::ops::Deref;
707
708 let (mut reader, writer) = simplex(1024);
709 let mut writer = AsyncWriteStream::new(1024, writer);
710
711 let chunk = Bytes::from_static(&[123; 1]);
713
714 let permit = resolves_immediately(writer.write_ready())
715 .await
716 .expect("write should be ready");
717 assert_eq!(permit, 1024);
718
719 writer.write(chunk.clone()).expect("write does not trap");
720
721 let permit = resolves_immediately(writer.write_ready())
724 .await
725 .expect("write should be ready");
726 assert!(matches!(permit, 1023 | 1024));
727
728 let mut read_buf = vec![0; chunk.len()];
729 let read_len = reader.read_exact(&mut read_buf).await.unwrap();
730 assert_eq!(read_len, chunk.len());
731 assert_eq!(read_buf.as_slice(), chunk.deref());
732
733 let chunk2 = Bytes::from_static(&[45; 1]);
735
736 writer.flush().expect("channel is still alive");
738
739 let permit = resolves_immediately(writer.write_ready())
740 .await
741 .expect("write should be ready");
742 assert_eq!(permit, 1024);
743
744 writer.write(chunk2.clone()).expect("write does not trap");
745
746 let permit = resolves_immediately(writer.write_ready())
749 .await
750 .expect("write should be ready");
751 assert!(matches!(permit, 1023 | 1024));
752
753 let mut read2_buf = vec![0; chunk2.len()];
754 let read2_len = reader.read_exact(&mut read2_buf).await.unwrap();
755 assert_eq!(read2_len, chunk2.len());
756 assert_eq!(read2_buf.as_slice(), chunk2.deref());
757
758 writer.flush().expect("channel is still alive");
760
761 let permit = resolves_immediately(writer.write_ready())
762 .await
763 .expect("write should be ready");
764 assert_eq!(permit, 1024);
765 }
766
767 #[test_log::test(tokio::test(flavor = "multi_thread"))]
768 async fn backpressure_write_stream() {
769 for n in 0..TEST_ITERATIONS {
771 backpressure_write_stream_aux(n).await
772 }
773 }
774 #[tracing::instrument]
775 async fn backpressure_write_stream_aux(_: usize) {
776 use futures::future::poll_immediate;
777
778 let (mut reader, writer) = simplex(1024);
781 let mut writer = AsyncWriteStream::new(1024, writer);
782
783 let chunk = Bytes::from_static(&[0; 1024]);
784
785 let permit = resolves_immediately(writer.write_ready())
786 .await
787 .expect("write should be ready");
788 assert_eq!(permit, 1024);
789
790 writer.write(chunk.clone()).expect("write succeeds");
791
792 let permit = poll_immediate(writer.write_ready()).await;
795 assert!(matches!(permit, None | Some(Ok(1024))));
796
797 let permit = resolves_immediately(writer.write_ready())
800 .await
801 .expect("write should be ready");
802 assert_eq!(permit, 1024);
803
804 writer.write(chunk.clone()).expect("write does not trap");
807
808 writer
810 .write(chunk.clone())
811 .err()
812 .expect("unpermitted write does trap");
813
814 never_resolves(writer.write_ready()).await;
817
818 let mut buf = [0; 2048];
821 reader.read_exact(&mut buf).await.unwrap();
822
823 never_resolves(reader.read(&mut buf)).await;
825
826 let permit = resolves_immediately(writer.write_ready())
828 .await
829 .expect("ready is ok");
830 assert_eq!(permit, 1024);
831
832 writer.write(chunk.clone()).expect("write does not trap");
834 }
835
836 #[test_log::test(tokio::test(flavor = "multi_thread"))]
837 async fn backpressure_write_stream_with_flush() {
838 for n in 0..TEST_ITERATIONS {
839 backpressure_write_stream_with_flush_aux(n).await;
840 }
841 }
842
843 async fn backpressure_write_stream_with_flush_aux(_: usize) {
844 let (mut reader, writer) = simplex(1024);
847 let mut writer = AsyncWriteStream::new(1024, writer);
848
849 let chunk = Bytes::from_static(&[0; 1024]);
850
851 let permit = resolves_immediately(writer.write_ready())
852 .await
853 .expect("write should be ready");
854 assert_eq!(permit, 1024);
855
856 writer.write(chunk.clone()).expect("write succeeds");
857
858 writer.flush().expect("flush succeeds");
859
860 let permit = resolves_immediately(writer.write_ready())
863 .await
864 .expect("write_ready succeeds");
865 assert_eq!(permit, 1024);
866
867 writer.write(chunk.clone()).expect("write does not trap");
869
870 writer.flush().expect("flush succeeds");
872
873 writer
875 .write(chunk.clone())
876 .err()
877 .expect("unpermitted write does trap");
878
879 never_resolves(writer.write_ready()).await;
882
883 let mut buf = [0; 2048];
886 reader.read_exact(&mut buf).await.unwrap();
887
888 never_resolves(reader.read(&mut buf)).await;
890
891 let permit = resolves_immediately(writer.write_ready())
893 .await
894 .expect("ready is ok");
895 assert_eq!(permit, 1024);
896
897 writer.write(chunk.clone()).expect("write does not trap");
899
900 writer.flush().expect("flush succeeds");
901
902 let permit = resolves_immediately(writer.write_ready())
903 .await
904 .expect("ready is ok");
905 assert_eq!(permit, 1024);
906 }
907}