summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAaron Hill <aa1ronham@gmail.com>2018-10-13 19:51:01 -0400
committerAaron Hill <aa1ronham@gmail.com>2018-10-17 21:18:33 -0400
commitc50c8ef27467616645ddc1aa22fa56fc0439ab6b (patch)
treebbe4b01bac4759ebdc92d791864f2aa941742a15 /src
parent54955e4126a6da3aa233ccf837d7ba03f40be6d3 (diff)
downloadroughenough-c50c8ef27467616645ddc1aa22fa56fc0439ab6b.zip
Exact code into Server struct
Diffstat (limited to 'src')
-rw-r--r--src/bin/roughenough-server.rs196
1 files changed, 142 insertions, 54 deletions
diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs
index 9c00d7e..5b8b65d 100644
--- a/src/bin/roughenough-server.rs
+++ b/src/bin/roughenough-server.rs
@@ -46,6 +46,7 @@ use std::process;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
+use std::net::SocketAddr;
use mio::net::UdpSocket;
use mio::{Events, Poll, PollOpt, Ready, Token};
@@ -61,6 +62,15 @@ use roughenough::merkle::MerkleTree;
use roughenough::{Error, RtMessage, Tag};
use roughenough::{MIN_REQUEST_LENGTH, VERSION};
+macro_rules! check_ctrlc {
+ ($keep_running:expr) => {
+ if !$keep_running.load(Ordering::Acquire) {
+ warn!("Ctrl-C caught, exiting...");
+ return true;
+ }
+ }
+}
+
const MESSAGE: Token = Token(0);
const STATUS: Token = Token(1);
@@ -104,69 +114,99 @@ fn nonce_from_request(buf: &[u8], num_bytes: usize) -> Result<&[u8], Error> {
}
}
-fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_bytes: &[u8]) {
- let response_counter = AtomicUsize::new(0);
- let keep_running = Arc::new(AtomicBool::new(true));
- let kr = keep_running.clone();
- ctrlc::set_handler(move || kr.store(false, Ordering::Release))
- .expect("failed setting Ctrl-C handler");
- let sock_addr = config.socket_addr().expect("");
- let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
- let poll_duration = Some(Duration::from_millis(100));
+struct Server {
+ config: Box<ServerConfig>,
+ online_key: OnlineKey,
+ cert_bytes: Vec<u8>,
- let mut timer: Timer<()> = Timer::default();
- timer.set_timeout(config.status_interval(), ());
+ response_counter: AtomicUsize,
+ num_bad_requests: u64,
- let mut buf = [0u8; 65_536];
- let mut events = Events::with_capacity(32);
- let mut num_bad_requests = 0u64;
+ socket: UdpSocket,
+ keep_running: Arc<AtomicBool>,
+ poll_duration: Option<Duration>,
+ timer: Timer<()>,
+ poll: Poll,
+ events: Events,
+ merkle: MerkleTree,
+ requests: Vec<(Vec<u8>, SocketAddr)>,
+ buf: [u8; 65_536],
- let poll = Poll::new().unwrap();
- poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge())
- .unwrap();
- poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge())
- .unwrap();
+
+}
- let mut merkle = MerkleTree::new();
- let mut requests = Vec::with_capacity(config.batch_size() as usize);
+impl Server {
+ pub fn new(config: Box<ServerConfig>, online_key: OnlineKey, cert_bytes: Vec<u8>) -> Server {
+ let response_counter = AtomicUsize::new(0);
+ let keep_running = Arc::new(AtomicBool::new(true));
+
+ let sock_addr = config.socket_addr().expect("");
+ let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
+ let poll_duration = Some(Duration::from_millis(100));
+
+ let mut timer: Timer<()> = Timer::default();
+ timer.set_timeout(config.status_interval(), ());
+
+ let poll = Poll::new().unwrap();
+ poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge())
+ .unwrap();
+ poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ let mut merkle = MerkleTree::new();
+ let mut requests = Vec::with_capacity(config.batch_size() as usize);
+
+
+ Server {
+ config,
+ online_key,
+ cert_bytes,
+
+ response_counter,
+ num_bad_requests: 0,
+ socket,
+ keep_running,
+ poll_duration,
+ timer,
+ poll,
+ events: Events::with_capacity(32),
+ merkle,
+ requests,
+ buf: [0u8; 65_536]
+ }
- macro_rules! check_ctrlc {
- () => {
- if !keep_running.load(Ordering::Acquire) {
- warn!("Ctrl-C caught, exiting...");
- return;
- }
- };
}
- loop {
- check_ctrlc!();
+ fn get_keep_running(&self) -> Arc<AtomicBool> {
+ return self.keep_running.clone()
+ }
- poll.poll(&mut events, poll_duration).expect("poll failed");
+ fn process_events(&mut self) -> bool {
+ self.poll.poll(&mut self.events, self.poll_duration).expect("poll failed");
- for event in events.iter() {
+ for event in self.events.iter() {
match event.token() {
MESSAGE => {
let mut done = false;
'process_batch: loop {
- check_ctrlc!();
+ check_ctrlc!(self.keep_running);
- merkle.reset();
- requests.clear();
+ self.merkle.reset();
+ self.requests.clear();
- let resp_start = response_counter.load(Ordering::SeqCst);
+ let resp_start = self.response_counter.load(Ordering::SeqCst);
- for i in 0..config.batch_size() {
- match socket.recv_from(&mut buf) {
+ for i in 0..self.config.batch_size() {
+ match self.socket.recv_from(&mut self.buf) {
Ok((num_bytes, src_addr)) => {
- if let Ok(nonce) = nonce_from_request(&buf, num_bytes) {
- requests.push((Vec::from(nonce), src_addr));
- merkle.push_leaf(nonce);
+ if let Ok(nonce) = nonce_from_request(&self.buf, num_bytes) {
+ self.requests.push((Vec::from(nonce), src_addr));
+ self.merkle.push_leaf(nonce);
} else {
- num_bad_requests += 1;
+ self.num_bad_requests += 1;
info!(
"Invalid request ({} bytes) from {} (#{} in batch, resp #{})",
num_bytes, src_addr, i, resp_start + i as usize
@@ -190,23 +230,23 @@ fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_byt
};
}
- if requests.is_empty() {
+ if self.requests.is_empty() {
break 'process_batch;
}
- let merkle_root = merkle.compute_root();
- let srep = online_key.make_srep(time::get_time(), &merkle_root);
+ let merkle_root = self.merkle.compute_root();
+ let srep = self.online_key.make_srep(time::get_time(), &merkle_root);
- for (i, &(ref nonce, ref src_addr)) in requests.iter().enumerate() {
- let paths = merkle.get_paths(i);
+ for (i, &(ref nonce, ref src_addr)) in self.requests.iter().enumerate() {
+ let paths = self.merkle.get_paths(i);
- let resp = make_response(&srep, cert_bytes, &paths, i as u32);
+ let resp = make_response(&srep, &self.cert_bytes, &paths, i as u32);
let resp_bytes = resp.encode().unwrap();
- let bytes_sent = socket
+ let bytes_sent = self.socket
.send_to(&resp_bytes, &src_addr)
.expect("send_to failed");
- let num_responses = response_counter.fetch_add(1, Ordering::SeqCst);
+ let num_responses = self.response_counter.fetch_add(1, Ordering::SeqCst);
info!(
"Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
@@ -226,16 +266,64 @@ fn polling_loop(config: &Box<ServerConfig>, online_key: &mut OnlineKey, cert_byt
STATUS => {
info!(
"responses {}, invalid requests {}",
- response_counter.load(Ordering::SeqCst),
- num_bad_requests
+ self.response_counter.load(Ordering::SeqCst),
+ self.num_bad_requests
);
- timer.set_timeout(config.status_interval(), ());
+ self.timer.set_timeout(self.config.status_interval(), ());
}
_ => unreachable!(),
}
}
+ false
+ }
+}
+
+fn polling_loop(config: Box<ServerConfig>, online_key: OnlineKey, cert_bytes: Vec<u8>) -> bool {
+/* let response_counter = AtomicUsize::new(0);
+ let keep_running = Arc::new(AtomicBool::new(true));
+ let kr = keep_running.clone();
+
+ ctrlc::set_handler(move || kr.store(false, Ordering::Release))
+ .expect("failed setting Ctrl-C handler");
+
+ let sock_addr = config.socket_addr().expect("");
+ let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
+ let poll_duration = Some(Duration::from_millis(100));
+
+ let mut timer: Timer<()> = Timer::default();
+ timer.set_timeout(config.status_interval(), ());
+
+ let mut buf = [0u8; 65_536];
+ let mut events = Events::with_capacity(32);
+ let mut num_bad_requests = 0u64;
+
+ let poll = Poll::new().unwrap();
+ poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge())
+ .unwrap();
+ poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ let mut merkle = MerkleTree::new();
+ let mut requests = Vec::with_capacity(config.batch_size() as usize);*/
+
+
+ let mut server = Server::new(config, online_key, cert_bytes);
+ let kr = server.get_keep_running();
+ let kr_new = kr.clone();
+
+ ctrlc::set_handler(move || kr.store(false, Ordering::Release))
+ .expect("failed setting Ctrl-C handler");
+
+
+
+ loop {
+ check_ctrlc!(kr_new);
+ if server.process_events() {
+ return true;
+ }
+
}
}
@@ -295,7 +383,7 @@ pub fn main() {
info!("Status updates every : {} seconds", config.status_interval().as_secs());
info!("Server listening on : {}:{}", config.interface(), config.port());
- polling_loop(&config, &mut online_key, &cert_bytes);
+ polling_loop(config, online_key, cert_bytes);
info!("Done.");
process::exit(0);