From bf87d53e01fe0629add9ecf9c93df2458495e770 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Sun, 14 Jul 2024 11:17:15 +1200 Subject: [PATCH] Throtting by rate limiting the input (defaults are still very speculative, output not rate limited) --- README.md | 5 +++++ contrib/chorus.toml | 22 +++++++++++++++++++++- docs/CONFIG.md | 18 ++++++++++++++++++ sample/sample.config.toml | 2 ++ src/config.rs | 10 ++++++++++ src/lib.rs | 37 +++++++++++++++++++++++++++++++++++++ 6 files changed, 93 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 93efd11..3154ab7 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,11 @@ announce upgrade instructions until release. ## Change Log +### version 1.5.2 (TBD) + +- NEW CONFIG: `throttling_bytes_per_second` how many bytes are allowed per second +- NEW CONFIG: `throttling_burst` how many bytes can be accepted per connection in a burst + ### version 1.5.1 (2024-07-14, c9c71311) - FIX: large non-utf8 messages were attempted to be logged causing a panic diff --git a/contrib/chorus.toml b/contrib/chorus.toml index 5bf2721..9b9b017 100644 --- a/contrib/chorus.toml +++ b/contrib/chorus.toml @@ -269,4 +269,24 @@ timeout_seconds = 60 # # Default is 5 # -max_connections_per_ip = 5 \ No newline at end of file +max_connections_per_ip = 5 + + +# The maximum rate (excluding bursts) of data that will be accepted over a websocket connection +# (per connection). Beyond this rate (in a sustained way) the connection will be closed. +# +# Default is 131072 bytes per second. +# +throttling_bytes_per_second = 131072 + + +# The allowable bursts of data beyond the normal rate. +# +# We keep a count of how many bytes they are allowed, and that count starts at this number. +# As bytes are consumed the count goes down, but we refund throttling_bytes_per_second every +# second. If that bucket doesn't have enough, the burst won't be allowed and the connection +# will be closed. +# +# Default is 4194304 bytes. +# +throttling_burst = 4194304 diff --git a/docs/CONFIG.md b/docs/CONFIG.md index e3ea2a4..1924693 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -234,3 +234,21 @@ Default is 60 Maximum number of websocket connections per IP address Default is 5 + +### throttling_bytes_per_second + +The maximum rate (excluding bursts) of data that will be accepted over a websocket connection +(per connection). Beyond this rate (in a sustained way) the connection will be closed. + +Default is 131072 bytes per second. + +### throttling_burst + +The allowable bursts of data beyond the normal rate. + +We keep a count of how many bytes they are allowed, and that count starts at this number. +As bytes are consumed the count goes down, but we refund throttling_bytes_per_second every +second. If that bucket doesn't have enough, the burst won't be allowed and the connection +will be closed. + +Default is 4194304 bytes. diff --git a/sample/sample.config.toml b/sample/sample.config.toml index 1f339bd..828b1bb 100644 --- a/sample/sample.config.toml +++ b/sample/sample.config.toml @@ -31,3 +31,5 @@ enable_ip_blocking = true minimum_ban_seconds = 1 timeout_seconds = 60 max_connections_per_ip = 5 +throttling_bytes_per_second = 131072 +throttling_burst = 4194304 \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 11206b4..24b1ad0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -37,6 +37,8 @@ pub struct FriendlyConfig { pub minimum_ban_seconds: u64, pub timeout_seconds: u64, pub max_connections_per_ip: usize, + pub throttling_bytes_per_second: usize, + pub throttling_burst: usize, } impl Default for FriendlyConfig { @@ -71,6 +73,8 @@ impl Default for FriendlyConfig { minimum_ban_seconds: 1, timeout_seconds: 60, max_connections_per_ip: 5, + throttling_bytes_per_second: 1024 * 128, + throttling_burst: 1024 * 1024 * 4, } } } @@ -107,6 +111,8 @@ impl FriendlyConfig { minimum_ban_seconds, timeout_seconds, max_connections_per_ip, + throttling_bytes_per_second, + throttling_burst, } = self; let mut public_key: Option = None; @@ -165,6 +171,8 @@ impl FriendlyConfig { minimum_ban_seconds, timeout_seconds, max_connections_per_ip, + throttling_bytes_per_second, + throttling_burst, }) } } @@ -202,6 +210,8 @@ pub struct Config { pub minimum_ban_seconds: u64, pub timeout_seconds: u64, pub max_connections_per_ip: usize, + pub throttling_bytes_per_second: usize, + pub throttling_burst: usize, } impl Default for Config { diff --git a/src/lib.rs b/src/lib.rs index 1a408a7..eb62bdc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -185,6 +185,8 @@ async fn websocket_thread(peer: HashedPeer, websocket: HyperWebsocket, origin: S // We start with a 1-page buffer, and grow it if needed. buffer: vec![0; 4096], websocket, + last_message: Instant::now(), + burst_tokens: GLOBALS.config.read().throttling_burst, challenge: TextNonce::new().into_string(), user: None, error_punishment: 0.0, @@ -308,6 +310,8 @@ struct WebSocketService { pub subscriptions: HashMap>, pub buffer: Vec, pub websocket: WebSocketStream>, + pub last_message: Instant, + pub burst_tokens: usize, pub challenge: String, pub user: Option, pub error_punishment: f32, @@ -409,6 +413,39 @@ impl WebSocketService { } async fn handle_websocket_message(&mut self, message: Message) -> Result<(), Error> { + // Throttling + { + let (throttling_burst, throttling_bytes_per_second) = { + let config = GLOBALS.config.read(); + (config.throttling_burst, config.throttling_bytes_per_second) + }; + + // Get (and update) timing + let elapsed = self.last_message.elapsed(); + self.last_message = Instant::now(); + + // Grant new tokens + let new_tokens = throttling_bytes_per_second + * elapsed.as_millis() as usize + / 1_000; + self.burst_tokens = self.burst_tokens + new_tokens; + + // Cap tokens to a maximum + if self.burst_tokens > throttling_burst { + self.burst_tokens = throttling_burst; + } + + // Consume tokens, possibly closing the connection if there are not enough + if message.len() > self.burst_tokens { + let reply = NostrReply::Notice("Rate limit exceeded.".into()); + self.websocket.send(Message::text(reply.as_json())).await?; + self.error_punishment += 1.0; + return Err(ChorusError::ErrorClose.into()); + } else { + self.burst_tokens = self.burst_tokens - message.len(); + } + } + match message { Message::Text(msg) => { log::trace!(target: "Client", "{}: <= {}", self.peer, msg);