diff --git a/chorus-bin/src/globals.rs b/chorus-bin/src/globals.rs index e2f7fda..ad43e69 100644 --- a/chorus-bin/src/globals.rs +++ b/chorus-bin/src/globals.rs @@ -1,10 +1,7 @@ use chorus_lib::config::Config; -use chorus_lib::ip::{Ban, IpData}; use chorus_lib::store::Store; -use dashmap::DashMap; use hyper::server::conn::Http; use lazy_static::lazy_static; -use std::net::IpAddr; use std::sync::atomic::AtomicUsize; use std::sync::OnceLock; use tokio::sync::broadcast::Sender as BroadcastSender; @@ -24,8 +21,6 @@ pub struct Globals { pub num_clients: AtomicUsize, pub shutting_down: WatchSender, - - pub ip_data: DashMap, } lazy_static! { @@ -45,20 +40,6 @@ lazy_static! { new_events, num_clients: AtomicUsize::new(0), shutting_down, - ip_data: DashMap::new(), } }; } - -impl Globals { - pub fn ban(ipaddr: std::net::IpAddr, bankind: Ban) -> u64 { - if let Some(mut ipdata) = GLOBALS.ip_data.get_mut(&ipaddr) { - ipdata.ban(bankind) - } else { - let mut ipdata = IpData::new(); - let seconds = ipdata.ban(bankind); - GLOBALS.ip_data.insert(ipaddr, ipdata); - seconds - } - } -} diff --git a/chorus-bin/src/main.rs b/chorus-bin/src/main.rs index 1e5707f..42e46a2 100644 --- a/chorus-bin/src/main.rs +++ b/chorus-bin/src/main.rs @@ -3,14 +3,14 @@ pub mod nostr; pub mod tls; pub mod web; -use crate::globals::{Globals, GLOBALS}; +use crate::globals::GLOBALS; use crate::tls::MaybeTlsStream; use chorus_lib::config::{Config, FriendlyConfig}; use chorus_lib::error::{ChorusError, Error}; -use chorus_lib::ip::Ban; +use chorus_lib::ip::SessionExit; use chorus_lib::reply::NostrReply; use chorus_lib::store::Store; -use chorus_lib::types::{OwnedFilter, Pubkey, Time}; +use chorus_lib::types::{OwnedFilter, Pubkey}; use futures::{sink::SinkExt, stream::StreamExt}; use hyper::service::Service; use hyper::upgrade::Upgraded; @@ -107,14 +107,12 @@ async fn main() -> Result<(), Error> { let (tcp_stream, peer_addr) = v?; let ipaddr = peer_addr.ip(); - if let Some(ip_data) = GLOBALS.ip_data.get(&ipaddr) { - let now = Time::now(); - if ip_data.ban_until > now { - log::debug!(target: "Client", - "{peer_addr}: Blocking reconnection until {}", - ip_data.ban_until); - continue; - } + let ip_data = GLOBALS.store.get().unwrap().get_ip_data(ipaddr)?; + if ip_data.is_banned() { + log::debug!(target: "Client", + "{peer_addr}: Blocking reconnection until {}", + ip_data.ban_until); + continue; } if let Some(tls_acceptor) = &maybe_tls_acceptor { @@ -300,7 +298,7 @@ async fn handle_http_request( // Everybody gets a 4-second ban on disconnect to prevent // rapid reconnection - let mut bankind: Ban = Ban::General; + let mut session_exit: SessionExit = SessionExit::Ok; let mut msg = "Closed"; // Handle the websocket @@ -310,20 +308,20 @@ async fn handle_http_request( tungstenite::error::ProtocolError::ResetWithoutClosingHandshake, )) => { // So they disconnected ungracefully. - // No big deal, no extra ban for that. + // No big deal, still SessionExit::Ok msg = "Reset"; } - ChorusError::TooManyErrors => { - bankind = Ban::TooManyErrors; - msg = "Errored Out (too many)"; + ChorusError::ErrorClose => { + session_exit = SessionExit::TooManyErrors; + msg = "Errored Out"; } ChorusError::TimedOut => { - bankind = Ban::Timeout; + session_exit = SessionExit::Timeout; msg = "Timed Out (with no subscriptions)"; } _ => { log::error!(target: "Client", "{}: {}", peer, e); - bankind = Ban::ErrorExit; + session_exit = SessionExit::ErrorExit; msg = "Error Exited"; } } @@ -332,8 +330,16 @@ async fn handle_http_request( // Decrement count of active websockets let old_num_websockets = GLOBALS.num_clients.fetch_sub(1, Ordering::SeqCst); - // Ban for the appropriate duration - let ban_seconds = Globals::ban(peer.ip(), bankind); + // Update ip data (including ban time) + let mut ban_seconds = 0; + if let Ok(mut ip_data) = GLOBALS.store.get().unwrap().get_ip_data(peer.ip()) { + ban_seconds = ip_data.update_on_session_close(session_exit); + let _ = GLOBALS + .store + .get() + .unwrap() + .update_ip_data(peer.ip(), &ip_data); + } // we cheat somewhat and log these websocket open and close messages // as server messages @@ -488,10 +494,10 @@ impl WebSocketService { let reply = NostrReply::Notice(format!("error: {}", e)); self.websocket.send(Message::text(reply.as_json())).await?; } - if self.error_punishment > 1.0 { - let reply = NostrReply::Notice("Too many errors".into()); + if self.error_punishment >= 1.0 { + let reply = NostrReply::Notice("Closing due to error(s)".into()); self.websocket.send(Message::text(reply.as_json())).await?; - return Err(ChorusError::TooManyErrors.into()); + return Err(ChorusError::ErrorClose.into()); } } } diff --git a/chorus-lib/src/error.rs b/chorus-lib/src/error.rs index 9746101..e6faf82 100644 --- a/chorus-lib/src/error.rs +++ b/chorus-lib/src/error.rs @@ -58,6 +58,9 @@ pub enum ChorusError { // End of Input EndOfInput, + // Closing on error(s) + ErrorClose, + // Event is Invalid EventIsInvalid(String), @@ -121,9 +124,6 @@ pub enum ChorusError { // Speedy Speedy(speedy::Error), - // Too many errors - TooManyErrors, - // Too many subscriptions TooManySubscriptions, @@ -155,6 +155,7 @@ impl std::fmt::Display for ChorusError { ChorusError::Deleted => write!(f, "Event was previously deleted"), ChorusError::Duplicate => write!(f, "Duplicate event"), ChorusError::EndOfInput => write!(f, "End of input"), + ChorusError::ErrorClose => write!(f, "Closing due to error(s)"), ChorusError::EventIsInvalid(s) => write!(f, "Event is invalid: {s}"), ChorusError::Http(e) => write!(f, "{e}"), ChorusError::Hyper(e) => write!(f, "{e}"), @@ -188,7 +189,6 @@ impl std::fmt::Display for ChorusError { ChorusError::Tungstenite(e) => write!(f, "{e}"), ChorusError::Scraper => write!(f, "Filter is underspecified. Scrapers are not allowed"), ChorusError::Speedy(e) => write!(f, "{e}"), - ChorusError::TooManyErrors => write!(f, "Too many errors"), ChorusError::TooManySubscriptions => write!(f, "Too many subscriptions"), ChorusError::UrlParse(e) => write!(f, "{e}"), ChorusError::Utf8(e) => write!(f, "{e}"), @@ -226,7 +226,7 @@ impl ChorusError { ChorusError::AuthFailure(_) => 0.25, ChorusError::AuthRequired => 0.0, ChorusError::BadEventId => 0.1, - ChorusError::BadHexInput => 0.25, + ChorusError::BadHexInput => 0.5, ChorusError::BufferTooSmall => 0.0, ChorusError::ChannelRecv(_) => 0.0, ChorusError::ChannelSend(_) => 0.0, @@ -234,7 +234,8 @@ impl ChorusError { ChorusError::Crypto(_) => 0.1, ChorusError::Deleted => 0.1, ChorusError::Duplicate => 0.01, - ChorusError::EndOfInput => 0.2, + ChorusError::EndOfInput => 0.5, + ChorusError::ErrorClose => 1.0, ChorusError::EventIsInvalid(_) => 0.2, ChorusError::Http(_) => 0.0, ChorusError::Hyper(_) => 0.0, @@ -254,9 +255,8 @@ impl ChorusError { ChorusError::Rustls(_) => 0.0, ChorusError::TimedOut => 0.1, ChorusError::Tungstenite(_) => 0.0, - ChorusError::Scraper => 0.5, + ChorusError::Scraper => 1.0, ChorusError::Speedy(_) => 0.0, - ChorusError::TooManyErrors => 1.0, ChorusError::TooManySubscriptions => 0.1, ChorusError::UrlParse(_) => 0.1, ChorusError::Utf8(_) => 0.1, diff --git a/chorus-lib/src/ip.rs b/chorus-lib/src/ip.rs index 4e7ad4e..769d453 100644 --- a/chorus-lib/src/ip.rs +++ b/chorus-lib/src/ip.rs @@ -1,49 +1,75 @@ use crate::types::Time; +use speedy::{Readable, Writable}; +// Single-session exit condition #[derive(Debug, Clone, Copy, PartialEq)] -pub enum Ban { - General, +pub enum SessionExit { + // No problems + Ok, + + // Session exited with an error ErrorExit, + + // Session exited because of too many nostr command errors + // (based on a per-session command punishment value that we don't track long term) TooManyErrors, + + // Session timed out without an active subscription Timeout, } -#[derive(Debug)] -pub struct IpData { - pub ban_until: Time, - pub number_of_error_exits: u64, - pub number_of_too_many_error_bans: u64, - pub number_of_timeouts: u64, +// Long term reputation of an IP address +// The values are running totals, updating with (9/10) of old and (1/10) of new. +// +// Used to determine ban time multiplier from short-term violations +#[derive(Debug, Clone, Default, Readable, Writable)] +pub struct IpReputation { + pub good: f32, + pub errored: f32, + pub too_many_errors: f32, + pub timed_out: f32, } -impl Default for IpData { - fn default() -> Self { - Self::new() +impl IpReputation { + pub fn update(&mut self, session_exit: SessionExit) { + // Lessen the running totals + self.good *= 9.0 / 10.0; + self.errored *= 9.0 / 10.0; + self.too_many_errors *= 9.0 / 10.0; + self.timed_out *= 9.0 / 10.0; + + match session_exit { + SessionExit::Ok => self.good += 1.0, + SessionExit::ErrorExit => self.errored += 1.0, + SessionExit::TooManyErrors => self.too_many_errors += 1.0, + SessionExit::Timeout => self.timed_out += 1.0, + }; } + + pub fn ban_multiplier(&self) -> f32 { + let good_endings = 1.0 + self.good + (self.errored / 2.0); + + let bad_endings = 1.0 + self.timed_out + self.too_many_errors + (self.errored / 2.0); + + bad_endings / good_endings + } +} + +// Memory-only short-term record of IP handling +#[derive(Debug, Clone, Default, Readable, Writable)] +pub struct IpData { + pub ban_until: Time, + pub reputation: IpReputation, } impl IpData { - pub fn new() -> IpData { - IpData { - ban_until: Time::now(), - number_of_error_exits: 0, - number_of_too_many_error_bans: 0, - number_of_timeouts: 0, - } - } - - pub fn ban(&mut self, ban: Ban) -> u64 { - // Update numbers - match ban { - Ban::ErrorExit => self.number_of_error_exits += 1, - Ban::TooManyErrors => self.number_of_too_many_error_bans += 1, - Ban::Timeout => self.number_of_timeouts += 1, - _ => (), - }; + pub fn update_on_session_close(&mut self, session_exit: SessionExit) -> u64 { + // Update reputation + self.reputation.update(session_exit); // Compute ban_until let mut until = Time::now(); - let seconds = self.ban_seconds(ban); + let seconds = self.ban_seconds(session_exit); until.0 += seconds; self.ban_until = Time(self.ban_until.0.max(until.0)); @@ -51,12 +77,18 @@ impl IpData { seconds } - fn ban_seconds(&self, thisban: Ban) -> u64 { - match thisban { - Ban::General => 2, - Ban::ErrorExit => 2 + self.number_of_error_exits * 10, - Ban::TooManyErrors => 2 + self.number_of_too_many_error_bans * 15, - Ban::Timeout => 2 + self.number_of_timeouts * 5, + pub fn is_banned(&self) -> bool { + self.ban_until > Time::now() + } + + fn ban_seconds(&self, session_exit: SessionExit) -> u64 { + let multiplier = self.reputation.ban_multiplier(); + + match session_exit { + SessionExit::Ok => 2, + SessionExit::ErrorExit => 2 + (2.0 * multiplier) as u64, + SessionExit::TooManyErrors => 2 + (5.0 * multiplier) as u64, + SessionExit::Timeout => 2 + (4.0 * multiplier) as u64, } } } diff --git a/chorus-lib/src/store/mod.rs b/chorus-lib/src/store/mod.rs index 349c352..5442efd 100644 --- a/chorus-lib/src/store/mod.rs +++ b/chorus-lib/src/store/mod.rs @@ -4,11 +4,14 @@ pub use event_store::EventStore; mod migrations; use crate::error::{ChorusError, Error}; +use crate::ip::IpData; use crate::types::{Event, Filter, Id, Kind, Pubkey, Time}; use heed::byteorder::BigEndian; use heed::types::{OwnedType, UnalignedSlice, Unit, U64}; use heed::{Database, Env, EnvFlags, EnvOpenOptions, RwTxn}; +use speedy::{Readable, Writable}; use std::fs; +use std::net::IpAddr; use std::ops::Bound; #[derive(Debug)] @@ -23,6 +26,7 @@ pub struct Store { ktci: Database, OwnedType>, deleted_offsets: Database, Unit>, deleted_events: Database, Unit>, + ip_data: Database, UnalignedSlice>, allow_scraping: bool, } @@ -88,6 +92,11 @@ impl Store { .types::, Unit>() .name("deleted-events") .create(&mut txn)?; + let ip_data = env + .database_options() + .types::, UnalignedSlice>() + .name("ip_data") + .create(&mut txn)?; if let Ok(count) = ids.len(&txn) { log::info!("{count} events in storage"); @@ -95,6 +104,9 @@ impl Store { if let Ok(count) = deleted_offsets.len(&txn) { log::info!("{count} deleted events in the map"); } + if let Ok(count) = ip_data.len(&txn) { + log::info!("{count} IP addresses reputationally tracked"); + } txn.commit()?; @@ -112,6 +124,7 @@ impl Store { ktci, deleted_offsets, deleted_events, + ip_data, allow_scraping, }; @@ -605,6 +618,25 @@ impl Store { Ok(()) } + pub fn get_ip_data(&self, ip: IpAddr) -> Result { + let key = ip.write_to_vec()?; + let txn = self.env.read_txn()?; + let bytes = match self.ip_data.get(&txn, &key)? { + Some(b) => b, + None => return Ok(Default::default()), + }; + Ok(IpData::read_from_buffer(bytes)?) + } + + pub fn update_ip_data(&self, ip: IpAddr, data: &IpData) -> Result<(), Error> { + let key = ip.write_to_vec()?; + let mut txn = self.env.write_txn()?; + let bytes = data.write_to_vec()?; + self.ip_data.put(&mut txn, &key, &bytes)?; + txn.commit()?; + Ok(()) + } + fn key_ci(created_at: Time, id: Id) -> Vec { let mut key: Vec = Vec::with_capacity(std::mem::size_of::