wasmtime/runtime/component/concurrent/futures_and_streams/
buffers.rs1#[cfg(feature = "component-model-async-bytes")]
2use bytes::{Bytes, BytesMut};
3#[cfg(feature = "component-model-async-bytes")]
4use std::io::Cursor;
5use std::mem::{self, MaybeUninit};
6use std::slice;
7use std::vec::Vec;
8
9pub use untyped::*;
12mod untyped {
13    use super::WriteBuffer;
14    use crate::vm::SendSyncPtr;
15    use std::any::TypeId;
16    use std::marker;
17    use std::mem;
18    use std::ptr::NonNull;
19
20    pub struct UntypedWriteBuffer<'a> {
29        element_type_id: TypeId,
30        buf: SendSyncPtr<dyn WriteBuffer<()>>,
31        _marker: marker::PhantomData<&'a mut dyn WriteBuffer<()>>,
32    }
33
34    union ReinterpretWriteBuffer<T> {
37        typed: *mut dyn WriteBuffer<T>,
38        untyped: *mut dyn WriteBuffer<()>,
39    }
40
41    impl<'a> UntypedWriteBuffer<'a> {
42        pub fn new<T: 'static>(buf: &'a mut dyn WriteBuffer<T>) -> UntypedWriteBuffer<'a> {
47            UntypedWriteBuffer {
48                element_type_id: TypeId::of::<T>(),
49                buf: SendSyncPtr::new(
54                    NonNull::new(unsafe {
55                        let r = ReinterpretWriteBuffer { typed: buf };
56                        assert_eq!(mem::size_of_val(&r.typed), mem::size_of_val(&r.untyped));
57                        r.untyped
58                    })
59                    .unwrap(),
60                ),
61                _marker: marker::PhantomData,
62            }
63        }
64
65        pub fn get_mut<T: 'static>(&mut self) -> &mut dyn WriteBuffer<T> {
71            assert_eq!(self.element_type_id, TypeId::of::<T>());
72            unsafe {
77                &mut *ReinterpretWriteBuffer {
78                    untyped: self.buf.as_ptr(),
79                }
80                .typed
81            }
82        }
83    }
84}
85
86pub unsafe trait WriteBuffer<T>: Send + Sync + 'static {
97    fn remaining(&self) -> &[T];
99
100    fn skip(&mut self, count: usize);
102
103    fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>]));
116}
117
118pub trait ReadBuffer<T>: Send + Sync + 'static {
122    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I);
124
125    fn remaining_capacity(&self) -> usize;
127
128    fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize);
138}
139
140pub(super) struct Extender<'a, B>(pub(super) &'a mut B);
141
142impl<T, B: ReadBuffer<T>> Extend<T> for Extender<'_, B> {
143    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
144        self.0.extend(iter)
145    }
146}
147
148unsafe impl<T: Send + Sync + 'static> WriteBuffer<T> for Option<T> {
151    fn remaining(&self) -> &[T] {
152        if let Some(me) = self {
153            slice::from_ref(me)
154        } else {
155            &[]
156        }
157    }
158
159    fn skip(&mut self, count: usize) {
160        match count {
161            0 => {}
162            1 => {
163                assert!(self.is_some());
164                *self = None;
165            }
166            _ => panic!("cannot skip more than {} item(s)", self.remaining().len()),
167        }
168    }
169
170    fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>])) {
171        match count {
172            0 => fun(&mut []),
173            1 => {
174                let mut item = MaybeUninit::new(self.take().unwrap());
175                fun(slice::from_mut(&mut item));
176            }
177            _ => panic!("cannot forget more than {} item(s)", self.remaining().len()),
178        }
179    }
180}
181
182impl<T: Send + Sync + 'static> ReadBuffer<T> for Option<T> {
183    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
184        let mut iter = iter.into_iter();
185        if self.is_none() {
186            *self = iter.next();
187        }
188        assert!(iter.next().is_none());
189    }
190
191    fn remaining_capacity(&self) -> usize {
192        if self.is_some() { 0 } else { 1 }
193    }
194
195    fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize) {
196        match count {
197            0 => {}
198            1 => {
199                assert!(self.is_none());
200                input.take(1, &mut |slice| {
201                    unsafe {
205                        *self = Some(slice[0].assume_init_read());
206                    }
207                });
208            }
209            _ => panic!(
210                "cannot take more than {} item(s)",
211                self.remaining_capacity()
212            ),
213        }
214    }
215}
216
217pub struct SliceBuffer {
219    buffer: Vec<u8>,
220    offset: usize,
221    limit: usize,
222}
223
224impl SliceBuffer {
225    pub fn new(buffer: Vec<u8>, offset: usize, limit: usize) -> Self {
226        assert!(limit <= buffer.len());
227        Self {
228            buffer,
229            offset,
230            limit,
231        }
232    }
233
234    pub fn into_parts(self) -> (Vec<u8>, usize, usize) {
235        (self.buffer, self.offset, self.limit)
236    }
237}
238
239unsafe impl WriteBuffer<u8> for SliceBuffer {
243    fn remaining(&self) -> &[u8] {
244        &self.buffer[self.offset..self.limit]
245    }
246
247    fn skip(&mut self, count: usize) {
248        assert!(self.offset + count <= self.limit);
249        self.offset += count;
250    }
251
252    fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<u8>])) {
253        assert!(count <= self.remaining().len());
254        self.offset += count;
255        fun(unsafe {
258            mem::transmute::<&[u8], &[MaybeUninit<u8>]>(
259                &self.buffer[self.offset - count..self.limit],
260            )
261        });
262    }
263}
264
265pub struct VecBuffer<T> {
267    buffer: Vec<MaybeUninit<T>>,
268    offset: usize,
269}
270
271impl<T> Default for VecBuffer<T> {
272    fn default() -> Self {
273        Self::with_capacity(0)
274    }
275}
276
277impl<T> VecBuffer<T> {
278    pub fn with_capacity(capacity: usize) -> Self {
280        Self {
281            buffer: Vec::with_capacity(capacity),
282            offset: 0,
283        }
284    }
285
286    pub fn reset(&mut self) {
289        self.skip_(self.remaining_().len());
290        self.buffer.clear();
291        self.offset = 0;
292    }
293
294    fn remaining_(&self) -> &[T] {
295        unsafe { mem::transmute::<&[MaybeUninit<T>], &[T]>(&self.buffer[self.offset..]) }
299    }
300
301    fn skip_(&mut self, count: usize) {
302        assert!(count <= self.remaining_().len());
303        for item in &mut self.buffer[self.offset..][..count] {
304            self.offset += 1;
307            unsafe {
309                item.assume_init_drop();
310            }
311        }
312    }
313}
314
315unsafe impl<T: Send + Sync + 'static> WriteBuffer<T> for VecBuffer<T> {
319    fn remaining(&self) -> &[T] {
320        self.remaining_()
321    }
322
323    fn skip(&mut self, count: usize) {
324        self.skip_(count)
325    }
326
327    fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>])) {
328        assert!(count <= self.remaining().len());
329        self.offset += count;
333        fun(&self.buffer[self.offset - count..]);
334    }
335}
336
337impl<T> From<Vec<T>> for VecBuffer<T> {
338    fn from(buffer: Vec<T>) -> Self {
339        Self {
340            buffer: unsafe { mem::transmute::<Vec<T>, Vec<MaybeUninit<T>>>(buffer) },
343            offset: 0,
344        }
345    }
346}
347
348impl<T> Drop for VecBuffer<T> {
349    fn drop(&mut self) {
350        self.reset();
351    }
352}
353
354impl<T: Send + Sync + 'static> ReadBuffer<T> for Vec<T> {
355    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
356        Extend::extend(self, iter)
357    }
358
359    fn remaining_capacity(&self) -> usize {
360        self.capacity().checked_sub(self.len()).unwrap()
361    }
362
363    fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize) {
364        assert!(count <= self.remaining_capacity());
365        input.take(count, &mut |slice| {
366            for item in slice {
367                self.push(unsafe { item.assume_init_read() });
371            }
372        });
373    }
374}
375
376#[cfg(feature = "component-model-async-bytes")]
379unsafe impl WriteBuffer<u8> for Cursor<Bytes> {
380    fn remaining(&self) -> &[u8] {
381        &self.get_ref()[usize::try_from(self.position()).unwrap()..]
382    }
383
384    fn skip(&mut self, count: usize) {
385        assert!(
386            count <= self.remaining().len(),
387            "tried to skip {count} with {} remaining",
388            self.remaining().len()
389        );
390        self.set_position(
391            self.position()
392                .checked_add(u64::try_from(count).unwrap())
393                .unwrap(),
394        );
395    }
396
397    fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<u8>])) {
398        assert!(count <= self.remaining().len());
399        fun(unsafe_byte_slice(self.remaining()));
400        self.skip(count);
401    }
402}
403
404#[cfg(feature = "component-model-async-bytes")]
407unsafe impl WriteBuffer<u8> for Cursor<BytesMut> {
408    fn remaining(&self) -> &[u8] {
409        &self.get_ref()[usize::try_from(self.position()).unwrap()..]
410    }
411
412    fn skip(&mut self, count: usize) {
413        assert!(count <= self.remaining().len());
414        self.set_position(
415            self.position()
416                .checked_add(u64::try_from(count).unwrap())
417                .unwrap(),
418        );
419    }
420
421    fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<u8>])) {
422        assert!(count <= self.remaining().len());
423        fun(unsafe_byte_slice(self.remaining()));
424        self.skip(count);
425    }
426}
427
428#[cfg(feature = "component-model-async-bytes")]
429impl ReadBuffer<u8> for BytesMut {
430    fn extend<I: IntoIterator<Item = u8>>(&mut self, iter: I) {
431        Extend::extend(self, iter)
432    }
433
434    fn remaining_capacity(&self) -> usize {
435        self.capacity().checked_sub(self.len()).unwrap()
436    }
437
438    fn move_from(&mut self, input: &mut dyn WriteBuffer<u8>, count: usize) {
439        assert!(count <= self.remaining_capacity());
440        input.take(count, &mut |slice| {
441            let slice = unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) };
445            self.extend_from_slice(slice);
446        });
447    }
448}
449
450#[cfg(feature = "component-model-async-bytes")]
451fn unsafe_byte_slice(slice: &[u8]) -> &[MaybeUninit<u8>] {
452    unsafe { mem::transmute::<&[u8], &[MaybeUninit<u8>]>(slice) }
455}