IpData rework (persists a restart)

This commit is contained in:
Mike Dilger 2024-02-21 15:25:16 +13:00
parent 72a406cea7
commit 59643c1b06
6 changed files with 144 additions and 86 deletions

View File

@ -1,10 +1,7 @@
use chorus_lib::config::Config;
use chorus_lib::ip::{Ban, IpData};
use chorus_lib::store::Store;
use dashmap::DashMap;
use hyper::server::conn::Http;
use lazy_static::lazy_static;
use std::net::IpAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::OnceLock;
use tokio::sync::broadcast::Sender as BroadcastSender;
@ -24,8 +21,6 @@ pub struct Globals {
pub num_clients: AtomicUsize,
pub shutting_down: WatchSender<bool>,
pub ip_data: DashMap<IpAddr, IpData>,
}
lazy_static! {
@ -45,20 +40,6 @@ lazy_static! {
new_events,
num_clients: AtomicUsize::new(0),
shutting_down,
ip_data: DashMap::new(),
}
};
}
impl Globals {
pub fn ban(ipaddr: std::net::IpAddr, bankind: Ban) -> u64 {
if let Some(mut ipdata) = GLOBALS.ip_data.get_mut(&ipaddr) {
ipdata.ban(bankind)
} else {
let mut ipdata = IpData::new();
let seconds = ipdata.ban(bankind);
GLOBALS.ip_data.insert(ipaddr, ipdata);
seconds
}
}
}

View File

@ -3,14 +3,14 @@ pub mod nostr;
pub mod tls;
pub mod web;
use crate::globals::{Globals, GLOBALS};
use crate::globals::GLOBALS;
use crate::tls::MaybeTlsStream;
use chorus_lib::config::{Config, FriendlyConfig};
use chorus_lib::error::{ChorusError, Error};
use chorus_lib::ip::Ban;
use chorus_lib::ip::SessionExit;
use chorus_lib::reply::NostrReply;
use chorus_lib::store::Store;
use chorus_lib::types::{OwnedFilter, Pubkey, Time};
use chorus_lib::types::{OwnedFilter, Pubkey};
use futures::{sink::SinkExt, stream::StreamExt};
use hyper::service::Service;
use hyper::upgrade::Upgraded;
@ -107,14 +107,12 @@ async fn main() -> Result<(), Error> {
let (tcp_stream, peer_addr) = v?;
let ipaddr = peer_addr.ip();
if let Some(ip_data) = GLOBALS.ip_data.get(&ipaddr) {
let now = Time::now();
if ip_data.ban_until > now {
log::debug!(target: "Client",
"{peer_addr}: Blocking reconnection until {}",
ip_data.ban_until);
continue;
}
let ip_data = GLOBALS.store.get().unwrap().get_ip_data(ipaddr)?;
if ip_data.is_banned() {
log::debug!(target: "Client",
"{peer_addr}: Blocking reconnection until {}",
ip_data.ban_until);
continue;
}
if let Some(tls_acceptor) = &maybe_tls_acceptor {
@ -300,7 +298,7 @@ async fn handle_http_request(
// Everybody gets a 4-second ban on disconnect to prevent
// rapid reconnection
let mut bankind: Ban = Ban::General;
let mut session_exit: SessionExit = SessionExit::Ok;
let mut msg = "Closed";
// Handle the websocket
@ -310,20 +308,20 @@ async fn handle_http_request(
tungstenite::error::ProtocolError::ResetWithoutClosingHandshake,
)) => {
// So they disconnected ungracefully.
// No big deal, no extra ban for that.
// No big deal, still SessionExit::Ok
msg = "Reset";
}
ChorusError::TooManyErrors => {
bankind = Ban::TooManyErrors;
msg = "Errored Out (too many)";
ChorusError::ErrorClose => {
session_exit = SessionExit::TooManyErrors;
msg = "Errored Out";
}
ChorusError::TimedOut => {
bankind = Ban::Timeout;
session_exit = SessionExit::Timeout;
msg = "Timed Out (with no subscriptions)";
}
_ => {
log::error!(target: "Client", "{}: {}", peer, e);
bankind = Ban::ErrorExit;
session_exit = SessionExit::ErrorExit;
msg = "Error Exited";
}
}
@ -332,8 +330,16 @@ async fn handle_http_request(
// Decrement count of active websockets
let old_num_websockets = GLOBALS.num_clients.fetch_sub(1, Ordering::SeqCst);
// Ban for the appropriate duration
let ban_seconds = Globals::ban(peer.ip(), bankind);
// Update ip data (including ban time)
let mut ban_seconds = 0;
if let Ok(mut ip_data) = GLOBALS.store.get().unwrap().get_ip_data(peer.ip()) {
ban_seconds = ip_data.update_on_session_close(session_exit);
let _ = GLOBALS
.store
.get()
.unwrap()
.update_ip_data(peer.ip(), &ip_data);
}
// we cheat somewhat and log these websocket open and close messages
// as server messages
@ -488,10 +494,10 @@ impl WebSocketService {
let reply = NostrReply::Notice(format!("error: {}", e));
self.websocket.send(Message::text(reply.as_json())).await?;
}
if self.error_punishment > 1.0 {
let reply = NostrReply::Notice("Too many errors".into());
if self.error_punishment >= 1.0 {
let reply = NostrReply::Notice("Closing due to error(s)".into());
self.websocket.send(Message::text(reply.as_json())).await?;
return Err(ChorusError::TooManyErrors.into());
return Err(ChorusError::ErrorClose.into());
}
}
}

View File

@ -58,6 +58,9 @@ pub enum ChorusError {
// End of Input
EndOfInput,
// Closing on error(s)
ErrorClose,
// Event is Invalid
EventIsInvalid(String),
@ -121,9 +124,6 @@ pub enum ChorusError {
// Speedy
Speedy(speedy::Error),
// Too many errors
TooManyErrors,
// Too many subscriptions
TooManySubscriptions,
@ -155,6 +155,7 @@ impl std::fmt::Display for ChorusError {
ChorusError::Deleted => write!(f, "Event was previously deleted"),
ChorusError::Duplicate => write!(f, "Duplicate event"),
ChorusError::EndOfInput => write!(f, "End of input"),
ChorusError::ErrorClose => write!(f, "Closing due to error(s)"),
ChorusError::EventIsInvalid(s) => write!(f, "Event is invalid: {s}"),
ChorusError::Http(e) => write!(f, "{e}"),
ChorusError::Hyper(e) => write!(f, "{e}"),
@ -188,7 +189,6 @@ impl std::fmt::Display for ChorusError {
ChorusError::Tungstenite(e) => write!(f, "{e}"),
ChorusError::Scraper => write!(f, "Filter is underspecified. Scrapers are not allowed"),
ChorusError::Speedy(e) => write!(f, "{e}"),
ChorusError::TooManyErrors => write!(f, "Too many errors"),
ChorusError::TooManySubscriptions => write!(f, "Too many subscriptions"),
ChorusError::UrlParse(e) => write!(f, "{e}"),
ChorusError::Utf8(e) => write!(f, "{e}"),
@ -226,7 +226,7 @@ impl ChorusError {
ChorusError::AuthFailure(_) => 0.25,
ChorusError::AuthRequired => 0.0,
ChorusError::BadEventId => 0.1,
ChorusError::BadHexInput => 0.25,
ChorusError::BadHexInput => 0.5,
ChorusError::BufferTooSmall => 0.0,
ChorusError::ChannelRecv(_) => 0.0,
ChorusError::ChannelSend(_) => 0.0,
@ -234,7 +234,8 @@ impl ChorusError {
ChorusError::Crypto(_) => 0.1,
ChorusError::Deleted => 0.1,
ChorusError::Duplicate => 0.01,
ChorusError::EndOfInput => 0.2,
ChorusError::EndOfInput => 0.5,
ChorusError::ErrorClose => 1.0,
ChorusError::EventIsInvalid(_) => 0.2,
ChorusError::Http(_) => 0.0,
ChorusError::Hyper(_) => 0.0,
@ -254,9 +255,8 @@ impl ChorusError {
ChorusError::Rustls(_) => 0.0,
ChorusError::TimedOut => 0.1,
ChorusError::Tungstenite(_) => 0.0,
ChorusError::Scraper => 0.5,
ChorusError::Scraper => 1.0,
ChorusError::Speedy(_) => 0.0,
ChorusError::TooManyErrors => 1.0,
ChorusError::TooManySubscriptions => 0.1,
ChorusError::UrlParse(_) => 0.1,
ChorusError::Utf8(_) => 0.1,

View File

@ -1,49 +1,75 @@
use crate::types::Time;
use speedy::{Readable, Writable};
// Single-session exit condition
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Ban {
General,
pub enum SessionExit {
// No problems
Ok,
// Session exited with an error
ErrorExit,
// Session exited because of too many nostr command errors
// (based on a per-session command punishment value that we don't track long term)
TooManyErrors,
// Session timed out without an active subscription
Timeout,
}
#[derive(Debug)]
pub struct IpData {
pub ban_until: Time,
pub number_of_error_exits: u64,
pub number_of_too_many_error_bans: u64,
pub number_of_timeouts: u64,
// Long term reputation of an IP address
// The values are running totals, updating with (9/10) of old and (1/10) of new.
//
// Used to determine ban time multiplier from short-term violations
#[derive(Debug, Clone, Default, Readable, Writable)]
pub struct IpReputation {
pub good: f32,
pub errored: f32,
pub too_many_errors: f32,
pub timed_out: f32,
}
impl Default for IpData {
fn default() -> Self {
Self::new()
impl IpReputation {
pub fn update(&mut self, session_exit: SessionExit) {
// Lessen the running totals
self.good *= 9.0 / 10.0;
self.errored *= 9.0 / 10.0;
self.too_many_errors *= 9.0 / 10.0;
self.timed_out *= 9.0 / 10.0;
match session_exit {
SessionExit::Ok => self.good += 1.0,
SessionExit::ErrorExit => self.errored += 1.0,
SessionExit::TooManyErrors => self.too_many_errors += 1.0,
SessionExit::Timeout => self.timed_out += 1.0,
};
}
pub fn ban_multiplier(&self) -> f32 {
let good_endings = 1.0 + self.good + (self.errored / 2.0);
let bad_endings = 1.0 + self.timed_out + self.too_many_errors + (self.errored / 2.0);
bad_endings / good_endings
}
}
// Memory-only short-term record of IP handling
#[derive(Debug, Clone, Default, Readable, Writable)]
pub struct IpData {
pub ban_until: Time,
pub reputation: IpReputation,
}
impl IpData {
pub fn new() -> IpData {
IpData {
ban_until: Time::now(),
number_of_error_exits: 0,
number_of_too_many_error_bans: 0,
number_of_timeouts: 0,
}
}
pub fn ban(&mut self, ban: Ban) -> u64 {
// Update numbers
match ban {
Ban::ErrorExit => self.number_of_error_exits += 1,
Ban::TooManyErrors => self.number_of_too_many_error_bans += 1,
Ban::Timeout => self.number_of_timeouts += 1,
_ => (),
};
pub fn update_on_session_close(&mut self, session_exit: SessionExit) -> u64 {
// Update reputation
self.reputation.update(session_exit);
// Compute ban_until
let mut until = Time::now();
let seconds = self.ban_seconds(ban);
let seconds = self.ban_seconds(session_exit);
until.0 += seconds;
self.ban_until = Time(self.ban_until.0.max(until.0));
@ -51,12 +77,18 @@ impl IpData {
seconds
}
fn ban_seconds(&self, thisban: Ban) -> u64 {
match thisban {
Ban::General => 2,
Ban::ErrorExit => 2 + self.number_of_error_exits * 10,
Ban::TooManyErrors => 2 + self.number_of_too_many_error_bans * 15,
Ban::Timeout => 2 + self.number_of_timeouts * 5,
pub fn is_banned(&self) -> bool {
self.ban_until > Time::now()
}
fn ban_seconds(&self, session_exit: SessionExit) -> u64 {
let multiplier = self.reputation.ban_multiplier();
match session_exit {
SessionExit::Ok => 2,
SessionExit::ErrorExit => 2 + (2.0 * multiplier) as u64,
SessionExit::TooManyErrors => 2 + (5.0 * multiplier) as u64,
SessionExit::Timeout => 2 + (4.0 * multiplier) as u64,
}
}
}

View File

@ -4,11 +4,14 @@ pub use event_store::EventStore;
mod migrations;
use crate::error::{ChorusError, Error};
use crate::ip::IpData;
use crate::types::{Event, Filter, Id, Kind, Pubkey, Time};
use heed::byteorder::BigEndian;
use heed::types::{OwnedType, UnalignedSlice, Unit, U64};
use heed::{Database, Env, EnvFlags, EnvOpenOptions, RwTxn};
use speedy::{Readable, Writable};
use std::fs;
use std::net::IpAddr;
use std::ops::Bound;
#[derive(Debug)]
@ -23,6 +26,7 @@ pub struct Store {
ktci: Database<UnalignedSlice<u8>, OwnedType<usize>>,
deleted_offsets: Database<U64<BigEndian>, Unit>,
deleted_events: Database<UnalignedSlice<u8>, Unit>,
ip_data: Database<UnalignedSlice<u8>, UnalignedSlice<u8>>,
allow_scraping: bool,
}
@ -88,6 +92,11 @@ impl Store {
.types::<UnalignedSlice<u8>, Unit>()
.name("deleted-events")
.create(&mut txn)?;
let ip_data = env
.database_options()
.types::<UnalignedSlice<u8>, UnalignedSlice<u8>>()
.name("ip_data")
.create(&mut txn)?;
if let Ok(count) = ids.len(&txn) {
log::info!("{count} events in storage");
@ -95,6 +104,9 @@ impl Store {
if let Ok(count) = deleted_offsets.len(&txn) {
log::info!("{count} deleted events in the map");
}
if let Ok(count) = ip_data.len(&txn) {
log::info!("{count} IP addresses reputationally tracked");
}
txn.commit()?;
@ -112,6 +124,7 @@ impl Store {
ktci,
deleted_offsets,
deleted_events,
ip_data,
allow_scraping,
};
@ -605,6 +618,25 @@ impl Store {
Ok(())
}
pub fn get_ip_data(&self, ip: IpAddr) -> Result<IpData, Error> {
let key = ip.write_to_vec()?;
let txn = self.env.read_txn()?;
let bytes = match self.ip_data.get(&txn, &key)? {
Some(b) => b,
None => return Ok(Default::default()),
};
Ok(IpData::read_from_buffer(bytes)?)
}
pub fn update_ip_data(&self, ip: IpAddr, data: &IpData) -> Result<(), Error> {
let key = ip.write_to_vec()?;
let mut txn = self.env.write_txn()?;
let bytes = data.write_to_vec()?;
self.ip_data.put(&mut txn, &key, &bytes)?;
txn.commit()?;
Ok(())
}
fn key_ci(created_at: Time, id: Id) -> Vec<u8> {
let mut key: Vec<u8> =
Vec::with_capacity(std::mem::size_of::<Time>() + std::mem::size_of::<Id>());

View File

@ -1,8 +1,15 @@
use speedy::{Readable, Writable};
use std::fmt;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Readable, Writable)]
pub struct Time(pub u64);
impl Default for Time {
fn default() -> Time {
Time::now()
}
}
impl fmt::Display for Time {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)