diff options
author | Aaron Hill <aa1ronham@gmail.com> | 2018-10-13 19:51:01 -0400 |
---|---|---|
committer | Aaron Hill <aa1ronham@gmail.com> | 2018-10-17 21:18:33 -0400 |
commit | c50c8ef27467616645ddc1aa22fa56fc0439ab6b (patch) | |
tree | bbe4b01bac4759ebdc92d791864f2aa941742a15 /src/bin/roughenough-server.rs | |
parent | 54955e4126a6da3aa233ccf837d7ba03f40be6d3 (diff) | |
download | roughenough-c50c8ef27467616645ddc1aa22fa56fc0439ab6b.zip |
Exact code into Server struct
Diffstat (limited to 'src/bin/roughenough-server.rs')
-rw-r--r-- | src/bin/roughenough-server.rs | 196 |
1 files changed, 142 insertions, 54 deletions
diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs index 9c00d7e..5b8b65d 100644 --- a/src/bin/roughenough-server.rs +++ b/src/bin/roughenough-server.rs @@ -46,6 +46,7 @@ use std::process; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use std::net::SocketAddr; use mio::net::UdpSocket; use mio::{Events, Poll, PollOpt, Ready, Token}; @@ -61,6 +62,15 @@ use roughenough::merkle::MerkleTree; use roughenough::{Error, RtMessage, Tag}; use roughenough::{MIN_REQUEST_LENGTH, VERSION}; +macro_rules! check_ctrlc { + ($keep_running:expr) => { + if !$keep_running.load(Ordering::Acquire) { + warn!("Ctrl-C caught, exiting..."); + return true; + } + } +} + const MESSAGE: Token = Token(0); const STATUS: Token = Token(1); @@ -104,69 +114,99 @@ fn nonce_from_request(buf: &[u8], num_bytes: usize) -> Result<&[u8], Error> { } } -fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_bytes: &[u8]) { - let response_counter = AtomicUsize::new(0); - let keep_running = Arc::new(AtomicBool::new(true)); - let kr = keep_running.clone(); - ctrlc::set_handler(move || kr.store(false, Ordering::Release)) - .expect("failed setting Ctrl-C handler"); - let sock_addr = config.socket_addr().expect(""); - let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); - let poll_duration = Some(Duration::from_millis(100)); +struct Server { + config: Box<ServerConfig>, + online_key: OnlineKey, + cert_bytes: Vec<u8>, - let mut timer: Timer<()> = Timer::default(); - timer.set_timeout(config.status_interval(), ()); + response_counter: AtomicUsize, + num_bad_requests: u64, - let mut buf = [0u8; 65_536]; - let mut events = Events::with_capacity(32); - let mut num_bad_requests = 0u64; + socket: UdpSocket, + keep_running: Arc<AtomicBool>, + poll_duration: Option<Duration>, + timer: Timer<()>, + poll: Poll, + events: Events, + merkle: MerkleTree, + requests: Vec<(Vec<u8>, SocketAddr)>, + buf: [u8; 65_536], - let poll = Poll::new().unwrap(); - poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge()) - .unwrap(); - poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge()) - .unwrap(); + +} - let mut merkle = MerkleTree::new(); - let mut requests = Vec::with_capacity(config.batch_size() as usize); +impl Server { + pub fn new(config: Box<ServerConfig>, online_key: OnlineKey, cert_bytes: Vec<u8>) -> Server { + let response_counter = AtomicUsize::new(0); + let keep_running = Arc::new(AtomicBool::new(true)); + + let sock_addr = config.socket_addr().expect(""); + let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); + let poll_duration = Some(Duration::from_millis(100)); + + let mut timer: Timer<()> = Timer::default(); + timer.set_timeout(config.status_interval(), ()); + + let poll = Poll::new().unwrap(); + poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge()) + .unwrap(); + poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge()) + .unwrap(); + + let mut merkle = MerkleTree::new(); + let mut requests = Vec::with_capacity(config.batch_size() as usize); + + + Server { + config, + online_key, + cert_bytes, + + response_counter, + num_bad_requests: 0, + socket, + keep_running, + poll_duration, + timer, + poll, + events: Events::with_capacity(32), + merkle, + requests, + buf: [0u8; 65_536] + } - macro_rules! check_ctrlc { - () => { - if !keep_running.load(Ordering::Acquire) { - warn!("Ctrl-C caught, exiting..."); - return; - } - }; } - loop { - check_ctrlc!(); + fn get_keep_running(&self) -> Arc<AtomicBool> { + return self.keep_running.clone() + } - poll.poll(&mut events, poll_duration).expect("poll failed"); + fn process_events(&mut self) -> bool { + self.poll.poll(&mut self.events, self.poll_duration).expect("poll failed"); - for event in events.iter() { + for event in self.events.iter() { match event.token() { MESSAGE => { let mut done = false; 'process_batch: loop { - check_ctrlc!(); + check_ctrlc!(self.keep_running); - merkle.reset(); - requests.clear(); + self.merkle.reset(); + self.requests.clear(); - let resp_start = response_counter.load(Ordering::SeqCst); + let resp_start = self.response_counter.load(Ordering::SeqCst); - for i in 0..config.batch_size() { - match socket.recv_from(&mut buf) { + for i in 0..self.config.batch_size() { + match self.socket.recv_from(&mut self.buf) { Ok((num_bytes, src_addr)) => { - if let Ok(nonce) = nonce_from_request(&buf, num_bytes) { - requests.push((Vec::from(nonce), src_addr)); - merkle.push_leaf(nonce); + if let Ok(nonce) = nonce_from_request(&self.buf, num_bytes) { + self.requests.push((Vec::from(nonce), src_addr)); + self.merkle.push_leaf(nonce); } else { - num_bad_requests += 1; + self.num_bad_requests += 1; info!( "Invalid request ({} bytes) from {} (#{} in batch, resp #{})", num_bytes, src_addr, i, resp_start + i as usize @@ -190,23 +230,23 @@ fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_byt }; } - if requests.is_empty() { + if self.requests.is_empty() { break 'process_batch; } - let merkle_root = merkle.compute_root(); - let srep = online_key.make_srep(time::get_time(), &merkle_root); + let merkle_root = self.merkle.compute_root(); + let srep = self.online_key.make_srep(time::get_time(), &merkle_root); - for (i, &(ref nonce, ref src_addr)) in requests.iter().enumerate() { - let paths = merkle.get_paths(i); + for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() { + let paths = self.merkle.get_paths(i); - let resp = make_response(&srep, cert_bytes, &paths, i as u32); + let resp = make_response(&srep, &self.cert_bytes, &paths, i as u32); let resp_bytes = resp.encode().unwrap(); - let bytes_sent = socket + let bytes_sent = self.socket .send_to(&resp_bytes, &src_addr) .expect("send_to failed"); - let num_responses = response_counter.fetch_add(1, Ordering::SeqCst); + let num_responses = self.response_counter.fetch_add(1, Ordering::SeqCst); info!( "Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})", @@ -226,16 +266,64 @@ fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_byt STATUS => { info!( "responses {}, invalid requests {}", - response_counter.load(Ordering::SeqCst), - num_bad_requests + self.response_counter.load(Ordering::SeqCst), + self.num_bad_requests ); - timer.set_timeout(config.status_interval(), ()); + self.timer.set_timeout(self.config.status_interval(), ()); } _ => unreachable!(), } } + false + } +} + +fn polling_loop(config: Box<ServerConfig>, online_key: OnlineKey, cert_bytes: Vec<u8>) -> bool { +/* let response_counter = AtomicUsize::new(0); + let keep_running = Arc::new(AtomicBool::new(true)); + let kr = keep_running.clone(); + + ctrlc::set_handler(move || kr.store(false, Ordering::Release)) + .expect("failed setting Ctrl-C handler"); + + let sock_addr = config.socket_addr().expect(""); + let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); + let poll_duration = Some(Duration::from_millis(100)); + + let mut timer: Timer<()> = Timer::default(); + timer.set_timeout(config.status_interval(), ()); + + let mut buf = [0u8; 65_536]; + let mut events = Events::with_capacity(32); + let mut num_bad_requests = 0u64; + + let poll = Poll::new().unwrap(); + poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge()) + .unwrap(); + poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge()) + .unwrap(); + + let mut merkle = MerkleTree::new(); + let mut requests = Vec::with_capacity(config.batch_size() as usize);*/ + + + let mut server = Server::new(config, online_key, cert_bytes); + let kr = server.get_keep_running(); + let kr_new = kr.clone(); + + ctrlc::set_handler(move || kr.store(false, Ordering::Release)) + .expect("failed setting Ctrl-C handler"); + + + + loop { + check_ctrlc!(kr_new); + if server.process_events() { + return true; + } + } } @@ -295,7 +383,7 @@ pub fn main() { info!("Status updates every : {} seconds", config.status_interval().as_secs()); info!("Server listening on : {}:{}", config.interface(), config.port()); - polling_loop(&config, &mut online_key, &cert_bytes); + polling_loop(config, online_key, cert_bytes); info!("Done."); process::exit(0); |