diff --git a/src/lib.rs b/src/lib.rs index 7ac8cd0..11d92a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod error; pub mod filestore; pub mod globals; pub mod ip; +mod neg_storage; pub mod nostr; pub mod reply; pub mod tls; @@ -25,6 +26,7 @@ use hyper::{Request, Response}; use hyper_tungstenite::tungstenite; use hyper_tungstenite::{HyperWebsocket, WebSocketStream}; use hyper_util::rt::TokioIo; +use neg_storage::NegentropyStorageVector; use pocket_db::Store; use pocket_types::{Id, OwnedFilter, Pubkey}; use speedy::{Readable, Writable}; @@ -215,6 +217,7 @@ async fn websocket_thread(peer: HashedPeer, websocket: HyperWebsocket, origin: S let mut ws_service = WebSocketService { peer, subscriptions: HashMap::new(), + neg_subscriptions: HashMap::new(), // We start with a 1-page buffer, and grow it if needed. buffer: vec![0; 4096], websocket, @@ -350,6 +353,7 @@ async fn websocket_thread(peer: HashedPeer, websocket: HyperWebsocket, origin: S struct WebSocketService { pub peer: HashedPeer, pub subscriptions: HashMap>, + pub neg_subscriptions: HashMap, pub buffer: Vec, pub websocket: WebSocketStream>, pub last_message: Instant, @@ -425,7 +429,7 @@ impl WebSocketService { tokio::select! { instant = interval.tick() => { // Drop them if they have no subscriptions - if self.subscriptions.is_empty() { + if self.subscriptions.is_empty() && self.neg_subscriptions.is_empty() { // And they are idle for timeout_seconds with no subscriptions if last_message_at + Duration::from_secs(timeout_seconds) < instant { self.wsclose(ChorusError::TimedOut.into()).await?; diff --git a/src/neg_storage.rs b/src/neg_storage.rs new file mode 100644 index 0000000..9df51e4 --- /dev/null +++ b/src/neg_storage.rs @@ -0,0 +1,123 @@ +use negentropy::{Bound, Error, Id, Item, NegentropyStorageBase}; + +// We construct our own NegentropyStorageVector since NegentropyStorageBase +// is not implemented for a &NegentropyStorageVector upstream + +/// Negentropy Storage Vector +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct NegentropyStorageVector { + items: Vec, + sealed: bool, +} + +impl NegentropyStorageBase for &NegentropyStorageVector { + fn size(&self) -> Result { + self.check_sealed()?; + Ok(self.items.len()) + } + + fn get_item(&self, i: usize) -> Result, Error> { + self.check_sealed()?; + Ok(self.items.get(i).copied()) + } + + fn iterate( + &self, + begin: usize, + end: usize, + cb: &mut dyn FnMut(Item, usize) -> Result, + ) -> Result<(), Error> { + self.check_sealed()?; + self.check_bounds(begin, end)?; + + for i in begin..end { + if !cb(self.items[i], i)? { + break; + } + } + + Ok(()) + } + + fn find_lower_bound(&self, mut first: usize, last: usize, value: &Bound) -> usize { + let mut count: usize = last - first; + + while count > 0 { + let mut it: usize = first; + let step: usize = count / 2; + it += step; + + if self.items[it] < value.item { + it += 1; + first = it; + count -= step + 1; + } else { + count = step; + } + } + + first + } +} + +impl NegentropyStorageVector { + /// Create new storage + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Create new storage with capacity + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + Self { + items: Vec::with_capacity(capacity), + sealed: false, + } + } + + /// Insert item + pub fn insert(&mut self, created_at: u64, id: Id) -> Result<(), Error> { + if self.sealed { + return Err(Error::AlreadySealed); + } + + let elem: Item = Item::with_timestamp_and_id(created_at, id); + self.items.push(elem); + + Ok(()) + } + + /// Seal + pub fn seal(&mut self) -> Result<(), Error> { + if self.sealed { + return Err(Error::AlreadySealed); + } + self.sealed = true; + + self.items.sort(); + self.items.dedup(); + + Ok(()) + } + + /// Unseal + pub fn unseal(&mut self) -> Result<(), Error> { + self.sealed = false; + Ok(()) + } + + fn check_sealed(&self) -> Result<(), Error> { + if !self.sealed { + return Err(Error::NotSealed); + } + Ok(()) + } + + fn check_bounds(&self, begin: usize, end: usize) -> Result<(), Error> { + if begin > end || end > self.items.len() { + return Err(Error::BadRange); + } + Ok(()) + } +} diff --git a/src/nostr.rs b/src/nostr.rs index 1861afe..039abb5 100644 --- a/src/nostr.rs +++ b/src/nostr.rs @@ -1,10 +1,12 @@ use crate::error::{ChorusError, Error}; use crate::globals::GLOBALS; +use crate::neg_storage::NegentropyStorageVector; use crate::reply::{NostrReply, NostrReplyPrefix}; use crate::WebSocketService; use hyper_tungstenite::tungstenite::Message; +use negentropy::{Bytes, Negentropy}; use pocket_types::json::{eat_whitespace, json_unescape, verify_char}; -use pocket_types::{Event, Filter, Hll8, Kind, OwnedFilter, Pubkey, Time}; +use pocket_types::{read_hex, Event, Filter, Hll8, Kind, OwnedFilter, Pubkey, Time}; use url::Url; impl WebSocketService { @@ -32,6 +34,12 @@ impl WebSocketService { self.close(msg, inpos + 6).await?; } else if &input[inpos..inpos + 5] == b"AUTH\"" { self.auth(msg, inpos + 5).await?; + } else if &input[inpos..inpos + 9] == b"NEG-OPEN\"" { + self.neg_open(msg, inpos + 9).await?; + } else if &input[inpos..inpos + 8] == b"NEG-MSG\"" { + self.neg_msg(msg, inpos + 8).await?; + } else if &input[inpos..inpos + 10] == b"NEG-CLOSE\"" { + self.neg_close(msg, inpos + 10).await?; } else { log::warn!(target: "Client", "{}: Received unhandled text message: {}", self.peer, msg); let reply = NostrReply::Notice("Command unrecognized".to_owned()); @@ -435,6 +443,233 @@ impl WebSocketService { Ok(()) } + + pub async fn neg_open(&mut self, msg: &str, mut inpos: usize) -> Result<(), Error> { + let input = msg.as_bytes(); + + // ["NEG-OPEN", "", "", ""] + + eat_whitespace(input, &mut inpos); + verify_char(input, b',', &mut inpos)?; + eat_whitespace(input, &mut inpos); + + let mut outpos = 0; + + // Read the subid into the session buffer + let subid = { + verify_char(input, b'"', &mut inpos)?; + let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer[outpos..])?; + inpos += inlen; + let subid = unsafe { + String::from_utf8_unchecked(self.buffer[outpos..outpos + outlen].to_owned()) + }; + outpos += outlen; + verify_char(input, b'"', &mut inpos)?; // FIXME: json_unescape should eat the closing quote + subid + }; + + // Read the filter into the session buffer + let filter = { + eat_whitespace(input, &mut inpos); + verify_char(input, b',', &mut inpos)?; + // whitespace after the comma is handled within Filter::from_json + let (incount, outcount, filter) = + Filter::from_json(&input[inpos..], &mut self.buffer[outpos..])?; + inpos += incount; + outpos += outcount; + filter.to_owned() + }; + + // Read the negentropy message + let incoming_msg = { + eat_whitespace(input, &mut inpos); + verify_char(input, b',', &mut inpos)?; + eat_whitespace(input, &mut inpos); + verify_char(input, b'"', &mut inpos)?; + let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer[outpos..])?; + inpos += inlen; + verify_char(input, b'"', &mut inpos)?; + let mut msg = vec![0; outlen / 2]; + read_hex!(&self.buffer[outpos..outpos + outlen], &mut msg, outlen / 2)?; + //outpos += outlen; + msg + }; + + // NEG-ERR if the message was empty + if incoming_msg.is_empty() { + let reply = NostrReply::NegErr(&subid, "Empty negentropy message".to_owned()); + self.send(Message::text(reply.as_json())).await?; + return Ok(()); + } + + // If the version is too high, respond with our version number + if incoming_msg[0] != 0x61 { + let reply = NostrReply::NegMsg(&subid, vec![0x61]); + self.send(Message::text(reply.as_json())).await?; + return Ok(()); + } + + let user = self.user; + let authorized_user = authorized_user(&user); + + // Find all matching events + let mut events: Vec<&Event> = Vec::new(); + let screen = |event: &Event| { + let event_flags = event_flags(event, &user); + screen_outgoing_event(event, &event_flags, authorized_user) + }; + let filter_events = { + let config = &*GLOBALS.config.read(); + GLOBALS.store.get().unwrap().find_events( + &filter, + config.allow_scraping, + config.allow_scrape_if_limited_to, + config.allow_scrape_if_max_seconds, + screen, + )? + }; + events.extend(filter_events); + events.sort_by(|a, b| { + a.created_at() + .cmp(&b.created_at()) + .then(a.id().cmp(&b.id())) + }); + events.dedup(); + + let mut nsv = NegentropyStorageVector::with_capacity(events.len()); + for event in &events { + let id = negentropy::Id::from_slice(event.id().as_slice())?; + let time = event.created_at().as_u64(); + nsv.insert(time, id)?; + } + nsv.seal()?; + + // Save the matching events under the subscription Id + self.neg_subscriptions.insert(subid.clone(), nsv); + + // Look it up again immediately + let Some(nsv) = self.neg_subscriptions.get(&subid) else { + return Err(ChorusError::General( + "NEG-OPEN inserted data is immediately missing!".to_owned(), + ) + .into()); + }; + + let mut neg = Negentropy::new(nsv, 1024 * 1024)?; // websocket frame size limit + match neg.reconcile(&Bytes::from(incoming_msg)) { + Ok(response) => { + let reply = NostrReply::NegMsg(&subid, response.as_bytes().to_owned()); + self.send(Message::text(reply.as_json())).await?; + } + Err(e) => { + let reply = NostrReply::NegErr(&subid, format!("{e}")); + self.send(Message::text(reply.as_json())).await?; + } + } + + Ok(()) + } + + pub async fn neg_msg(&mut self, msg: &str, mut inpos: usize) -> Result<(), Error> { + let input = msg.as_bytes(); + + // ["NEG-MSG", "", ""] + eat_whitespace(input, &mut inpos); + verify_char(input, b',', &mut inpos)?; + eat_whitespace(input, &mut inpos); + + let mut outpos = 0; + + // Read the subid into the session buffer + let subid = { + verify_char(input, b'"', &mut inpos)?; + let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer[outpos..])?; + inpos += inlen; + let subid = unsafe { + String::from_utf8_unchecked(self.buffer[outpos..outpos + outlen].to_owned()) + }; + outpos += outlen; + verify_char(input, b'"', &mut inpos)?; // FIXME: json_unescape should eat the closing quote + subid + }; + + // Read the negentropy message + let incoming_msg = { + eat_whitespace(input, &mut inpos); + verify_char(input, b',', &mut inpos)?; + eat_whitespace(input, &mut inpos); + verify_char(input, b'"', &mut inpos)?; + let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer[outpos..])?; + inpos += inlen; + verify_char(input, b'"', &mut inpos)?; + let mut msg = vec![0; outlen / 2]; + read_hex!(&self.buffer[outpos..outpos + outlen], &mut msg, outlen / 2)?; + // outpos += outlen; + msg + }; + + // NEG-ERR if the message was empty + if incoming_msg.is_empty() { + let reply = NostrReply::NegErr(&subid, "Empty negentropy message".to_owned()); + self.send(Message::text(reply.as_json())).await?; + return Ok(()); + } + + // If the version is too high, return an error (version negotiation should + // have already happened in NEG-OPEN) + if incoming_msg[0] != 0x61 { + let reply = NostrReply::NegErr(&subid, "Version mismatch".to_owned()); + self.send(Message::text(reply.as_json())).await?; + return Ok(()); + } + + // Look up the events we have + let Some(nsv) = self.neg_subscriptions.get(&subid) else { + let reply = NostrReply::NegErr(&subid, "Subscription not found".to_owned()); + self.send(Message::text(reply.as_json())).await?; + return Ok(()); + }; + + let mut neg = Negentropy::new(nsv, 1024 * 1024)?; // websocket frame size limit + match neg.reconcile(&Bytes::from(incoming_msg)) { + Ok(response) => { + let reply = NostrReply::NegMsg(&subid, response.as_bytes().to_owned()); + self.send(Message::text(reply.as_json())).await?; + } + Err(e) => { + let reply = NostrReply::NegErr(&subid, format!("{e}")); + self.send(Message::text(reply.as_json())).await?; + } + } + + Ok(()) + } + + pub async fn neg_close(&mut self, msg: &str, mut inpos: usize) -> Result<(), Error> { + let input = msg.as_bytes(); + + // ["NEG-CLOSE", ""] + + eat_whitespace(input, &mut inpos); + verify_char(input, b',', &mut inpos)?; + eat_whitespace(input, &mut inpos); + + // Read the subid into the session buffer + let subid = { + verify_char(input, b'"', &mut inpos)?; + let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer)?; + inpos += inlen; + let subid = unsafe { String::from_utf8_unchecked(self.buffer[..outlen].to_owned()) }; + verify_char(input, b'"', &mut inpos)?; // FIXME: json_unescape should eat the closing quote + subid + }; + + // Close the subscription + self.neg_subscriptions.remove(&subid); + + // No need to reply to the client + Ok(()) + } } async fn screen_incoming_event( diff --git a/src/reply.rs b/src/reply.rs index c3f3501..695d6a0 100644 --- a/src/reply.rs +++ b/src/reply.rs @@ -1,4 +1,4 @@ -use pocket_types::{Event, Hll8, Id}; +use pocket_types::{write_hex, Event, Hll8, Id}; use std::fmt; #[derive(Debug, Clone, Copy)] @@ -40,6 +40,8 @@ pub enum NostrReply<'a> { Closed(&'a str, NostrReplyPrefix, String), Notice(String), Count(&'a str, usize, Option), + NegErr(&'a str, String), + NegMsg(&'a str, Vec), } impl NostrReply<'_> { @@ -61,6 +63,16 @@ impl NostrReply<'_> { format!(r#"["COUNT","{subid}",{{"count":{c}}}]"#) } } + NostrReply::NegErr(subid, reason) => { + format!(r#"["NEG-ERR","{subid}","{reason}"]"#) + } + NostrReply::NegMsg(subid, msg) => { + // write msg as hex + let mut buf: Vec = vec![0; msg.len() * 2]; + write_hex!(msg, &mut buf, msg.len()).unwrap(); + let msg_hex = unsafe { std::str::from_utf8_unchecked(&buf) }; + format!(r#"["NEG-MSG","{subid}","{}"]"#, msg_hex) + } } } }