summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/roughenough-client.rs12
-rw-r--r--src/bin/roughenough-kms.rs2
-rw-r--r--src/bin/roughenough-server.rs249
-rw-r--r--src/config/environment.rs11
-rw-r--r--src/config/file.rs15
-rw-r--r--src/config/memory.rs73
-rw-r--r--src/config/mod.rs25
-rw-r--r--src/key/mod.rs1
-rw-r--r--src/kms/awskms.rs1
-rw-r--r--src/kms/envelope.rs9
-rw-r--r--src/kms/gcpkms.rs12
-rw-r--r--src/kms/mod.rs2
-rw-r--r--src/lib.rs6
-rw-r--r--src/server.rs323
14 files changed, 497 insertions, 244 deletions
diff --git a/src/bin/roughenough-client.rs b/src/bin/roughenough-client.rs
index 75ebe14..570dc47 100644
--- a/src/bin/roughenough-client.rs
+++ b/src/bin/roughenough-client.rs
@@ -28,6 +28,8 @@ use chrono::offset::Utc;
use chrono::TimeZone;
use std::collections::HashMap;
+use std::fs::File;
+use std::io::Write;
use std::iter::Iterator;
use std::net::{ToSocketAddrs, UdpSocket};
@@ -228,6 +230,12 @@ fn main() {
.long("stress")
.help("Stress-tests the server by sending the same request as fast as possible. Please only use this on your own server")
)
+ .arg(Arg::with_name("output")
+ .short("o")
+ .long("output")
+ .takes_value(true)
+ .help("Writes all requsts to the specified file, in addition to sending them to the server. Useful for generating fuzer inputs")
+ )
.get_matches();
let host = matches.value_of("host").unwrap();
@@ -238,6 +246,7 @@ fn main() {
let pub_key = matches
.value_of("public-key")
.map(|pkey| hex::decode(pkey).expect("Error parsing public key!"));
+ let out = matches.value_of("output");
println!("Requesting time from: {:?}:{:?}", host, port);
@@ -264,11 +273,14 @@ fn main() {
}
let mut requests = Vec::with_capacity(num_requests);
+ let mut file = out.map(|o| File::create(o).expect("Failed to create file!"));
for _ in 0..num_requests {
let nonce = create_nonce();
let mut socket = UdpSocket::bind("0.0.0.0:0").expect("Couldn't open UDP socket");
let request = make_request(&nonce);
+ file.as_mut()
+ .map(|f| f.write_all(&request).expect("Failed to write to file!"));
requests.push((nonce, request, socket));
}
diff --git a/src/bin/roughenough-kms.rs b/src/bin/roughenough-kms.rs
index 6563224..1cea22e 100644
--- a/src/bin/roughenough-kms.rs
+++ b/src/bin/roughenough-kms.rs
@@ -30,8 +30,8 @@ use roughenough::VERSION;
#[cfg(feature = "awskms")]
fn aws_kms(kms_key: &str, plaintext_seed: &[u8]) {
- use roughenough::kms::EnvelopeEncryption;
use roughenough::kms::AwsKms;
+ use roughenough::kms::EnvelopeEncryption;
let client = AwsKms::from_arn(kms_key).unwrap();
diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs
index 9c00d7e..6c6a118 100644
--- a/src/bin/roughenough-server.rs
+++ b/src/bin/roughenough-server.rs
@@ -41,200 +41,52 @@ extern crate untrusted;
extern crate yaml_rust;
use std::env;
-use std::io::ErrorKind;
use std::process;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::time::Duration;
-
-use mio::net::UdpSocket;
-use mio::{Events, Poll, PollOpt, Ready, Token};
-use mio_extras::timer::Timer;
-
-use byteorder::{LittleEndian, WriteBytesExt};
+use std::sync::atomic::Ordering;
use roughenough::config;
use roughenough::config::ServerConfig;
-use roughenough::kms;
-use roughenough::key::{LongTermKey, OnlineKey};
-use roughenough::merkle::MerkleTree;
-use roughenough::{Error, RtMessage, Tag};
-use roughenough::{MIN_REQUEST_LENGTH, VERSION};
-
-const MESSAGE: Token = Token(0);
-const STATUS: Token = Token(1);
-
-fn make_response(srep: &RtMessage, cert_bytes: &[u8], path: &[u8], idx: u32) -> RtMessage {
- let mut index = [0; 4];
- (&mut index as &mut [u8])
- .write_u32::<LittleEndian>(idx)
- .unwrap();
-
- let sig_bytes = srep.get_field(Tag::SIG).unwrap();
- let srep_bytes = srep.get_field(Tag::SREP).unwrap();
-
- let mut response = RtMessage::new(5);
- response.add_field(Tag::SIG, sig_bytes).unwrap();
- response.add_field(Tag::PATH, path).unwrap();
- response.add_field(Tag::SREP, srep_bytes).unwrap();
- response.add_field(Tag::CERT, cert_bytes).unwrap();
- response.add_field(Tag::INDX, &index).unwrap();
-
- response
-}
-
-// extract the client's nonce from its request
-fn nonce_from_request(buf: &[u8], num_bytes: usize) -> Result<&[u8], Error> {
- if num_bytes < MIN_REQUEST_LENGTH as usize {
- return Err(Error::RequestTooShort);
- }
-
- let tag_count = &buf[..4];
- let expected_nonc = &buf[8..12];
- let expected_pad = &buf[12..16];
-
- let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00];
- let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value();
- let tag2_is_pad = expected_pad == Tag::PAD.wire_value();
-
- if tag_count_is_2 && tag1_is_nonc && tag2_is_pad {
- Ok(&buf[0x10..0x50])
- } else {
- Err(Error::InvalidRequest)
- }
+use roughenough::server::Server;
+use roughenough::VERSION;
+
+macro_rules! check_ctrlc {
+ ($keep_running:expr) => {
+ if !$keep_running.load(Ordering::Acquire) {
+ warn!("Ctrl-C caught, exiting...");
+ return;
+ }
+ };
}
-fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_bytes: &[u8]) {
- let response_counter = AtomicUsize::new(0);
- let keep_running = Arc::new(AtomicBool::new(true));
- let kr = keep_running.clone();
+fn polling_loop(config: Box<ServerConfig>) {
+ let mut server = Server::new(config);
+
+ info!("Long-term public key : {}", server.get_public_key());
+ info!("Online public key : {}", server.get_online_key());
+ info!(
+ "Max response batch size : {}",
+ server.get_config().batch_size()
+ );
+ info!(
+ "Status updates every : {} seconds",
+ server.get_config().status_interval().as_secs()
+ );
+ info!(
+ "Server listening on : {}:{}",
+ server.get_config().interface(),
+ server.get_config().port()
+ );
+
+ let kr = server.get_keep_running();
+ let kr_new = kr.clone();
ctrlc::set_handler(move || kr.store(false, Ordering::Release))
.expect("failed setting Ctrl-C handler");
- let sock_addr = config.socket_addr().expect("");
- let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
- let poll_duration = Some(Duration::from_millis(100));
-
- let mut timer: Timer<()> = Timer::default();
- timer.set_timeout(config.status_interval(), ());
-
- let mut buf = [0u8; 65_536];
- let mut events = Events::with_capacity(32);
- let mut num_bad_requests = 0u64;
-
- let poll = Poll::new().unwrap();
- poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge())
- .unwrap();
- poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge())
- .unwrap();
-
- let mut merkle = MerkleTree::new();
- let mut requests = Vec::with_capacity(config.batch_size() as usize);
-
- macro_rules! check_ctrlc {
- () => {
- if !keep_running.load(Ordering::Acquire) {
- warn!("Ctrl-C caught, exiting...");
- return;
- }
- };
- }
-
loop {
- check_ctrlc!();
-
- poll.poll(&mut events, poll_duration).expect("poll failed");
-
- for event in events.iter() {
- match event.token() {
- MESSAGE => {
- let mut done = false;
-
- 'process_batch: loop {
- check_ctrlc!();
-
- merkle.reset();
- requests.clear();
-
- let resp_start = response_counter.load(Ordering::SeqCst);
-
- for i in 0..config.batch_size() {
- match socket.recv_from(&mut buf) {
- Ok((num_bytes, src_addr)) => {
- if let Ok(nonce) = nonce_from_request(&buf, num_bytes) {
- requests.push((Vec::from(nonce), src_addr));
- merkle.push_leaf(nonce);
- } else {
- num_bad_requests += 1;
- info!(
- "Invalid request ({} bytes) from {} (#{} in batch, resp #{})",
- num_bytes, src_addr, i, resp_start + i as usize
- );
- }
- }
- Err(e) => match e.kind() {
- ErrorKind::WouldBlock => {
- done = true;
- break;
- }
- _ => {
- error!(
- "Error receiving from socket: {:?}: {:?}",
- e.kind(),
- e
- );
- break;
- }
- },
- };
- }
-
- if requests.is_empty() {
- break 'process_batch;
- }
-
- let merkle_root = merkle.compute_root();
- let srep = online_key.make_srep(time::get_time(), &merkle_root);
-
- for (i, &(ref nonce, ref src_addr)) in requests.iter().enumerate() {
- let paths = merkle.get_paths(i);
-
- let resp = make_response(&srep, cert_bytes, &paths, i as u32);
- let resp_bytes = resp.encode().unwrap();
-
- let bytes_sent = socket
- .send_to(&resp_bytes, &src_addr)
- .expect("send_to failed");
- let num_responses = response_counter.fetch_add(1, Ordering::SeqCst);
-
- info!(
- "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
- bytes_sent,
- src_addr,
- hex::encode(&nonce[0..4]),
- i,
- num_responses
- );
- }
- if done {
- break 'process_batch;
- }
- }
- }
-
- STATUS => {
- info!(
- "responses {}, invalid requests {}",
- response_counter.load(Ordering::SeqCst),
- num_bad_requests
- );
-
- timer.set_timeout(config.status_interval(), ());
- }
-
- _ => unreachable!(),
- }
+ check_ctrlc!(kr_new);
+ if server.process_events() {
+ return;
}
}
}
@@ -254,7 +106,11 @@ pub fn main() {
simple_logger::init_with_level(Level::Info).unwrap();
- info!("Roughenough server v{}{} starting", VERSION, kms_support_str());
+ info!(
+ "Roughenough server v{}{} starting",
+ VERSION,
+ kms_support_str()
+ );
let mut args = env::args();
if args.len() != 2 {
@@ -272,30 +128,7 @@ pub fn main() {
Ok(cfg) => cfg,
};
- let mut online_key = OnlineKey::new();
- let public_key: String;
-
- let cert_bytes = {
- let seed = match kms::load_seed(&config) {
- Ok(seed) => seed,
- Err(e) => {
- error!("Failed to load seed: {:#?}", e);
- process::exit(1);
- }
- };
- let mut long_term_key = LongTermKey::new(&seed);
- public_key = hex::encode(long_term_key.public_key());
-
- long_term_key.make_cert(&online_key).encode().unwrap()
- };
-
- info!("Long-term public key : {}", public_key);
- info!("Online public key : {}", online_key);
- info!("Max response batch size : {}", config.batch_size());
- info!("Status updates every : {} seconds", config.status_interval().as_secs());
- info!("Server listening on : {}:{}", config.interface(), config.port());
-
- polling_loop(&config, &mut online_key, &cert_bytes);
+ polling_loop(config);
info!("Done.");
process::exit(0);
diff --git a/src/config/environment.rs b/src/config/environment.rs
index 2385b28..5edb6d0 100644
--- a/src/config/environment.rs
+++ b/src/config/environment.rs
@@ -15,13 +15,12 @@
extern crate hex;
use std::env;
-use std::net::SocketAddr;
use std::time::Duration;
use config::ServerConfig;
use config::{DEFAULT_BATCH_SIZE, DEFAULT_STATUS_INTERVAL};
-use Error;
use key::KeyProtection;
+use Error;
///
/// Obtain a Roughenough server configuration ([ServerConfig](trait.ServerConfig.html))
@@ -123,14 +122,6 @@ impl ServerConfig for EnvironmentConfig {
self.status_interval
}
- fn socket_addr(&self) -> Result<SocketAddr, Error> {
- let addr = format!("{}:{}", self.interface, self.port);
- match addr.parse() {
- Ok(v) => Ok(v),
- Err(_) => Err(Error::InvalidConfiguration(addr)),
- }
- }
-
fn key_protection(&self) -> &KeyProtection {
&self.key_protection
}
diff --git a/src/config/file.rs b/src/config/file.rs
index 440c78c..b0f8b4d 100644
--- a/src/config/file.rs
+++ b/src/config/file.rs
@@ -16,14 +16,13 @@ extern crate hex;
use std::fs::File;
use std::io::Read;
-use std::net::SocketAddr;
use std::time::Duration;
use yaml_rust::YamlLoader;
use config::ServerConfig;
use config::{DEFAULT_BATCH_SIZE, DEFAULT_STATUS_INTERVAL};
-use Error;
use key::KeyProtection;
+use Error;
///
/// Read a Roughenough server configuration ([ServerConfig](trait.ServerConfig.html))
@@ -87,7 +86,9 @@ impl FileConfig {
config.status_interval = Duration::from_secs(val as u64)
}
"key_protection" => {
- let val = value.as_str().unwrap()
+ let val = value
+ .as_str()
+ .unwrap()
.parse()
.expect(format!("invalid key_protection value: {:?}", value).as_ref());
config.key_protection = val
@@ -126,14 +127,6 @@ impl ServerConfig for FileConfig {
self.status_interval
}
- fn socket_addr(&self) -> Result<SocketAddr, Error> {
- let addr = format!("{}:{}", self.interface, self.port);
- match addr.parse() {
- Ok(v) => Ok(v),
- Err(_) => Err(Error::InvalidConfiguration(addr)),
- }
- }
-
fn key_protection(&self) -> &KeyProtection {
&self.key_protection
}
diff --git a/src/config/memory.rs b/src/config/memory.rs
new file mode 100644
index 0000000..abca5a5
--- /dev/null
+++ b/src/config/memory.rs
@@ -0,0 +1,73 @@
+// Copyright 2017-2018 int08h LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+use config::ServerConfig;
+use config::{DEFAULT_BATCH_SIZE, DEFAULT_STATUS_INTERVAL};
+use key::KeyProtection;
+use std::time::Duration;
+
+use hex;
+
+/// A purely in-memory Roughenough config
+/// This is useful for fuzzing a server without the need
+/// to create additioanl files.
+pub struct MemoryConfig {
+ pub port: u16,
+ pub interface: String,
+ pub seed: Vec<u8>,
+ pub batch_size: u8,
+ pub status_interval: Duration,
+ pub key_protection: KeyProtection,
+}
+
+impl MemoryConfig {
+ pub fn new(port: u16) -> MemoryConfig {
+ MemoryConfig {
+ port,
+ interface: "127.0.0.1".to_string(),
+ seed: hex::decode("a32049da0ffde0ded92ce10a0230d35fe615ec8461c14986baa63fe3b3bac3db")
+ .unwrap(),
+ batch_size: DEFAULT_BATCH_SIZE,
+ status_interval: DEFAULT_STATUS_INTERVAL,
+ key_protection: KeyProtection::Plaintext,
+ }
+ }
+}
+
+impl ServerConfig for MemoryConfig {
+ fn interface(&self) -> &str {
+ self.interface.as_ref()
+ }
+
+ fn port(&self) -> u16 {
+ self.port
+ }
+
+ fn seed(&self) -> Vec<u8> {
+ self.seed.clone()
+ }
+
+ fn batch_size(&self) -> u8 {
+ self.batch_size
+ }
+
+ fn status_interval(&self) -> Duration {
+ self.status_interval
+ }
+
+ fn key_protection(&self) -> &KeyProtection {
+ &self.key_protection
+ }
+}
diff --git a/src/config/mod.rs b/src/config/mod.rs
index f05578b..772e1ee 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -34,8 +34,11 @@ pub use self::file::FileConfig;
mod environment;
pub use self::environment::EnvironmentConfig;
-use Error;
+mod memory;
+pub use self::memory::MemoryConfig;
+
use key::KeyProtection;
+use Error;
/// Maximum number of requests to process in one batch and include the the Merkle tree.
pub const DEFAULT_BATCH_SIZE: u8 = 64;
@@ -89,7 +92,13 @@ pub trait ServerConfig {
fn key_protection(&self) -> &KeyProtection;
/// Convenience function to create a `SocketAddr` from the provided `interface` and `port`
- fn socket_addr(&self) -> Result<SocketAddr, Error>;
+ fn socket_addr(&self) -> Result<SocketAddr, Error> {
+ let addr = format!("{}:{}", self.interface(), self.port());
+ match addr.parse() {
+ Ok(v) => Ok(v),
+ Err(_) => Err(Error::InvalidConfiguration(addr)),
+ }
+ }
}
/// Factory function to create a `ServerConfig` _trait object_ based on the value
@@ -135,14 +144,22 @@ pub fn is_valid_config(cfg: &Box<ServerConfig>) -> bool {
is_valid = false;
}
if cfg.batch_size() < 1 || cfg.batch_size() > 64 {
- error!("batch_size {} is invalid; valid range 1-64", cfg.batch_size());
+ error!(
+ "batch_size {} is invalid; valid range 1-64",
+ cfg.batch_size()
+ );
is_valid = false;
}
if is_valid {
match cfg.socket_addr() {
Err(e) => {
- error!("failed to create socket {}:{} {:?}", cfg.interface(), cfg.port(), e);
+ error!(
+ "failed to create socket {}:{} {:?}",
+ cfg.interface(),
+ cfg.port(),
+ e
+ );
is_valid = false;
}
_ => (),
diff --git a/src/key/mod.rs b/src/key/mod.rs
index 32ca241..6bb3eb5 100644
--- a/src/key/mod.rs
+++ b/src/key/mod.rs
@@ -66,4 +66,3 @@ impl FromStr for KeyProtection {
}
}
}
-
diff --git a/src/kms/awskms.rs b/src/kms/awskms.rs
index 96d4a38..14f0804 100644
--- a/src/kms/awskms.rs
+++ b/src/kms/awskms.rs
@@ -121,4 +121,3 @@ pub mod inner {
}
}
}
-
diff --git a/src/kms/envelope.rs b/src/kms/envelope.rs
index 1f6d615..da75961 100644
--- a/src/kms/envelope.rs
+++ b/src/kms/envelope.rs
@@ -73,7 +73,6 @@ fn vec_zero_filled(len: usize) -> Vec<u8> {
pub struct EnvelopeEncryption;
impl EnvelopeEncryption {
-
/// Decrypt a seed previously encrypted with `encrypt_seed()`
pub fn decrypt_seed(kms: &KmsProvider, ciphertext_blob: &[u8]) -> Result<Vec<u8>, KmsError> {
if ciphertext_blob.len() < MIN_PAYLOAD_SIZE {
@@ -107,7 +106,13 @@ impl EnvelopeEncryption {
// Decrypt the seed value using the DEK
let dek_open_key = OpeningKey::new(&AES_256_GCM, &dek)?;
- match open_in_place(&dek_open_key, &nonce, AD, IN_PREFIX_LEN, &mut encrypted_seed) {
+ match open_in_place(
+ &dek_open_key,
+ &nonce,
+ AD,
+ IN_PREFIX_LEN,
+ &mut encrypted_seed,
+ ) {
Ok(plaintext_seed) => Ok(plaintext_seed.to_vec()),
Err(_) => Err(KmsError::OperationFailed(
"failed to decrypt plaintext seed".to_string(),
diff --git a/src/kms/gcpkms.rs b/src/kms/gcpkms.rs
index c0fbb5d..13303db 100644
--- a/src/kms/gcpkms.rs
+++ b/src/kms/gcpkms.rs
@@ -19,14 +19,16 @@ extern crate log;
pub mod inner {
extern crate base64;
+ extern crate google_cloudkms1 as cloudkms1;
extern crate hyper;
extern crate hyper_rustls;
extern crate yup_oauth2 as oauth2;
- extern crate google_cloudkms1 as cloudkms1;
+
use std::fmt;
use std::env;
use std::fmt::Formatter;
+ use std::result::Result;
use std::str::FromStr;
use std::result::Result;
use std::default::Default;
@@ -34,13 +36,15 @@ pub mod inner {
use std::path::Path;
use std::time::Duration;
- use self::oauth2::{service_account_key_from_file, ServiceAccountAccess, ServiceAccountKey};
use self::cloudkms1::CloudKMS;
- use self::cloudkms1::{Result as CloudKmsResult, Error as CloudKmsError, EncryptRequest, DecryptRequest};
+ use self::cloudkms1::{
+ DecryptRequest, EncryptRequest, Error as CloudKmsError, Result as CloudKmsResult,
+ };
use self::hyper::net::HttpsConnector;
use self::hyper::header::Headers;
use self::hyper::status::StatusCode;
use self::hyper_rustls::TlsClient;
+ use self::oauth2::{service_account_key_from_file, ServiceAccountAccess, ServiceAccountKey};
use kms::{EncryptedDEK, KmsError, KmsProvider, PlaintextDEK};
@@ -156,5 +160,3 @@ pub mod inner {
panic!("Failed to load service account credential. Is GOOGLE_APPLICATION_CREDENTIALS set?");
}
}
-
-
diff --git a/src/kms/mod.rs b/src/kms/mod.rs
index 810623a..56e7631 100644
--- a/src/kms/mod.rs
+++ b/src/kms/mod.rs
@@ -52,9 +52,9 @@
mod envelope;
use base64;
+use ring;
use std;
use std::error::Error;
-use ring;
use config::ServerConfig;
use error;
diff --git a/src/lib.rs b/src/lib.rs
index 70a3fff..4156747 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -64,6 +64,11 @@ extern crate time;
extern crate yaml_rust;
#[macro_use]
extern crate hyper;
+extern crate hex;
+extern crate mio;
+extern crate mio_extras;
+extern crate time;
+extern crate yaml_rust;
#[macro_use]
extern crate log;
@@ -77,6 +82,7 @@ pub mod config;
pub mod key;
pub mod kms;
pub mod merkle;
+pub mod server;
pub mod sign;
pub use error::Error;
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..0fd5f1a
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,323 @@
+// Copyright 2017-2018 int08h LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+use hex;
+use std::io::ErrorKind;
+use std::net::SocketAddr;
+use std::process;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+use time;
+
+use byteorder::{LittleEndian, WriteBytesExt};
+
+use mio::net::UdpSocket;
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio_extras::timer::Timer;
+
+use config::ServerConfig;
+use key::{LongTermKey, OnlineKey};
+use kms;
+use merkle::MerkleTree;
+use MIN_REQUEST_LENGTH;
+use {Error, RtMessage, Tag};
+
+macro_rules! check_ctrlc {
+ ($keep_running:expr) => {
+ if !$keep_running.load(Ordering::Acquire) {
+ warn!("Ctrl-C caught, exiting...");
+ return true;
+ }
+ };
+}
+
+const MESSAGE: Token = Token(0);
+const STATUS: Token = Token(1);
+
+/// The main server instance.
+/// A Server is initialiezd from a Server Config
+/// and processes incoming messages in
+/// 'process_events'
+pub struct Server {
+ config: Box<ServerConfig>,
+ online_key: OnlineKey,
+ cert_bytes: Vec<u8>,
+
+ response_counter: AtomicUsize,
+ num_bad_requests: u64,
+
+ socket: UdpSocket,
+ keep_running: Arc<AtomicBool>,
+ poll_duration: Option<Duration>,
+ timer: Timer<()>,
+ poll: Poll,
+ events: Events,
+ merkle: MerkleTree,
+ requests: Vec<(Vec<u8>, SocketAddr)>,
+ buf: [u8; 65_536],
+
+ public_key: String,
+
+ // Used to send requests to outselves in fuzing mode
+ #[cfg(fuzzing)]
+ fake_client_socket: UdpSocket,
+}
+
+impl Server {
+ pub fn new(config: Box<ServerConfig>) -> Server {
+ let online_key = OnlineKey::new();
+ let public_key: String;
+
+ let cert_bytes = {
+ let seed = match kms::load_seed(&config) {
+ Ok(seed) => seed,
+ Err(e) => {
+ error!("Failed to load seed: {:#?}", e);
+ process::exit(1);
+ }
+ };
+ let mut long_term_key = LongTermKey::new(&seed);
+ public_key = hex::encode(long_term_key.public_key());
+
+ long_term_key.make_cert(&online_key).encode().unwrap()
+ };
+
+ let response_counter = AtomicUsize::new(0);
+ let keep_running = Arc::new(AtomicBool::new(true));
+
+ let sock_addr = config.socket_addr().expect("");
+ let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
+
+ let poll_duration = Some(Duration::from_millis(100));
+
+ let mut timer: Timer<()> = Timer::default();
+ timer.set_timeout(config.status_interval(), ());
+
+ let poll = Poll::new().unwrap();
+ poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge())
+ .unwrap();
+ poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ let merkle = MerkleTree::new();
+ let requests = Vec::with_capacity(config.batch_size() as usize);
+
+ Server {
+ config,
+ online_key,
+ cert_bytes,
+
+ response_counter,
+ num_bad_requests: 0,
+ socket,
+ keep_running,
+ poll_duration,
+ timer,
+ poll,
+ events: Events::with_capacity(32),
+ merkle,
+ requests,
+ buf: [0u8; 65_536],
+
+ public_key,
+
+ #[cfg(fuzzing)]
+ fake_client_socket: UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(),
+ }
+ }
+
+ pub fn get_keep_running(&self) -> Arc<AtomicBool> {
+ return self.keep_running.clone();
+ }
+
+ // extract the client's nonce from its request
+ fn nonce_from_request<'a>(&self, buf: &'a [u8], num_bytes: usize) -> Result<&'a [u8], Error> {
+ if num_bytes < MIN_REQUEST_LENGTH as usize {
+ return Err(Error::RequestTooShort);
+ }
+
+ let tag_count = &buf[..4];
+ let expected_nonc = &buf[8..12];
+ let expected_pad = &buf[12..16];
+
+ let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00];
+ let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value();
+ let tag2_is_pad = expected_pad == Tag::PAD.wire_value();
+
+ if tag_count_is_2 && tag1_is_nonc && tag2_is_pad {
+ Ok(&buf[0x10..0x50])
+ } else {
+ Err(Error::InvalidRequest)
+ }
+ }
+
+ fn make_response(
+ &self,
+ srep: &RtMessage,
+ cert_bytes: &[u8],
+ path: &[u8],
+ idx: u32,
+ ) -> RtMessage {
+ let mut index = [0; 4];
+ (&mut index as &mut [u8])
+ .write_u32::<LittleEndian>(idx)
+ .unwrap();
+
+ let sig_bytes = srep.get_field(Tag::SIG).unwrap();
+ let srep_bytes = srep.get_field(Tag::SREP).unwrap();
+
+ let mut response = RtMessage::new(5);
+ response.add_field(Tag::SIG, sig_bytes).unwrap();
+ response.add_field(Tag::PATH, path).unwrap();
+ response.add_field(Tag::SREP, srep_bytes).unwrap();
+ response.add_field(Tag::CERT, cert_bytes).unwrap();
+ response.add_field(Tag::INDX, &index).unwrap();
+
+ response
+ }
+
+ /// The main processing function for incoming connections.
+ /// This method should be called repeatedly in a loop
+ /// to process requests. It returns 'true' when the server
+ /// has shutdown (due to keep_running being set to 'false')
+ pub fn process_events(&mut self) -> bool {
+ self.poll
+ .poll(&mut self.events, self.poll_duration)
+ .expect("poll failed");
+
+ for event in self.events.iter() {
+ match event.token() {
+ MESSAGE => {
+ let mut done = false;
+
+ 'process_batch: loop {
+ check_ctrlc!(self.keep_running);
+
+ let resp_start = self.response_counter.load(Ordering::SeqCst);
+
+ for i in 0..self.config.batch_size() {
+ match self.socket.recv_from(&mut self.buf) {
+ Ok((num_bytes, src_addr)) => {
+ match self.nonce_from_request(&self.buf, num_bytes) {
+ Ok(nonce) => {
+ self.requests.push((Vec::from(nonce), src_addr));
+ self.merkle.push_leaf(nonce);
+ }
+ Err(e) => {
+ self.num_bad_requests += 1;
+
+ info!(
+ "Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})",
+ e, num_bytes, src_addr, i, resp_start + i as usize
+ );
+ }
+ }
+ }
+ Err(e) => match e.kind() {
+ ErrorKind::WouldBlock => {
+ done = true;
+ break;
+ }
+ _ => {
+ error!(
+ "Error receiving from socket: {:?}: {:?}",
+ e.kind(),
+ e
+ );
+ break;
+ }
+ },
+ };
+ }
+
+ if self.requests.is_empty() {
+ break 'process_batch;
+ }
+
+ let merkle_root = self.merkle.compute_root();
+ let srep = self.online_key.make_srep(time::get_time(), &merkle_root);
+
+ for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() {
+ let paths = self.merkle.get_paths(i);
+
+ let resp =
+ self.make_response(&srep, &self.cert_bytes, &paths, i as u32);
+ let resp_bytes = resp.encode().unwrap();
+
+ let bytes_sent = self
+ .socket
+ .send_to(&resp_bytes, &src_addr)
+ .expect("send_to failed");
+ let num_responses =
+ self.response_counter.fetch_add(1, Ordering::SeqCst);
+
+ info!(
+ "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
+ bytes_sent,
+ src_addr,
+ hex::encode(&nonce[0..4]),
+ i,
+ num_responses
+ );
+ }
+
+ self.merkle.reset();
+ self.requests.clear();
+
+ if done {
+ break 'process_batch;
+ }
+ }
+ }
+
+ STATUS => {
+ info!(
+ "responses {}, invalid requests {}",
+ self.response_counter.load(Ordering::SeqCst),
+ self.num_bad_requests
+ );
+
+ self.timer.set_timeout(self.config.status_interval(), ());
+ }
+
+ _ => unreachable!(),
+ }
+ }
+ false
+ }
+
+ #[cfg(fuzzing)]
+ pub fn send_to_self(&mut self, data: &[u8]) {
+ self.response_counter.store(0, Ordering::SeqCst);;
+ self.num_bad_requests = 0;
+ let res = self
+ .fake_client_socket
+ .send_to(data, &self.socket.local_addr().unwrap());
+ info!("Sent to self: {:?}", res);
+ }
+
+ pub fn get_public_key(&self) -> &str {
+ return &self.public_key;
+ }
+
+ pub fn get_online_key(&self) -> &OnlineKey {
+ return &self.online_key;
+ }
+
+ pub fn get_config(&self) -> &Box<ServerConfig> {
+ return &self.config;
+ }
+}