Throtting by rate limiting the input (defaults are still very speculative, output not rate limited)

This commit is contained in:
Mike Dilger 2024-07-14 11:17:15 +12:00
parent e3ec1cac57
commit bf87d53e01
6 changed files with 93 additions and 1 deletions

View File

@ -61,6 +61,11 @@ announce upgrade instructions until release.
## Change Log
### version 1.5.2 (TBD)
- NEW CONFIG: `throttling_bytes_per_second` how many bytes are allowed per second
- NEW CONFIG: `throttling_burst` how many bytes can be accepted per connection in a burst
### version 1.5.1 (2024-07-14, c9c71311)
- FIX: large non-utf8 messages were attempted to be logged causing a panic

View File

@ -269,4 +269,24 @@ timeout_seconds = 60
#
# Default is 5
#
max_connections_per_ip = 5
max_connections_per_ip = 5
# The maximum rate (excluding bursts) of data that will be accepted over a websocket connection
# (per connection). Beyond this rate (in a sustained way) the connection will be closed.
#
# Default is 131072 bytes per second.
#
throttling_bytes_per_second = 131072
# The allowable bursts of data beyond the normal rate.
#
# We keep a count of how many bytes they are allowed, and that count starts at this number.
# As bytes are consumed the count goes down, but we refund throttling_bytes_per_second every
# second. If that bucket doesn't have enough, the burst won't be allowed and the connection
# will be closed.
#
# Default is 4194304 bytes.
#
throttling_burst = 4194304

View File

@ -234,3 +234,21 @@ Default is 60
Maximum number of websocket connections per IP address
Default is 5
### throttling_bytes_per_second
The maximum rate (excluding bursts) of data that will be accepted over a websocket connection
(per connection). Beyond this rate (in a sustained way) the connection will be closed.
Default is 131072 bytes per second.
### throttling_burst
The allowable bursts of data beyond the normal rate.
We keep a count of how many bytes they are allowed, and that count starts at this number.
As bytes are consumed the count goes down, but we refund throttling_bytes_per_second every
second. If that bucket doesn't have enough, the burst won't be allowed and the connection
will be closed.
Default is 4194304 bytes.

View File

@ -31,3 +31,5 @@ enable_ip_blocking = true
minimum_ban_seconds = 1
timeout_seconds = 60
max_connections_per_ip = 5
throttling_bytes_per_second = 131072
throttling_burst = 4194304

View File

@ -37,6 +37,8 @@ pub struct FriendlyConfig {
pub minimum_ban_seconds: u64,
pub timeout_seconds: u64,
pub max_connections_per_ip: usize,
pub throttling_bytes_per_second: usize,
pub throttling_burst: usize,
}
impl Default for FriendlyConfig {
@ -71,6 +73,8 @@ impl Default for FriendlyConfig {
minimum_ban_seconds: 1,
timeout_seconds: 60,
max_connections_per_ip: 5,
throttling_bytes_per_second: 1024 * 128,
throttling_burst: 1024 * 1024 * 4,
}
}
}
@ -107,6 +111,8 @@ impl FriendlyConfig {
minimum_ban_seconds,
timeout_seconds,
max_connections_per_ip,
throttling_bytes_per_second,
throttling_burst,
} = self;
let mut public_key: Option<Pubkey> = None;
@ -165,6 +171,8 @@ impl FriendlyConfig {
minimum_ban_seconds,
timeout_seconds,
max_connections_per_ip,
throttling_bytes_per_second,
throttling_burst,
})
}
}
@ -202,6 +210,8 @@ pub struct Config {
pub minimum_ban_seconds: u64,
pub timeout_seconds: u64,
pub max_connections_per_ip: usize,
pub throttling_bytes_per_second: usize,
pub throttling_burst: usize,
}
impl Default for Config {

View File

@ -185,6 +185,8 @@ async fn websocket_thread(peer: HashedPeer, websocket: HyperWebsocket, origin: S
// We start with a 1-page buffer, and grow it if needed.
buffer: vec![0; 4096],
websocket,
last_message: Instant::now(),
burst_tokens: GLOBALS.config.read().throttling_burst,
challenge: TextNonce::new().into_string(),
user: None,
error_punishment: 0.0,
@ -308,6 +310,8 @@ struct WebSocketService {
pub subscriptions: HashMap<String, Vec<OwnedFilter>>,
pub buffer: Vec<u8>,
pub websocket: WebSocketStream<TokioIo<Upgraded>>,
pub last_message: Instant,
pub burst_tokens: usize,
pub challenge: String,
pub user: Option<Pubkey>,
pub error_punishment: f32,
@ -409,6 +413,39 @@ impl WebSocketService {
}
async fn handle_websocket_message(&mut self, message: Message) -> Result<(), Error> {
// Throttling
{
let (throttling_burst, throttling_bytes_per_second) = {
let config = GLOBALS.config.read();
(config.throttling_burst, config.throttling_bytes_per_second)
};
// Get (and update) timing
let elapsed = self.last_message.elapsed();
self.last_message = Instant::now();
// Grant new tokens
let new_tokens = throttling_bytes_per_second
* elapsed.as_millis() as usize
/ 1_000;
self.burst_tokens = self.burst_tokens + new_tokens;
// Cap tokens to a maximum
if self.burst_tokens > throttling_burst {
self.burst_tokens = throttling_burst;
}
// Consume tokens, possibly closing the connection if there are not enough
if message.len() > self.burst_tokens {
let reply = NostrReply::Notice("Rate limit exceeded.".into());
self.websocket.send(Message::text(reply.as_json())).await?;
self.error_punishment += 1.0;
return Err(ChorusError::ErrorClose.into());
} else {
self.burst_tokens = self.burst_tokens - message.len();
}
}
match message {
Message::Text(msg) => {
log::trace!(target: "Client", "{}: <= {}", self.peer, msg);