diff options
Diffstat (limited to 'src/bin/roughenough-server.rs')
-rw-r--r-- | src/bin/roughenough-server.rs | 249 |
1 files changed, 41 insertions, 208 deletions
diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs index 9c00d7e..6c6a118 100644 --- a/src/bin/roughenough-server.rs +++ b/src/bin/roughenough-server.rs @@ -41,200 +41,52 @@ extern crate untrusted; extern crate yaml_rust; use std::env; -use std::io::ErrorKind; use std::process; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::Duration; - -use mio::net::UdpSocket; -use mio::{Events, Poll, PollOpt, Ready, Token}; -use mio_extras::timer::Timer; - -use byteorder::{LittleEndian, WriteBytesExt}; +use std::sync::atomic::Ordering; use roughenough::config; use roughenough::config::ServerConfig; -use roughenough::kms; -use roughenough::key::{LongTermKey, OnlineKey}; -use roughenough::merkle::MerkleTree; -use roughenough::{Error, RtMessage, Tag}; -use roughenough::{MIN_REQUEST_LENGTH, VERSION}; - -const MESSAGE: Token = Token(0); -const STATUS: Token = Token(1); - -fn make_response(srep: &RtMessage, cert_bytes: &[u8], path: &[u8], idx: u32) -> RtMessage { - let mut index = [0; 4]; - (&mut index as &mut [u8]) - .write_u32::<LittleEndian>(idx) - .unwrap(); - - let sig_bytes = srep.get_field(Tag::SIG).unwrap(); - let srep_bytes = srep.get_field(Tag::SREP).unwrap(); - - let mut response = RtMessage::new(5); - response.add_field(Tag::SIG, sig_bytes).unwrap(); - response.add_field(Tag::PATH, path).unwrap(); - response.add_field(Tag::SREP, srep_bytes).unwrap(); - response.add_field(Tag::CERT, cert_bytes).unwrap(); - response.add_field(Tag::INDX, &index).unwrap(); - - response -} - -// extract the client's nonce from its request -fn nonce_from_request(buf: &[u8], num_bytes: usize) -> Result<&[u8], Error> { - if num_bytes < MIN_REQUEST_LENGTH as usize { - return Err(Error::RequestTooShort); - } - - let tag_count = &buf[..4]; - let expected_nonc = &buf[8..12]; - let expected_pad = &buf[12..16]; - - let tag_count_is_2 = tag_count == [0x02, 0x00, 0x00, 0x00]; - let tag1_is_nonc = expected_nonc == Tag::NONC.wire_value(); - let tag2_is_pad = expected_pad == Tag::PAD.wire_value(); - - if tag_count_is_2 && tag1_is_nonc && tag2_is_pad { - Ok(&buf[0x10..0x50]) - } else { - Err(Error::InvalidRequest) - } +use roughenough::server::Server; +use roughenough::VERSION; + +macro_rules! check_ctrlc { + ($keep_running:expr) => { + if !$keep_running.load(Ordering::Acquire) { + warn!("Ctrl-C caught, exiting..."); + return; + } + }; } -fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_bytes: &[u8]) { - let response_counter = AtomicUsize::new(0); - let keep_running = Arc::new(AtomicBool::new(true)); - let kr = keep_running.clone(); +fn polling_loop(config: Box<ServerConfig>) { + let mut server = Server::new(config); + + info!("Long-term public key : {}", server.get_public_key()); + info!("Online public key : {}", server.get_online_key()); + info!( + "Max response batch size : {}", + server.get_config().batch_size() + ); + info!( + "Status updates every : {} seconds", + server.get_config().status_interval().as_secs() + ); + info!( + "Server listening on : {}:{}", + server.get_config().interface(), + server.get_config().port() + ); + + let kr = server.get_keep_running(); + let kr_new = kr.clone(); ctrlc::set_handler(move || kr.store(false, Ordering::Release)) .expect("failed setting Ctrl-C handler"); - let sock_addr = config.socket_addr().expect(""); - let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); - let poll_duration = Some(Duration::from_millis(100)); - - let mut timer: Timer<()> = Timer::default(); - timer.set_timeout(config.status_interval(), ()); - - let mut buf = [0u8; 65_536]; - let mut events = Events::with_capacity(32); - let mut num_bad_requests = 0u64; - - let poll = Poll::new().unwrap(); - poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge()) - .unwrap(); - poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge()) - .unwrap(); - - let mut merkle = MerkleTree::new(); - let mut requests = Vec::with_capacity(config.batch_size() as usize); - - macro_rules! check_ctrlc { - () => { - if !keep_running.load(Ordering::Acquire) { - warn!("Ctrl-C caught, exiting..."); - return; - } - }; - } - loop { - check_ctrlc!(); - - poll.poll(&mut events, poll_duration).expect("poll failed"); - - for event in events.iter() { - match event.token() { - MESSAGE => { - let mut done = false; - - 'process_batch: loop { - check_ctrlc!(); - - merkle.reset(); - requests.clear(); - - let resp_start = response_counter.load(Ordering::SeqCst); - - for i in 0..config.batch_size() { - match socket.recv_from(&mut buf) { - Ok((num_bytes, src_addr)) => { - if let Ok(nonce) = nonce_from_request(&buf, num_bytes) { - requests.push((Vec::from(nonce), src_addr)); - merkle.push_leaf(nonce); - } else { - num_bad_requests += 1; - info!( - "Invalid request ({} bytes) from {} (#{} in batch, resp #{})", - num_bytes, src_addr, i, resp_start + i as usize - ); - } - } - Err(e) => match e.kind() { - ErrorKind::WouldBlock => { - done = true; - break; - } - _ => { - error!( - "Error receiving from socket: {:?}: {:?}", - e.kind(), - e - ); - break; - } - }, - }; - } - - if requests.is_empty() { - break 'process_batch; - } - - let merkle_root = merkle.compute_root(); - let srep = online_key.make_srep(time::get_time(), &merkle_root); - - for (i, &(ref nonce, ref src_addr)) in requests.iter().enumerate() { - let paths = merkle.get_paths(i); - - let resp = make_response(&srep, cert_bytes, &paths, i as u32); - let resp_bytes = resp.encode().unwrap(); - - let bytes_sent = socket - .send_to(&resp_bytes, &src_addr) - .expect("send_to failed"); - let num_responses = response_counter.fetch_add(1, Ordering::SeqCst); - - info!( - "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})", - bytes_sent, - src_addr, - hex::encode(&nonce[0..4]), - i, - num_responses - ); - } - if done { - break 'process_batch; - } - } - } - - STATUS => { - info!( - "responses {}, invalid requests {}", - response_counter.load(Ordering::SeqCst), - num_bad_requests - ); - - timer.set_timeout(config.status_interval(), ()); - } - - _ => unreachable!(), - } + check_ctrlc!(kr_new); + if server.process_events() { + return; } } } @@ -254,7 +106,11 @@ pub fn main() { simple_logger::init_with_level(Level::Info).unwrap(); - info!("Roughenough server v{}{} starting", VERSION, kms_support_str()); + info!( + "Roughenough server v{}{} starting", + VERSION, + kms_support_str() + ); let mut args = env::args(); if args.len() != 2 { @@ -272,30 +128,7 @@ pub fn main() { Ok(cfg) => cfg, }; - let mut online_key = OnlineKey::new(); - let public_key: String; - - let cert_bytes = { - let seed = match kms::load_seed(&config) { - Ok(seed) => seed, - Err(e) => { - error!("Failed to load seed: {:#?}", e); - process::exit(1); - } - }; - let mut long_term_key = LongTermKey::new(&seed); - public_key = hex::encode(long_term_key.public_key()); - - long_term_key.make_cert(&online_key).encode().unwrap() - }; - - info!("Long-term public key : {}", public_key); - info!("Online public key : {}", online_key); - info!("Max response batch size : {}", config.batch_size()); - info!("Status updates every : {} seconds", config.status_interval().as_secs()); - info!("Server listening on : {}:{}", config.interface(), config.port()); - - polling_loop(&config, &mut online_key, &cert_bytes); + polling_loop(config); info!("Done."); process::exit(0); |