From dd5e3d735dd5b0e1208e6ecfddf9673aa7b4c9a9 Mon Sep 17 00:00:00 2001 From: Stuart Stock Date: Sat, 20 Jun 2020 11:19:19 -0700 Subject: refactor: extract health check and status update methods out of event loop --- src/server.rs | 120 +++++++++++++++++++++++++++++----------------------------- 1 file changed, 60 insertions(+), 60 deletions(-) diff --git a/src/server.rs b/src/server.rs index 6e549b0..438311d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -52,9 +52,9 @@ macro_rules! check_ctrlc { } // mio event registrations -const MESSAGE: Token = Token(0); -const STATUS: Token = Token(1); -const HEALTH_CHECK: Token = Token(2); +const EVT_MESSAGE: Token = Token(0); +const EVT_STATUS_UPDATE: Token = Token(1); +const EVT_HEALTH_CHECK: Token = Token(2); // Canned response to health check request const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: close\n\n"; @@ -128,9 +128,9 @@ impl Server { timer.set_timeout(config.status_interval(), ()); let poll = Poll::new().unwrap(); - poll.register(&socket, MESSAGE, Ready::readable(), PollOpt::edge()) + poll.register(&socket, EVT_MESSAGE, Ready::readable(), PollOpt::edge()) .unwrap(); - poll.register(&timer, STATUS, Ready::readable(), PollOpt::edge()) + poll.register(&timer, EVT_STATUS_UPDATE, Ready::readable(), PollOpt::edge()) .unwrap(); let health_listener = if let Some(hc_port) = config.health_check_port() { @@ -141,7 +141,7 @@ impl Server { let tcp_listener = TcpListener::bind(&hc_sock_addr) .expect("failed to bind TCP listener for health check"); - poll.register(&tcp_listener, HEALTH_CHECK, Ready::readable(), PollOpt::edge()) + poll.register(&tcp_listener, EVT_HEALTH_CHECK, Ready::readable(), PollOpt::edge()) .unwrap(); Some(tcp_listener) @@ -224,7 +224,7 @@ impl Server { for msg in events.iter() { match msg.token() { - MESSAGE => { + EVT_MESSAGE => { loop { check_ctrlc!(self.keep_running); @@ -243,59 +243,9 @@ impl Server { break; } } - } - - HEALTH_CHECK => { - let listener = self.health_listener.as_ref().unwrap(); - - match listener.accept() { - Ok((ref mut stream, src_addr)) => { - info!("health check from {}", src_addr); - self.stats.add_health_check(&src_addr.ip()); - - match stream.write(HTTP_RESPONSE.as_bytes()) { - Ok(_) => (), - Err(e) => warn!("error writing health check {}", e), - }; - - match stream.shutdown(Shutdown::Both) { - Ok(_) => (), - Err(e) => warn!("error in health check socket shutdown {}", e), - } - } - Err(ref e) if e.kind() == ErrorKind::WouldBlock => { - debug!("blocking in TCP health check"); - } - Err(e) => { - warn!("unexpected health check error {}", e); - } - } - } - - STATUS => { - let mut vec: Vec<(&IpAddr, &ClientStatEntry)> = self.stats.iter().collect(); - vec.sort_by(|lhs, rhs| lhs.1.valid_requests.cmp(&rhs.1.valid_requests)); - - for (addr, counts) in vec { - info!( - "{:16}: {} valid, {} invalid requests; {} responses ({} sent)", - format!("{}", addr), counts.valid_requests, counts.invalid_requests, - counts.responses_sent, - counts.bytes_sent.file_size(fsopts::BINARY).unwrap() - ); - } - - info!( - "Totals: {} unique clients; {} valid, {} invalid requests; {} responses ({} sent)", - self.stats.total_unique_clients(), - self.stats.total_valid_requests(), self.stats.total_invalid_requests(), - self.stats.total_responses_sent(), - self.stats.total_bytes_sent().file_size(fsopts::BINARY).unwrap() - ); - - self.timer.set_timeout(self.config.status_interval(), ()); - } - + }, + EVT_HEALTH_CHECK => self.handle_health_check(), + EVT_STATUS_UPDATE => self.handle_status_update(), _ => unreachable!(), } } @@ -411,4 +361,54 @@ impl Server { response } + + fn handle_health_check(&mut self) { + let listener = self.health_listener.as_ref().unwrap(); + match listener.accept() { + Ok((ref mut stream, src_addr)) => { + info!("health check from {}", src_addr); + self.stats.add_health_check(&src_addr.ip()); + + match stream.write(HTTP_RESPONSE.as_bytes()) { + Ok(_) => (), + Err(e) => warn!("error writing health check {}", e), + }; + + match stream.shutdown(Shutdown::Both) { + Ok(_) => (), + Err(e) => warn!("error in health check socket shutdown {}", e), + } + } + Err(ref e) if e.kind() == ErrorKind::WouldBlock => { + debug!("blocking in TCP health check"); + } + Err(e) => { + warn!("unexpected health check error {}", e); + } + } + } + + fn handle_status_update(&mut self) { + let mut vec: Vec<(&IpAddr, &ClientStatEntry)> = self.stats.iter().collect(); + vec.sort_by(|lhs, rhs| lhs.1.valid_requests.cmp(&rhs.1.valid_requests)); + + for (addr, counts) in vec { + info!( + "{:16}: {} valid, {} invalid requests; {} responses ({} sent)", + format!("{}", addr), counts.valid_requests, counts.invalid_requests, + counts.responses_sent, + counts.bytes_sent.file_size(fsopts::BINARY).unwrap() + ); + } + + info!( + "Totals: {} unique clients; {} valid, {} invalid requests; {} responses ({} sent)", + self.stats.total_unique_clients(), + self.stats.total_valid_requests(), self.stats.total_invalid_requests(), + self.stats.total_responses_sent(), + self.stats.total_bytes_sent().file_size(fsopts::BINARY).unwrap() + ); + + self.timer.set_timeout(self.config.status_interval(), ()); + } } -- cgit v1.2.3