Skip to main content

wasmtime_wasi_http/p2/
types.rs

1//! Implements the base structure that will provide the implementation of the
2//! wasi-http API.
3
4use crate::FieldMap;
5use crate::p2::{
6    WasiHttpCtxView, WasiHttpHooks,
7    bindings::http::types::{self, ErrorCode, Method, Scheme},
8    body::{HostIncomingBody, HyperIncomingBody, HyperOutgoingBody},
9};
10use bytes::Bytes;
11use http_body_util::BodyExt;
12use hyper::body::Body;
13use std::time::Duration;
14use wasmtime::component::Resource;
15use wasmtime::{Result, bail};
16use wasmtime_wasi::p2::Pollable;
17use wasmtime_wasi::runtime::AbortOnDropJoinHandle;
18
19/// Removes forbidden headers from a [`FieldMap`].
20pub(crate) fn remove_forbidden_headers(
21    hooks: &mut dyn WasiHttpHooks,
22    headers: &mut http::HeaderMap,
23) {
24    let forbidden_keys = Vec::from_iter(headers.keys().filter_map(|name| {
25        if hooks.is_forbidden_header(name) {
26            Some(name.clone())
27        } else {
28            None
29        }
30    }));
31
32    for name in forbidden_keys {
33        headers.remove(&name);
34    }
35}
36
37/// Configuration for an outgoing request.
38pub struct OutgoingRequestConfig {
39    /// Whether to use TLS for the request.
40    pub use_tls: bool,
41    /// The timeout for connecting.
42    pub connect_timeout: Duration,
43    /// The timeout until the first byte.
44    pub first_byte_timeout: Duration,
45    /// The timeout between chunks of a streaming body
46    pub between_bytes_timeout: Duration,
47}
48
49impl From<http::Method> for types::Method {
50    fn from(method: http::Method) -> Self {
51        if method == http::Method::GET {
52            types::Method::Get
53        } else if method == hyper::Method::HEAD {
54            types::Method::Head
55        } else if method == hyper::Method::POST {
56            types::Method::Post
57        } else if method == hyper::Method::PUT {
58            types::Method::Put
59        } else if method == hyper::Method::DELETE {
60            types::Method::Delete
61        } else if method == hyper::Method::CONNECT {
62            types::Method::Connect
63        } else if method == hyper::Method::OPTIONS {
64            types::Method::Options
65        } else if method == hyper::Method::TRACE {
66            types::Method::Trace
67        } else if method == hyper::Method::PATCH {
68            types::Method::Patch
69        } else {
70            types::Method::Other(method.to_string())
71        }
72    }
73}
74
75impl TryInto<http::Method> for types::Method {
76    type Error = http::method::InvalidMethod;
77
78    fn try_into(self) -> Result<http::Method, Self::Error> {
79        match self {
80            Method::Get => Ok(http::Method::GET),
81            Method::Head => Ok(http::Method::HEAD),
82            Method::Post => Ok(http::Method::POST),
83            Method::Put => Ok(http::Method::PUT),
84            Method::Delete => Ok(http::Method::DELETE),
85            Method::Connect => Ok(http::Method::CONNECT),
86            Method::Options => Ok(http::Method::OPTIONS),
87            Method::Trace => Ok(http::Method::TRACE),
88            Method::Patch => Ok(http::Method::PATCH),
89            Method::Other(s) => http::Method::from_bytes(s.as_bytes()),
90        }
91    }
92}
93
94/// The concrete type behind a `wasi:http/types.incoming-request` resource.
95#[derive(Debug)]
96pub struct HostIncomingRequest {
97    pub(crate) method: http::method::Method,
98    pub(crate) uri: http::uri::Uri,
99    pub(crate) headers: FieldMap,
100    pub(crate) scheme: Scheme,
101    pub(crate) authority: String,
102    /// The body of the incoming request.
103    pub body: Option<HostIncomingBody>,
104}
105
106impl WasiHttpCtxView<'_> {
107    /// Create a new incoming request resource.
108    pub fn new_incoming_request<B>(
109        &mut self,
110        scheme: Scheme,
111        req: hyper::Request<B>,
112    ) -> wasmtime::Result<Resource<HostIncomingRequest>>
113    where
114        B: Body<Data = Bytes> + Send + 'static,
115        B::Error: Into<ErrorCode>,
116    {
117        let (mut parts, body) = req.into_parts();
118        let body = body.map_err(Into::into).boxed_unsync();
119        let body = HostIncomingBody::new(
120            body,
121            // TODO: this needs to be plumbed through
122            std::time::Duration::from_millis(600 * 1000),
123        );
124        let authority = match parts.uri.authority() {
125            Some(authority) => authority.to_string(),
126            None => match parts.headers.get(http::header::HOST) {
127                Some(host) => host.to_str()?.to_string(),
128                None => bail!("invalid HTTP request missing authority in URI and host header"),
129            },
130        };
131
132        remove_forbidden_headers(self.hooks, &mut parts.headers);
133        let headers = FieldMap::new_immutable(parts.headers);
134
135        let req = HostIncomingRequest {
136            method: parts.method,
137            uri: parts.uri,
138            headers,
139            authority,
140            scheme,
141            body: Some(body),
142        };
143        Ok(self.table.push(req)?)
144    }
145}
146
147/// The concrete type behind a `wasi:http/types.response-outparam` resource.
148pub struct HostResponseOutparam {
149    /// The callback sending a response.
150    pub send:
151        Box<dyn FnOnce(Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>) + Send + Sync>,
152}
153
154impl WasiHttpCtxView<'_> {
155    /// Create a new outgoing response resource.
156    pub fn new_response_outparam(
157        &mut self,
158        result: tokio::sync::oneshot::Sender<
159            Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>,
160        >,
161    ) -> wasmtime::Result<Resource<HostResponseOutparam>> {
162        let id = self.table.push(HostResponseOutparam {
163            send: Box::new(move |value| {
164                // Giving the API doesn't return any error, it's probably
165                // better to ignore the error than trap the guest, in case of
166                // host timeout and dropped the receiver side of the channel.
167                // See also: #10784
168                _ = result.send(value)
169            }),
170        })?;
171        Ok(id)
172    }
173
174    /// Create a new outgoing response from an `FnOnce`.
175    pub fn new_response_outparam_from_callback(
176        &mut self,
177        callback: impl FnOnce(Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>)
178        + Send
179        + Sync
180        + 'static,
181    ) -> wasmtime::Result<Resource<HostResponseOutparam>> {
182        let id = self.table.push(HostResponseOutparam {
183            send: Box::new(callback),
184        })?;
185        Ok(id)
186    }
187}
188
189/// The concrete type behind a `wasi:http/types.outgoing-response` resource.
190pub struct HostOutgoingResponse {
191    /// The status of the response.
192    pub status: http::StatusCode,
193    /// The headers of the response.
194    pub headers: FieldMap,
195    /// The body of the response.
196    pub body: Option<HyperOutgoingBody>,
197}
198
199impl TryFrom<HostOutgoingResponse> for hyper::Response<HyperOutgoingBody> {
200    type Error = http::Error;
201
202    fn try_from(
203        resp: HostOutgoingResponse,
204    ) -> Result<hyper::Response<HyperOutgoingBody>, Self::Error> {
205        use http_body_util::Empty;
206
207        let mut builder = hyper::Response::builder().status(resp.status);
208
209        *builder.headers_mut().unwrap() = resp.headers.into();
210
211        match resp.body {
212            Some(body) => builder.body(body),
213            None => builder.body(
214                Empty::<bytes::Bytes>::new()
215                    .map_err(|_| unreachable!("Infallible error"))
216                    .boxed_unsync(),
217            ),
218        }
219    }
220}
221
222/// The concrete type behind a `wasi:http/types.outgoing-request` resource.
223#[derive(Debug)]
224pub struct HostOutgoingRequest {
225    /// The method of the request.
226    pub method: Method,
227    /// The scheme of the request.
228    pub scheme: Option<Scheme>,
229    /// The authority of the request.
230    pub authority: Option<String>,
231    /// The path and query of the request.
232    pub path_with_query: Option<String>,
233    /// The request headers.
234    pub headers: FieldMap,
235    /// The request body.
236    pub body: Option<HyperOutgoingBody>,
237}
238
239/// The concrete type behind a `wasi:http/types.request-options` resource.
240#[derive(Debug, Default)]
241pub struct HostRequestOptions {
242    /// How long to wait for a connection to be established.
243    pub connect_timeout: Option<std::time::Duration>,
244    /// How long to wait for the first byte of the response body.
245    pub first_byte_timeout: Option<std::time::Duration>,
246    /// How long to wait between frames of the response body.
247    pub between_bytes_timeout: Option<std::time::Duration>,
248}
249
250/// The concrete type behind a `wasi:http/types.incoming-response` resource.
251#[derive(Debug)]
252pub struct HostIncomingResponse {
253    /// The response status
254    pub status: u16,
255    /// The response headers
256    pub headers: FieldMap,
257    /// The response body
258    pub body: Option<HostIncomingBody>,
259}
260
261/// A handle to a future incoming response.
262pub type FutureIncomingResponseHandle =
263    AbortOnDropJoinHandle<wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>>;
264
265/// A response that is in the process of being received.
266#[derive(Debug)]
267pub struct IncomingResponse {
268    /// The response itself.
269    pub resp: hyper::Response<HyperIncomingBody>,
270    /// Optional worker task that continues to process the response.
271    pub worker: Option<AbortOnDropJoinHandle<()>>,
272    /// The timeout between chunks of the response.
273    pub between_bytes_timeout: std::time::Duration,
274}
275
276/// The concrete type behind a `wasi:http/types.future-incoming-response` resource.
277#[derive(Debug)]
278pub enum HostFutureIncomingResponse {
279    /// A pending response
280    Pending(FutureIncomingResponseHandle),
281    /// The response is ready.
282    ///
283    /// An outer error will trap while the inner error gets returned to the guest.
284    Ready(wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>),
285    /// The response has been consumed.
286    Consumed,
287}
288
289impl HostFutureIncomingResponse {
290    /// Create a new `HostFutureIncomingResponse` that is pending on the provided task handle.
291    pub fn pending(handle: FutureIncomingResponseHandle) -> Self {
292        Self::Pending(handle)
293    }
294
295    /// Create a new `HostFutureIncomingResponse` that is ready.
296    pub fn ready(result: wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>) -> Self {
297        Self::Ready(result)
298    }
299
300    /// Returns `true` if the response is ready.
301    pub fn is_ready(&self) -> bool {
302        matches!(self, Self::Ready(_))
303    }
304
305    /// Unwrap the response, panicking if it is not ready.
306    pub fn unwrap_ready(self) -> wasmtime::Result<Result<IncomingResponse, types::ErrorCode>> {
307        match self {
308            Self::Ready(res) => res,
309            Self::Pending(_) | Self::Consumed => {
310                panic!("unwrap_ready called on a pending HostFutureIncomingResponse")
311            }
312        }
313    }
314}
315
316#[async_trait::async_trait]
317impl Pollable for HostFutureIncomingResponse {
318    async fn ready(&mut self) {
319        if let Self::Pending(handle) = self {
320            *self = Self::Ready(handle.await);
321        }
322    }
323}