Restructure to propogate errors up further for counting and logging, while not over-replying

This commit is contained in:
Mike Dilger 2024-02-21 10:55:36 +13:00
parent 4d875aa87f
commit 798079a3f2
3 changed files with 52 additions and 36 deletions

View File

@ -97,6 +97,9 @@ pub enum ChorusError {
// No private key
NoPrivateKey,
// No such subscription
NoSuchSubscription,
// Restricted
Restricted,
@ -171,6 +174,7 @@ impl std::fmt::Display for ChorusError {
),
ChorusError::Lmdb(e) => write!(f, "{e}"),
ChorusError::NoPrivateKey => write!(f, "Private Key Not Found"),
ChorusError::NoSuchSubscription => write!(f, "No such subscription"),
ChorusError::Restricted => write!(f, "Restricted"),
ChorusError::Rustls(e) => write!(f, "{e}"),
ChorusError::TimedOut => write!(f, "Timed out"),

View File

@ -279,6 +279,7 @@ async fn handle_http_request(
challenge: TextNonce::new().into_string(),
user: None,
errcount: 0,
replied: false,
};
// Increment count of active websockets
@ -364,6 +365,7 @@ struct WebSocketService {
pub challenge: String,
pub user: Option<Pubkey>,
pub errcount: usize,
pub replied: bool,
}
impl WebSocketService {
@ -463,6 +465,7 @@ impl WebSocketService {
match message {
Message::Text(msg) => {
log::trace!("{}: <= {}", self.peer, msg);
self.replied = false;
// This is defined in nostr.rs
if let Err(e) = self.handle_nostr_message(&msg).await {
self.errcount += 1;
@ -472,8 +475,10 @@ impl WebSocketService {
} else {
log::error!("{}: truncated msg was {} ...", self.peer, &msg[..2048]);
}
let reply = NostrReply::Notice(format!("error: {}", e));
self.websocket.send(Message::text(reply.as_json())).await?;
if !self.replied {
let reply = NostrReply::Notice(format!("error: {}", e));
self.websocket.send(Message::text(reply.as_json())).await?;
}
if self.errcount >= 3 {
let reply = NostrReply::Notice(
"Too many errors (3). Banned for 60 seconds.".into(),

View File

@ -36,6 +36,7 @@ impl WebSocketService {
log::warn!("{}: Received unhandled text message: {}", self.peer, msg);
let reply = NostrReply::Notice("Command unrecognized".to_owned());
self.websocket.send(Message::text(reply.as_json())).await?;
self.replied = true;
}
Ok(())
@ -79,12 +80,6 @@ impl WebSocketService {
}
if let Err(e) = self.req_inner(&subid, filters).await {
log::error!("{}: {e}", self.peer);
if msg.len() < 2048 {
log::error!("{}: msg was {}", self.peer, msg);
} else {
log::error!("{}: truncated msg was {} ...", self.peer, &msg[..2048]);
}
let reply = match e.inner {
ChorusError::TooManySubscriptions => {
let max_subscriptions = GLOBALS.config.get().unwrap().max_subscriptions;
@ -101,11 +96,12 @@ impl WebSocketService {
}
_ => NostrReply::Closed(&subid, NostrReplyPrefix::Error, format!("{e}")),
};
self.websocket.send(Message::text(reply.as_json())).await?;
self.replied = true;
Err(e)
} else {
Ok(())
}
Ok(())
}
async fn req_inner(&mut self, subid: &String, filters: Vec<OwnedFilter>) -> Result<(), Error> {
@ -161,6 +157,8 @@ impl WebSocketService {
// Store subscription
self.subscriptions.insert(subid.to_owned(), filters);
self.replied = true;
log::debug!(
"{}: new subscription \"{subid}\", {} total",
self.peer,
@ -183,9 +181,8 @@ impl WebSocketService {
let (_incount, event) = Event::from_json(&input[inpos..], &mut self.buffer)?;
let id = event.id();
let reply = match self.event_inner().await {
Ok(()) => NostrReply::Ok(id, true, NostrReplyPrefix::None, "".to_string()),
Err(e) => match e.inner {
if let Err(e) = self.event_inner().await {
let reply = match e.inner {
ChorusError::AuthRequired => NostrReply::Ok(
id,
false,
@ -215,11 +212,16 @@ impl WebSocketService {
)
}
_ => NostrReply::Ok(id, false, NostrReplyPrefix::Error, format!("{}", e)),
},
};
self.websocket.send(Message::text(reply.as_json())).await?;
Ok(())
};
self.websocket.send(Message::text(reply.as_json())).await?;
self.replied = true;
Err(e)
} else {
let reply = NostrReply::Ok(id, true, NostrReplyPrefix::None, "".to_string());
self.websocket.send(Message::text(reply.as_json())).await?;
self.replied = true;
Ok(())
}
}
async fn event_inner(&mut self) -> Result<(), Error> {
@ -272,15 +274,16 @@ impl WebSocketService {
let subid = unsafe { std::str::from_utf8_unchecked(&self.buffer[..outlen]) };
// If we have that subscription
let reply = if self.subscriptions.contains_key(subid) {
if self.subscriptions.contains_key(subid) {
// Remove it, and let them know
self.subscriptions.remove(subid);
NostrReply::Closed(subid, NostrReplyPrefix::None, "".to_owned())
let reply = NostrReply::Closed(subid, NostrReplyPrefix::None, "".to_owned());
self.websocket.send(Message::text(reply.as_json())).await?;
self.replied = true;
Ok(())
} else {
NostrReply::Notice(format!("no such subscription id: {}", subid))
};
self.websocket.send(Message::text(reply.as_json())).await?;
Ok(())
Err(ChorusError::NoSuchSubscription.into())
}
}
pub async fn auth(&mut self, msg: &str, mut inpos: usize) -> Result<(), Error> {
@ -295,18 +298,22 @@ impl WebSocketService {
let id = event.id();
// Always return an OK message, based on the results of our auth_inner
let reply = match self.auth_inner().await {
Ok(()) => NostrReply::Ok(id, true, NostrReplyPrefix::None, "".to_string()),
Err(e) => match e.inner {
ChorusError::AuthFailure(s) => {
NostrReply::Ok(id, false, NostrReplyPrefix::Invalid, s)
if let Err(e) = self.auth_inner().await {
let reply = match e.inner {
ChorusError::AuthFailure(_) => {
NostrReply::Ok(id, false, NostrReplyPrefix::Invalid, format!("{e}"))
}
_ => NostrReply::Ok(id, false, NostrReplyPrefix::Error, format!("{}", e)),
},
};
self.websocket.send(Message::text(reply.as_json())).await?;
Ok(())
_ => NostrReply::Ok(id, false, NostrReplyPrefix::Error, format!("{e}")),
};
self.websocket.send(Message::text(reply.as_json())).await?;
self.replied = true;
Err(e)
} else {
let reply = NostrReply::Ok(id, true, NostrReplyPrefix::None, "".to_string());
self.websocket.send(Message::text(reply.as_json())).await?;
self.replied = true;
Ok(())
}
}
async fn auth_inner(&mut self) -> Result<(), Error> {