summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Orlenko <zxteam@protonmail.com>2024-03-22 00:30:29 +0000
committerAlex Orlenko <zxteam@protonmail.com>2024-03-22 00:35:37 +0000
commit59b14000f3c12279183747781e6f802b29d90bd8 (patch)
tree38c603f8b281e3db61a32ea19c5ffeb9d3e88347
parent849206ef9d56b250424f34077a62a4e6ad96c21a (diff)
downloadmlua-59b14000f3c12279183747781e6f802b29d90bd8.zip
Update hyper examples
-rw-r--r--Cargo.toml5
-rw-r--r--README.md15
-rw-r--r--examples/async_http_client.rs53
-rw-r--r--examples/async_http_server.rs137
4 files changed, 109 insertions, 101 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 1352d52..c42075d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -63,10 +63,9 @@ libloading = { version = "0.8", optional = true }
[dev-dependencies]
trybuild = "1.0"
futures = "0.3.5"
-hyper = { version = "1", features = ["client", "server"] }
-hyper-util = { version = "0.1.3", features = ["server", "client", "client-legacy", "tokio", "http1"] }
+hyper = { version = "1.2", features = ["full"] }
+hyper-util = { version = "0.1.3", features = ["full"] }
http-body-util = "0.1.1"
-bytes = "1.5.0"
reqwest = { version = "0.12", features = ["json"] }
tokio = { version = "1.0", features = ["macros", "rt", "time"] }
serde = { version = "1.0", features = ["derive"] }
diff --git a/README.md b/README.md
index 434a2ba..3d362d7 100644
--- a/README.md
+++ b/README.md
@@ -85,14 +85,17 @@ This works using Lua [coroutines](https://www.lua.org/manual/5.3/manual.html#2.6
- [TCP Server](examples/async_tcp_server.rs)
-**shell command example**:
-``` shell
-# async http client
-cargo run --example async_http_client --features="async macros lua54"
+**shell command examples**:
+```shell
+# async http client (hyper)
+cargo run --example async_http_client --features=lua54,async,macros
+
+# async http client (reqwest)
+cargo run --example async_http_reqwest --features=lua54,async,macros,serialize
# async http server
-cargo run --example async_http_server --features="async macros lua54"
-curl http://localhost:3000
+cargo run --example async_http_server --features=lua54,async,macros
+curl -v http://localhost:3000
```
### Serialization (serde) support
diff --git a/examples/async_http_client.rs b/examples/async_http_client.rs
index c0fa1ff..ca4eac9 100644
--- a/examples/async_http_client.rs
+++ b/examples/async_http_client.rs
@@ -1,9 +1,9 @@
-use bytes::Bytes;
-use http_body_util::BodyExt;
-use http_body_util::Empty;
+use std::collections::HashMap;
+
+use http_body_util::BodyExt as _;
use hyper::body::Incoming;
use hyper_util::client::legacy::Client as HyperClient;
-use std::collections::HashMap;
+use hyper_util::rt::TokioExecutor;
use mlua::{chunk, ExternalResult, Lua, Result, UserData, UserDataMethods};
@@ -11,28 +11,14 @@ struct BodyReader(Incoming);
impl UserData for BodyReader {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
+ // Every call returns a next chunk
methods.add_async_method_mut("read", |lua, reader, ()| async move {
- let mut summarize = Vec::new(); // Create a vector to accumulate the bytes
-
- loop {
- match reader.0.frame().await {
- Some(Ok(bytes)) => {
- if let Ok(data) = bytes.into_data() {
- summarize.extend(data); // Append the bytes to the summarize variable
- }
- }
- Some(Err(_)) => break, // Break on error
- None => break, // Break if no more frames
+ if let Some(bytes) = reader.0.frame().await {
+ if let Some(bytes) = bytes.into_lua_err()?.data_ref() {
+ return Some(lua.create_string(&bytes)).transpose();
}
}
-
- if !summarize.is_empty() {
- // If summarize has collected data, return it as a Lua string
- Ok(Some(lua.create_string(&summarize)?))
- } else {
- // Return None if no data was collected
- Ok(None)
- }
+ Ok(None)
});
}
}
@@ -42,8 +28,7 @@ async fn main() -> Result<()> {
let lua = Lua::new();
let fetch_url = lua.create_async_function(|lua, uri: String| async move {
- let client =
- HyperClient::builder(hyper_util::rt::TokioExecutor::new()).build_http::<Empty<Bytes>>();
+ let client = HyperClient::builder(TokioExecutor::new()).build_http::<String>();
let uri = uri.parse().into_lua_err()?;
let resp = client.get(uri).await.into_lua_err()?;
@@ -66,19 +51,19 @@ async fn main() -> Result<()> {
let f = lua
.load(chunk! {
- local res = $fetch_url(...)
+ local res = $fetch_url(...)
print("status: "..res.status)
for key, vals in pairs(res.headers) do
- for _, val in ipairs(vals) do
- print(key..": "..val)
- end
+ for _, val in ipairs(vals) do
+ print(key..": "..val)
+ end
end
repeat
- local body = res.body:read()
- if body then
- print(body)
- end
- until not body
+ local chunk = res.body:read()
+ if chunk then
+ print(chunk)
+ end
+ until not chunk
})
.into_function()?;
diff --git a/examples/async_http_server.rs b/examples/async_http_server.rs
index cd94e6e..7da8490 100644
--- a/examples/async_http_server.rs
+++ b/examples/async_http_server.rs
@@ -1,35 +1,63 @@
-use bytes::Bytes;
-use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
-use hyper::{body::Incoming, service::Service, Request, Response};
-use hyper_util::{rt::TokioIo, server::conn::auto};
+use std::convert::Infallible;
+use std::future::Future;
+use std::net::SocketAddr;
+use std::rc::Rc;
+
+use futures::future::LocalBoxFuture;
+use http_body_util::{combinators::BoxBody, BodyExt as _, Empty, Full};
+use hyper::body::{Bytes, Incoming};
+use hyper::{Request, Response};
+use hyper_util::rt::TokioIo;
+use hyper_util::server::conn::auto::Builder as ServerConnBuilder;
+use tokio::net::TcpListener;
+use tokio::task::LocalSet;
+
use mlua::{
- chunk, Error as LuaError, Function, Lua, String as LuaString, Table, UserData, UserDataMethods,
+ chunk, Error as LuaError, Function, Lua, RegistryKey, String as LuaString, Table, UserData,
+ UserDataMethods,
};
-use std::{future::Future, net::SocketAddr, pin::Pin, rc::Rc};
-use tokio::{net::TcpListener, task::LocalSet};
+/// Wrapper around incoming request that implements UserData
struct LuaRequest(SocketAddr, Request<Incoming>);
impl UserData for LuaRequest {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
- methods.add_method("remote_addr", |_lua, req, ()| Ok((req.0).to_string()));
- methods.add_method("method", |_lua, req, ()| Ok((req.1).method().to_string()));
+ methods.add_method("remote_addr", |_, req, ()| Ok((req.0).to_string()));
+ methods.add_method("method", |_, req, ()| Ok((req.1).method().to_string()));
+ methods.add_method("path", |_, req, ()| Ok(req.1.uri().path().to_string()));
}
}
-pub struct Svc(Rc<Lua>, SocketAddr);
+/// Service that handles incoming requests
+#[derive(Clone)]
+pub struct Svc {
+ lua: Rc<Lua>,
+ handler: Rc<RegistryKey>,
+ peer_addr: SocketAddr,
+}
+
+impl Svc {
+ pub fn new(lua: Rc<Lua>, handler: Rc<RegistryKey>, peer_addr: SocketAddr) -> Self {
+ Self {
+ lua,
+ handler,
+ peer_addr,
+ }
+ }
+}
-impl Service<Request<Incoming>> for Svc {
- type Response = Response<BoxBody<Bytes, hyper::Error>>;
+impl hyper::service::Service<Request<Incoming>> for Svc {
+ type Response = Response<BoxBody<Bytes, Infallible>>;
type Error = LuaError;
- type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
+ type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, req: Request<Incoming>) -> Self::Future {
// If handler returns an error then generate 5xx response
- let lua = self.0.clone();
- let lua_req = LuaRequest(self.1, req);
+ let lua = self.lua.clone();
+ let handler_key = self.handler.clone();
+ let lua_req = LuaRequest(self.peer_addr, req);
Box::pin(async move {
- let handler: Function = lua.named_registry_value("http_handler")?;
+ let handler: Function = lua.registry_value(&handler_key)?;
match handler.call_async::<_, Table>(lua_req).await {
Ok(lua_resp) => {
let status = lua_resp.get::<_, Option<u16>>("status")?.unwrap_or(200);
@@ -43,18 +71,11 @@ impl Service<Request<Incoming>> for Svc {
}
}
+ // Set body
let body = lua_resp
.get::<_, Option<LuaString>>("body")?
- .map(|b| {
- Full::new(Bytes::copy_from_slice(b.clone().as_bytes()))
- .map_err(|never| match never {})
- .boxed()
- })
- .unwrap_or_else(|| {
- Empty::<Bytes>::new()
- .map_err(|never| match never {})
- .boxed()
- });
+ .map(|b| Full::new(Bytes::copy_from_slice(b.as_bytes())).boxed())
+ .unwrap_or_else(|| Empty::<Bytes>::new().boxed());
Ok(resp.body(body).unwrap())
}
@@ -62,11 +83,7 @@ impl Service<Request<Incoming>> for Svc {
eprintln!("{}", err);
Ok(Response::builder()
.status(500)
- .body(
- Full::new(Bytes::from("Internal Server Error".as_bytes()))
- .map_err(|never| match never {})
- .boxed(),
- )
+ .body(Full::new(Bytes::from("Internal Server Error")).boxed())
.unwrap())
}
}
@@ -79,43 +96,47 @@ async fn main() {
let lua = Rc::new(Lua::new());
// Create Lua handler function
- let handler: Function = lua
+ let handler: RegistryKey = lua
.load(chunk! {
- function(req)
- return {
- status = 200,
- headers = {
- ["X-Req-Method"] = req:method(),
- ["X-Remote-Addr"] = req:remote_addr(),
- },
- body = "Hello from Lua!\n"
- }
- end
+ function(req)
+ return {
+ status = 200,
+ headers = {
+ ["X-Req-Method"] = req:method(),
+ ["X-Req-Path"] = req:path(),
+ ["X-Remote-Addr"] = req:remote_addr(),
+ },
+ body = "Hello from Lua!\n"
+ }
+ end
})
.eval()
- .expect("cannot create Lua handler");
+ .expect("Failed to create Lua handler");
+ let handler = Rc::new(handler);
- // Store it in the Registry
- lua.set_named_registry_value("http_handler", handler)
- .expect("cannot store Lua handler");
-
- let addr = "127.0.0.1:3000";
+ let listen_addr = "127.0.0.1:3000";
+ let listener = TcpListener::bind(listen_addr).await.unwrap();
+ println!("Listening on http://{listen_addr}");
let local = LocalSet::new();
- let listener = TcpListener::bind(addr).await.unwrap();
loop {
- let (stream, peer_addr) = listener.accept().await.unwrap();
- let io = TokioIo::new(stream);
+ let (stream, peer_addr) = match listener.accept().await {
+ Ok(x) => x,
+ Err(err) => {
+ eprintln!("Failed to accept connection: {err}");
+ continue;
+ }
+ };
- let svc = Svc(lua.clone(), peer_addr);
+ let svc = Svc::new(lua.clone(), handler.clone(), peer_addr);
local
.run_until(async move {
- if let Err(err) = auto::Builder::new(LocalExec)
+ let result = ServerConnBuilder::new(LocalExec)
.http1()
- .serve_connection(io, svc)
- .await
- {
- println!("Error serving connection: {:?}", err);
+ .serve_connection(TokioIo::new(stream), svc)
+ .await;
+ if let Err(err) = result {
+ eprintln!("Error serving connection: {err:?}");
}
})
.await;
@@ -127,7 +148,7 @@ struct LocalExec;
impl<F> hyper::rt::Executor<F> for LocalExec
where
- F: std::future::Future + 'static, // not requiring `Send`
+ F: Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
tokio::task::spawn_local(fut);