From 798079a3f2bc4a49aec85d93e8d185aca703e82a Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Wed, 21 Feb 2024 10:55:36 +1300 Subject: [PATCH] Restructure to propogate errors up further for counting and logging, while not over-replying --- src/error.rs | 4 +++ src/main.rs | 9 +++++-- src/nostr.rs | 75 ++++++++++++++++++++++++++++------------------------ 3 files changed, 52 insertions(+), 36 deletions(-) diff --git a/src/error.rs b/src/error.rs index 4ba7304..4dd667f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -97,6 +97,9 @@ pub enum ChorusError { // No private key NoPrivateKey, + // No such subscription + NoSuchSubscription, + // Restricted Restricted, @@ -171,6 +174,7 @@ impl std::fmt::Display for ChorusError { ), ChorusError::Lmdb(e) => write!(f, "{e}"), ChorusError::NoPrivateKey => write!(f, "Private Key Not Found"), + ChorusError::NoSuchSubscription => write!(f, "No such subscription"), ChorusError::Restricted => write!(f, "Restricted"), ChorusError::Rustls(e) => write!(f, "{e}"), ChorusError::TimedOut => write!(f, "Timed out"), diff --git a/src/main.rs b/src/main.rs index 7c4892a..111a9ec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -279,6 +279,7 @@ async fn handle_http_request( challenge: TextNonce::new().into_string(), user: None, errcount: 0, + replied: false, }; // Increment count of active websockets @@ -364,6 +365,7 @@ struct WebSocketService { pub challenge: String, pub user: Option, pub errcount: usize, + pub replied: bool, } impl WebSocketService { @@ -463,6 +465,7 @@ impl WebSocketService { match message { Message::Text(msg) => { log::trace!("{}: <= {}", self.peer, msg); + self.replied = false; // This is defined in nostr.rs if let Err(e) = self.handle_nostr_message(&msg).await { self.errcount += 1; @@ -472,8 +475,10 @@ impl WebSocketService { } else { log::error!("{}: truncated msg was {} ...", self.peer, &msg[..2048]); } - let reply = NostrReply::Notice(format!("error: {}", e)); - self.websocket.send(Message::text(reply.as_json())).await?; + if !self.replied { + let reply = NostrReply::Notice(format!("error: {}", e)); + self.websocket.send(Message::text(reply.as_json())).await?; + } if self.errcount >= 3 { let reply = NostrReply::Notice( "Too many errors (3). Banned for 60 seconds.".into(), diff --git a/src/nostr.rs b/src/nostr.rs index e656ae1..4f23dfb 100644 --- a/src/nostr.rs +++ b/src/nostr.rs @@ -36,6 +36,7 @@ impl WebSocketService { log::warn!("{}: Received unhandled text message: {}", self.peer, msg); let reply = NostrReply::Notice("Command unrecognized".to_owned()); self.websocket.send(Message::text(reply.as_json())).await?; + self.replied = true; } Ok(()) @@ -79,12 +80,6 @@ impl WebSocketService { } if let Err(e) = self.req_inner(&subid, filters).await { - log::error!("{}: {e}", self.peer); - if msg.len() < 2048 { - log::error!("{}: msg was {}", self.peer, msg); - } else { - log::error!("{}: truncated msg was {} ...", self.peer, &msg[..2048]); - } let reply = match e.inner { ChorusError::TooManySubscriptions => { let max_subscriptions = GLOBALS.config.get().unwrap().max_subscriptions; @@ -101,11 +96,12 @@ impl WebSocketService { } _ => NostrReply::Closed(&subid, NostrReplyPrefix::Error, format!("{e}")), }; - self.websocket.send(Message::text(reply.as_json())).await?; + self.replied = true; + Err(e) + } else { + Ok(()) } - - Ok(()) } async fn req_inner(&mut self, subid: &String, filters: Vec) -> Result<(), Error> { @@ -161,6 +157,8 @@ impl WebSocketService { // Store subscription self.subscriptions.insert(subid.to_owned(), filters); + self.replied = true; + log::debug!( "{}: new subscription \"{subid}\", {} total", self.peer, @@ -183,9 +181,8 @@ impl WebSocketService { let (_incount, event) = Event::from_json(&input[inpos..], &mut self.buffer)?; let id = event.id(); - let reply = match self.event_inner().await { - Ok(()) => NostrReply::Ok(id, true, NostrReplyPrefix::None, "".to_string()), - Err(e) => match e.inner { + if let Err(e) = self.event_inner().await { + let reply = match e.inner { ChorusError::AuthRequired => NostrReply::Ok( id, false, @@ -215,11 +212,16 @@ impl WebSocketService { ) } _ => NostrReply::Ok(id, false, NostrReplyPrefix::Error, format!("{}", e)), - }, - }; - self.websocket.send(Message::text(reply.as_json())).await?; - - Ok(()) + }; + self.websocket.send(Message::text(reply.as_json())).await?; + self.replied = true; + Err(e) + } else { + let reply = NostrReply::Ok(id, true, NostrReplyPrefix::None, "".to_string()); + self.websocket.send(Message::text(reply.as_json())).await?; + self.replied = true; + Ok(()) + } } async fn event_inner(&mut self) -> Result<(), Error> { @@ -272,15 +274,16 @@ impl WebSocketService { let subid = unsafe { std::str::from_utf8_unchecked(&self.buffer[..outlen]) }; // If we have that subscription - let reply = if self.subscriptions.contains_key(subid) { + if self.subscriptions.contains_key(subid) { // Remove it, and let them know self.subscriptions.remove(subid); - NostrReply::Closed(subid, NostrReplyPrefix::None, "".to_owned()) + let reply = NostrReply::Closed(subid, NostrReplyPrefix::None, "".to_owned()); + self.websocket.send(Message::text(reply.as_json())).await?; + self.replied = true; + Ok(()) } else { - NostrReply::Notice(format!("no such subscription id: {}", subid)) - }; - self.websocket.send(Message::text(reply.as_json())).await?; - Ok(()) + Err(ChorusError::NoSuchSubscription.into()) + } } pub async fn auth(&mut self, msg: &str, mut inpos: usize) -> Result<(), Error> { @@ -295,18 +298,22 @@ impl WebSocketService { let id = event.id(); // Always return an OK message, based on the results of our auth_inner - let reply = match self.auth_inner().await { - Ok(()) => NostrReply::Ok(id, true, NostrReplyPrefix::None, "".to_string()), - Err(e) => match e.inner { - ChorusError::AuthFailure(s) => { - NostrReply::Ok(id, false, NostrReplyPrefix::Invalid, s) + if let Err(e) = self.auth_inner().await { + let reply = match e.inner { + ChorusError::AuthFailure(_) => { + NostrReply::Ok(id, false, NostrReplyPrefix::Invalid, format!("{e}")) } - _ => NostrReply::Ok(id, false, NostrReplyPrefix::Error, format!("{}", e)), - }, - }; - self.websocket.send(Message::text(reply.as_json())).await?; - - Ok(()) + _ => NostrReply::Ok(id, false, NostrReplyPrefix::Error, format!("{e}")), + }; + self.websocket.send(Message::text(reply.as_json())).await?; + self.replied = true; + Err(e) + } else { + let reply = NostrReply::Ok(id, true, NostrReplyPrefix::None, "".to_string()); + self.websocket.send(Message::text(reply.as_json())).await?; + self.replied = true; + Ok(()) + } } async fn auth_inner(&mut self) -> Result<(), Error> {