summaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
authorStuart Stock <stuart@int08h.com>2018-10-25 21:56:29 -0500
committerStuart Stock <stuart@int08h.com>2018-10-25 21:56:29 -0500
commiteb06e63faa8cbb98533b408e39bd3fd27e5d14ae (patch)
tree6b1241a1154dff5d766c4a39c6562b440f0cb5be /src/server.rs
parentbed4ed8e8db78434a77bb112a59c360f8f9eb803 (diff)
downloadroughenough-eb06e63faa8cbb98533b408e39bd3fd27e5d14ae.zip
implement server TCP health check, #8
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs77
1 files changed, 63 insertions, 14 deletions
diff --git a/src/server.rs b/src/server.rs
index 66e3715..f851270 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -20,14 +20,14 @@ use hex;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::process;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use time;
use byteorder::{LittleEndian, WriteBytesExt};
-use mio::net::UdpSocket;
+use mio::net::{TcpListener, UdpSocket};
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_extras::timer::Timer;
@@ -35,8 +35,9 @@ use config::ServerConfig;
use key::{LongTermKey, OnlineKey};
use kms;
use merkle::MerkleTree;
-use MIN_REQUEST_LENGTH;
-use {Error, RtMessage, Tag};
+use {Error, RtMessage, Tag, MIN_REQUEST_LENGTH};
+use std::io::Write;
+use mio::tcp::Shutdown;
macro_rules! check_ctrlc {
($keep_running:expr) => {
@@ -47,8 +48,13 @@ macro_rules! check_ctrlc {
};
}
+// mio event registrations
const MESSAGE: Token = Token(0);
const STATUS: Token = Token(1);
+const HEALTH_CHECK: Token = Token(2);
+
+// Canned response to health check request
+const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: close\n\n";
/// The main Roughenough server instance.
///
@@ -65,10 +71,11 @@ pub struct Server {
online_key: OnlineKey,
cert_bytes: Vec<u8>,
- response_counter: AtomicUsize,
+ response_counter: u64,
num_bad_requests: u64,
socket: UdpSocket,
+ health_listener: Option<TcpListener>,
keep_running: Arc<AtomicBool>,
poll_duration: Option<Duration>,
timer: Timer<()>,
@@ -109,10 +116,9 @@ impl Server {
long_term_key.make_cert(&online_key).encode().unwrap()
};
- let response_counter = AtomicUsize::new(0);
let keep_running = Arc::new(AtomicBool::new(true));
- let sock_addr = config.udp_socket_addr().expect("");
+ let sock_addr = config.udp_socket_addr().expect("udp sock addr");
let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
let poll_duration = Some(Duration::from_millis(100));
@@ -126,6 +132,21 @@ impl Server {
poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge())
.unwrap();
+ let health_listener = if let Some(hc_port) = config.health_check_port() {
+ let hc_sock_addr: SocketAddr = format!("{}:{}", config.interface(), hc_port).parse()
+ .unwrap();
+
+ let tcp_listener = TcpListener::bind(&hc_sock_addr)
+ .expect("failed to bind TCP listener for health check");
+
+ poll.register(&tcp_listener, HEALTH_CHECK, Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ Some(tcp_listener)
+ } else {
+ None
+ };
+
let merkle = MerkleTree::new();
let requests = Vec::with_capacity(config.batch_size() as usize);
@@ -134,9 +155,11 @@ impl Server {
online_key,
cert_bytes,
- response_counter,
+ response_counter: 0,
num_bad_requests: 0,
socket,
+ health_listener,
+
keep_running,
poll_duration,
timer,
@@ -221,7 +244,7 @@ impl Server {
'process_batch: loop {
check_ctrlc!(self.keep_running);
- let resp_start = self.response_counter.load(Ordering::SeqCst);
+ let resp_start = self.response_counter;
for i in 0..self.config.batch_size() {
match self.socket.recv_from(&mut self.buf) {
@@ -236,7 +259,7 @@ impl Server {
info!(
"Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})",
- e, num_bytes, src_addr, i, resp_start + i as usize
+ e, num_bytes, src_addr, i, resp_start + i as u64
);
}
}
@@ -276,8 +299,8 @@ impl Server {
.socket
.send_to(&resp_bytes, &src_addr)
.expect("send_to failed");
- let num_responses =
- self.response_counter.fetch_add(1, Ordering::SeqCst);
+
+ self.response_counter += 1;
info!(
"Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
@@ -285,7 +308,7 @@ impl Server {
src_addr,
hex::encode(&nonce[0..4]),
i,
- num_responses
+ self.response_counter
);
}
@@ -298,10 +321,36 @@ impl Server {
}
}
+ HEALTH_CHECK => {
+ let listener = self.health_listener.as_ref().unwrap();
+
+ match listener.accept() {
+ Ok((ref mut stream, src_addr)) => {
+ info!("health check from {}", src_addr);
+
+ match stream.write(HTTP_RESPONSE.as_bytes()) {
+ Ok(_) => (),
+ Err(e) => warn!("error writing health check {}", e)
+ }
+
+ match stream.shutdown(Shutdown::Both) {
+ Ok(_) => (),
+ Err(e) => warn!("error in health check socket shutdown {}", e)
+ }
+ }
+ Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
+ debug!("blocking in TCP health check");
+ },
+ Err(e) => {
+ warn!("unexpected health check error {}", e);
+ }
+ }
+ }
+
STATUS => {
info!(
"responses {}, invalid requests {}",
- self.response_counter.load(Ordering::SeqCst),
+ self.response_counter,
self.num_bad_requests
);