wasmtime_wasi_http/p3/
response.rs1use crate::p3::bindings::http::types::ErrorCode;
2use crate::p3::body::{Body, GuestBody};
3use crate::p3::{WasiHttpCtxView, WasiHttpView};
4use crate::{FieldMap, get_content_length};
5use bytes::Bytes;
6use http::StatusCode;
7use http_body_util::BodyExt as _;
8use http_body_util::combinators::UnsyncBoxBody;
9use wasmtime::AsContextMut;
10use wasmtime::error::Context as _;
11
12pub struct Response {
14 pub status: StatusCode,
16 pub headers: FieldMap,
18 pub(crate) body: Body,
20}
21
22impl TryFrom<Response> for http::Response<Body> {
23 type Error = http::Error;
24
25 fn try_from(
26 Response {
27 status,
28 headers,
29 body,
30 }: Response,
31 ) -> Result<Self, Self::Error> {
32 let mut res = http::Response::builder().status(status);
33 *res.headers_mut().unwrap() = headers.into();
34 res.body(body)
35 }
36}
37
38impl Response {
39 pub fn into_http<T: WasiHttpView + 'static>(
46 self,
47 store: impl AsContextMut<Data = T>,
48 fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
49 ) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {
50 self.into_http_with_getter(store, fut, T::http)
51 }
52
53 pub fn into_http_with_getter<T: 'static>(
56 self,
57 store: impl AsContextMut<Data = T>,
58 fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
59 getter: fn(&mut T) -> WasiHttpCtxView<'_>,
60 ) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {
61 let res = http::Response::try_from(self)?;
62 let (res, body) = res.into_parts();
63 let body = match body {
64 Body::Guest {
65 contents_rx,
66 trailers_rx,
67 result_tx,
68 } => {
69 let content_length =
71 get_content_length(&res.headers).context("failed to parse `content-length`")?;
72 GuestBody::new(
73 store,
74 contents_rx,
75 trailers_rx,
76 result_tx,
77 fut,
78 content_length,
79 ErrorCode::HttpResponseBodySize,
80 getter,
81 )?
82 .boxed_unsync()
83 }
84 Body::Host { body, result_tx } => {
85 _ = result_tx.send(Box::new(fut));
86 body
87 }
88 };
89 Ok(http::Response::from_parts(res, body))
90 }
91
92 pub fn from_http<T>(
94 res: http::Response<T>,
95 ) -> (
96 Self,
97 impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
98 )
99 where
100 T: http_body::Body<Data = Bytes> + Send + 'static,
101 T::Error: Into<ErrorCode>,
102 {
103 let (parts, body) = res.into_parts();
104 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
105
106 let wasi_response = Response {
107 status: parts.status,
108 headers: FieldMap::new_immutable(parts.headers),
109 body: Body::Host {
110 body: body.map_err(Into::into).boxed_unsync(),
111 result_tx,
112 },
113 };
114
115 let io_future = async {
116 let Ok(fut) = result_rx.await else {
117 return Ok(());
118 };
119 Box::into_pin(fut).await
120 };
121
122 (wasi_response, io_future)
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129 use core::future::Future;
130 use core::pin::pin;
131 use core::task::{Context, Poll, Waker};
132 use http_body_util::Full;
133
134 #[tokio::test]
135 async fn test_response_from_http() {
136 let http_response = http::Response::builder()
137 .status(StatusCode::OK)
138 .header("x-custom-header", "value123")
139 .body(Full::new(Bytes::from_static(b"hello wasm")))
140 .unwrap();
141
142 let (wasi_resp, io_future) = Response::from_http(http_response);
143 assert_eq!(wasi_resp.status, StatusCode::OK);
144 assert_eq!(
145 wasi_resp.headers.get("x-custom-header").unwrap(),
146 "value123"
147 );
148 match wasi_resp.body {
149 Body::Host { body, result_tx } => {
150 let collected = body.collect().await;
151 assert!(collected.is_ok(), "Body stream failed unexpectedly");
152 let chunks = collected.unwrap().to_bytes();
153 assert_eq!(chunks, &b"hello wasm"[..]);
154 _ = result_tx.send(Box::new(async { Ok(()) }));
155 }
156 _ => panic!("Response body should be of type Host"),
157 }
158
159 let mut cx = Context::from_waker(Waker::noop());
160 let result = pin!(io_future).poll(&mut cx);
161 assert!(matches!(result, Poll::Ready(Ok(_))));
162 }
163}