diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/roughenough-client.rs | 12 | ||||
-rw-r--r-- | src/bin/roughenough-kms.rs | 2 | ||||
-rw-r--r-- | src/bin/roughenough-server.rs | 249 | ||||
-rw-r--r-- | src/config/environment.rs | 11 | ||||
-rw-r--r-- | src/config/file.rs | 15 | ||||
-rw-r--r-- | src/config/memory.rs | 73 | ||||
-rw-r--r-- | src/config/mod.rs | 25 | ||||
-rw-r--r-- | src/key/mod.rs | 1 | ||||
-rw-r--r-- | src/kms/awskms.rs | 1 | ||||
-rw-r--r-- | src/kms/envelope.rs | 9 | ||||
-rw-r--r-- | src/kms/gcpkms.rs | 12 | ||||
-rw-r--r-- | src/kms/mod.rs | 2 | ||||
-rw-r--r-- | src/lib.rs | 6 | ||||
-rw-r--r-- | src/server.rs | 323 |
14 files changed, 497 insertions, 244 deletions
diff --git a/src/bin/roughenough-client.rs b/src/bin/roughenough-client.rs index 75ebe14..570dc47 100644 --- a/src/bin/roughenough-client.rs +++ b/src/bin/roughenough-client.rs @@ -28,6 +28,8 @@ use chrono::offset::Utc; use chrono::TimeZone; use std::collections::HashMap; +use std::fs::File; +use std::io::Write; use std::iter::Iterator; use std::net::{ToSocketAddrs, UdpSocket}; @@ -228,6 +230,12 @@ fn main() { .long("stress") .help("Stress-tests the server by sending the same request as fast as possible. Please only use this on your own server") ) + .arg(Arg::with_name("output") + .short("o") + .long("output") + .takes_value(true) + .help("Writes all requsts to the specified file, in addition to sending them to the server. Useful for generating fuzer inputs") + ) .get_matches(); let host = matches.value_of("host").unwrap(); @@ -238,6 +246,7 @@ fn main() { let pub_key = matches .value_of("public-key") .map(|pkey| hex::decode(pkey).expect("Error parsing public key!")); + let out = matches.value_of("output"); println!("Requesting time from: {:?}:{:?}", host, port); @@ -264,11 +273,14 @@ fn main() { } let mut requests = Vec::with_capacity(num_requests); + let mut file = out.map(|o| File::create(o).expect("Failed to create file!")); for _ in 0..num_requests { let nonce = create_nonce(); let mut socket = UdpSocket::bind("0.0.0.0:0").expect("Couldn't open UDP socket"); let request = make_request(&nonce); + file.as_mut() + .map(|f| f.write_all(&request).expect("Failed to write to file!")); requests.push((nonce, request, socket)); } diff --git a/src/bin/roughenough-kms.rs b/src/bin/roughenough-kms.rs index 6563224..1cea22e 100644 --- a/src/bin/roughenough-kms.rs +++ b/src/bin/roughenough-kms.rs @@ -30,8 +30,8 @@ use roughenough::VERSION; #[cfg(feature = "awskms")] fn aws_kms(kms_key: &str, plaintext_seed: &[u8]) { - use roughenough::kms::EnvelopeEncryption; use roughenough::kms::AwsKms; + use roughenough::kms::EnvelopeEncryption; let client = AwsKms::from_arn(kms_key).unwrap(); diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs index 9c00d7e..6c6a118 100644 --- a/src/bin/roughenough-server.rs +++ b/src/bin/roughenough-server.rs @@ -41,200 +41,52 @@ extern crate untrusted; extern crate yaml_rust; use std::env; -use std::io::ErrorKind; use std::process; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::Duration; - -use mio::net::UdpSocket; -use mio::{Events, Poll, PollOpt, Ready, Token}; -use mio_extras::timer::Timer; - -use byteorder::{LittleEndian, WriteBytesExt}; +use std::sync::atomic::Ordering; use roughenough::config; use roughenough::config::ServerConfig; -use roughenough::kms; -use roughenough::key::{LongTermKey, OnlineKey}; -use roughenough::merkle::MerkleTree; -use roughenough::{Error, RtMessage, Tag}; -use roughenough::{MIN_REQUEST_LENGTH, VERSION}; - -const MESSAGE: Token = Token(0); -const STATUS: Token = Token(1); - -fn make_response(srep: &RtMessage, cert_bytes: &[u8], path: &[u8], idx: u32) -> RtMessage { - let mut index = [0; 4]; - (&mut index as &mut [u8]) - .write_u32::<LittleEndian>(idx) - .unwrap(); - - let sig_bytes = srep.get_field(Tag::SIG).unwrap(); - let srep_bytes = srep.get_field(Tag::SREP).unwrap(); - - let mut response = RtMessage::new(5); - response.add_field(Tag::SIG, sig_bytes).unwrap(); - response.add_field(Tag::PATH, path).unwrap(); - response.add_field(Tag::SREP, srep_bytes).unwrap(); - response.add_field(Tag::CERT, cert_bytes).unwrap(); - response.add_field(Tag::INDX, &index).unwrap(); - - response -} - -// extract the client's nonce from its request -fn nonce_from_request(buf: &[u8], num_bytes: usize) -> Result<&[u8], Error> { - if num_bytes < MIN_REQUEST_LENGTH as usize { - return Err(Error::RequestTooShort); - } - - let tag_count = &buf[..4]; - let expected_nonc = &buf[8..12]; - let expected_pad = &buf[12..16]; - - let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00]; - let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value(); - let tag2_is_pad = expected_pad == Tag::PAD.wire_value(); - - if tag_count_is_2 && tag1_is_nonc && tag2_is_pad { - Ok(&buf[0x10..0x50]) - } else { - Err(Error::InvalidRequest) - } +use roughenough::server::Server; +use roughenough::VERSION; + +macro_rules! check_ctrlc { + ($keep_running:expr) => { + if !$keep_running.load(Ordering::Acquire) { + warn!("Ctrl-C caught, exiting..."); + return; + } + }; } -fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_bytes: &[u8]) { - let response_counter = AtomicUsize::new(0); - let keep_running = Arc::new(AtomicBool::new(true)); - let kr = keep_running.clone(); +fn polling_loop(config: Box<ServerConfig>) { + let mut server = Server::new(config); + + info!("Long-term public key : {}", server.get_public_key()); + info!("Online public key : {}", server.get_online_key()); + info!( + "Max response batch size : {}", + server.get_config().batch_size() + ); + info!( + "Status updates every : {} seconds", + server.get_config().status_interval().as_secs() + ); + info!( + "Server listening on : {}:{}", + server.get_config().interface(), + server.get_config().port() + ); + + let kr = server.get_keep_running(); + let kr_new = kr.clone(); ctrlc::set_handler(move || kr.store(false, Ordering::Release)) .expect("failed setting Ctrl-C handler"); - let sock_addr = config.socket_addr().expect(""); - let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); - let poll_duration = Some(Duration::from_millis(100)); - - let mut timer: Timer<()> = Timer::default(); - timer.set_timeout(config.status_interval(), ()); - - let mut buf = [0u8; 65_536]; - let mut events = Events::with_capacity(32); - let mut num_bad_requests = 0u64; - - let poll = Poll::new().unwrap(); - poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge()) - .unwrap(); - poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge()) - .unwrap(); - - let mut merkle = MerkleTree::new(); - let mut requests = Vec::with_capacity(config.batch_size() as usize); - - macro_rules! check_ctrlc { - () => { - if !keep_running.load(Ordering::Acquire) { - warn!("Ctrl-C caught, exiting..."); - return; - } - }; - } - loop { - check_ctrlc!(); - - poll.poll(&mut events, poll_duration).expect("poll failed"); - - for event in events.iter() { - match event.token() { - MESSAGE => { - let mut done = false; - - 'process_batch: loop { - check_ctrlc!(); - - merkle.reset(); - requests.clear(); - - let resp_start = response_counter.load(Ordering::SeqCst); - - for i in 0..config.batch_size() { - match socket.recv_from(&mut buf) { - Ok((num_bytes, src_addr)) => { - if let Ok(nonce) = nonce_from_request(&buf, num_bytes) { - requests.push((Vec::from(nonce), src_addr)); - merkle.push_leaf(nonce); - } else { - num_bad_requests += 1; - info!( - "Invalid request ({} bytes) from {} (#{} in batch, resp #{})", - num_bytes, src_addr, i, resp_start + i as usize - ); - } - } - Err(e) => match e.kind() { - ErrorKind::WouldBlock => { - done = true; - break; - } - _ => { - error!( - "Error receiving from socket: {:?}: {:?}", - e.kind(), - e - ); - break; - } - }, - }; - } - - if requests.is_empty() { - break 'process_batch; - } - - let merkle_root = merkle.compute_root(); - let srep = online_key.make_srep(time::get_time(), &merkle_root); - - for (i, &(ref nonce, ref src_addr)) in requests.iter().enumerate() { - let paths = merkle.get_paths(i); - - let resp = make_response(&srep, cert_bytes, &paths, i as u32); - let resp_bytes = resp.encode().unwrap(); - - let bytes_sent = socket - .send_to(&resp_bytes, &src_addr) - .expect("send_to failed"); - let num_responses = response_counter.fetch_add(1, Ordering::SeqCst); - - info!( - "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})", - bytes_sent, - src_addr, - hex::encode(&nonce[0..4]), - i, - num_responses - ); - } - if done { - break 'process_batch; - } - } - } - - STATUS => { - info!( - "responses {}, invalid requests {}", - response_counter.load(Ordering::SeqCst), - num_bad_requests - ); - - timer.set_timeout(config.status_interval(), ()); - } - - _ => unreachable!(), - } + check_ctrlc!(kr_new); + if server.process_events() { + return; } } } @@ -254,7 +106,11 @@ pub fn main() { simple_logger::init_with_level(Level::Info).unwrap(); - info!("Roughenough server v{}{} starting", VERSION, kms_support_str()); + info!( + "Roughenough server v{}{} starting", + VERSION, + kms_support_str() + ); let mut args = env::args(); if args.len() != 2 { @@ -272,30 +128,7 @@ pub fn main() { Ok(cfg) => cfg, }; - let mut online_key = OnlineKey::new(); - let public_key: String; - - let cert_bytes = { - let seed = match kms::load_seed(&config) { - Ok(seed) => seed, - Err(e) => { - error!("Failed to load seed: {:#?}", e); - process::exit(1); - } - }; - let mut long_term_key = LongTermKey::new(&seed); - public_key = hex::encode(long_term_key.public_key()); - - long_term_key.make_cert(&online_key).encode().unwrap() - }; - - info!("Long-term public key : {}", public_key); - info!("Online public key : {}", online_key); - info!("Max response batch size : {}", config.batch_size()); - info!("Status updates every : {} seconds", config.status_interval().as_secs()); - info!("Server listening on : {}:{}", config.interface(), config.port()); - - polling_loop(&config, &mut online_key, &cert_bytes); + polling_loop(config); info!("Done."); process::exit(0); diff --git a/src/config/environment.rs b/src/config/environment.rs index 2385b28..5edb6d0 100644 --- a/src/config/environment.rs +++ b/src/config/environment.rs @@ -15,13 +15,12 @@ extern crate hex; use std::env; -use std::net::SocketAddr; use std::time::Duration; use config::ServerConfig; use config::{DEFAULT_BATCH_SIZE, DEFAULT_STATUS_INTERVAL}; -use Error; use key::KeyProtection; +use Error; /// /// Obtain a Roughenough server configuration ([ServerConfig](trait.ServerConfig.html)) @@ -123,14 +122,6 @@ impl ServerConfig for EnvironmentConfig { self.status_interval } - fn socket_addr(&self) -> Result<SocketAddr, Error> { - let addr = format!("{}:{}", self.interface, self.port); - match addr.parse() { - Ok(v) => Ok(v), - Err(_) => Err(Error::InvalidConfiguration(addr)), - } - } - fn key_protection(&self) -> &KeyProtection { &self.key_protection } diff --git a/src/config/file.rs b/src/config/file.rs index 440c78c..b0f8b4d 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -16,14 +16,13 @@ extern crate hex; use std::fs::File; use std::io::Read; -use std::net::SocketAddr; use std::time::Duration; use yaml_rust::YamlLoader; use config::ServerConfig; use config::{DEFAULT_BATCH_SIZE, DEFAULT_STATUS_INTERVAL}; -use Error; use key::KeyProtection; +use Error; /// /// Read a Roughenough server configuration ([ServerConfig](trait.ServerConfig.html)) @@ -87,7 +86,9 @@ impl FileConfig { config.status_interval = Duration::from_secs(val as u64) } "key_protection" => { - let val = value.as_str().unwrap() + let val = value + .as_str() + .unwrap() .parse() .expect(format!("invalid key_protection value: {:?}", value).as_ref()); config.key_protection = val @@ -126,14 +127,6 @@ impl ServerConfig for FileConfig { self.status_interval } - fn socket_addr(&self) -> Result<SocketAddr, Error> { - let addr = format!("{}:{}", self.interface, self.port); - match addr.parse() { - Ok(v) => Ok(v), - Err(_) => Err(Error::InvalidConfiguration(addr)), - } - } - fn key_protection(&self) -> &KeyProtection { &self.key_protection } diff --git a/src/config/memory.rs b/src/config/memory.rs new file mode 100644 index 0000000..abca5a5 --- /dev/null +++ b/src/config/memory.rs @@ -0,0 +1,73 @@ +// Copyright 2017-2018 int08h LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +use config::ServerConfig; +use config::{DEFAULT_BATCH_SIZE, DEFAULT_STATUS_INTERVAL}; +use key::KeyProtection; +use std::time::Duration; + +use hex; + +/// A purely in-memory Roughenough config +/// This is useful for fuzzing a server without the need +/// to create additioanl files. +pub struct MemoryConfig { + pub port: u16, + pub interface: String, + pub seed: Vec<u8>, + pub batch_size: u8, + pub status_interval: Duration, + pub key_protection: KeyProtection, +} + +impl MemoryConfig { + pub fn new(port: u16) -> MemoryConfig { + MemoryConfig { + port, + interface: "127.0.0.1".to_string(), + seed: hex::decode("a32049da0ffde0ded92ce10a0230d35fe615ec8461c14986baa63fe3b3bac3db") + .unwrap(), + batch_size: DEFAULT_BATCH_SIZE, + status_interval: DEFAULT_STATUS_INTERVAL, + key_protection: KeyProtection::Plaintext, + } + } +} + +impl ServerConfig for MemoryConfig { + fn interface(&self) -> &str { + self.interface.as_ref() + } + + fn port(&self) -> u16 { + self.port + } + + fn seed(&self) -> Vec<u8> { + self.seed.clone() + } + + fn batch_size(&self) -> u8 { + self.batch_size + } + + fn status_interval(&self) -> Duration { + self.status_interval + } + + fn key_protection(&self) -> &KeyProtection { + &self.key_protection + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs index f05578b..772e1ee 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -34,8 +34,11 @@ pub use self::file::FileConfig; mod environment; pub use self::environment::EnvironmentConfig; -use Error; +mod memory; +pub use self::memory::MemoryConfig; + use key::KeyProtection; +use Error; /// Maximum number of requests to process in one batch and include the the Merkle tree. pub const DEFAULT_BATCH_SIZE: u8 = 64; @@ -89,7 +92,13 @@ pub trait ServerConfig { fn key_protection(&self) -> &KeyProtection; /// Convenience function to create a `SocketAddr` from the provided `interface` and `port` - fn socket_addr(&self) -> Result<SocketAddr, Error>; + fn socket_addr(&self) -> Result<SocketAddr, Error> { + let addr = format!("{}:{}", self.interface(), self.port()); + match addr.parse() { + Ok(v) => Ok(v), + Err(_) => Err(Error::InvalidConfiguration(addr)), + } + } } /// Factory function to create a `ServerConfig` _trait object_ based on the value @@ -135,14 +144,22 @@ pub fn is_valid_config(cfg: &Box<ServerConfig>) -> bool { is_valid = false; } if cfg.batch_size() < 1 || cfg.batch_size() > 64 { - error!("batch_size {} is invalid; valid range 1-64", cfg.batch_size()); + error!( + "batch_size {} is invalid; valid range 1-64", + cfg.batch_size() + ); is_valid = false; } if is_valid { match cfg.socket_addr() { Err(e) => { - error!("failed to create socket {}:{} {:?}", cfg.interface(), cfg.port(), e); + error!( + "failed to create socket {}:{} {:?}", + cfg.interface(), + cfg.port(), + e + ); is_valid = false; } _ => (), diff --git a/src/key/mod.rs b/src/key/mod.rs index 32ca241..6bb3eb5 100644 --- a/src/key/mod.rs +++ b/src/key/mod.rs @@ -66,4 +66,3 @@ impl FromStr for KeyProtection { } } } - diff --git a/src/kms/awskms.rs b/src/kms/awskms.rs index 96d4a38..14f0804 100644 --- a/src/kms/awskms.rs +++ b/src/kms/awskms.rs @@ -121,4 +121,3 @@ pub mod inner { } } } - diff --git a/src/kms/envelope.rs b/src/kms/envelope.rs index 1f6d615..da75961 100644 --- a/src/kms/envelope.rs +++ b/src/kms/envelope.rs @@ -73,7 +73,6 @@ fn vec_zero_filled(len: usize) -> Vec<u8> { pub struct EnvelopeEncryption; impl EnvelopeEncryption { - /// Decrypt a seed previously encrypted with `encrypt_seed()` pub fn decrypt_seed(kms: &KmsProvider, ciphertext_blob: &[u8]) -> Result<Vec<u8>, KmsError> { if ciphertext_blob.len() < MIN_PAYLOAD_SIZE { @@ -107,7 +106,13 @@ impl EnvelopeEncryption { // Decrypt the seed value using the DEK let dek_open_key = OpeningKey::new(&AES_256_GCM, &dek)?; - match open_in_place(&dek_open_key, &nonce, AD, IN_PREFIX_LEN, &mut encrypted_seed) { + match open_in_place( + &dek_open_key, + &nonce, + AD, + IN_PREFIX_LEN, + &mut encrypted_seed, + ) { Ok(plaintext_seed) => Ok(plaintext_seed.to_vec()), Err(_) => Err(KmsError::OperationFailed( "failed to decrypt plaintext seed".to_string(), diff --git a/src/kms/gcpkms.rs b/src/kms/gcpkms.rs index c0fbb5d..13303db 100644 --- a/src/kms/gcpkms.rs +++ b/src/kms/gcpkms.rs @@ -19,14 +19,16 @@ extern crate log; pub mod inner { extern crate base64; + extern crate google_cloudkms1 as cloudkms1; extern crate hyper; extern crate hyper_rustls; extern crate yup_oauth2 as oauth2; - extern crate google_cloudkms1 as cloudkms1; + use std::fmt; use std::env; use std::fmt::Formatter; + use std::result::Result; use std::str::FromStr; use std::result::Result; use std::default::Default; @@ -34,13 +36,15 @@ pub mod inner { use std::path::Path; use std::time::Duration; - use self::oauth2::{service_account_key_from_file, ServiceAccountAccess, ServiceAccountKey}; use self::cloudkms1::CloudKMS; - use self::cloudkms1::{Result as CloudKmsResult, Error as CloudKmsError, EncryptRequest, DecryptRequest}; + use self::cloudkms1::{ + DecryptRequest, EncryptRequest, Error as CloudKmsError, Result as CloudKmsResult, + }; use self::hyper::net::HttpsConnector; use self::hyper::header::Headers; use self::hyper::status::StatusCode; use self::hyper_rustls::TlsClient; + use self::oauth2::{service_account_key_from_file, ServiceAccountAccess, ServiceAccountKey}; use kms::{EncryptedDEK, KmsError, KmsProvider, PlaintextDEK}; @@ -156,5 +160,3 @@ pub mod inner { panic!("Failed to load service account credential. Is GOOGLE_APPLICATION_CREDENTIALS set?"); } } - - diff --git a/src/kms/mod.rs b/src/kms/mod.rs index 810623a..56e7631 100644 --- a/src/kms/mod.rs +++ b/src/kms/mod.rs @@ -52,9 +52,9 @@ mod envelope; use base64; +use ring; use std; use std::error::Error; -use ring; use config::ServerConfig; use error; @@ -64,6 +64,11 @@ extern crate time; extern crate yaml_rust; #[macro_use] extern crate hyper; +extern crate hex; +extern crate mio; +extern crate mio_extras; +extern crate time; +extern crate yaml_rust; #[macro_use] extern crate log; @@ -77,6 +82,7 @@ pub mod config; pub mod key; pub mod kms; pub mod merkle; +pub mod server; pub mod sign; pub use error::Error; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..0fd5f1a --- /dev/null +++ b/src/server.rs @@ -0,0 +1,323 @@ +// Copyright 2017-2018 int08h LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +use hex; +use std::io::ErrorKind; +use std::net::SocketAddr; +use std::process; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use time; + +use byteorder::{LittleEndian, WriteBytesExt}; + +use mio::net::UdpSocket; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio_extras::timer::Timer; + +use config::ServerConfig; +use key::{LongTermKey, OnlineKey}; +use kms; +use merkle::MerkleTree; +use MIN_REQUEST_LENGTH; +use {Error, RtMessage, Tag}; + +macro_rules! check_ctrlc { + ($keep_running:expr) => { + if !$keep_running.load(Ordering::Acquire) { + warn!("Ctrl-C caught, exiting..."); + return true; + } + }; +} + +const MESSAGE: Token = Token(0); +const STATUS: Token = Token(1); + +/// The main server instance. +/// A Server is initialiezd from a Server Config +/// and processes incoming messages in +/// 'process_events' +pub struct Server { + config: Box<ServerConfig>, + online_key: OnlineKey, + cert_bytes: Vec<u8>, + + response_counter: AtomicUsize, + num_bad_requests: u64, + + socket: UdpSocket, + keep_running: Arc<AtomicBool>, + poll_duration: Option<Duration>, + timer: Timer<()>, + poll: Poll, + events: Events, + merkle: MerkleTree, + requests: Vec<(Vec<u8>, SocketAddr)>, + buf: [u8; 65_536], + + public_key: String, + + // Used to send requests to outselves in fuzing mode + #[cfg(fuzzing)] + fake_client_socket: UdpSocket, +} + +impl Server { + pub fn new(config: Box<ServerConfig>) -> Server { + let online_key = OnlineKey::new(); + let public_key: String; + + let cert_bytes = { + let seed = match kms::load_seed(&config) { + Ok(seed) => seed, + Err(e) => { + error!("Failed to load seed: {:#?}", e); + process::exit(1); + } + }; + let mut long_term_key = LongTermKey::new(&seed); + public_key = hex::encode(long_term_key.public_key()); + + long_term_key.make_cert(&online_key).encode().unwrap() + }; + + let response_counter = AtomicUsize::new(0); + let keep_running = Arc::new(AtomicBool::new(true)); + + let sock_addr = config.socket_addr().expect(""); + let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); + + let poll_duration = Some(Duration::from_millis(100)); + + let mut timer: Timer<()> = Timer::default(); + timer.set_timeout(config.status_interval(), ()); + + let poll = Poll::new().unwrap(); + poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge()) + .unwrap(); + poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge()) + .unwrap(); + + let merkle = MerkleTree::new(); + let requests = Vec::with_capacity(config.batch_size() as usize); + + Server { + config, + online_key, + cert_bytes, + + response_counter, + num_bad_requests: 0, + socket, + keep_running, + poll_duration, + timer, + poll, + events: Events::with_capacity(32), + merkle, + requests, + buf: [0u8; 65_536], + + public_key, + + #[cfg(fuzzing)] + fake_client_socket: UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(), + } + } + + pub fn get_keep_running(&self) -> Arc<AtomicBool> { + return self.keep_running.clone(); + } + + // extract the client's nonce from its request + fn nonce_from_request<'a>(&self, buf: &'a [u8], num_bytes: usize) -> Result<&'a [u8], Error> { + if num_bytes < MIN_REQUEST_LENGTH as usize { + return Err(Error::RequestTooShort); + } + + let tag_count = &buf[..4]; + let expected_nonc = &buf[8..12]; + let expected_pad = &buf[12..16]; + + let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00]; + let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value(); + let tag2_is_pad = expected_pad == Tag::PAD.wire_value(); + + if tag_count_is_2 && tag1_is_nonc && tag2_is_pad { + Ok(&buf[0x10..0x50]) + } else { + Err(Error::InvalidRequest) + } + } + + fn make_response( + &self, + srep: &RtMessage, + cert_bytes: &[u8], + path: &[u8], + idx: u32, + ) -> RtMessage { + let mut index = [0; 4]; + (&mut index as &mut [u8]) + .write_u32::<LittleEndian>(idx) + .unwrap(); + + let sig_bytes = srep.get_field(Tag::SIG).unwrap(); + let srep_bytes = srep.get_field(Tag::SREP).unwrap(); + + let mut response = RtMessage::new(5); + response.add_field(Tag::SIG, sig_bytes).unwrap(); + response.add_field(Tag::PATH, path).unwrap(); + response.add_field(Tag::SREP, srep_bytes).unwrap(); + response.add_field(Tag::CERT, cert_bytes).unwrap(); + response.add_field(Tag::INDX, &index).unwrap(); + + response + } + + /// The main processing function for incoming connections. + /// This method should be called repeatedly in a loop + /// to process requests. It returns 'true' when the server + /// has shutdown (due to keep_running being set to 'false') + pub fn process_events(&mut self) -> bool { + self.poll + .poll(&mut self.events, self.poll_duration) + .expect("poll failed"); + + for event in self.events.iter() { + match event.token() { + MESSAGE => { + let mut done = false; + + 'process_batch: loop { + check_ctrlc!(self.keep_running); + + let resp_start = self.response_counter.load(Ordering::SeqCst); + + for i in 0..self.config.batch_size() { + match self.socket.recv_from(&mut self.buf) { + Ok((num_bytes, src_addr)) => { + match self.nonce_from_request(&self.buf, num_bytes) { + Ok(nonce) => { + self.requests.push((Vec::from(nonce), src_addr)); + self.merkle.push_leaf(nonce); + } + Err(e) => { + self.num_bad_requests += 1; + + info!( + "Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})", + e, num_bytes, src_addr, i, resp_start + i as usize + ); + } + } + } + Err(e) => match e.kind() { + ErrorKind::WouldBlock => { + done = true; + break; + } + _ => { + error!( + "Error receiving from socket: {:?}: {:?}", + e.kind(), + e + ); + break; + } + }, + }; + } + + if self.requests.is_empty() { + break 'process_batch; + } + + let merkle_root = self.merkle.compute_root(); + let srep = self.online_key.make_srep(time::get_time(), &merkle_root); + + for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() { + let paths = self.merkle.get_paths(i); + + let resp = + self.make_response(&srep, &self.cert_bytes, &paths, i as u32); + let resp_bytes = resp.encode().unwrap(); + + let bytes_sent = self + .socket + .send_to(&resp_bytes, &src_addr) + .expect("send_to failed"); + let num_responses = + self.response_counter.fetch_add(1, Ordering::SeqCst); + + info!( + "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})", + bytes_sent, + src_addr, + hex::encode(&nonce[0..4]), + i, + num_responses + ); + } + + self.merkle.reset(); + self.requests.clear(); + + if done { + break 'process_batch; + } + } + } + + STATUS => { + info!( + "responses {}, invalid requests {}", + self.response_counter.load(Ordering::SeqCst), + self.num_bad_requests + ); + + self.timer.set_timeout(self.config.status_interval(), ()); + } + + _ => unreachable!(), + } + } + false + } + + #[cfg(fuzzing)] + pub fn send_to_self(&mut self, data: &[u8]) { + self.response_counter.store(0, Ordering::SeqCst);; + self.num_bad_requests = 0; + let res = self + .fake_client_socket + .send_to(data, &self.socket.local_addr().unwrap()); + info!("Sent to self: {:?}", res); + } + + pub fn get_public_key(&self) -> &str { + return &self.public_key; + } + + pub fn get_online_key(&self) -> &OnlineKey { + return &self.online_key; + } + + pub fn get_config(&self) -> &Box<ServerConfig> { + return &self.config; + } +} |