summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStuart Stock <stuart@int08h.com>2019-01-21 20:45:46 -0600
committerStuart Stock <stuart@int08h.com>2019-01-21 20:45:46 -0600
commita3394f33f0dcc723a4ef3e2fe4667eaecdede653 (patch)
tree6df07e37a7716acd804c716cc435d28ee520e9b9 /src
parenta84573f053e4ab36dda3b82b8811bc20dd991844 (diff)
downloadroughenough-a3394f33f0dcc723a4ef3e2fe4667eaecdede653.zip
Initial add of simple stats collector
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs1
-rw-r--r--src/server.rs25
-rw-r--r--src/stats/mod.rs282
3 files changed, 297 insertions, 11 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 6e7a88a..211f182 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -68,6 +68,7 @@ pub mod kms;
pub mod merkle;
pub mod server;
pub mod sign;
+pub mod stats;
pub use crate::error::Error;
pub use crate::message::RtMessage;
diff --git a/src/server.rs b/src/server.rs
index 87a4a72..b23404d 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -38,6 +38,7 @@ use crate::key::{LongTermKey, OnlineKey};
use crate::kms;
use crate::merkle::MerkleTree;
use crate::{Error, RtMessage, Tag, MIN_REQUEST_LENGTH};
+use crate::stats::{ClientStats, SimpleStats};
macro_rules! check_ctrlc {
($keep_running:expr) => {
@@ -71,9 +72,6 @@ pub struct Server {
online_key: OnlineKey,
cert_bytes: Vec<u8>,
- response_counter: u64,
- num_bad_requests: u64,
-
socket: UdpSocket,
health_listener: Option<TcpListener>,
keep_running: Arc<AtomicBool>,
@@ -89,6 +87,8 @@ pub struct Server {
// Used to send requests to ourselves in fuzzing mode
#[cfg(fuzzing)]
fake_client_socket: UdpSocket,
+
+ stats: SimpleStats,
}
impl Server {
@@ -154,8 +154,6 @@ impl Server {
online_key,
cert_bytes,
- response_counter: 0,
- num_bad_requests: 0,
socket,
health_listener,
@@ -171,6 +169,8 @@ impl Server {
#[cfg(fuzzing)]
fake_client_socket: UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(),
+
+ stats: SimpleStats::new(),
}
}
@@ -242,11 +242,12 @@ impl Server {
match listener.accept() {
Ok((ref mut stream, src_addr)) => {
info!("health check from {}", src_addr);
+ self.stats.add_health_check(&src_addr.ip());
match stream.write(HTTP_RESPONSE.as_bytes()) {
Ok(_) => (),
Err(e) => warn!("error writing health check {}", e),
- }
+ };
match stream.shutdown(Shutdown::Both) {
Ok(_) => (),
@@ -265,7 +266,7 @@ impl Server {
STATUS => {
info!(
"responses {}, invalid requests {}",
- self.response_counter, self.num_bad_requests
+ self.stats.total_responses_sent(), self.stats.total_invalid_requests()
);
self.timer.set_timeout(self.config.status_interval(), ());
@@ -285,15 +286,17 @@ impl Server {
Ok((num_bytes, src_addr)) => {
match self.nonce_from_request(&self.buf, num_bytes) {
Ok(nonce) => {
+ self.stats.add_valid_request(&src_addr.ip());
self.requests.push((Vec::from(nonce), src_addr));
self.merkle.push_leaf(nonce);
}
Err(e) => {
- self.num_bad_requests += 1;
+ self.stats.add_invalid_request(&src_addr.ip());
info!(
"Invalid request: '{:?}' ({} bytes) from {} (#{} in batch, resp #{})",
- e, num_bytes, src_addr, i, self.response_counter + i as u64
+ e, num_bytes, src_addr, i,
+ self.stats.total_responses_sent() + u64::from(i)
);
}
}
@@ -350,7 +353,7 @@ impl Server {
.send_to(&resp_bytes, &src_addr)
.expect("send_to failed");
- self.response_counter += 1;
+ self.stats.add_response(&src_addr.ip(), bytes_sent);
info!(
"Responded {} bytes to {} for '{}..' (#{} in batch, resp #{})",
@@ -358,7 +361,7 @@ impl Server {
src_addr,
hex::encode(&nonce[0..4]),
i,
- self.response_counter
+ self.stats.total_responses_sent()
);
}
}
diff --git a/src/stats/mod.rs b/src/stats/mod.rs
new file mode 100644
index 0000000..126b88b
--- /dev/null
+++ b/src/stats/mod.rs
@@ -0,0 +1,282 @@
+// Copyright 2017-2019 int08h LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use hashbrown::HashMap;
+use std::net::IpAddr;
+
+/// Maximum number of stats entries to maintain.
+const MAX_CLIENTS: usize = 1_000_000;
+
+/// Track number and sizes of requests and responses per client.
+pub trait ClientStats {
+ fn add_valid_request(&mut self, addr: &IpAddr);
+
+ fn add_invalid_request(&mut self, addr: &IpAddr);
+
+ fn add_health_check(&mut self, addr: &IpAddr);
+
+ fn add_response(&mut self, addr: &IpAddr, bytes_sent: usize);
+
+ fn total_valid_requests(&self) -> u64;
+
+ fn total_invalid_requests(&self) -> u64;
+
+ fn total_health_checks(&self) -> u64;
+
+ fn total_responses_sent(&self) -> u64;
+
+ fn total_bytes_sent(&self) -> usize;
+
+ fn total_unique_clients(&self) -> u64;
+
+ fn get_stats(&self, addr: &IpAddr) -> Option<&StatEntry>;
+
+ fn clear(&mut self);
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct StatEntry {
+ pub valid_requests: u64,
+ pub invalid_requests: u64,
+ pub health_checks: u64,
+ pub responses_sent: u64,
+ pub bytes_sent: usize,
+}
+
+impl StatEntry {
+ fn new() -> Self {
+ StatEntry {
+ valid_requests: 0,
+ invalid_requests: 0,
+ health_checks: 0,
+ responses_sent: 0,
+ bytes_sent: 0,
+ }
+ }
+}
+
+///
+/// Straight forward implementation of `ClientStats` backed by a hashmap.
+///
+/// Maintains a maximum of `MAX_CLIENTS` unique entries to bound memory use. Excess entries
+/// beyond `MAX_CLIENTS` are ignored and `num_overflows` is incremented.
+///
+pub struct SimpleStats {
+ clients: HashMap<IpAddr, StatEntry>,
+ num_overflows: u64,
+ max_clients: usize,
+}
+
+impl SimpleStats {
+ pub fn new() -> Self {
+ SimpleStats {
+ clients: HashMap::with_capacity(128),
+ num_overflows: 0,
+ max_clients: MAX_CLIENTS
+ }
+ }
+
+ // visible for testing
+ #[cfg(test)]
+ fn with_limits(limit: usize) -> Self {
+ SimpleStats {
+ clients: HashMap::with_capacity(128),
+ num_overflows: 0,
+ max_clients: limit
+ }
+ }
+
+ #[inline]
+ fn too_many_entries(&mut self) -> bool {
+ let too_big = self.clients.len() >= self.max_clients;
+
+ if too_big {
+ self.num_overflows += 1;
+ }
+
+ return too_big;
+ }
+
+ pub fn num_overflows(&self) -> u64 {
+ self.num_overflows
+ }
+}
+
+impl ClientStats for SimpleStats {
+ fn add_valid_request(&mut self, addr: &IpAddr) {
+ if self.too_many_entries() {
+ return;
+ }
+ self.clients
+ .entry(*addr)
+ .or_insert_with(StatEntry::new)
+ .valid_requests += 1;
+ }
+
+ fn add_invalid_request(&mut self, addr: &IpAddr) {
+ if self.too_many_entries() {
+ return;
+ }
+ self.clients
+ .entry(*addr)
+ .or_insert_with(StatEntry::new)
+ .invalid_requests += 1;
+ }
+
+ fn add_health_check(&mut self, addr: &IpAddr) {
+ if self.too_many_entries() {
+ return;
+ }
+ self.clients
+ .entry(*addr)
+ .or_insert_with(StatEntry::new)
+ .health_checks += 1;
+ }
+
+ fn add_response(&mut self, addr: &IpAddr, bytes_sent: usize) {
+ if self.too_many_entries() {
+ return;
+ }
+ let entry = self.clients
+ .entry(*addr)
+ .or_insert_with(StatEntry::new);
+
+ entry.responses_sent += 1;
+ entry.bytes_sent += bytes_sent;
+ }
+
+ fn total_valid_requests(&self) -> u64 {
+ self.clients
+ .values()
+ .map(|&v| v.valid_requests)
+ .sum()
+ }
+
+ fn total_invalid_requests(&self) -> u64 {
+ self.clients
+ .values()
+ .map(|&v| v.invalid_requests)
+ .sum()
+ }
+
+ fn total_health_checks(&self) -> u64 {
+ self.clients
+ .values()
+ .map(|&v| v.health_checks)
+ .sum()
+ }
+
+ fn total_responses_sent(&self) -> u64 {
+ self.clients
+ .values()
+ .map(|&v| v.responses_sent)
+ .sum()
+ }
+
+ fn total_bytes_sent(&self) -> usize {
+ self.clients
+ .values()
+ .map(|&v| v.bytes_sent)
+ .sum()
+ }
+
+ fn total_unique_clients(&self) -> u64 {
+ self.clients.len() as u64
+ }
+
+ fn get_stats(&self, addr: &IpAddr) -> Option<&StatEntry> {
+ self.clients.iter()
+
+ self.clients.get(addr)
+ }
+
+ fn clear(&mut self) {
+ self.clients.clear();
+ self.num_overflows = 0;
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::stats::{ClientStats, SimpleStats};
+ use std::net::{IpAddr, Ipv4Addr};
+
+ #[test]
+ fn simple_stats_starts_empty() {
+ let stats = SimpleStats::new();
+
+ assert_eq!(stats.total_valid_requests(), 0);
+ assert_eq!(stats.total_invalid_requests(), 0);
+ assert_eq!(stats.total_health_checks(), 0);
+ assert_eq!(stats.total_responses_sent(), 0);
+ assert_eq!(stats.total_bytes_sent(), 0);
+ assert_eq!(stats.total_unique_clients(), 0);
+ assert_eq!(stats.num_overflows(), 0);
+ }
+
+ #[test]
+ fn client_requests_are_tracked() {
+ let mut stats = SimpleStats::new();
+
+ let ip1 = "127.0.0.1".parse().unwrap();
+ let ip2 = "127.0.0.2".parse().unwrap();
+ let ip3 = "127.0.0.3".parse().unwrap();
+
+ stats.add_valid_request(&ip1);
+ stats.add_valid_request(&ip2);
+ stats.add_valid_request(&ip3);
+ assert_eq!(stats.total_valid_requests(), 3);
+
+ stats.add_invalid_request(&ip2);
+ assert_eq!(stats.total_invalid_requests(), 1);
+
+ stats.add_response(&ip2, 8192);
+ assert_eq!(stats.total_bytes_sent(), 8192);
+
+ assert_eq!(stats.total_unique_clients(), 3);
+ }
+
+ #[test]
+ fn per_client_stats() {
+ let mut stats = SimpleStats::new();
+ let ip = "127.0.0.3".parse().unwrap();
+
+ stats.add_valid_request(&ip);
+ stats.add_response(&ip, 2048);
+ stats.add_response(&ip, 1024);
+
+ let entry = stats.get_stats(&ip).unwrap();
+ assert_eq!(entry.valid_requests, 1);
+ assert_eq!(entry.invalid_requests, 0);
+ assert_eq!(entry.responses_sent, 2);
+ assert_eq!(entry.bytes_sent, 3072);
+ }
+
+ #[test]
+ fn overflow_max_entries() {
+ let mut stats = SimpleStats::with_limits(100);
+
+ for i in 0..201 {
+ let ipv4 = Ipv4Addr::from(i as u32);
+ let addr = IpAddr::from(ipv4);
+
+ stats.add_valid_request(&addr);
+ };
+
+ assert_eq!(stats.total_unique_clients(), 100);
+ assert_eq!(stats.num_overflows(), 101);
+ }
+}
+
+