diff options
author | Alex Orlenko <zxteam@protonmail.com> | 2024-03-22 00:30:29 +0000 |
---|---|---|
committer | Alex Orlenko <zxteam@protonmail.com> | 2024-03-22 00:35:37 +0000 |
commit | 59b14000f3c12279183747781e6f802b29d90bd8 (patch) | |
tree | 38c603f8b281e3db61a32ea19c5ffeb9d3e88347 | |
parent | 849206ef9d56b250424f34077a62a4e6ad96c21a (diff) | |
download | mlua-59b14000f3c12279183747781e6f802b29d90bd8.zip |
Update hyper examples
-rw-r--r-- | Cargo.toml | 5 | ||||
-rw-r--r-- | README.md | 15 | ||||
-rw-r--r-- | examples/async_http_client.rs | 53 | ||||
-rw-r--r-- | examples/async_http_server.rs | 137 |
4 files changed, 109 insertions, 101 deletions
@@ -63,10 +63,9 @@ libloading = { version = "0.8", optional = true } [dev-dependencies] trybuild = "1.0" futures = "0.3.5" -hyper = { version = "1", features = ["client", "server"] } -hyper-util = { version = "0.1.3", features = ["server", "client", "client-legacy", "tokio", "http1"] } +hyper = { version = "1.2", features = ["full"] } +hyper-util = { version = "0.1.3", features = ["full"] } http-body-util = "0.1.1" -bytes = "1.5.0" reqwest = { version = "0.12", features = ["json"] } tokio = { version = "1.0", features = ["macros", "rt", "time"] } serde = { version = "1.0", features = ["derive"] } @@ -85,14 +85,17 @@ This works using Lua [coroutines](https://www.lua.org/manual/5.3/manual.html#2.6 - [TCP Server](examples/async_tcp_server.rs) -**shell command example**: -``` shell -# async http client -cargo run --example async_http_client --features="async macros lua54" +**shell command examples**: +```shell +# async http client (hyper) +cargo run --example async_http_client --features=lua54,async,macros + +# async http client (reqwest) +cargo run --example async_http_reqwest --features=lua54,async,macros,serialize # async http server -cargo run --example async_http_server --features="async macros lua54" -curl http://localhost:3000 +cargo run --example async_http_server --features=lua54,async,macros +curl -v http://localhost:3000 ``` ### Serialization (serde) support diff --git a/examples/async_http_client.rs b/examples/async_http_client.rs index c0fa1ff..ca4eac9 100644 --- a/examples/async_http_client.rs +++ b/examples/async_http_client.rs @@ -1,9 +1,9 @@ -use bytes::Bytes; -use http_body_util::BodyExt; -use http_body_util::Empty; +use std::collections::HashMap; + +use http_body_util::BodyExt as _; use hyper::body::Incoming; use hyper_util::client::legacy::Client as HyperClient; -use std::collections::HashMap; +use hyper_util::rt::TokioExecutor; use mlua::{chunk, ExternalResult, Lua, Result, UserData, UserDataMethods}; @@ -11,28 +11,14 @@ struct BodyReader(Incoming); impl UserData for BodyReader { fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { + // Every call returns a next chunk methods.add_async_method_mut("read", |lua, reader, ()| async move { - let mut summarize = Vec::new(); // Create a vector to accumulate the bytes - - loop { - match reader.0.frame().await { - Some(Ok(bytes)) => { - if let Ok(data) = bytes.into_data() { - summarize.extend(data); // Append the bytes to the summarize variable - } - } - Some(Err(_)) => break, // Break on error - None => break, // Break if no more frames + if let Some(bytes) = reader.0.frame().await { + if let Some(bytes) = bytes.into_lua_err()?.data_ref() { + return Some(lua.create_string(&bytes)).transpose(); } } - - if !summarize.is_empty() { - // If summarize has collected data, return it as a Lua string - Ok(Some(lua.create_string(&summarize)?)) - } else { - // Return None if no data was collected - Ok(None) - } + Ok(None) }); } } @@ -42,8 +28,7 @@ async fn main() -> Result<()> { let lua = Lua::new(); let fetch_url = lua.create_async_function(|lua, uri: String| async move { - let client = - HyperClient::builder(hyper_util::rt::TokioExecutor::new()).build_http::<Empty<Bytes>>(); + let client = HyperClient::builder(TokioExecutor::new()).build_http::<String>(); let uri = uri.parse().into_lua_err()?; let resp = client.get(uri).await.into_lua_err()?; @@ -66,19 +51,19 @@ async fn main() -> Result<()> { let f = lua .load(chunk! { - local res = $fetch_url(...) + local res = $fetch_url(...) print("status: "..res.status) for key, vals in pairs(res.headers) do - for _, val in ipairs(vals) do - print(key..": "..val) - end + for _, val in ipairs(vals) do + print(key..": "..val) + end end repeat - local body = res.body:read() - if body then - print(body) - end - until not body + local chunk = res.body:read() + if chunk then + print(chunk) + end + until not chunk }) .into_function()?; diff --git a/examples/async_http_server.rs b/examples/async_http_server.rs index cd94e6e..7da8490 100644 --- a/examples/async_http_server.rs +++ b/examples/async_http_server.rs @@ -1,35 +1,63 @@ -use bytes::Bytes; -use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; -use hyper::{body::Incoming, service::Service, Request, Response}; -use hyper_util::{rt::TokioIo, server::conn::auto}; +use std::convert::Infallible; +use std::future::Future; +use std::net::SocketAddr; +use std::rc::Rc; + +use futures::future::LocalBoxFuture; +use http_body_util::{combinators::BoxBody, BodyExt as _, Empty, Full}; +use hyper::body::{Bytes, Incoming}; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use hyper_util::server::conn::auto::Builder as ServerConnBuilder; +use tokio::net::TcpListener; +use tokio::task::LocalSet; + use mlua::{ - chunk, Error as LuaError, Function, Lua, String as LuaString, Table, UserData, UserDataMethods, + chunk, Error as LuaError, Function, Lua, RegistryKey, String as LuaString, Table, UserData, + UserDataMethods, }; -use std::{future::Future, net::SocketAddr, pin::Pin, rc::Rc}; -use tokio::{net::TcpListener, task::LocalSet}; +/// Wrapper around incoming request that implements UserData struct LuaRequest(SocketAddr, Request<Incoming>); impl UserData for LuaRequest { fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) { - methods.add_method("remote_addr", |_lua, req, ()| Ok((req.0).to_string())); - methods.add_method("method", |_lua, req, ()| Ok((req.1).method().to_string())); + methods.add_method("remote_addr", |_, req, ()| Ok((req.0).to_string())); + methods.add_method("method", |_, req, ()| Ok((req.1).method().to_string())); + methods.add_method("path", |_, req, ()| Ok(req.1.uri().path().to_string())); } } -pub struct Svc(Rc<Lua>, SocketAddr); +/// Service that handles incoming requests +#[derive(Clone)] +pub struct Svc { + lua: Rc<Lua>, + handler: Rc<RegistryKey>, + peer_addr: SocketAddr, +} + +impl Svc { + pub fn new(lua: Rc<Lua>, handler: Rc<RegistryKey>, peer_addr: SocketAddr) -> Self { + Self { + lua, + handler, + peer_addr, + } + } +} -impl Service<Request<Incoming>> for Svc { - type Response = Response<BoxBody<Bytes, hyper::Error>>; +impl hyper::service::Service<Request<Incoming>> for Svc { + type Response = Response<BoxBody<Bytes, Infallible>>; type Error = LuaError; - type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; + type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; fn call(&self, req: Request<Incoming>) -> Self::Future { // If handler returns an error then generate 5xx response - let lua = self.0.clone(); - let lua_req = LuaRequest(self.1, req); + let lua = self.lua.clone(); + let handler_key = self.handler.clone(); + let lua_req = LuaRequest(self.peer_addr, req); Box::pin(async move { - let handler: Function = lua.named_registry_value("http_handler")?; + let handler: Function = lua.registry_value(&handler_key)?; match handler.call_async::<_, Table>(lua_req).await { Ok(lua_resp) => { let status = lua_resp.get::<_, Option<u16>>("status")?.unwrap_or(200); @@ -43,18 +71,11 @@ impl Service<Request<Incoming>> for Svc { } } + // Set body let body = lua_resp .get::<_, Option<LuaString>>("body")? - .map(|b| { - Full::new(Bytes::copy_from_slice(b.clone().as_bytes())) - .map_err(|never| match never {}) - .boxed() - }) - .unwrap_or_else(|| { - Empty::<Bytes>::new() - .map_err(|never| match never {}) - .boxed() - }); + .map(|b| Full::new(Bytes::copy_from_slice(b.as_bytes())).boxed()) + .unwrap_or_else(|| Empty::<Bytes>::new().boxed()); Ok(resp.body(body).unwrap()) } @@ -62,11 +83,7 @@ impl Service<Request<Incoming>> for Svc { eprintln!("{}", err); Ok(Response::builder() .status(500) - .body( - Full::new(Bytes::from("Internal Server Error".as_bytes())) - .map_err(|never| match never {}) - .boxed(), - ) + .body(Full::new(Bytes::from("Internal Server Error")).boxed()) .unwrap()) } } @@ -79,43 +96,47 @@ async fn main() { let lua = Rc::new(Lua::new()); // Create Lua handler function - let handler: Function = lua + let handler: RegistryKey = lua .load(chunk! { - function(req) - return { - status = 200, - headers = { - ["X-Req-Method"] = req:method(), - ["X-Remote-Addr"] = req:remote_addr(), - }, - body = "Hello from Lua!\n" - } - end + function(req) + return { + status = 200, + headers = { + ["X-Req-Method"] = req:method(), + ["X-Req-Path"] = req:path(), + ["X-Remote-Addr"] = req:remote_addr(), + }, + body = "Hello from Lua!\n" + } + end }) .eval() - .expect("cannot create Lua handler"); + .expect("Failed to create Lua handler"); + let handler = Rc::new(handler); - // Store it in the Registry - lua.set_named_registry_value("http_handler", handler) - .expect("cannot store Lua handler"); - - let addr = "127.0.0.1:3000"; + let listen_addr = "127.0.0.1:3000"; + let listener = TcpListener::bind(listen_addr).await.unwrap(); + println!("Listening on http://{listen_addr}"); let local = LocalSet::new(); - let listener = TcpListener::bind(addr).await.unwrap(); loop { - let (stream, peer_addr) = listener.accept().await.unwrap(); - let io = TokioIo::new(stream); + let (stream, peer_addr) = match listener.accept().await { + Ok(x) => x, + Err(err) => { + eprintln!("Failed to accept connection: {err}"); + continue; + } + }; - let svc = Svc(lua.clone(), peer_addr); + let svc = Svc::new(lua.clone(), handler.clone(), peer_addr); local .run_until(async move { - if let Err(err) = auto::Builder::new(LocalExec) + let result = ServerConnBuilder::new(LocalExec) .http1() - .serve_connection(io, svc) - .await - { - println!("Error serving connection: {:?}", err); + .serve_connection(TokioIo::new(stream), svc) + .await; + if let Err(err) = result { + eprintln!("Error serving connection: {err:?}"); } }) .await; @@ -127,7 +148,7 @@ struct LocalExec; impl<F> hyper::rt::Executor<F> for LocalExec where - F: std::future::Future + 'static, // not requiring `Send` + F: Future + 'static, // not requiring `Send` { fn execute(&self, fut: F) { tokio::task::spawn_local(fut); |