summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAaron Hill <aa1ronham@gmail.com>2018-10-13 20:04:50 -0400
committerAaron Hill <aa1ronham@gmail.com>2018-10-17 21:19:08 -0400
commitceea631423c9faeee3cb6ef23478cc28a14250bc (patch)
tree8c23be0d5414df9a58446362b4ea4349764fe855 /src
parent978eb3b256f59fd024e9822cfa1d05b98548b033 (diff)
downloadroughenough-ceea631423c9faeee3cb6ef23478cc28a14250bc.zip
Move Server to its own module
Diffstat (limited to 'src')
-rw-r--r--src/bin/roughenough-server.rs208
-rw-r--r--src/lib.rs4
-rw-r--r--src/server.rs245
3 files changed, 250 insertions, 207 deletions
diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs
index ad173e9..d00709d 100644
--- a/src/bin/roughenough-server.rs
+++ b/src/bin/roughenough-server.rs
@@ -61,6 +61,7 @@ use roughenough::key::{LongTermKey, OnlineKey};
use roughenough::merkle::MerkleTree;
use roughenough::{Error, RtMessage, Tag};
use roughenough::{MIN_REQUEST_LENGTH, VERSION};
+use roughenough::server::Server;
macro_rules! check_ctrlc {
($keep_running:expr) => {
@@ -71,218 +72,11 @@ macro_rules! check_ctrlc {
}
}
-const MESSAGE: Token = Token(0);
-const STATUS: Token = Token(1);
-
-fn make_response(srep: &RtMessage, cert_bytes: &[u8], path: &[u8], idx: u32) -> RtMessage {
- let mut index = [0; 4];
- (&mut index as &mut [u8])
- .write_u32::<LittleEndian>(idx)
- .unwrap();
-
- let sig_bytes = srep.get_field(Tag::SIG).unwrap();
- let srep_bytes = srep.get_field(Tag::SREP).unwrap();
-
- let mut response = RtMessage::new(5);
- response.add_field(Tag::SIG, sig_bytes).unwrap();
- response.add_field(Tag::PATH, path).unwrap();
- response.add_field(Tag::SREP, srep_bytes).unwrap();
- response.add_field(Tag::CERT, cert_bytes).unwrap();
- response.add_field(Tag::INDX, &index).unwrap();
-
- response
-}
-
-// extract the client's nonce from its request
-fn nonce_from_request(buf: &[u8], num_bytes: usize) -> Result<&[u8], Error> {
- if num_bytes < MIN_REQUEST_LENGTH as usize {
- return Err(Error::RequestTooShort);
- }
-
- let tag_count = &buf[..4];
- let expected_nonc = &buf[8..12];
- let expected_pad = &buf[12..16];
-
- let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00];
- let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value();
- let tag2_is_pad = expected_pad == Tag::PAD.wire_value();
-
- if tag_count_is_2 && tag1_is_nonc && tag2_is_pad {
- Ok(&buf[0x10..0x50])
- } else {
- Err(Error::InvalidRequest)
- }
-}
-
-struct Server {
- config: Box<ServerConfig>,
- online_key: OnlineKey,
- cert_bytes: Vec<u8>,
- response_counter: AtomicUsize,
- num_bad_requests: u64,
- socket: UdpSocket,
- keep_running: Arc<AtomicBool>,
- poll_duration: Option<Duration>,
- timer: Timer<()>,
- poll: Poll,
- events: Events,
- merkle: MerkleTree,
- requests: Vec<(Vec<u8>, SocketAddr)>,
- buf: [u8; 65_536],
-
-}
-
-impl Server {
- pub fn new(config: Box<ServerConfig>, online_key: OnlineKey, cert_bytes: Vec<u8>) -> Server {
- let response_counter = AtomicUsize::new(0);
- let keep_running = Arc::new(AtomicBool::new(true));
-
- let sock_addr = config.socket_addr().expect("");
- let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
- let poll_duration = Some(Duration::from_millis(100));
-
- let mut timer: Timer<()> = Timer::default();
- timer.set_timeout(config.status_interval(), ());
-
- let poll = Poll::new().unwrap();
- poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge())
- .unwrap();
- poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge())
- .unwrap();
-
- let mut merkle = MerkleTree::new();
- let mut requests = Vec::with_capacity(config.batch_size() as usize);
-
-
- Server {
- config,
- online_key,
- cert_bytes,
-
- response_counter,
- num_bad_requests: 0,
- socket,
- keep_running,
- poll_duration,
- timer,
- poll,
- events: Events::with_capacity(32),
- merkle,
- requests,
- buf: [0u8; 65_536]
- }
-
- }
-
- fn get_keep_running(&self) -> Arc<AtomicBool> {
- return self.keep_running.clone()
- }
-
- fn process_events(&mut self) -> bool {
- self.poll.poll(&mut self.events, self.poll_duration).expect("poll failed");
-
- for event in self.events.iter() {
- match event.token() {
- MESSAGE => {
- let mut done = false;
-
- 'process_batch: loop {
- check_ctrlc!(self.keep_running);
-
- self.merkle.reset();
- self.requests.clear();
-
- let resp_start = self.response_counter.load(Ordering::SeqCst);
-
- for i in 0..self.config.batch_size() {
- match self.socket.recv_from(&mut self.buf) {
- Ok((num_bytes, src_addr)) => {
- if let Ok(nonce) = nonce_from_request(&self.buf, num_bytes) {
- self.requests.push((Vec::from(nonce), src_addr));
- self.merkle.push_leaf(nonce);
- } else {
- self.num_bad_requests += 1;
- info!(
- "Invalid request ({} bytes) from {} (#{} in batch, resp #{})",
- num_bytes, src_addr, i, resp_start + i as usize
- );
- }
- }
- Err(e) => match e.kind() {
- ErrorKind::WouldBlock => {
- done = true;
- break;
- }
- _ => {
- error!(
- "Error receiving from socket: {:?}: {:?}",
- e.kind(),
- e
- );
- break;
- }
- },
- };
- }
-
- if self.requests.is_empty() {
- break 'process_batch;
- }
-
- let merkle_root = self.merkle.compute_root();
- let srep = self.online_key.make_srep(time::get_time(), &merkle_root);
-
- for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() {
- let paths = self.merkle.get_paths(i);
-
- let resp = make_response(&srep, &self.cert_bytes, &paths, i as u32);
- let resp_bytes = resp.encode().unwrap();
-
- let bytes_sent = self.socket
- .send_to(&resp_bytes, &src_addr)
- .expect("send_to failed");
- let num_responses = self.response_counter.fetch_add(1, Ordering::SeqCst);
-
- info!(
- "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
- bytes_sent,
- src_addr,
- hex::encode(&nonce[0..4]),
- i,
- num_responses
- );
- }
- if done {
- break 'process_batch;
- }
- }
- }
-
- STATUS => {
- info!(
- "responses {}, invalid requests {}",
- self.response_counter.load(Ordering::SeqCst),
- self.num_bad_requests
- );
-
- self.timer.set_timeout(self.config.status_interval(), ());
- }
-
- _ => unreachable!(),
- }
- }
- false
- }
-
- fn send_to_self(&mut self, data: &[u8]) {
- self.socket.send_to(data, &self.socket.local_addr().unwrap());
- }
-}
fn polling_loop(config: Box<ServerConfig>, online_key: OnlineKey, cert_bytes: Vec<u8>) -> bool {
/* let response_counter = AtomicUsize::new(0);
diff --git a/src/lib.rs b/src/lib.rs
index 70a3fff..0f9c96b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -59,11 +59,14 @@
extern crate base64;
extern crate byteorder;
+extern crate hex;
extern crate core;
extern crate time;
extern crate yaml_rust;
#[macro_use]
extern crate hyper;
+extern crate mio;
+extern crate mio_extras;
#[macro_use]
extern crate log;
@@ -77,6 +80,7 @@ pub mod config;
pub mod key;
pub mod kms;
pub mod merkle;
+pub mod server;
pub mod sign;
pub use error::Error;
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..ff302d9
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,245 @@
+use std::env;
+use std::io::ErrorKind;
+use std::process;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+use std::net::SocketAddr;
+use hex;
+use time;
+
+use byteorder::{LittleEndian, WriteBytesExt};
+
+
+use mio::net::UdpSocket;
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio_extras::timer::Timer;
+
+use config;
+use config::ServerConfig;
+use kms;
+use key::{LongTermKey, OnlineKey};
+use merkle::MerkleTree;
+use {Error, RtMessage, Tag};
+use {MIN_REQUEST_LENGTH, VERSION};
+
+
+macro_rules! check_ctrlc {
+ ($keep_running:expr) => {
+ if !$keep_running.load(Ordering::Acquire) {
+ warn!("Ctrl-C caught, exiting...");
+ return true;
+ }
+ }
+}
+
+const MESSAGE: Token = Token(0);
+const STATUS: Token = Token(1);
+
+fn make_response(srep: &RtMessage, cert_bytes: &[u8], path: &[u8], idx: u32) -> RtMessage {
+ let mut index = [0; 4];
+ (&mut index as &mut [u8])
+ .write_u32::<LittleEndian>(idx)
+ .unwrap();
+
+ let sig_bytes = srep.get_field(Tag::SIG).unwrap();
+ let srep_bytes = srep.get_field(Tag::SREP).unwrap();
+
+ let mut response = RtMessage::new(5);
+ response.add_field(Tag::SIG, sig_bytes).unwrap();
+ response.add_field(Tag::PATH, path).unwrap();
+ response.add_field(Tag::SREP, srep_bytes).unwrap();
+ response.add_field(Tag::CERT, cert_bytes).unwrap();
+ response.add_field(Tag::INDX, &index).unwrap();
+
+ response
+}
+
+// extract the client's nonce from its request
+fn nonce_from_request(buf: &[u8], num_bytes: usize) -> Result<&[u8], Error> {
+ if num_bytes < MIN_REQUEST_LENGTH as usize {
+ return Err(Error::RequestTooShort);
+ }
+
+ let tag_count = &buf[..4];
+ let expected_nonc = &buf[8..12];
+ let expected_pad = &buf[12..16];
+
+ let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00];
+ let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value();
+ let tag2_is_pad = expected_pad == Tag::PAD.wire_value();
+
+ if tag_count_is_2 && tag1_is_nonc && tag2_is_pad {
+ Ok(&buf[0x10..0x50])
+ } else {
+ Err(Error::InvalidRequest)
+ }
+}
+
+pub struct Server {
+ config: Box<ServerConfig>,
+ online_key: OnlineKey,
+ cert_bytes: Vec<u8>,
+
+ response_counter: AtomicUsize,
+ num_bad_requests: u64,
+
+ socket: UdpSocket,
+ keep_running: Arc<AtomicBool>,
+ poll_duration: Option<Duration>,
+ timer: Timer<()>,
+ poll: Poll,
+ events: Events,
+ merkle: MerkleTree,
+ requests: Vec<(Vec<u8>, SocketAddr)>,
+ buf: [u8; 65_536],
+
+
+}
+
+impl Server {
+ pub fn new(config: Box<ServerConfig>, online_key: OnlineKey, cert_bytes: Vec<u8>) -> Server {
+ let response_counter = AtomicUsize::new(0);
+ let keep_running = Arc::new(AtomicBool::new(true));
+
+ let sock_addr = config.socket_addr().expect("");
+ let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
+ let poll_duration = Some(Duration::from_millis(100));
+
+ let mut timer: Timer<()> = Timer::default();
+ timer.set_timeout(config.status_interval(), ());
+
+ let poll = Poll::new().unwrap();
+ poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge())
+ .unwrap();
+ poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ let mut merkle = MerkleTree::new();
+ let mut requests = Vec::with_capacity(config.batch_size() as usize);
+
+
+ Server {
+ config,
+ online_key,
+ cert_bytes,
+
+ response_counter,
+ num_bad_requests: 0,
+ socket,
+ keep_running,
+ poll_duration,
+ timer,
+ poll,
+ events: Events::with_capacity(32),
+ merkle,
+ requests,
+ buf: [0u8; 65_536]
+ }
+
+ }
+
+ pub fn get_keep_running(&self) -> Arc<AtomicBool> {
+ return self.keep_running.clone()
+ }
+
+ pub fn process_events(&mut self) -> bool {
+ self.poll.poll(&mut self.events, self.poll_duration).expect("poll failed");
+
+ for event in self.events.iter() {
+ match event.token() {
+ MESSAGE => {
+ let mut done = false;
+
+ 'process_batch: loop {
+ check_ctrlc!(self.keep_running);
+
+ self.merkle.reset();
+ self.requests.clear();
+
+ let resp_start = self.response_counter.load(Ordering::SeqCst);
+
+ for i in 0..self.config.batch_size() {
+ match self.socket.recv_from(&mut self.buf) {
+ Ok((num_bytes, src_addr)) => {
+ if let Ok(nonce) = nonce_from_request(&self.buf, num_bytes) {
+ self.requests.push((Vec::from(nonce), src_addr));
+ self.merkle.push_leaf(nonce);
+ } else {
+ self.num_bad_requests += 1;
+ info!(
+ "Invalid request ({} bytes) from {} (#{} in batch, resp #{})",
+ num_bytes, src_addr, i, resp_start + i as usize
+ );
+ }
+ }
+ Err(e) => match e.kind() {
+ ErrorKind::WouldBlock => {
+ done = true;
+ break;
+ }
+ _ => {
+ error!(
+ "Error receiving from socket: {:?}: {:?}",
+ e.kind(),
+ e
+ );
+ break;
+ }
+ },
+ };
+ }
+
+ if self.requests.is_empty() {
+ break 'process_batch;
+ }
+
+ let merkle_root = self.merkle.compute_root();
+ let srep = self.online_key.make_srep(time::get_time(), &merkle_root);
+
+ for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() {
+ let paths = self.merkle.get_paths(i);
+
+ let resp = make_response(&srep, &self.cert_bytes, &paths, i as u32);
+ let resp_bytes = resp.encode().unwrap();
+
+ let bytes_sent = self.socket
+ .send_to(&resp_bytes, &src_addr)
+ .expect("send_to failed");
+ let num_responses = self.response_counter.fetch_add(1, Ordering::SeqCst);
+
+ info!(
+ "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
+ bytes_sent,
+ src_addr,
+ hex::encode(&nonce[0..4]),
+ i,
+ num_responses
+ );
+ }
+ if done {
+ break 'process_batch;
+ }
+ }
+ }
+
+ STATUS => {
+ info!(
+ "responses {}, invalid requests {}",
+ self.response_counter.load(Ordering::SeqCst),
+ self.num_bad_requests
+ );
+
+ self.timer.set_timeout(self.config.status_interval(), ());
+ }
+
+ _ => unreachable!(),
+ }
+ }
+ false
+ }
+
+ fn send_to_self(&mut self, data: &[u8]) {
+ self.socket.send_to(data, &self.socket.local_addr().unwrap());
+ }
+}