wasmtime/runtime/component/concurrent/futures_and_streams/
buffers.rs1#[cfg(feature = "component-model-async-bytes")]
2use bytes::{Bytes, BytesMut};
3#[cfg(feature = "component-model-async-bytes")]
4use std::io::Cursor;
5use std::mem::{self, MaybeUninit};
6use std::slice;
7use std::vec::Vec;
8
9pub use untyped::*;
12mod untyped {
13 use super::WriteBuffer;
14 use std::any::TypeId;
15 use std::marker;
16 use std::mem;
17
18 pub struct UntypedWriteBuffer<'a> {
27 element_type_id: TypeId,
28 buf: *mut dyn WriteBuffer<()>,
29 _marker: marker::PhantomData<&'a mut dyn WriteBuffer<()>>,
30 }
31
32 union ReinterpretWriteBuffer<T> {
35 typed: *mut dyn WriteBuffer<T>,
36 untyped: *mut dyn WriteBuffer<()>,
37 }
38
39 impl<'a> UntypedWriteBuffer<'a> {
40 pub fn new<T: 'static>(buf: &'a mut dyn WriteBuffer<T>) -> UntypedWriteBuffer<'a> {
45 UntypedWriteBuffer {
46 element_type_id: TypeId::of::<T>(),
47 buf: unsafe {
52 let r = ReinterpretWriteBuffer { typed: buf };
53 assert_eq!(mem::size_of_val(&r.typed), mem::size_of_val(&r.untyped));
54 r.untyped
55 },
56 _marker: marker::PhantomData,
57 }
58 }
59
60 pub fn get_mut<T: 'static>(&mut self) -> &mut dyn WriteBuffer<T> {
66 assert_eq!(self.element_type_id, TypeId::of::<T>());
67 unsafe { &mut *ReinterpretWriteBuffer { untyped: self.buf }.typed }
72 }
73 }
74}
75
76pub unsafe trait WriteBuffer<T>: Send + Sync + 'static {
87 fn remaining(&self) -> &[T];
89
90 fn skip(&mut self, count: usize);
92
93 fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>]));
106}
107
108pub trait ReadBuffer<T>: Send + Sync + 'static {
112 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I);
114
115 fn remaining_capacity(&self) -> usize;
117
118 fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize);
128}
129
130pub(super) struct Extender<'a, B>(pub(super) &'a mut B);
131
132impl<T, B: ReadBuffer<T>> Extend<T> for Extender<'_, B> {
133 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
134 self.0.extend(iter)
135 }
136}
137
138unsafe impl<T: Send + Sync + 'static> WriteBuffer<T> for Option<T> {
141 fn remaining(&self) -> &[T] {
142 if let Some(me) = self {
143 slice::from_ref(me)
144 } else {
145 &[]
146 }
147 }
148
149 fn skip(&mut self, count: usize) {
150 match count {
151 0 => {}
152 1 => {
153 assert!(self.is_some());
154 *self = None;
155 }
156 _ => panic!("cannot skip more than {} item(s)", self.remaining().len()),
157 }
158 }
159
160 fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>])) {
161 match count {
162 0 => fun(&mut []),
163 1 => {
164 let mut item = MaybeUninit::new(self.take().unwrap());
165 fun(slice::from_mut(&mut item));
166 }
167 _ => panic!("cannot forget more than {} item(s)", self.remaining().len()),
168 }
169 }
170}
171
172impl<T: Send + Sync + 'static> ReadBuffer<T> for Option<T> {
173 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
174 let mut iter = iter.into_iter();
175 if self.is_none() {
176 *self = iter.next();
177 }
178 assert!(iter.next().is_none());
179 }
180
181 fn remaining_capacity(&self) -> usize {
182 if self.is_some() { 0 } else { 1 }
183 }
184
185 fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize) {
186 match count {
187 0 => {}
188 1 => {
189 assert!(self.is_none());
190 input.take(1, &mut |slice| {
191 unsafe {
195 *self = Some(slice[0].assume_init_read());
196 }
197 });
198 }
199 _ => panic!(
200 "cannot take more than {} item(s)",
201 self.remaining_capacity()
202 ),
203 }
204 }
205}
206
207pub struct VecBuffer<T: Send + Sync + 'static> {
209 buffer: Vec<MaybeUninit<T>>,
210 offset: usize,
211}
212
213impl<T: Send + Sync + 'static> VecBuffer<T> {
214 pub fn with_capacity(capacity: usize) -> Self {
216 Self {
217 buffer: Vec::with_capacity(capacity),
218 offset: 0,
219 }
220 }
221
222 pub fn reset(&mut self) {
225 self.skip(self.remaining().len());
226 self.buffer.clear();
227 self.offset = 0;
228 }
229}
230
231unsafe impl<T: Send + Sync + 'static> WriteBuffer<T> for VecBuffer<T> {
235 fn remaining(&self) -> &[T] {
236 unsafe { mem::transmute::<&[MaybeUninit<T>], &[T]>(&self.buffer[self.offset..]) }
240 }
241
242 fn skip(&mut self, count: usize) {
243 assert!(count <= self.remaining().len());
244 for item in &mut self.buffer[self.offset..][..count] {
245 self.offset += 1;
248 unsafe {
250 item.assume_init_drop();
251 }
252 }
253 }
254
255 fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>])) {
256 assert!(count <= self.remaining().len());
257 self.offset += count;
261 fun(&mut self.buffer[self.offset - count..]);
262 }
263}
264
265impl<T: Send + Sync + 'static> From<Vec<T>> for VecBuffer<T> {
266 fn from(buffer: Vec<T>) -> Self {
267 Self {
268 buffer: unsafe { mem::transmute::<Vec<T>, Vec<MaybeUninit<T>>>(buffer) },
271 offset: 0,
272 }
273 }
274}
275
276impl<T: Send + Sync + 'static> Drop for VecBuffer<T> {
277 fn drop(&mut self) {
278 self.reset();
279 }
280}
281
282impl<T: Send + Sync + 'static> ReadBuffer<T> for Vec<T> {
283 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
284 Extend::extend(self, iter)
285 }
286
287 fn remaining_capacity(&self) -> usize {
288 self.capacity().checked_sub(self.len()).unwrap()
289 }
290
291 fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize) {
292 assert!(count <= self.remaining_capacity());
293 input.take(count, &mut |slice| {
294 for item in slice {
295 self.push(unsafe { item.assume_init_read() });
299 }
300 });
301 }
302}
303
304#[cfg(feature = "component-model-async-bytes")]
307unsafe impl WriteBuffer<u8> for Cursor<Bytes> {
308 fn remaining(&self) -> &[u8] {
309 &self.get_ref()[usize::try_from(self.position()).unwrap()..]
310 }
311
312 fn skip(&mut self, count: usize) {
313 assert!(
314 count <= self.remaining().len(),
315 "tried to skip {count} with {} remaining",
316 self.remaining().len()
317 );
318 self.set_position(
319 self.position()
320 .checked_add(u64::try_from(count).unwrap())
321 .unwrap(),
322 );
323 }
324
325 fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<u8>])) {
326 assert!(count <= self.remaining().len());
327 fun(unsafe_byte_slice(self.remaining()));
328 self.skip(count);
329 }
330}
331
332#[cfg(feature = "component-model-async-bytes")]
335unsafe impl WriteBuffer<u8> for Cursor<BytesMut> {
336 fn remaining(&self) -> &[u8] {
337 &self.get_ref()[usize::try_from(self.position()).unwrap()..]
338 }
339
340 fn skip(&mut self, count: usize) {
341 assert!(count <= self.remaining().len());
342 self.set_position(
343 self.position()
344 .checked_add(u64::try_from(count).unwrap())
345 .unwrap(),
346 );
347 }
348
349 fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<u8>])) {
350 assert!(count <= self.remaining().len());
351 fun(unsafe_byte_slice(self.remaining()));
352 self.skip(count);
353 }
354}
355
356#[cfg(feature = "component-model-async-bytes")]
357impl ReadBuffer<u8> for BytesMut {
358 fn extend<I: IntoIterator<Item = u8>>(&mut self, iter: I) {
359 Extend::extend(self, iter)
360 }
361
362 fn remaining_capacity(&self) -> usize {
363 self.capacity().checked_sub(self.len()).unwrap()
364 }
365
366 fn move_from(&mut self, input: &mut dyn WriteBuffer<u8>, count: usize) {
367 assert!(count <= self.remaining_capacity());
368 input.take(count, &mut |slice| {
369 let slice = unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) };
373 self.extend_from_slice(slice);
374 });
375 }
376}
377
378#[cfg(feature = "component-model-async-bytes")]
379fn unsafe_byte_slice(slice: &[u8]) -> &[MaybeUninit<u8>] {
380 unsafe { mem::transmute::<&[u8], &[MaybeUninit<u8>]>(slice) }
383}