diff options
author | Stuart Stock <stuart@int08h.com> | 2018-10-25 21:56:29 -0500 |
---|---|---|
committer | Stuart Stock <stuart@int08h.com> | 2018-10-25 21:56:29 -0500 |
commit | eb06e63faa8cbb98533b408e39bd3fd27e5d14ae (patch) | |
tree | 6b1241a1154dff5d766c4a39c6562b440f0cb5be /src | |
parent | bed4ed8e8db78434a77bb112a59c360f8f9eb803 (diff) | |
download | roughenough-eb06e63faa8cbb98533b408e39bd3fd27e5d14ae.zip |
implement server TCP health check, #8
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/roughenough-server.rs | 8 | ||||
-rw-r--r-- | src/server.rs | 77 |
2 files changed, 71 insertions, 14 deletions
diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs index c908133..9778998 100644 --- a/src/bin/roughenough-server.rs +++ b/src/bin/roughenough-server.rs @@ -71,6 +71,14 @@ fn polling_loop(config: Box<ServerConfig>) { server.get_config().port() ); + if let Some(hc_port) = server.get_config().health_check_port() { + info!( + "TCP health check : {}:{}", + server.get_config().interface(), + hc_port + ); + } + let kr = server.get_keep_running(); let kr_new = kr.clone(); diff --git a/src/server.rs b/src/server.rs index 66e3715..f851270 100644 --- a/src/server.rs +++ b/src/server.rs @@ -20,14 +20,14 @@ use hex; use std::io::ErrorKind; use std::net::SocketAddr; use std::process; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use time; use byteorder::{LittleEndian, WriteBytesExt}; -use mio::net::UdpSocket; +use mio::net::{TcpListener, UdpSocket}; use mio::{Events, Poll, PollOpt, Ready, Token}; use mio_extras::timer::Timer; @@ -35,8 +35,9 @@ use config::ServerConfig; use key::{LongTermKey, OnlineKey}; use kms; use merkle::MerkleTree; -use MIN_REQUEST_LENGTH; -use {Error, RtMessage, Tag}; +use {Error, RtMessage, Tag, MIN_REQUEST_LENGTH}; +use std::io::Write; +use mio::tcp::Shutdown; macro_rules! check_ctrlc { ($keep_running:expr) => { @@ -47,8 +48,13 @@ macro_rules! check_ctrlc { }; } +// mio event registrations const MESSAGE: Token = Token(0); const STATUS: Token = Token(1); +const HEALTH_CHECK: Token = Token(2); + +// Canned response to health check request +const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: close\n\n"; /// The main Roughenough server instance. /// @@ -65,10 +71,11 @@ pub struct Server { online_key: OnlineKey, cert_bytes: Vec<u8>, - response_counter: AtomicUsize, + response_counter: u64, num_bad_requests: u64, socket: UdpSocket, + health_listener: Option<TcpListener>, keep_running: Arc<AtomicBool>, poll_duration: Option<Duration>, timer: Timer<()>, @@ -109,10 +116,9 @@ impl Server { long_term_key.make_cert(&online_key).encode().unwrap() }; - let response_counter = AtomicUsize::new(0); let keep_running = Arc::new(AtomicBool::new(true)); - let sock_addr = config.udp_socket_addr().expect(""); + let sock_addr = config.udp_socket_addr().expect("udp sock addr"); let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); let poll_duration = Some(Duration::from_millis(100)); @@ -126,6 +132,21 @@ impl Server { poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge()) .unwrap(); + let health_listener = if let Some(hc_port) = config.health_check_port() { + let hc_sock_addr: SocketAddr = format!("{}:{}", config.interface(), hc_port).parse() + .unwrap(); + + let tcp_listener = TcpListener::bind(&hc_sock_addr) + .expect("failed to bind TCP listener for health check"); + + poll.register(&tcp_listener, HEALTH_CHECK, Ready::readable(), PollOpt::edge()) + .unwrap(); + + Some(tcp_listener) + } else { + None + }; + let merkle = MerkleTree::new(); let requests = Vec::with_capacity(config.batch_size() as usize); @@ -134,9 +155,11 @@ impl Server { online_key, cert_bytes, - response_counter, + response_counter: 0, num_bad_requests: 0, socket, + health_listener, + keep_running, poll_duration, timer, @@ -221,7 +244,7 @@ impl Server { 'process_batch: loop { check_ctrlc!(self.keep_running); - let resp_start = self.response_counter.load(Ordering::SeqCst); + let resp_start = self.response_counter; for i in 0..self.config.batch_size() { match self.socket.recv_from(&mut self.buf) { @@ -236,7 +259,7 @@ impl Server { info!( "Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})", - e, num_bytes, src_addr, i, resp_start + i as usize + e, num_bytes, src_addr, i, resp_start + i as u64 ); } } @@ -276,8 +299,8 @@ impl Server { .socket .send_to(&resp_bytes, &src_addr) .expect("send_to failed"); - let num_responses = - self.response_counter.fetch_add(1, Ordering::SeqCst); + + self.response_counter += 1; info!( "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})", @@ -285,7 +308,7 @@ impl Server { src_addr, hex::encode(&nonce[0..4]), i, - num_responses + self.response_counter ); } @@ -298,10 +321,36 @@ impl Server { } } + HEALTH_CHECK => { + let listener = self.health_listener.as_ref().unwrap(); + + match listener.accept() { + Ok((ref mut stream, src_addr)) => { + info!("health check from {}", src_addr); + + match stream.write(HTTP_RESPONSE.as_bytes()) { + Ok(_) => (), + Err(e) => warn!("error writing health check {}", e) + } + + match stream.shutdown(Shutdown::Both) { + Ok(_) => (), + Err(e) => warn!("error in health check socket shutdown {}", e) + } + } + Err(ref e) if e.kind() == ErrorKind::WouldBlock => { + debug!("blocking in TCP health check"); + }, + Err(e) => { + warn!("unexpected health check error {}", e); + } + } + } + STATUS => { info!( "responses {}, invalid requests {}", - self.response_counter.load(Ordering::SeqCst), + self.response_counter, self.num_bad_requests ); |