summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStuart Stock <stuart@int08h.com>2019-01-19 14:16:51 -0600
committerStuart Stock <stuart@int08h.com>2019-01-20 09:03:24 -0600
commita84573f053e4ab36dda3b82b8811bc20dd991844 (patch)
tree291b6a62f1d9dedb3481e76bfb36729cd17486b0
parentbdf087db2f5ebcadf31bd0968b690daa25820489 (diff)
downloadroughenough-a84573f053e4ab36dda3b82b8811bc20dd991844.zip
Clean and refactor server's inner loop
-rw-r--r--CHANGELOG.md5
-rw-r--r--Cargo.toml6
-rw-r--r--src/bin/roughenough-server.rs6
-rw-r--r--src/config/mod.rs4
-rw-r--r--src/lib.rs2
-rw-r--r--src/server.rs275
6 files changed, 151 insertions, 147 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9a98044..343aa58 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## version 1.1.2
+
+* Optional client request statistics tracking.
+* Clean-up and simplification of server inner loop.
+
## Version 1.1.1
* Provide auxiliary data to the AWS KMS decryption call. The auxiliary data _was_ provided in encrypt, but not decrypt, resulting in unconditional failure when unwrapping the long-term identity. See https://github.com/int08h/roughenough/commit/846128d08bd3fcd72f23b3123b332d0692782e41#diff-7f7c3059af30a5ded26269301caf8531R102
diff --git a/Cargo.toml b/Cargo.toml
index 165d81d..fa57435 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "roughenough"
-version = "1.1.1"
+version = "1.1.2"
repository = "https://github.com/int08h/roughenough"
authors = ["Stuart Stock <stuart@int08h.com>", "Aaron Hill <aa1ronham@gmail.com>"]
license = "Apache-2.0"
@@ -33,8 +33,8 @@ chrono = "0.4"
hex = "0.3"
base64 = "0.9"
-rusoto_core = { version = "0.34", optional = true }
-rusoto_kms = { version = "0.34", optional = true }
+rusoto_core = { version = "0.36", optional = true }
+rusoto_kms = { version = "0.36", optional = true }
# google-cloudkms1 intentionally uses an old version of Hyper. See
# https://github.com/Byron/google-apis-rs/issues/173 for more information.
diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs
index 5f70da0..88c8e83 100644
--- a/src/bin/roughenough-server.rs
+++ b/src/bin/roughenough-server.rs
@@ -32,6 +32,8 @@ use roughenough::config::ServerConfig;
use roughenough::roughenough_version;
use roughenough::server::Server;
+use mio::Events;
+
macro_rules! check_ctrlc {
($keep_running:expr) => {
if !$keep_running.load(Ordering::Acquire) {
@@ -74,9 +76,11 @@ fn polling_loop(config: Box<ServerConfig>) {
ctrlc::set_handler(move || kr.store(false, Ordering::Release))
.expect("failed setting Ctrl-C handler");
+ let mut events = Events::with_capacity(64);
+
loop {
check_ctrlc!(kr_new);
- if server.process_events() {
+ if server.process_events(&mut events) {
return;
}
}
diff --git a/src/config/mod.rs b/src/config/mod.rs
index 65204e6..fb7854f 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -112,7 +112,7 @@ pub trait ServerConfig {
/// * `ENV` will return an [`EnvironmentConfig`](struct.EnvironmentConfig.html)
/// * any other value returns a [`FileConfig`](struct.FileConfig.html)
///
-pub fn make_config(arg: &str) -> Result<Box<ServerConfig>, Error> {
+pub fn make_config(arg: &str) -> Result<Box<dyn ServerConfig>, Error> {
if arg == "ENV" {
match EnvironmentConfig::new() {
Ok(cfg) => Ok(Box::new(cfg)),
@@ -129,7 +129,7 @@ pub fn make_config(arg: &str) -> Result<Box<ServerConfig>, Error> {
///
/// Validate configuration settings. Returns `true` if the config is valid, `false` otherwise.
///
-pub fn is_valid_config(cfg: &Box<ServerConfig>) -> bool {
+pub fn is_valid_config(cfg: &Box<dyn ServerConfig>) -> bool {
let mut is_valid = true;
if cfg.port() == 0 {
diff --git a/src/lib.rs b/src/lib.rs
index 4ad6390..6e7a88a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -74,7 +74,7 @@ pub use crate::message::RtMessage;
pub use crate::tag::Tag;
/// Version of Roughenough
-pub const VERSION: &str = "1.1.1";
+pub const VERSION: &str = "1.1.2";
/// Roughenough version string enriched with any compile-time optional features
pub fn roughenough_version() -> String {
diff --git a/src/server.rs b/src/server.rs
index 01bbef6..87a4a72 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -23,20 +23,20 @@ use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
+use std::io::Write;
use time;
use byteorder::{LittleEndian, WriteBytesExt};
use mio::net::{TcpListener, UdpSocket};
use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio::tcp::Shutdown;
use mio_extras::timer::Timer;
use crate::config::ServerConfig;
use crate::key::{LongTermKey, OnlineKey};
use crate::kms;
use crate::merkle::MerkleTree;
-use mio::tcp::Shutdown;
-use std::io::Write;
use crate::{Error, RtMessage, Tag, MIN_REQUEST_LENGTH};
macro_rules! check_ctrlc {
@@ -67,7 +67,7 @@ const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: clo
/// See [the config module](../config/index.html) for more information.
///
pub struct Server {
- config: Box<ServerConfig>,
+ config: Box<dyn ServerConfig>,
online_key: OnlineKey,
cert_bytes: Vec<u8>,
@@ -80,7 +80,6 @@ pub struct Server {
poll_duration: Option<Duration>,
timer: Timer<()>,
poll: Poll,
- events: Events,
merkle: MerkleTree,
requests: Vec<(Vec<u8>, SocketAddr)>,
buf: [u8; 65_536],
@@ -164,7 +163,6 @@ impl Server {
poll_duration,
timer,
poll,
- events: Events::with_capacity(32),
merkle,
requests,
buf: [0u8; 65_536],
@@ -176,147 +174,64 @@ impl Server {
}
}
- /// Returns a reference counted pointer the this server's `keep_running` value.
- pub fn get_keep_running(&self) -> Arc<AtomicBool> {
- self.keep_running.clone()
+ /// Returns a reference to the server's long-term public key
+ pub fn get_public_key(&self) -> &str {
+ &self.public_key
}
- // extract the client's nonce from its request
- fn nonce_from_request<'a>(&self, buf: &'a [u8], num_bytes: usize) -> Result<&'a [u8], Error> {
- if num_bytes < MIN_REQUEST_LENGTH as usize {
- return Err(Error::RequestTooShort);
- }
-
- let tag_count = &buf[..4];
- let expected_nonc = &buf[8..12];
- let expected_pad = &buf[12..16];
-
- let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00];
- let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value();
- let tag2_is_pad = expected_pad == Tag::PAD.wire_value();
-
- if tag_count_is_2 && tag1_is_nonc && tag2_is_pad {
- Ok(&buf[0x10..0x50])
- } else {
- Err(Error::InvalidRequest)
- }
+ /// Returns a reference to the server's on-line (delegated) key
+ pub fn get_online_key(&self) -> &OnlineKey {
+ &self.online_key
}
- fn make_response(
- &self,
- srep: &RtMessage,
- cert_bytes: &[u8],
- path: &[u8],
- idx: u32,
- ) -> RtMessage {
- let mut index = [0; 4];
- (&mut index as &mut [u8])
- .write_u32::<LittleEndian>(idx)
- .unwrap();
-
- let sig_bytes = srep.get_field(Tag::SIG).unwrap();
- let srep_bytes = srep.get_field(Tag::SREP).unwrap();
+ /// Returns a reference to the `ServerConfig` this server was configured with
+ pub fn get_config(&self) -> &Box<dyn ServerConfig> {
+ &self.config
+ }
- let mut response = RtMessage::new(5);
- response.add_field(Tag::SIG, sig_bytes).unwrap();
- response.add_field(Tag::PATH, path).unwrap();
- response.add_field(Tag::SREP, srep_bytes).unwrap();
- response.add_field(Tag::CERT, cert_bytes).unwrap();
- response.add_field(Tag::INDX, &index).unwrap();
+ /// Returns a reference counted pointer the this server's `keep_running` value.
+ pub fn get_keep_running(&self) -> Arc<AtomicBool> {
+ self.keep_running.clone()
+ }
- response
+ #[cfg(fuzzing)]
+ pub fn send_to_self(&mut self, data: &[u8]) {
+ self.response_counter = 0;
+ self.num_bad_requests = 0;
+ let res = self
+ .fake_client_socket
+ .send_to(data, &self.socket.local_addr().unwrap());
+ info!("Sent to self: {:?}", res);
}
/// The main processing function for incoming connections. This method should be
/// called repeatedly in a loop to process requests. It returns 'true' when the
/// server has shutdown (due to keep_running being set to 'false').
///
- pub fn process_events(&mut self) -> bool {
+ pub fn process_events(&mut self, events: &mut Events) -> bool {
self.poll
- .poll(&mut self.events, self.poll_duration)
+ .poll(events, self.poll_duration)
.expect("poll failed");
- for event in self.events.iter() {
- match event.token() {
+ for msg in events.iter() {
+ match msg.token() {
MESSAGE => {
- let mut done = false;
-
- 'process_batch: loop {
+ loop {
check_ctrlc!(self.keep_running);
- let resp_start = self.response_counter;
-
- for i in 0..self.config.batch_size() {
- match self.socket.recv_from(&mut self.buf) {
- Ok((num_bytes, src_addr)) => {
- match self.nonce_from_request(&self.buf, num_bytes) {
- Ok(nonce) => {
- self.requests.push((Vec::from(nonce), src_addr));
- self.merkle.push_leaf(nonce);
- }
- Err(e) => {
- self.num_bad_requests += 1;
-
- info!(
- "Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})",
- e, num_bytes, src_addr, i, resp_start + i as u64
- );
- }
- }
- }
- Err(e) => match e.kind() {
- ErrorKind::WouldBlock => {
- done = true;
- break;
- }
- _ => {
- error!(
- "Error receiving from socket: {:?}: {:?}",
- e.kind(),
- e
- );
- break;
- }
- },
- };
- }
-
- if self.requests.is_empty() {
- break 'process_batch;
- }
-
- let merkle_root = self.merkle.compute_root();
- let srep = self.online_key.make_srep(time::get_time(), &merkle_root);
-
- for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() {
- let paths = self.merkle.get_paths(i);
-
- let resp =
- self.make_response(&srep, &self.cert_bytes, &paths, i as u32);
- let resp_bytes = resp.encode().unwrap();
-
- let bytes_sent = self
- .socket
- .send_to(&resp_bytes, &src_addr)
- .expect("send_to failed");
+ self.merkle.reset();
+ self.requests.clear();
- self.response_counter += 1;
+ let socket_now_empty = self.collect_requests();
- info!(
- "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
- bytes_sent,
- src_addr,
- hex::encode(&nonce[0..4]),
- i,
- self.response_counter
- );
+ if self.requests.is_empty() {
+ break;
}
- self.merkle.reset();
- self.requests.clear();
+ self.send_responses();
- if done {
- break 'process_batch;
+ if socket_now_empty {
+ break;
}
}
}
@@ -362,28 +277,108 @@ impl Server {
false
}
- /// Returns a reference to the server's long-term public key
- pub fn get_public_key(&self) -> &str {
- &self.public_key
+ // Read and process client requests from socket until empty or 'batch_size' number of
+ // requests have been read.
+ fn collect_requests(&mut self) -> bool {
+ for i in 0..self.config.batch_size() {
+ match self.socket.recv_from(&mut self.buf) {
+ Ok((num_bytes, src_addr)) => {
+ match self.nonce_from_request(&self.buf, num_bytes) {
+ Ok(nonce) => {
+ self.requests.push((Vec::from(nonce), src_addr));
+ self.merkle.push_leaf(nonce);
+ }
+ Err(e) => {
+ self.num_bad_requests += 1;
+
+ info!(
+ "Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})",
+ e, num_bytes, src_addr, i, self.response_counter + i as u64
+ );
+ }
+ }
+ }
+ Err(e) => match e.kind() {
+ ErrorKind::WouldBlock => {
+ return true;
+ }
+ _ => {
+ error!("Error receiving from socket: {:?}: {:?}", e.kind(), e);
+ return false;
+ }
+ },
+ };
+ }
+
+ false
}
- /// Returns a reference to the server's on-line (delegated) key
- pub fn get_online_key(&self) -> &OnlineKey {
- &self.online_key
+ // extract the client's nonce from its request
+ fn nonce_from_request<'a>(&self, buf: &'a [u8], num_bytes: usize) -> Result<&'a [u8], Error> {
+ if num_bytes < MIN_REQUEST_LENGTH as usize {
+ return Err(Error::RequestTooShort);
+ }
+
+ let tag_count = &buf[..4];
+ let expected_nonc = &buf[8..12];
+ let expected_pad = &buf[12..16];
+
+ let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00];
+ let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value();
+ let tag2_is_pad = expected_pad == Tag::PAD.wire_value();
+
+ if tag_count_is_2 && tag1_is_nonc && tag2_is_pad {
+ Ok(&buf[0x10..0x50])
+ } else {
+ Err(Error::InvalidRequest)
+ }
}
- /// Returns a reference to the `ServerConfig` this server was configured with
- pub fn get_config(&self) -> &Box<ServerConfig> {
- &self.config
+ fn send_responses(&mut self) -> () {
+ let merkle_root = self.merkle.compute_root();
+
+ // The SREP tag is identical for each response
+ let srep = self.online_key.make_srep(time::get_time(), &merkle_root);
+
+ for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() {
+ let paths = self.merkle.get_paths(i);
+ let resp = self.make_response(&srep, &self.cert_bytes, &paths, i as u32);
+ let resp_bytes = resp.encode().unwrap();
+
+ let bytes_sent = self
+ .socket
+ .send_to(&resp_bytes, &src_addr)
+ .expect("send_to failed");
+
+ self.response_counter += 1;
+
+ info!(
+ "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
+ bytes_sent,
+ src_addr,
+ hex::encode(&nonce[0..4]),
+ i,
+ self.response_counter
+ );
+ }
}
- #[cfg(fuzzing)]
- pub fn send_to_self(&mut self, data: &[u8]) {
- self.response_counter = 0;
- self.num_bad_requests = 0;
- let res = self
- .fake_client_socket
- .send_to(data, &self.socket.local_addr().unwrap());
- info!("Sent to self: {:?}", res);
+ fn make_response(&self, srep: &RtMessage, cert_bytes: &[u8], path: &[u8], idx: u32) -> RtMessage {
+ let mut index = [0; 4];
+ (&mut index as &mut [u8])
+ .write_u32::<LittleEndian>(idx)
+ .unwrap();
+
+ let sig_bytes = srep.get_field(Tag::SIG).unwrap();
+ let srep_bytes = srep.get_field(Tag::SREP).unwrap();
+
+ let mut response = RtMessage::new(5);
+ response.add_field(Tag::SIG, sig_bytes).unwrap();
+ response.add_field(Tag::PATH, path).unwrap();
+ response.add_field(Tag::SREP, srep_bytes).unwrap();
+ response.add_field(Tag::CERT, cert_bytes).unwrap();
+ response.add_field(Tag::INDX, &index).unwrap();
+
+ response
}
}