diff --git a/Cargo.lock b/Cargo.lock index 9aaf960..e06871e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,6 +59,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "bincode" version = "1.3.3" @@ -168,6 +174,7 @@ dependencies = [ name = "chorus-lib" version = "1.1.1" dependencies = [ + "base64 0.22.0", "futures", "heed", "hyper", diff --git a/chorus-bin/src/main.rs b/chorus-bin/src/main.rs index 1bd0e2b..950a98b 100644 --- a/chorus-bin/src/main.rs +++ b/chorus-bin/src/main.rs @@ -7,7 +7,7 @@ use crate::globals::GLOBALS; use crate::tls::MaybeTlsStream; use chorus_lib::config::{Config, FriendlyConfig}; use chorus_lib::error::{ChorusError, Error}; -use chorus_lib::ip::SessionExit; +use chorus_lib::ip::{HashedIp, HashedPeer, SessionExit}; use chorus_lib::reply::NostrReply; use chorus_lib::store::Store; use chorus_lib::types::{OwnedFilter, Pubkey}; @@ -22,7 +22,7 @@ use std::error::Error as StdError; use std::fs::OpenOptions; use std::future::Future; use std::io::Read; -use std::net::{IpAddr, SocketAddr}; +use std::net::IpAddr; use std::pin::Pin; use std::sync::atomic::Ordering; use std::task::{Context, Poll}; @@ -119,13 +119,17 @@ async fn main() -> Result<(), Error> { // Accepts network connections and spawn a task to serve each one v = listener.accept() => { - let (tcp_stream, peer_addr) = v?; - let ipaddr = peer_addr.ip(); + let (tcp_stream, hashed_peer) = { + let (tcp_stream, peer_addr) = v?; + let hashed_peer = HashedPeer::new(peer_addr); + (tcp_stream, hashed_peer) + }; - let ip_data = GLOBALS.store.get().unwrap().get_ip_data(ipaddr)?; + let ip_data = GLOBALS.store.get().unwrap().get_ip_data(hashed_peer.ip())?; if ip_data.is_banned() { log::debug!(target: "Client", - "{peer_addr}: Blocking reconnection until {}", + "{}: Blocking reconnection until {}", + hashed_peer.ip(), ip_data.ban_until); continue; } @@ -136,20 +140,20 @@ async fn main() -> Result<(), Error> { match tls_acceptor_clone.accept(tcp_stream).await { Err(e) => log::error!( target: "Client", - "{}: {}", peer_addr, e + "{}: {}", hashed_peer, e ), Ok(tls_stream) => { - if let Err(e) = serve(MaybeTlsStream::Rustls(tls_stream), peer_addr).await { + if let Err(e) = serve(MaybeTlsStream::Rustls(tls_stream), hashed_peer).await { log::error!( target: "Client", - "{}: {}", peer_addr, e + "{}: {}", hashed_peer, e ); } } } }); } else { - serve(MaybeTlsStream::Plain(tcp_stream), peer_addr).await?; + serve(MaybeTlsStream::Plain(tcp_stream), hashed_peer).await?; } } }; @@ -197,9 +201,9 @@ async fn main() -> Result<(), Error> { } // Serve a single network connection -async fn serve(stream: MaybeTlsStream, peer_addr: SocketAddr) -> Result<(), Error> { +async fn serve(stream: MaybeTlsStream, peer: HashedPeer) -> Result<(), Error> { // Serve the network stream with our http server and our HttpService - let service = HttpService { peer: peer_addr }; + let service = HttpService { peer }; let connection = GLOBALS .http_server @@ -214,12 +218,12 @@ async fn serve(stream: MaybeTlsStream, peer_addr: SocketAddr) -> Resu // do nothing } else { // Print in detail - log::error!(target: "Client", "{}: {:?}", peer_addr, src); + log::error!(target: "Client", "{}: {:?}", peer, src); } } else { // Print in less detail let e: Error = he.into(); - log::error!(target: "Client", "{}: {}", peer_addr, e); + log::error!(target: "Client", "{}: {}", peer, e); } } }); @@ -229,7 +233,7 @@ async fn serve(stream: MaybeTlsStream, peer_addr: SocketAddr) -> Resu // This is our per-connection HTTP service struct HttpService { - peer: SocketAddr, + peer: HashedPeer, } impl Service> for HttpService { @@ -253,7 +257,8 @@ impl Service> for HttpService { if let Some(rip) = req.headers().get("x-real-ip") { if let Ok(ripstr) = rip.to_str() { if let Ok(ipaddr) = ripstr.parse::() { - peer.set_ip(ipaddr); + let hashed_ip = HashedIp::new(ipaddr); + peer = HashedPeer::from_parts(hashed_ip, peer.port()); } } } @@ -264,7 +269,7 @@ impl Service> for HttpService { } async fn handle_http_request( - peer: SocketAddr, + peer: HashedPeer, mut request: Request, ) -> Result, Error> { let ua = match request.headers().get("user-agent") { @@ -393,7 +398,7 @@ async fn handle_http_request( } struct WebSocketService { - pub peer: SocketAddr, + pub peer: HashedPeer, pub subscriptions: HashMap>, pub buffer: Vec, pub websocket: WebSocketStream, diff --git a/chorus-bin/src/web.rs b/chorus-bin/src/web.rs index 06d2ae1..7344732 100644 --- a/chorus-bin/src/web.rs +++ b/chorus-bin/src/web.rs @@ -1,10 +1,10 @@ use crate::globals::GLOBALS; use chorus_lib::config::Config; use chorus_lib::error::Error; +use chorus_lib::ip::HashedPeer; use hyper::{Body, Request, Response, StatusCode}; -use std::net::SocketAddr; -pub async fn serve_http(peer: SocketAddr, request: Request) -> Result, Error> { +pub async fn serve_http(peer: HashedPeer, request: Request) -> Result, Error> { log::debug!(target: "Client", "{}: HTTP request for {}", peer, request.uri()); let response = Response::builder() .header("Access-Control-Allow-Origin", "*") @@ -15,7 +15,7 @@ pub async fn serve_http(peer: SocketAddr, request: Request) -> Result Result, Error> { +pub async fn serve_nip11(peer: HashedPeer) -> Result, Error> { log::debug!(target: "Client", "{}: sent NIP-11", peer); let rid = { let config = &*GLOBALS.config.read(); diff --git a/chorus-lib/Cargo.toml b/chorus-lib/Cargo.toml index 392025b..9273948 100644 --- a/chorus-lib/Cargo.toml +++ b/chorus-lib/Cargo.toml @@ -8,6 +8,7 @@ repository = "https://github.com/mikedilger/chorus" edition = "2021" [dependencies] +base64 = "0.22" futures = "0.3" heed = { git = "https://github.com/meilisearch/heed", rev = "64fd6fec293c0dee94855b8267557ce03e7ce5d8" } hyper = { version = "0.14", features = [ "http1", "server", "runtime", "stream" ] } diff --git a/chorus-lib/src/ip.rs b/chorus-lib/src/ip.rs index 769d453..8080293 100644 --- a/chorus-lib/src/ip.rs +++ b/chorus-lib/src/ip.rs @@ -1,5 +1,61 @@ use crate::types::Time; use speedy::{Readable, Writable}; +use std::net::{IpAddr, SocketAddr}; + +#[derive(Debug, Clone, Copy)] +pub struct HashedIp(pub [u8; 20], bool); + +impl std::fmt::Display for HashedIp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + unsafe { write!(f, "{}", std::str::from_utf8_unchecked(self.0.as_slice())) } + } +} + +impl HashedIp { + pub fn new(ip_addr: IpAddr) -> HashedIp { + use base64::prelude::*; + use secp256k1::hashes::{sha256, Hash}; + let bytes = ip_addr.write_to_vec().unwrap(); + let hashvalue: sha256::Hash = Hash::hash(&bytes); + let tag = BASE64_STANDARD.encode(&hashvalue.as_byte_array()[0..16]); + HashedIp( + tag.as_bytes()[..20].try_into().unwrap(), + ip_addr.is_loopback(), + ) + } + + pub fn is_loopback(&self) -> bool { + self.1 + } +} + +#[derive(Debug, Clone, Copy)] +pub struct HashedPeer(pub HashedIp, pub u16); + +impl std::fmt::Display for HashedPeer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{}", self.0, self.1) + } +} + +impl HashedPeer { + pub fn new(peer_addr: SocketAddr) -> HashedPeer { + let hashed_ip = HashedIp::new(peer_addr.ip()); + HashedPeer(hashed_ip, peer_addr.port()) + } + + pub fn from_parts(hashed_ip: HashedIp, port: u16) -> HashedPeer { + HashedPeer(hashed_ip, port) + } + + pub fn ip(&self) -> HashedIp { + self.0 + } + + pub fn port(&self) -> u16 { + self.1 + } +} // Single-session exit condition #[derive(Debug, Clone, Copy, PartialEq)] @@ -92,3 +148,17 @@ impl IpData { } } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_hashed_ip() { + let ipaddr: std::net::IpAddr = "127.0.0.1".parse().unwrap(); + println!("HashedIP={}", HashedIp::new(ipaddr).unwrap()); + + let socketaddr = std::net::SocketAddr::new(ipaddr, 80); + println!("HashedPEER={}", HashedPeer::new(socketaddr).unwrap()); + } +} diff --git a/chorus-lib/src/store/migrations.rs b/chorus-lib/src/store/migrations.rs index ad4e4ff..8fa6781 100644 --- a/chorus-lib/src/store/migrations.rs +++ b/chorus-lib/src/store/migrations.rs @@ -2,7 +2,7 @@ use super::Store; use crate::error::Error; use heed::RwTxn; -pub const CURRENT_MIGRATION_LEVEL: u32 = 2; +pub const CURRENT_MIGRATION_LEVEL: u32 = 3; impl Store { pub fn migrate(&self) -> Result<(), Error> { @@ -39,6 +39,7 @@ impl Store { match level { 1 => self.migrate_to_1(txn)?, 2 => self.migrate_to_2(txn)?, + 3 => self.migrate_to_3(txn)?, _ => panic!("Unknown migration level {level}"), } @@ -102,4 +103,10 @@ impl Store { Ok(()) } + + // Clear IP data (we are hashing now) + fn migrate_to_3(&self, txn: &mut RwTxn<'_>) -> Result<(), Error> { + self.ip_data.clear(txn)?; + Ok(()) + } } diff --git a/chorus-lib/src/store/mod.rs b/chorus-lib/src/store/mod.rs index d01b455..51b5707 100644 --- a/chorus-lib/src/store/mod.rs +++ b/chorus-lib/src/store/mod.rs @@ -5,7 +5,7 @@ mod migrations; use crate::config::Config; use crate::error::{ChorusError, Error}; -use crate::ip::IpData; +use crate::ip::{HashedIp, IpData}; use crate::types::{Event, Filter, Id, Kind, Pubkey, Time}; use heed::byteorder::BigEndian; use heed::types::{OwnedType, UnalignedSlice, Unit, U64}; @@ -13,7 +13,6 @@ use heed::{Database, Env, EnvFlags, EnvOpenOptions, RwTxn}; use speedy::{Readable, Writable}; use std::collections::BTreeSet; use std::fs; -use std::net::IpAddr; use std::ops::Bound; #[derive(Debug)] @@ -297,8 +296,12 @@ impl Store { } /// Find all events that match the filter - pub fn find_events(&self, filter: Filter, screen: F, config: &Config) - -> Result, Error> + pub fn find_events( + &self, + filter: Filter, + screen: F, + config: &Config, + ) -> Result, Error> where F: Fn(&Event) -> bool, { @@ -909,21 +912,21 @@ impl Store { Ok(()) } - pub fn get_ip_data(&self, ip: IpAddr) -> Result { - let key = ip.write_to_vec()?; + pub fn get_ip_data(&self, ip: HashedIp) -> Result { + let key = &ip.0; let txn = self.env.read_txn()?; - let bytes = match self.ip_data.get(&txn, &key)? { + let bytes = match self.ip_data.get(&txn, key)? { Some(b) => b, None => return Ok(Default::default()), }; Ok(IpData::read_from_buffer(bytes)?) } - pub fn update_ip_data(&self, ip: IpAddr, data: &IpData) -> Result<(), Error> { - let key = ip.write_to_vec()?; + pub fn update_ip_data(&self, ip: HashedIp, data: &IpData) -> Result<(), Error> { + let key = &ip.0; let mut txn = self.env.write_txn()?; let bytes = data.write_to_vec()?; - self.ip_data.put(&mut txn, &key, &bytes)?; + self.ip_data.put(&mut txn, key, &bytes)?; txn.commit()?; Ok(()) }