Count runtime and inbound/outbound traffic bytes

This commit is contained in:
Mike Dilger 2024-04-05 15:23:13 +13:00
parent 5daff6fac5
commit 5ee9dee6f1
3 changed files with 67 additions and 7 deletions

View File

@ -3,12 +3,16 @@ use chorus_lib::store::Store;
use hyper::server::conn::Http; use hyper::server::conn::Http;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use parking_lot::RwLock; use parking_lot::RwLock;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::sync::OnceLock; use std::sync::OnceLock;
use std::time::Instant;
use tokio::sync::broadcast::Sender as BroadcastSender; use tokio::sync::broadcast::Sender as BroadcastSender;
use tokio::sync::watch::Sender as WatchSender; use tokio::sync::watch::Sender as WatchSender;
pub struct Globals { pub struct Globals {
pub start_time: Instant,
pub bytes_inbound: AtomicU64,
pub bytes_outbound: AtomicU64,
pub config: RwLock<Config>, pub config: RwLock<Config>,
pub store: OnceLock<Store>, pub store: OnceLock<Store>,
pub http_server: Http, pub http_server: Http,
@ -34,6 +38,9 @@ lazy_static! {
let (shutting_down, _) = tokio::sync::watch::channel(false); let (shutting_down, _) = tokio::sync::watch::channel(false);
Globals { Globals {
start_time: Instant::now(),
bytes_inbound: AtomicU64::new(0),
bytes_outbound: AtomicU64::new(0),
config: RwLock::new(Default::default()), config: RwLock::new(Default::default()),
store: OnceLock::new(), store: OnceLock::new(),
http_server, http_server,

View File

@ -197,6 +197,22 @@ async fn main() -> Result<(), Error> {
log::info!(target: "Server", "Syncing and shutting down."); log::info!(target: "Server", "Syncing and shutting down.");
let _ = GLOBALS.store.get().unwrap().sync(); let _ = GLOBALS.store.get().unwrap().sync();
let mut runtime: u64 = GLOBALS.start_time.elapsed().as_secs();
if runtime < 1 {
runtime = 1;
}
log::info!("Runtime: {} seconds", runtime);
log::info!(
"Inbound: {} bytes ({} B/s)",
GLOBALS.bytes_inbound.load(Ordering::Relaxed),
(GLOBALS.bytes_inbound.load(Ordering::Relaxed) as f32) / (runtime as f32)
);
log::info!(
"Outbound: {} bytes ({} B/s)",
GLOBALS.bytes_outbound.load(Ordering::Relaxed),
(GLOBALS.bytes_outbound.load(Ordering::Relaxed) as f32) / (runtime as f32)
);
Ok(()) Ok(())
} }
@ -316,8 +332,7 @@ async fn handle_http_request(
ua ua
); );
// Everybody gets a 4-second ban on disconnect to prevent // Everybody gets a ban on disconnect to prevent rapid reconnection
// rapid reconnection
let mut session_exit: SessionExit = SessionExit::Ok; let mut session_exit: SessionExit = SessionExit::Ok;
let mut msg = "Closed"; let mut msg = "Closed";

View File

@ -1,9 +1,11 @@
use crate::globals::GLOBALS;
use chorus_lib::config::Config; use chorus_lib::config::Config;
use chorus_lib::error::{ChorusError, Error}; use chorus_lib::error::{ChorusError, Error};
use rustls::{Certificate, PrivateKey}; use rustls::{Certificate, PrivateKey};
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
@ -53,8 +55,32 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for MaybeTlsStream<S> {
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
match self.get_mut() { match self.get_mut() {
MaybeTlsStream::Plain(ref mut s) => Pin::new(s).poll_read(cx, buf), MaybeTlsStream::Plain(ref mut s) => {
MaybeTlsStream::Rustls(s) => Pin::new(s).poll_read(cx, buf), // Count bytes for statistics
let pre = buf.filled().len();
let result = Pin::new(s).poll_read(cx, buf);
let post = buf.filled().len();
let count = post - pre;
if count > 0 {
let _ = GLOBALS
.bytes_inbound
.fetch_add(count as u64, Ordering::SeqCst);
}
result
}
MaybeTlsStream::Rustls(s) => {
// Count bytes for statistics
let pre = buf.filled().len();
let result = Pin::new(s).poll_read(cx, buf);
let post = buf.filled().len();
let count = post - pre;
if count > 0 {
let _ = GLOBALS
.bytes_inbound
.fetch_add(count as u64, Ordering::SeqCst);
}
result
}
} }
} }
} }
@ -66,8 +92,20 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
buf: &[u8], buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> { ) -> Poll<Result<usize, std::io::Error>> {
match self.get_mut() { match self.get_mut() {
MaybeTlsStream::Plain(ref mut s) => Pin::new(s).poll_write(cx, buf), MaybeTlsStream::Plain(ref mut s) => {
MaybeTlsStream::Rustls(s) => Pin::new(s).poll_write(cx, buf), // Count bytes for statistics
let _ = GLOBALS
.bytes_outbound
.fetch_add(buf.len() as u64, Ordering::SeqCst);
Pin::new(s).poll_write(cx, buf)
}
MaybeTlsStream::Rustls(s) => {
// Count bytes for statistics
let _ = GLOBALS
.bytes_outbound
.fetch_add(buf.len() as u64, Ordering::SeqCst);
Pin::new(s).poll_write(cx, buf)
}
} }
} }