From 5ee9dee6f166354b5120e365705cd3a87d1ac5c4 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Fri, 5 Apr 2024 15:23:13 +1300 Subject: [PATCH] Count runtime and inbound/outbound traffic bytes --- chorus-bin/src/globals.rs | 9 +++++++- chorus-bin/src/main.rs | 19 ++++++++++++++-- chorus-bin/src/tls.rs | 46 +++++++++++++++++++++++++++++++++++---- 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/chorus-bin/src/globals.rs b/chorus-bin/src/globals.rs index 22b22b2..357586e 100644 --- a/chorus-bin/src/globals.rs +++ b/chorus-bin/src/globals.rs @@ -3,12 +3,16 @@ use chorus_lib::store::Store; use hyper::server::conn::Http; use lazy_static::lazy_static; use parking_lot::RwLock; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicU64, AtomicUsize}; use std::sync::OnceLock; +use std::time::Instant; use tokio::sync::broadcast::Sender as BroadcastSender; use tokio::sync::watch::Sender as WatchSender; pub struct Globals { + pub start_time: Instant, + pub bytes_inbound: AtomicU64, + pub bytes_outbound: AtomicU64, pub config: RwLock, pub store: OnceLock, pub http_server: Http, @@ -34,6 +38,9 @@ lazy_static! { let (shutting_down, _) = tokio::sync::watch::channel(false); Globals { + start_time: Instant::now(), + bytes_inbound: AtomicU64::new(0), + bytes_outbound: AtomicU64::new(0), config: RwLock::new(Default::default()), store: OnceLock::new(), http_server, diff --git a/chorus-bin/src/main.rs b/chorus-bin/src/main.rs index 950a98b..3b8f9e8 100644 --- a/chorus-bin/src/main.rs +++ b/chorus-bin/src/main.rs @@ -197,6 +197,22 @@ async fn main() -> Result<(), Error> { log::info!(target: "Server", "Syncing and shutting down."); let _ = GLOBALS.store.get().unwrap().sync(); + let mut runtime: u64 = GLOBALS.start_time.elapsed().as_secs(); + if runtime < 1 { + runtime = 1; + } + log::info!("Runtime: {} seconds", runtime); + log::info!( + "Inbound: {} bytes ({} B/s)", + GLOBALS.bytes_inbound.load(Ordering::Relaxed), + (GLOBALS.bytes_inbound.load(Ordering::Relaxed) as f32) / (runtime as f32) + ); + log::info!( + "Outbound: {} bytes ({} B/s)", + GLOBALS.bytes_outbound.load(Ordering::Relaxed), + (GLOBALS.bytes_outbound.load(Ordering::Relaxed) as f32) / (runtime as f32) + ); + Ok(()) } @@ -316,8 +332,7 @@ async fn handle_http_request( ua ); - // Everybody gets a 4-second ban on disconnect to prevent - // rapid reconnection + // Everybody gets a ban on disconnect to prevent rapid reconnection let mut session_exit: SessionExit = SessionExit::Ok; let mut msg = "Closed"; diff --git a/chorus-bin/src/tls.rs b/chorus-bin/src/tls.rs index 15d3d7c..b170268 100644 --- a/chorus-bin/src/tls.rs +++ b/chorus-bin/src/tls.rs @@ -1,9 +1,11 @@ +use crate::globals::GLOBALS; use chorus_lib::config::Config; use chorus_lib::error::{ChorusError, Error}; use rustls::{Certificate, PrivateKey}; use std::fs::File; use std::io::BufReader; use std::pin::Pin; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -53,8 +55,32 @@ impl AsyncRead for MaybeTlsStream { buf: &mut ReadBuf<'_>, ) -> Poll> { match self.get_mut() { - MaybeTlsStream::Plain(ref mut s) => Pin::new(s).poll_read(cx, buf), - MaybeTlsStream::Rustls(s) => Pin::new(s).poll_read(cx, buf), + MaybeTlsStream::Plain(ref mut s) => { + // Count bytes for statistics + let pre = buf.filled().len(); + let result = Pin::new(s).poll_read(cx, buf); + let post = buf.filled().len(); + let count = post - pre; + if count > 0 { + let _ = GLOBALS + .bytes_inbound + .fetch_add(count as u64, Ordering::SeqCst); + } + result + } + MaybeTlsStream::Rustls(s) => { + // Count bytes for statistics + let pre = buf.filled().len(); + let result = Pin::new(s).poll_read(cx, buf); + let post = buf.filled().len(); + let count = post - pre; + if count > 0 { + let _ = GLOBALS + .bytes_inbound + .fetch_add(count as u64, Ordering::SeqCst); + } + result + } } } } @@ -66,8 +92,20 @@ impl AsyncWrite for MaybeTlsStream { buf: &[u8], ) -> Poll> { match self.get_mut() { - MaybeTlsStream::Plain(ref mut s) => Pin::new(s).poll_write(cx, buf), - MaybeTlsStream::Rustls(s) => Pin::new(s).poll_write(cx, buf), + MaybeTlsStream::Plain(ref mut s) => { + // Count bytes for statistics + let _ = GLOBALS + .bytes_outbound + .fetch_add(buf.len() as u64, Ordering::SeqCst); + Pin::new(s).poll_write(cx, buf) + } + MaybeTlsStream::Rustls(s) => { + // Count bytes for statistics + let _ = GLOBALS + .bytes_outbound + .fetch_add(buf.len() as u64, Ordering::SeqCst); + Pin::new(s).poll_write(cx, buf) + } } }