summaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs308
1 files changed, 160 insertions, 148 deletions
diff --git a/src/server.rs b/src/server.rs
index 01bbef6..6045191 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -23,21 +23,24 @@ use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
+use std::io::Write;
use time;
use byteorder::{LittleEndian, WriteBytesExt};
+use humansize::{FileSize, file_size_opts as fsopts};
+
use mio::net::{TcpListener, UdpSocket};
use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::tcp::Shutdown;
use mio_extras::timer::Timer;
use crate::config::ServerConfig;
use crate::key::{LongTermKey, OnlineKey};
use crate::kms;
use crate::merkle::MerkleTree;
-use mio::tcp::Shutdown;
-use std::io::Write;
use crate::{Error, RtMessage, Tag, MIN_REQUEST_LENGTH};
+use crate::stats::{ClientStats, SimpleStats};
macro_rules! check_ctrlc {
($keep_running:expr) => {
@@ -67,20 +70,16 @@ const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: clo
/// See [the config module](../config/index.html) for more information.
///
pub struct Server {
- config: Box<ServerConfig>,
+ config: Box<dyn ServerConfig>,
online_key: OnlineKey,
cert_bytes: Vec<u8>,
- response_counter: u64,
- num_bad_requests: u64,
-
socket: UdpSocket,
health_listener: Option<TcpListener>,
keep_running: Arc<AtomicBool>,
poll_duration: Option<Duration>,
timer: Timer<()>,
poll: Poll,
- events: Events,
merkle: MerkleTree,
requests: Vec<(Vec<u8>, SocketAddr)>,
buf: [u8; 65_536],
@@ -90,6 +89,8 @@ pub struct Server {
// Used to send requests to ourselves in fuzzing mode
#[cfg(fuzzing)]
fake_client_socket: UdpSocket,
+
+ stats: SimpleStats,
}
impl Server {
@@ -155,8 +156,6 @@ impl Server {
online_key,
cert_bytes,
- response_counter: 0,
- num_bad_requests: 0,
socket,
health_listener,
@@ -164,7 +163,6 @@ impl Server {
poll_duration,
timer,
poll,
- events: Events::with_capacity(32),
merkle,
requests,
buf: [0u8; 65_536],
@@ -173,150 +171,69 @@ impl Server {
#[cfg(fuzzing)]
fake_client_socket: UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(),
+
+ stats: SimpleStats::new(),
}
}
- /// Returns a reference counted pointer the this server's `keep_running` value.
- pub fn get_keep_running(&self) -> Arc<AtomicBool> {
- self.keep_running.clone()
+ /// Returns a reference to the server's long-term public key
+ pub fn get_public_key(&self) -> &str {
+ &self.public_key
}
- // extract the client's nonce from its request
- fn nonce_from_request<'a>(&self, buf: &'a [u8], num_bytes: usize) -> Result<&'a [u8], Error> {
- if num_bytes < MIN_REQUEST_LENGTH as usize {
- return Err(Error::RequestTooShort);
- }
-
- let tag_count = &buf[..4];
- let expected_nonc = &buf[8..12];
- let expected_pad = &buf[12..16];
-
- let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00];
- let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value();
- let tag2_is_pad = expected_pad == Tag::PAD.wire_value();
-
- if tag_count_is_2 && tag1_is_nonc && tag2_is_pad {
- Ok(&buf[0x10..0x50])
- } else {
- Err(Error::InvalidRequest)
- }
+ /// Returns a reference to the server's on-line (delegated) key
+ pub fn get_online_key(&self) -> &OnlineKey {
+ &self.online_key
}
- fn make_response(
- &self,
- srep: &RtMessage,
- cert_bytes: &[u8],
- path: &[u8],
- idx: u32,
- ) -> RtMessage {
- let mut index = [0; 4];
- (&mut index as &mut [u8])
- .write_u32::<LittleEndian>(idx)
- .unwrap();
-
- let sig_bytes = srep.get_field(Tag::SIG).unwrap();
- let srep_bytes = srep.get_field(Tag::SREP).unwrap();
+ /// Returns a reference to the `ServerConfig` this server was configured with
+ pub fn get_config(&self) -> &Box<dyn ServerConfig> {
+ &self.config
+ }
- let mut response = RtMessage::new(5);
- response.add_field(Tag::SIG, sig_bytes).unwrap();
- response.add_field(Tag::PATH, path).unwrap();
- response.add_field(Tag::SREP, srep_bytes).unwrap();
- response.add_field(Tag::CERT, cert_bytes).unwrap();
- response.add_field(Tag::INDX, &index).unwrap();
+ /// Returns a reference counted pointer the this server's `keep_running` value.
+ pub fn get_keep_running(&self) -> Arc<AtomicBool> {
+ self.keep_running.clone()
+ }
- response
+ #[cfg(fuzzing)]
+ pub fn send_to_self(&mut self, data: &[u8]) {
+ self.response_counter = 0;
+ self.num_bad_requests = 0;
+ let res = self
+ .fake_client_socket
+ .send_to(data, &self.socket.local_addr().unwrap());
+ info!("Sent to self: {:?}", res);
}
/// The main processing function for incoming connections. This method should be
/// called repeatedly in a loop to process requests. It returns 'true' when the
/// server has shutdown (due to keep_running being set to 'false').
///
- pub fn process_events(&mut self) -> bool {
+ pub fn process_events(&mut self, events: &mut Events) -> bool {
self.poll
- .poll(&mut self.events, self.poll_duration)
+ .poll(events, self.poll_duration)
.expect("poll failed");
- for event in self.events.iter() {
- match event.token() {
+ for msg in events.iter() {
+ match msg.token() {
MESSAGE => {
- let mut done = false;
-
- 'process_batch: loop {
+ loop {
check_ctrlc!(self.keep_running);
- let resp_start = self.response_counter;
-
- for i in 0..self.config.batch_size() {
- match self.socket.recv_from(&mut self.buf) {
- Ok((num_bytes, src_addr)) => {
- match self.nonce_from_request(&self.buf, num_bytes) {
- Ok(nonce) => {
- self.requests.push((Vec::from(nonce), src_addr));
- self.merkle.push_leaf(nonce);
- }
- Err(e) => {
- self.num_bad_requests += 1;
-
- info!(
- "Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})",
- e, num_bytes, src_addr, i, resp_start + i as u64
- );
- }
- }
- }
- Err(e) => match e.kind() {
- ErrorKind::WouldBlock => {
- done = true;
- break;
- }
- _ => {
- error!(
- "Error receiving from socket: {:?}: {:?}",
- e.kind(),
- e
- );
- break;
- }
- },
- };
- }
-
- if self.requests.is_empty() {
- break 'process_batch;
- }
-
- let merkle_root = self.merkle.compute_root();
- let srep = self.online_key.make_srep(time::get_time(), &merkle_root);
-
- for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() {
- let paths = self.merkle.get_paths(i);
-
- let resp =
- self.make_response(&srep, &self.cert_bytes, &paths, i as u32);
- let resp_bytes = resp.encode().unwrap();
-
- let bytes_sent = self
- .socket
- .send_to(&resp_bytes, &src_addr)
- .expect("send_to failed");
+ self.merkle.reset();
+ self.requests.clear();
- self.response_counter += 1;
+ let socket_now_empty = self.collect_requests();
- info!(
- "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
- bytes_sent,
- src_addr,
- hex::encode(&nonce[0..4]),
- i,
- self.response_counter
- );
+ if self.requests.is_empty() {
+ break;
}
- self.merkle.reset();
- self.requests.clear();
+ self.send_responses();
- if done {
- break 'process_batch;
+ if socket_now_empty {
+ break;
}
}
}
@@ -327,11 +244,12 @@ impl Server {
match listener.accept() {
Ok((ref mut stream, src_addr)) => {
info!("health check from {}", src_addr);
+ self.stats.add_health_check(&src_addr.ip());
match stream.write(HTTP_RESPONSE.as_bytes()) {
Ok(_) => (),
Err(e) => warn!("error writing health check {}", e),
- }
+ };
match stream.shutdown(Shutdown::Both) {
Ok(_) => (),
@@ -348,9 +266,21 @@ impl Server {
}
STATUS => {
+ for (addr, counts) in self.stats.iter() {
+ info!(
+ "{:16}: {} valid, {} invalid requests; {} responses ({} sent)",
+ format!("{}", addr), counts.valid_requests, counts.invalid_requests,
+ counts.responses_sent,
+ counts.bytes_sent.file_size(fsopts::BINARY).unwrap()
+ );
+ }
+
info!(
- "responses {}, invalid requests {}",
- self.response_counter, self.num_bad_requests
+ "Totals: {} unique clients; {} valid, {} invalid requests; {} responses ({} sent)",
+ self.stats.total_unique_clients(),
+ self.stats.total_valid_requests(), self.stats.total_invalid_requests(),
+ self.stats.total_responses_sent(),
+ self.stats.total_bytes_sent().file_size(fsopts::BINARY).unwrap()
);
self.timer.set_timeout(self.config.status_interval(), ());
@@ -362,28 +292,110 @@ impl Server {
false
}
- /// Returns a reference to the server's long-term public key
- pub fn get_public_key(&self) -> &str {
- &self.public_key
+ // Read and process client requests from socket until empty or 'batch_size' number of
+ // requests have been read.
+ fn collect_requests(&mut self) -> bool {
+ for i in 0..self.config.batch_size() {
+ match self.socket.recv_from(&mut self.buf) {
+ Ok((num_bytes, src_addr)) => {
+ match self.nonce_from_request(&self.buf, num_bytes) {
+ Ok(nonce) => {
+ self.stats.add_valid_request(&src_addr.ip());
+ self.requests.push((Vec::from(nonce), src_addr));
+ self.merkle.push_leaf(nonce);
+ }
+ Err(e) => {
+ self.stats.add_invalid_request(&src_addr.ip());
+
+ info!(
+ "Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})",
+ e, num_bytes, src_addr, i,
+ self.stats.total_responses_sent() + u64::from(i)
+ );
+ }
+ }
+ }
+ Err(e) => match e.kind() {
+ ErrorKind::WouldBlock => {
+ return true;
+ }
+ _ => {
+ error!("Error receiving from socket: {:?}: {:?}", e.kind(), e);
+ return false;
+ }
+ },
+ };
+ }
+
+ false
}
- /// Returns a reference to the server's on-line (delegated) key
- pub fn get_online_key(&self) -> &OnlineKey {
- &self.online_key
+ // extract the client's nonce from its request
+ fn nonce_from_request<'a>(&self, buf: &'a [u8], num_bytes: usize) -> Result<&'a [u8], Error> {
+ if num_bytes < MIN_REQUEST_LENGTH as usize {
+ return Err(Error::RequestTooShort);
+ }
+
+ let tag_count = &buf[..4];
+ let expected_nonc = &buf[8..12];
+ let expected_pad = &buf[12..16];
+
+ let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00];
+ let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value();
+ let tag2_is_pad = expected_pad == Tag::PAD.wire_value();
+
+ if tag_count_is_2 && tag1_is_nonc && tag2_is_pad {
+ Ok(&buf[0x10..0x50])
+ } else {
+ Err(Error::InvalidRequest)
+ }
}
- /// Returns a reference to the `ServerConfig` this server was configured with
- pub fn get_config(&self) -> &Box<ServerConfig> {
- &self.config
+ fn send_responses(&mut self) -> () {
+ let merkle_root = self.merkle.compute_root();
+
+ // The SREP tag is identical for each response
+ let srep = self.online_key.make_srep(time::get_time(), &merkle_root);
+
+ for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() {
+ let paths = self.merkle.get_paths(i);
+ let resp = self.make_response(&srep, &self.cert_bytes, &paths, i as u32);
+ let resp_bytes = resp.encode().unwrap();
+
+ let bytes_sent = self
+ .socket
+ .send_to(&resp_bytes, &src_addr)
+ .expect("send_to failed");
+
+ self.stats.add_response(&src_addr.ip(), bytes_sent);
+
+ info!(
+ "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
+ bytes_sent,
+ src_addr,
+ hex::encode(&nonce[0..4]),
+ i,
+ self.stats.total_responses_sent()
+ );
+ }
}
- #[cfg(fuzzing)]
- pub fn send_to_self(&mut self, data: &[u8]) {
- self.response_counter = 0;
- self.num_bad_requests = 0;
- let res = self
- .fake_client_socket
- .send_to(data, &self.socket.local_addr().unwrap());
- info!("Sent to self: {:?}", res);
+ fn make_response(&self, srep: &RtMessage, cert_bytes: &[u8], path: &[u8], idx: u32) -> RtMessage {
+ let mut index = [0; 4];
+ (&mut index as &mut [u8])
+ .write_u32::<LittleEndian>(idx)
+ .unwrap();
+
+ let sig_bytes = srep.get_field(Tag::SIG).unwrap();
+ let srep_bytes = srep.get_field(Tag::SREP).unwrap();
+
+ let mut response = RtMessage::new(5);
+ response.add_field(Tag::SIG, sig_bytes).unwrap();
+ response.add_field(Tag::PATH, path).unwrap();
+ response.add_field(Tag::SREP, srep_bytes).unwrap();
+ response.add_field(Tag::CERT, cert_bytes).unwrap();
+ response.add_field(Tag::INDX, &index).unwrap();
+
+ response
}
}