Implement Negentropy NIP-77 (nostr-protocol/nips PR #1494)

This commit is contained in:
Mike Dilger 2025-01-20 08:40:06 +13:00
parent ac87fc9d29
commit 2e1ca26d3e
No known key found for this signature in database
GPG Key ID: 47581A78D4329BA4
4 changed files with 377 additions and 3 deletions

View File

@ -4,6 +4,7 @@ pub mod error;
pub mod filestore;
pub mod globals;
pub mod ip;
mod neg_storage;
pub mod nostr;
pub mod reply;
pub mod tls;
@ -25,6 +26,7 @@ use hyper::{Request, Response};
use hyper_tungstenite::tungstenite;
use hyper_tungstenite::{HyperWebsocket, WebSocketStream};
use hyper_util::rt::TokioIo;
use neg_storage::NegentropyStorageVector;
use pocket_db::Store;
use pocket_types::{Id, OwnedFilter, Pubkey};
use speedy::{Readable, Writable};
@ -215,6 +217,7 @@ async fn websocket_thread(peer: HashedPeer, websocket: HyperWebsocket, origin: S
let mut ws_service = WebSocketService {
peer,
subscriptions: HashMap::new(),
neg_subscriptions: HashMap::new(),
// We start with a 1-page buffer, and grow it if needed.
buffer: vec![0; 4096],
websocket,
@ -350,6 +353,7 @@ async fn websocket_thread(peer: HashedPeer, websocket: HyperWebsocket, origin: S
struct WebSocketService {
pub peer: HashedPeer,
pub subscriptions: HashMap<String, Vec<OwnedFilter>>,
pub neg_subscriptions: HashMap<String, NegentropyStorageVector>,
pub buffer: Vec<u8>,
pub websocket: WebSocketStream<TokioIo<Upgraded>>,
pub last_message: Instant,
@ -425,7 +429,7 @@ impl WebSocketService {
tokio::select! {
instant = interval.tick() => {
// Drop them if they have no subscriptions
if self.subscriptions.is_empty() {
if self.subscriptions.is_empty() && self.neg_subscriptions.is_empty() {
// And they are idle for timeout_seconds with no subscriptions
if last_message_at + Duration::from_secs(timeout_seconds) < instant {
self.wsclose(ChorusError::TimedOut.into()).await?;

123
src/neg_storage.rs Normal file
View File

@ -0,0 +1,123 @@
use negentropy::{Bound, Error, Id, Item, NegentropyStorageBase};
// We construct our own NegentropyStorageVector since NegentropyStorageBase
// is not implemented for a &NegentropyStorageVector upstream
/// Negentropy Storage Vector
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct NegentropyStorageVector {
items: Vec<Item>,
sealed: bool,
}
impl NegentropyStorageBase for &NegentropyStorageVector {
fn size(&self) -> Result<usize, Error> {
self.check_sealed()?;
Ok(self.items.len())
}
fn get_item(&self, i: usize) -> Result<Option<Item>, Error> {
self.check_sealed()?;
Ok(self.items.get(i).copied())
}
fn iterate(
&self,
begin: usize,
end: usize,
cb: &mut dyn FnMut(Item, usize) -> Result<bool, Error>,
) -> Result<(), Error> {
self.check_sealed()?;
self.check_bounds(begin, end)?;
for i in begin..end {
if !cb(self.items[i], i)? {
break;
}
}
Ok(())
}
fn find_lower_bound(&self, mut first: usize, last: usize, value: &Bound) -> usize {
let mut count: usize = last - first;
while count > 0 {
let mut it: usize = first;
let step: usize = count / 2;
it += step;
if self.items[it] < value.item {
it += 1;
first = it;
count -= step + 1;
} else {
count = step;
}
}
first
}
}
impl NegentropyStorageVector {
/// Create new storage
#[inline]
pub fn new() -> Self {
Self::default()
}
/// Create new storage with capacity
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
Self {
items: Vec::with_capacity(capacity),
sealed: false,
}
}
/// Insert item
pub fn insert(&mut self, created_at: u64, id: Id) -> Result<(), Error> {
if self.sealed {
return Err(Error::AlreadySealed);
}
let elem: Item = Item::with_timestamp_and_id(created_at, id);
self.items.push(elem);
Ok(())
}
/// Seal
pub fn seal(&mut self) -> Result<(), Error> {
if self.sealed {
return Err(Error::AlreadySealed);
}
self.sealed = true;
self.items.sort();
self.items.dedup();
Ok(())
}
/// Unseal
pub fn unseal(&mut self) -> Result<(), Error> {
self.sealed = false;
Ok(())
}
fn check_sealed(&self) -> Result<(), Error> {
if !self.sealed {
return Err(Error::NotSealed);
}
Ok(())
}
fn check_bounds(&self, begin: usize, end: usize) -> Result<(), Error> {
if begin > end || end > self.items.len() {
return Err(Error::BadRange);
}
Ok(())
}
}

View File

@ -1,10 +1,12 @@
use crate::error::{ChorusError, Error};
use crate::globals::GLOBALS;
use crate::neg_storage::NegentropyStorageVector;
use crate::reply::{NostrReply, NostrReplyPrefix};
use crate::WebSocketService;
use hyper_tungstenite::tungstenite::Message;
use negentropy::{Bytes, Negentropy};
use pocket_types::json::{eat_whitespace, json_unescape, verify_char};
use pocket_types::{Event, Filter, Hll8, Kind, OwnedFilter, Pubkey, Time};
use pocket_types::{read_hex, Event, Filter, Hll8, Kind, OwnedFilter, Pubkey, Time};
use url::Url;
impl WebSocketService {
@ -32,6 +34,12 @@ impl WebSocketService {
self.close(msg, inpos + 6).await?;
} else if &input[inpos..inpos + 5] == b"AUTH\"" {
self.auth(msg, inpos + 5).await?;
} else if &input[inpos..inpos + 9] == b"NEG-OPEN\"" {
self.neg_open(msg, inpos + 9).await?;
} else if &input[inpos..inpos + 8] == b"NEG-MSG\"" {
self.neg_msg(msg, inpos + 8).await?;
} else if &input[inpos..inpos + 10] == b"NEG-CLOSE\"" {
self.neg_close(msg, inpos + 10).await?;
} else {
log::warn!(target: "Client", "{}: Received unhandled text message: {}", self.peer, msg);
let reply = NostrReply::Notice("Command unrecognized".to_owned());
@ -435,6 +443,233 @@ impl WebSocketService {
Ok(())
}
pub async fn neg_open(&mut self, msg: &str, mut inpos: usize) -> Result<(), Error> {
let input = msg.as_bytes();
// ["NEG-OPEN", "<subid>", "<filter>", "<hex-message>"]
eat_whitespace(input, &mut inpos);
verify_char(input, b',', &mut inpos)?;
eat_whitespace(input, &mut inpos);
let mut outpos = 0;
// Read the subid into the session buffer
let subid = {
verify_char(input, b'"', &mut inpos)?;
let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer[outpos..])?;
inpos += inlen;
let subid = unsafe {
String::from_utf8_unchecked(self.buffer[outpos..outpos + outlen].to_owned())
};
outpos += outlen;
verify_char(input, b'"', &mut inpos)?; // FIXME: json_unescape should eat the closing quote
subid
};
// Read the filter into the session buffer
let filter = {
eat_whitespace(input, &mut inpos);
verify_char(input, b',', &mut inpos)?;
// whitespace after the comma is handled within Filter::from_json
let (incount, outcount, filter) =
Filter::from_json(&input[inpos..], &mut self.buffer[outpos..])?;
inpos += incount;
outpos += outcount;
filter.to_owned()
};
// Read the negentropy message
let incoming_msg = {
eat_whitespace(input, &mut inpos);
verify_char(input, b',', &mut inpos)?;
eat_whitespace(input, &mut inpos);
verify_char(input, b'"', &mut inpos)?;
let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer[outpos..])?;
inpos += inlen;
verify_char(input, b'"', &mut inpos)?;
let mut msg = vec![0; outlen / 2];
read_hex!(&self.buffer[outpos..outpos + outlen], &mut msg, outlen / 2)?;
//outpos += outlen;
msg
};
// NEG-ERR if the message was empty
if incoming_msg.is_empty() {
let reply = NostrReply::NegErr(&subid, "Empty negentropy message".to_owned());
self.send(Message::text(reply.as_json())).await?;
return Ok(());
}
// If the version is too high, respond with our version number
if incoming_msg[0] != 0x61 {
let reply = NostrReply::NegMsg(&subid, vec![0x61]);
self.send(Message::text(reply.as_json())).await?;
return Ok(());
}
let user = self.user;
let authorized_user = authorized_user(&user);
// Find all matching events
let mut events: Vec<&Event> = Vec::new();
let screen = |event: &Event| {
let event_flags = event_flags(event, &user);
screen_outgoing_event(event, &event_flags, authorized_user)
};
let filter_events = {
let config = &*GLOBALS.config.read();
GLOBALS.store.get().unwrap().find_events(
&filter,
config.allow_scraping,
config.allow_scrape_if_limited_to,
config.allow_scrape_if_max_seconds,
screen,
)?
};
events.extend(filter_events);
events.sort_by(|a, b| {
a.created_at()
.cmp(&b.created_at())
.then(a.id().cmp(&b.id()))
});
events.dedup();
let mut nsv = NegentropyStorageVector::with_capacity(events.len());
for event in &events {
let id = negentropy::Id::from_slice(event.id().as_slice())?;
let time = event.created_at().as_u64();
nsv.insert(time, id)?;
}
nsv.seal()?;
// Save the matching events under the subscription Id
self.neg_subscriptions.insert(subid.clone(), nsv);
// Look it up again immediately
let Some(nsv) = self.neg_subscriptions.get(&subid) else {
return Err(ChorusError::General(
"NEG-OPEN inserted data is immediately missing!".to_owned(),
)
.into());
};
let mut neg = Negentropy::new(nsv, 1024 * 1024)?; // websocket frame size limit
match neg.reconcile(&Bytes::from(incoming_msg)) {
Ok(response) => {
let reply = NostrReply::NegMsg(&subid, response.as_bytes().to_owned());
self.send(Message::text(reply.as_json())).await?;
}
Err(e) => {
let reply = NostrReply::NegErr(&subid, format!("{e}"));
self.send(Message::text(reply.as_json())).await?;
}
}
Ok(())
}
pub async fn neg_msg(&mut self, msg: &str, mut inpos: usize) -> Result<(), Error> {
let input = msg.as_bytes();
// ["NEG-MSG", "<subid>", "<hex-message>"]
eat_whitespace(input, &mut inpos);
verify_char(input, b',', &mut inpos)?;
eat_whitespace(input, &mut inpos);
let mut outpos = 0;
// Read the subid into the session buffer
let subid = {
verify_char(input, b'"', &mut inpos)?;
let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer[outpos..])?;
inpos += inlen;
let subid = unsafe {
String::from_utf8_unchecked(self.buffer[outpos..outpos + outlen].to_owned())
};
outpos += outlen;
verify_char(input, b'"', &mut inpos)?; // FIXME: json_unescape should eat the closing quote
subid
};
// Read the negentropy message
let incoming_msg = {
eat_whitespace(input, &mut inpos);
verify_char(input, b',', &mut inpos)?;
eat_whitespace(input, &mut inpos);
verify_char(input, b'"', &mut inpos)?;
let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer[outpos..])?;
inpos += inlen;
verify_char(input, b'"', &mut inpos)?;
let mut msg = vec![0; outlen / 2];
read_hex!(&self.buffer[outpos..outpos + outlen], &mut msg, outlen / 2)?;
// outpos += outlen;
msg
};
// NEG-ERR if the message was empty
if incoming_msg.is_empty() {
let reply = NostrReply::NegErr(&subid, "Empty negentropy message".to_owned());
self.send(Message::text(reply.as_json())).await?;
return Ok(());
}
// If the version is too high, return an error (version negotiation should
// have already happened in NEG-OPEN)
if incoming_msg[0] != 0x61 {
let reply = NostrReply::NegErr(&subid, "Version mismatch".to_owned());
self.send(Message::text(reply.as_json())).await?;
return Ok(());
}
// Look up the events we have
let Some(nsv) = self.neg_subscriptions.get(&subid) else {
let reply = NostrReply::NegErr(&subid, "Subscription not found".to_owned());
self.send(Message::text(reply.as_json())).await?;
return Ok(());
};
let mut neg = Negentropy::new(nsv, 1024 * 1024)?; // websocket frame size limit
match neg.reconcile(&Bytes::from(incoming_msg)) {
Ok(response) => {
let reply = NostrReply::NegMsg(&subid, response.as_bytes().to_owned());
self.send(Message::text(reply.as_json())).await?;
}
Err(e) => {
let reply = NostrReply::NegErr(&subid, format!("{e}"));
self.send(Message::text(reply.as_json())).await?;
}
}
Ok(())
}
pub async fn neg_close(&mut self, msg: &str, mut inpos: usize) -> Result<(), Error> {
let input = msg.as_bytes();
// ["NEG-CLOSE", "<subid>"]
eat_whitespace(input, &mut inpos);
verify_char(input, b',', &mut inpos)?;
eat_whitespace(input, &mut inpos);
// Read the subid into the session buffer
let subid = {
verify_char(input, b'"', &mut inpos)?;
let (inlen, outlen) = json_unescape(&input[inpos..], &mut self.buffer)?;
inpos += inlen;
let subid = unsafe { String::from_utf8_unchecked(self.buffer[..outlen].to_owned()) };
verify_char(input, b'"', &mut inpos)?; // FIXME: json_unescape should eat the closing quote
subid
};
// Close the subscription
self.neg_subscriptions.remove(&subid);
// No need to reply to the client
Ok(())
}
}
async fn screen_incoming_event(

View File

@ -1,4 +1,4 @@
use pocket_types::{Event, Hll8, Id};
use pocket_types::{write_hex, Event, Hll8, Id};
use std::fmt;
#[derive(Debug, Clone, Copy)]
@ -40,6 +40,8 @@ pub enum NostrReply<'a> {
Closed(&'a str, NostrReplyPrefix, String),
Notice(String),
Count(&'a str, usize, Option<Hll8>),
NegErr(&'a str, String),
NegMsg(&'a str, Vec<u8>),
}
impl NostrReply<'_> {
@ -61,6 +63,16 @@ impl NostrReply<'_> {
format!(r#"["COUNT","{subid}",{{"count":{c}}}]"#)
}
}
NostrReply::NegErr(subid, reason) => {
format!(r#"["NEG-ERR","{subid}","{reason}"]"#)
}
NostrReply::NegMsg(subid, msg) => {
// write msg as hex
let mut buf: Vec<u8> = vec![0; msg.len() * 2];
write_hex!(msg, &mut buf, msg.len()).unwrap();
let msg_hex = unsafe { std::str::from_utf8_unchecked(&buf) };
format!(r#"["NEG-MSG","{subid}","{}"]"#, msg_hex)
}
}
}
}