1use crate::FieldMap;
5use crate::p2::{
6 WasiHttpCtxView, WasiHttpHooks,
7 bindings::http::types::{self, ErrorCode, Method, Scheme},
8 body::{HostIncomingBody, HyperIncomingBody, HyperOutgoingBody},
9};
10use bytes::Bytes;
11use http_body_util::BodyExt;
12use hyper::body::Body;
13use std::time::Duration;
14use wasmtime::component::Resource;
15use wasmtime::{Result, bail};
16use wasmtime_wasi::p2::Pollable;
17use wasmtime_wasi::runtime::AbortOnDropJoinHandle;
18
19pub(crate) fn remove_forbidden_headers(
21 hooks: &mut dyn WasiHttpHooks,
22 headers: &mut http::HeaderMap,
23) {
24 let forbidden_keys = Vec::from_iter(headers.keys().filter_map(|name| {
25 if hooks.is_forbidden_header(name) {
26 Some(name.clone())
27 } else {
28 None
29 }
30 }));
31
32 for name in forbidden_keys {
33 headers.remove(&name);
34 }
35}
36
37pub struct OutgoingRequestConfig {
39 pub use_tls: bool,
41 pub connect_timeout: Duration,
43 pub first_byte_timeout: Duration,
45 pub between_bytes_timeout: Duration,
47}
48
49impl From<http::Method> for types::Method {
50 fn from(method: http::Method) -> Self {
51 if method == http::Method::GET {
52 types::Method::Get
53 } else if method == hyper::Method::HEAD {
54 types::Method::Head
55 } else if method == hyper::Method::POST {
56 types::Method::Post
57 } else if method == hyper::Method::PUT {
58 types::Method::Put
59 } else if method == hyper::Method::DELETE {
60 types::Method::Delete
61 } else if method == hyper::Method::CONNECT {
62 types::Method::Connect
63 } else if method == hyper::Method::OPTIONS {
64 types::Method::Options
65 } else if method == hyper::Method::TRACE {
66 types::Method::Trace
67 } else if method == hyper::Method::PATCH {
68 types::Method::Patch
69 } else {
70 types::Method::Other(method.to_string())
71 }
72 }
73}
74
75impl TryInto<http::Method> for types::Method {
76 type Error = http::method::InvalidMethod;
77
78 fn try_into(self) -> Result<http::Method, Self::Error> {
79 match self {
80 Method::Get => Ok(http::Method::GET),
81 Method::Head => Ok(http::Method::HEAD),
82 Method::Post => Ok(http::Method::POST),
83 Method::Put => Ok(http::Method::PUT),
84 Method::Delete => Ok(http::Method::DELETE),
85 Method::Connect => Ok(http::Method::CONNECT),
86 Method::Options => Ok(http::Method::OPTIONS),
87 Method::Trace => Ok(http::Method::TRACE),
88 Method::Patch => Ok(http::Method::PATCH),
89 Method::Other(s) => http::Method::from_bytes(s.as_bytes()),
90 }
91 }
92}
93
94#[derive(Debug)]
96pub struct HostIncomingRequest {
97 pub(crate) method: http::method::Method,
98 pub(crate) uri: http::uri::Uri,
99 pub(crate) headers: FieldMap,
100 pub(crate) scheme: Scheme,
101 pub(crate) authority: String,
102 pub body: Option<HostIncomingBody>,
104}
105
106impl WasiHttpCtxView<'_> {
107 pub fn new_incoming_request<B>(
109 &mut self,
110 scheme: Scheme,
111 req: hyper::Request<B>,
112 ) -> wasmtime::Result<Resource<HostIncomingRequest>>
113 where
114 B: Body<Data = Bytes> + Send + 'static,
115 B::Error: Into<ErrorCode>,
116 {
117 let (mut parts, body) = req.into_parts();
118 let body = body.map_err(Into::into).boxed_unsync();
119 let body = HostIncomingBody::new(
120 body,
121 std::time::Duration::from_millis(600 * 1000),
123 );
124 let authority = match parts.uri.authority() {
125 Some(authority) => authority.to_string(),
126 None => match parts.headers.get(http::header::HOST) {
127 Some(host) => host.to_str()?.to_string(),
128 None => bail!("invalid HTTP request missing authority in URI and host header"),
129 },
130 };
131
132 remove_forbidden_headers(self.hooks, &mut parts.headers);
133 let headers = FieldMap::new_immutable(parts.headers);
134
135 let req = HostIncomingRequest {
136 method: parts.method,
137 uri: parts.uri,
138 headers,
139 authority,
140 scheme,
141 body: Some(body),
142 };
143 Ok(self.table.push(req)?)
144 }
145}
146
147pub struct HostResponseOutparam {
149 pub result:
151 tokio::sync::oneshot::Sender<Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>>,
152}
153
154impl WasiHttpCtxView<'_> {
155 pub fn new_response_outparam(
157 &mut self,
158 result: tokio::sync::oneshot::Sender<
159 Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>,
160 >,
161 ) -> wasmtime::Result<Resource<HostResponseOutparam>> {
162 let id = self.table.push(HostResponseOutparam { result })?;
163 Ok(id)
164 }
165}
166
167pub struct HostOutgoingResponse {
169 pub status: http::StatusCode,
171 pub headers: FieldMap,
173 pub body: Option<HyperOutgoingBody>,
175}
176
177impl TryFrom<HostOutgoingResponse> for hyper::Response<HyperOutgoingBody> {
178 type Error = http::Error;
179
180 fn try_from(
181 resp: HostOutgoingResponse,
182 ) -> Result<hyper::Response<HyperOutgoingBody>, Self::Error> {
183 use http_body_util::Empty;
184
185 let mut builder = hyper::Response::builder().status(resp.status);
186
187 *builder.headers_mut().unwrap() = resp.headers.into();
188
189 match resp.body {
190 Some(body) => builder.body(body),
191 None => builder.body(
192 Empty::<bytes::Bytes>::new()
193 .map_err(|_| unreachable!("Infallible error"))
194 .boxed_unsync(),
195 ),
196 }
197 }
198}
199
200#[derive(Debug)]
202pub struct HostOutgoingRequest {
203 pub method: Method,
205 pub scheme: Option<Scheme>,
207 pub authority: Option<String>,
209 pub path_with_query: Option<String>,
211 pub headers: FieldMap,
213 pub body: Option<HyperOutgoingBody>,
215}
216
217#[derive(Debug, Default)]
219pub struct HostRequestOptions {
220 pub connect_timeout: Option<std::time::Duration>,
222 pub first_byte_timeout: Option<std::time::Duration>,
224 pub between_bytes_timeout: Option<std::time::Duration>,
226}
227
228#[derive(Debug)]
230pub struct HostIncomingResponse {
231 pub status: u16,
233 pub headers: FieldMap,
235 pub body: Option<HostIncomingBody>,
237}
238
239pub type FutureIncomingResponseHandle =
241 AbortOnDropJoinHandle<wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>>;
242
243#[derive(Debug)]
245pub struct IncomingResponse {
246 pub resp: hyper::Response<HyperIncomingBody>,
248 pub worker: Option<AbortOnDropJoinHandle<()>>,
250 pub between_bytes_timeout: std::time::Duration,
252}
253
254#[derive(Debug)]
256pub enum HostFutureIncomingResponse {
257 Pending(FutureIncomingResponseHandle),
259 Ready(wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>),
263 Consumed,
265}
266
267impl HostFutureIncomingResponse {
268 pub fn pending(handle: FutureIncomingResponseHandle) -> Self {
270 Self::Pending(handle)
271 }
272
273 pub fn ready(result: wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>) -> Self {
275 Self::Ready(result)
276 }
277
278 pub fn is_ready(&self) -> bool {
280 matches!(self, Self::Ready(_))
281 }
282
283 pub fn unwrap_ready(self) -> wasmtime::Result<Result<IncomingResponse, types::ErrorCode>> {
285 match self {
286 Self::Ready(res) => res,
287 Self::Pending(_) | Self::Consumed => {
288 panic!("unwrap_ready called on a pending HostFutureIncomingResponse")
289 }
290 }
291 }
292}
293
294#[async_trait::async_trait]
295impl Pollable for HostFutureIncomingResponse {
296 async fn ready(&mut self) {
297 if let Self::Pending(handle) = self {
298 *self = Self::Ready(handle.await);
299 }
300 }
301}