1use crate::FieldMap;
5use crate::p2::{
6 WasiHttpCtxView, WasiHttpHooks,
7 bindings::http::types::{self, ErrorCode, Method, Scheme},
8 body::{HostIncomingBody, HyperIncomingBody, HyperOutgoingBody},
9};
10use bytes::Bytes;
11use http_body_util::BodyExt;
12use hyper::body::Body;
13use std::time::Duration;
14use wasmtime::component::Resource;
15use wasmtime::{Result, bail};
16use wasmtime_wasi::p2::Pollable;
17use wasmtime_wasi::runtime::AbortOnDropJoinHandle;
18
19pub(crate) fn remove_forbidden_headers(
21 hooks: &mut dyn WasiHttpHooks,
22 headers: &mut http::HeaderMap,
23) {
24 let forbidden_keys = Vec::from_iter(headers.keys().filter_map(|name| {
25 if hooks.is_forbidden_header(name) {
26 Some(name.clone())
27 } else {
28 None
29 }
30 }));
31
32 for name in forbidden_keys {
33 headers.remove(&name);
34 }
35}
36
37pub struct OutgoingRequestConfig {
39 pub use_tls: bool,
41 pub connect_timeout: Duration,
43 pub first_byte_timeout: Duration,
45 pub between_bytes_timeout: Duration,
47}
48
49impl From<http::Method> for types::Method {
50 fn from(method: http::Method) -> Self {
51 if method == http::Method::GET {
52 types::Method::Get
53 } else if method == hyper::Method::HEAD {
54 types::Method::Head
55 } else if method == hyper::Method::POST {
56 types::Method::Post
57 } else if method == hyper::Method::PUT {
58 types::Method::Put
59 } else if method == hyper::Method::DELETE {
60 types::Method::Delete
61 } else if method == hyper::Method::CONNECT {
62 types::Method::Connect
63 } else if method == hyper::Method::OPTIONS {
64 types::Method::Options
65 } else if method == hyper::Method::TRACE {
66 types::Method::Trace
67 } else if method == hyper::Method::PATCH {
68 types::Method::Patch
69 } else {
70 types::Method::Other(method.to_string())
71 }
72 }
73}
74
75impl TryInto<http::Method> for types::Method {
76 type Error = http::method::InvalidMethod;
77
78 fn try_into(self) -> Result<http::Method, Self::Error> {
79 match self {
80 Method::Get => Ok(http::Method::GET),
81 Method::Head => Ok(http::Method::HEAD),
82 Method::Post => Ok(http::Method::POST),
83 Method::Put => Ok(http::Method::PUT),
84 Method::Delete => Ok(http::Method::DELETE),
85 Method::Connect => Ok(http::Method::CONNECT),
86 Method::Options => Ok(http::Method::OPTIONS),
87 Method::Trace => Ok(http::Method::TRACE),
88 Method::Patch => Ok(http::Method::PATCH),
89 Method::Other(s) => http::Method::from_bytes(s.as_bytes()),
90 }
91 }
92}
93
94#[derive(Debug)]
96pub struct HostIncomingRequest {
97 pub(crate) method: http::method::Method,
98 pub(crate) uri: http::uri::Uri,
99 pub(crate) headers: FieldMap,
100 pub(crate) scheme: Scheme,
101 pub(crate) authority: String,
102 pub body: Option<HostIncomingBody>,
104}
105
106impl WasiHttpCtxView<'_> {
107 pub fn new_incoming_request<B>(
109 &mut self,
110 scheme: Scheme,
111 req: hyper::Request<B>,
112 ) -> wasmtime::Result<Resource<HostIncomingRequest>>
113 where
114 B: Body<Data = Bytes> + Send + 'static,
115 B::Error: Into<ErrorCode>,
116 {
117 let (mut parts, body) = req.into_parts();
118 let body = body.map_err(Into::into).boxed_unsync();
119 let body = HostIncomingBody::new(
120 body,
121 std::time::Duration::from_millis(600 * 1000),
123 );
124 let authority = match parts.uri.authority() {
125 Some(authority) => authority.to_string(),
126 None => match parts.headers.get(http::header::HOST) {
127 Some(host) => host.to_str()?.to_string(),
128 None => bail!("invalid HTTP request missing authority in URI and host header"),
129 },
130 };
131
132 remove_forbidden_headers(self.hooks, &mut parts.headers);
133 let headers = FieldMap::new_immutable(parts.headers);
134
135 let req = HostIncomingRequest {
136 method: parts.method,
137 uri: parts.uri,
138 headers,
139 authority,
140 scheme,
141 body: Some(body),
142 };
143 Ok(self.table.push(req)?)
144 }
145}
146
147pub struct HostResponseOutparam {
149 pub send:
151 Box<dyn FnOnce(Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>) + Send + Sync>,
152}
153
154impl WasiHttpCtxView<'_> {
155 pub fn new_response_outparam(
157 &mut self,
158 result: tokio::sync::oneshot::Sender<
159 Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>,
160 >,
161 ) -> wasmtime::Result<Resource<HostResponseOutparam>> {
162 let id = self.table.push(HostResponseOutparam {
163 send: Box::new(move |value| {
164 _ = result.send(value)
169 }),
170 })?;
171 Ok(id)
172 }
173
174 pub fn new_response_outparam_from_callback(
176 &mut self,
177 callback: impl FnOnce(Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>)
178 + Send
179 + Sync
180 + 'static,
181 ) -> wasmtime::Result<Resource<HostResponseOutparam>> {
182 let id = self.table.push(HostResponseOutparam {
183 send: Box::new(callback),
184 })?;
185 Ok(id)
186 }
187}
188
189pub struct HostOutgoingResponse {
191 pub status: http::StatusCode,
193 pub headers: FieldMap,
195 pub body: Option<HyperOutgoingBody>,
197}
198
199impl TryFrom<HostOutgoingResponse> for hyper::Response<HyperOutgoingBody> {
200 type Error = http::Error;
201
202 fn try_from(
203 resp: HostOutgoingResponse,
204 ) -> Result<hyper::Response<HyperOutgoingBody>, Self::Error> {
205 use http_body_util::Empty;
206
207 let mut builder = hyper::Response::builder().status(resp.status);
208
209 *builder.headers_mut().unwrap() = resp.headers.into();
210
211 match resp.body {
212 Some(body) => builder.body(body),
213 None => builder.body(
214 Empty::<bytes::Bytes>::new()
215 .map_err(|_| unreachable!("Infallible error"))
216 .boxed_unsync(),
217 ),
218 }
219 }
220}
221
222#[derive(Debug)]
224pub struct HostOutgoingRequest {
225 pub method: Method,
227 pub scheme: Option<Scheme>,
229 pub authority: Option<String>,
231 pub path_with_query: Option<String>,
233 pub headers: FieldMap,
235 pub body: Option<HyperOutgoingBody>,
237}
238
239#[derive(Debug, Default)]
241pub struct HostRequestOptions {
242 pub connect_timeout: Option<std::time::Duration>,
244 pub first_byte_timeout: Option<std::time::Duration>,
246 pub between_bytes_timeout: Option<std::time::Duration>,
248}
249
250#[derive(Debug)]
252pub struct HostIncomingResponse {
253 pub status: u16,
255 pub headers: FieldMap,
257 pub body: Option<HostIncomingBody>,
259}
260
261pub type FutureIncomingResponseHandle =
263 AbortOnDropJoinHandle<wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>>;
264
265#[derive(Debug)]
267pub struct IncomingResponse {
268 pub resp: hyper::Response<HyperIncomingBody>,
270 pub worker: Option<AbortOnDropJoinHandle<()>>,
272 pub between_bytes_timeout: std::time::Duration,
274}
275
276#[derive(Debug)]
278pub enum HostFutureIncomingResponse {
279 Pending(FutureIncomingResponseHandle),
281 Ready(wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>),
285 Consumed,
287}
288
289impl HostFutureIncomingResponse {
290 pub fn pending(handle: FutureIncomingResponseHandle) -> Self {
292 Self::Pending(handle)
293 }
294
295 pub fn ready(result: wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>) -> Self {
297 Self::Ready(result)
298 }
299
300 pub fn is_ready(&self) -> bool {
302 matches!(self, Self::Ready(_))
303 }
304
305 pub fn unwrap_ready(self) -> wasmtime::Result<Result<IncomingResponse, types::ErrorCode>> {
307 match self {
308 Self::Ready(res) => res,
309 Self::Pending(_) | Self::Consumed => {
310 panic!("unwrap_ready called on a pending HostFutureIncomingResponse")
311 }
312 }
313 }
314}
315
316#[async_trait::async_trait]
317impl Pollable for HostFutureIncomingResponse {
318 async fn ready(&mut self) {
319 if let Self::Pending(handle) = self {
320 *self = Self::Ready(handle.await);
321 }
322 }
323}