From 3ee285b7b7a261d14e2a19cdec04d6f77860f991 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Tue, 5 Mar 2024 08:13:57 +1300 Subject: [PATCH] store: Rename indexes; Improve scraping (use since and until on ci_index) --- chorus-lib/src/store/migrations.rs | 9 +- chorus-lib/src/store/mod.rs | 177 +++++++++++++++++------------ 2 files changed, 111 insertions(+), 75 deletions(-) diff --git a/chorus-lib/src/store/migrations.rs b/chorus-lib/src/store/migrations.rs index 4e6cf11..7760c09 100644 --- a/chorus-lib/src/store/migrations.rs +++ b/chorus-lib/src/store/migrations.rs @@ -47,13 +47,16 @@ impl Store { fn migrate_to_1(&self, txn: &mut RwTxn<'_>) -> Result<(), Error> { // Build ci database let loop_txn = self.env.read_txn()?; - let iter = self.ids.iter(&loop_txn)?; + let iter = self.i_index.iter(&loop_txn)?; for result in iter { let (_key, offset) = result?; if let Some(event) = self.events.get_event_by_offset(offset)? { // Index in ci - self.ci - .put(txn, &Self::key_ci(event.created_at(), event.id()), &offset)?; + self.ci_index.put( + txn, + &Self::key_ci_index(event.created_at(), event.id()), + &offset, + )?; } } diff --git a/chorus-lib/src/store/mod.rs b/chorus-lib/src/store/mod.rs index 3773364..ac401a2 100644 --- a/chorus-lib/src/store/mod.rs +++ b/chorus-lib/src/store/mod.rs @@ -20,11 +20,11 @@ pub struct Store { general: Database, UnalignedSlice>, events: EventStore, env: Env, - ids: Database, OwnedType>, - ci: Database, OwnedType>, - akci: Database, OwnedType>, - atci: Database, OwnedType>, - ktci: Database, OwnedType>, + i_index: Database, OwnedType>, + ci_index: Database, OwnedType>, + akc_index: Database, OwnedType>, + atc_index: Database, OwnedType>, + kt_index: Database, OwnedType>, deleted_offsets: Database, Unit>, deleted_events: Database, Unit>, ip_data: Database, UnalignedSlice>, @@ -58,27 +58,27 @@ impl Store { .database_options() .types::, UnalignedSlice>() .create(&mut txn)?; - let ids = env + let i_index = env .database_options() .types::, OwnedType>() .name("ids") .create(&mut txn)?; - let ci = env + let ci_index = env .database_options() .types::, OwnedType>() .name("ci") .create(&mut txn)?; - let akci = env + let akc_index = env .database_options() .types::, OwnedType>() .name("akci") .create(&mut txn)?; - let atci = env + let atc_index = env .database_options() .types::, OwnedType>() .name("atci") .create(&mut txn)?; - let ktci = env + let kt_index = env .database_options() .types::, OwnedType>() .name("ktci") @@ -99,7 +99,7 @@ impl Store { .name("ip_data") .create(&mut txn)?; - if let Ok(count) = ids.len(&txn) { + if let Ok(count) = i_index.len(&txn) { log::info!("{count} events in storage"); } if let Ok(count) = deleted_offsets.len(&txn) { @@ -118,11 +118,11 @@ impl Store { general, events, env, - ids, - ci, - akci, - atci, - ktci, + i_index, + ci_index, + akc_index, + atc_index, + kt_index, deleted_offsets, deleted_events, ip_data, @@ -158,7 +158,7 @@ impl Store { let offset; // Only if it doesn't already exist - if self.ids.get(&txn, event.id().0.as_slice())?.is_none() { + if self.i_index.get(&txn, event.id().0.as_slice())?.is_none() { // Reject event if it was deleted { let deleted_key = Self::key_deleted_events(event.id(), event.pubkey()); @@ -232,7 +232,7 @@ impl Store { /// Get an event by Id pub fn get_event_by_id(&self, id: Id) -> Result, Error> { let txn = self.env.read_txn()?; - if let Some(offset) = self.ids.get(&txn, id.0.as_slice())? { + if let Some(offset) = self.i_index.get(&txn, id.0.as_slice())? { self.events.get_event_by_offset(offset) } else { Ok(None) @@ -271,18 +271,18 @@ impl Store { for author in filter.authors() { for kind in filter.kinds() { let iter = { - let start_prefix = Self::key_akci( + let start_prefix = Self::key_akc_index( author, kind, filter.until(), // scan goes backwards in time Id([0; 32]), ); - let end_prefix = Self::key_akci(author, kind, since, Id([255; 32])); + let end_prefix = Self::key_akc_index(author, kind, since, Id([255; 32])); let range = ( Bound::Included(&*start_prefix), Bound::Excluded(&*end_prefix), ); - self.akci.range(&txn, &range)? + self.akc_index.range(&txn, &range)? }; // Count how many we have found of this author-kind pair, so we @@ -338,20 +338,25 @@ impl Store { if let Some(tag0) = tag.next() { if let Some(tagvalue) = tag.next() { let iter = { - let start_prefix = Self::key_atci( + let start_prefix = Self::key_atc_index( author, tag0[0], tagvalue, filter.until(), // scan goes backwards in time Id([0; 32]), ); - let end_prefix = - Self::key_atci(author, tag0[0], tagvalue, since, Id([255; 32])); + let end_prefix = Self::key_atc_index( + author, + tag0[0], + tagvalue, + since, + Id([255; 32]), + ); let range = ( Bound::Included(&*start_prefix), Bound::Excluded(&*end_prefix), ); - self.atci.range(&txn, &range)? + self.atc_index.range(&txn, &range)? }; // Count how many we have found of this author-tag pair, so we @@ -401,20 +406,25 @@ impl Store { if let Some(tag0) = tag.next() { if let Some(tagvalue) = tag.next() { let iter = { - let start_prefix = Self::key_ktci( + let start_prefix = Self::key_kt_index( kind, tag0[0], tagvalue, filter.until(), // scan goes backwards in time Id([0; 32]), ); - let end_prefix = - Self::key_ktci(kind, tag0[0], tagvalue, since, Id([255; 32])); + let end_prefix = Self::key_kt_index( + kind, + tag0[0], + tagvalue, + since, + Id([255; 32]), + ); let range = ( Bound::Included(&*start_prefix), Bound::Excluded(&*end_prefix), ); - self.ktci.range(&txn, &range)? + self.kt_index.range(&txn, &range)? }; // Count how many we have found of this kind-tag pair, so we @@ -461,22 +471,31 @@ impl Store { return Err(ChorusError::Scraper.into()); } - // FIXME: we can use akci to scan author + time - // we can use ktci to scan kind + time + // FIXME: we can use akc_index to scan author + time + // we can use kt_index to scan kind + time // (but we have no index yet to scan tag + time) - // This is INEFFICIENT as it scans through EVERY EVENT - let mut count = 0; - let iter = self.ci.iter(&txn)?; + // This is INEFFICIENT as it scans through many events + + let start_prefix = Self::key_ci_index( + filter.until(), // scan goes backwards + Id([0; 32]), + ); + let end_prefix = Self::key_ci_index(filter.since(), Id([255; 32])); + let range = ( + Bound::Included(&*start_prefix), + Bound::Excluded(&*end_prefix), + ); + + let iter = self.ci_index.range(&txn, &range)?; for result in iter { - if count >= filter.limit() { + if output.len() >= filter.limit() as usize { break; } let (_key, offset) = result?; if let Some(event) = self.events.get_event_by_offset(offset)? { if filter.event_matches(&event)? && screen(&event) { output.insert(event); - count += 1; } } } @@ -484,20 +503,20 @@ impl Store { // Convert to a Vec, reverse time order, and apply limit Ok(output - .iter() - .rev() - .take(filter.limit() as usize) - .copied() - .collect()) + .iter() + .rev() + .take(filter.limit() as usize) + .copied() + .collect()) } /// Delete an event by id fn delete(&self, txn: &mut RwTxn<'_>, id: Id) -> Result<(), Error> { - if let Some(offset) = self.ids.get(txn, id.0.as_slice())? { + if let Some(offset) = self.i_index.get(txn, id.0.as_slice())? { self.set_offset_as_deleted(txn, offset)?; // Also remove from the id index - self.ids.delete(txn, id.0.as_slice())?; + self.i_index.delete(txn, id.0.as_slice())?; } Ok(()) } @@ -505,16 +524,19 @@ impl Store { // Index the event fn index(&self, txn: &mut RwTxn<'_>, event: &Event, offset: usize) -> Result<(), Error> { // Index by id - self.ids.put(txn, event.id().0.as_slice(), &offset)?; + self.i_index.put(txn, event.id().0.as_slice(), &offset)?; // Index by created_at and id - self.ci - .put(txn, &Self::key_ci(event.created_at(), event.id()), &offset)?; + self.ci_index.put( + txn, + &Self::key_ci_index(event.created_at(), event.id()), + &offset, + )?; // Index by author and kind (with created_at and id) - self.akci.put( + self.akc_index.put( txn, - &Self::key_akci(event.pubkey(), event.kind(), event.created_at(), event.id()), + &Self::key_akc_index(event.pubkey(), event.kind(), event.created_at(), event.id()), &offset, )?; @@ -524,9 +546,9 @@ impl Store { if tagname.len() == 1 { if let Some(tagvalue) = tsi.next() { // Index by author and tag (with created_at and id) - self.atci.put( + self.atc_index.put( txn, - &Self::key_atci( + &Self::key_atc_index( event.pubkey(), tagname[0], tagvalue, @@ -537,9 +559,9 @@ impl Store { )?; // Index by kind and tag (with created_at and id) - self.ktci.put( + self.kt_index.put( txn, - &Self::key_ktci( + &Self::key_kt_index( event.kind(), tagname[0], tagvalue, @@ -564,9 +586,9 @@ impl Store { if tagname.len() == 1 { if let Some(tagvalue) = tsi.next() { // Index by author and tag (with created_at and id) - self.atci.delete( + self.atc_index.delete( txn, - &Self::key_atci( + &Self::key_atc_index( event.pubkey(), tagname[0], tagvalue, @@ -576,9 +598,9 @@ impl Store { )?; // Index by kind and tag (with created_at and id) - self.ktci.delete( + self.kt_index.delete( txn, - &Self::key_ktci( + &Self::key_kt_index( event.kind(), tagname[0], tagvalue, @@ -591,17 +613,17 @@ impl Store { } } - self.ci - .delete(txn, &Self::key_ci(event.created_at(), event.id()))?; + self.ci_index + .delete(txn, &Self::key_ci_index(event.created_at(), event.id()))?; - self.akci.delete( + self.akc_index.delete( txn, - &Self::key_akci(event.pubkey(), event.kind(), event.created_at(), event.id()), + &Self::key_akc_index(event.pubkey(), event.kind(), event.created_at(), event.id()), )?; // We leave it in the id map. If someone wants to load the replaced event by id // they can still do it. - // self.ids.delete(&mut txn, event.id().0.as_slice())?; + // self.i_index.delete(&mut txn, event.id().0.as_slice())?; Ok(()) } @@ -635,20 +657,20 @@ impl Store { // this deletes all the events in that group except the most recent one. fn delete_replaced(&self, txn: &mut RwTxn<'_>, event: &Event) -> Result<(), Error> { if event.kind().is_replaceable() { - let start_prefix = Self::key_akci( + let start_prefix = Self::key_akc_index( event.pubkey(), event.kind(), Time::max(), // database is ordered in reverse time Id([0; 32]), ); let end_prefix = - Self::key_akci(event.pubkey(), event.kind(), Time::min(), Id([255; 32])); + Self::key_akc_index(event.pubkey(), event.kind(), Time::min(), Id([255; 32])); let range = ( Bound::Included(&*start_prefix), Bound::Excluded(&*end_prefix), ); let loop_txn = self.env.read_txn()?; - let iter = self.akci.range(&loop_txn, &range)?; + let iter = self.akc_index.range(&loop_txn, &range)?; let mut first = true; for result in iter { // Keep the first result @@ -666,15 +688,20 @@ impl Store { let tags = event.tags()?; if let Some(identifier) = tags.get_value(b"d") { let start_prefix = - Self::key_atci(event.pubkey(), b'd', identifier, Time::max(), Id([0; 32])); - let end_prefix = - Self::key_atci(event.pubkey(), b'd', identifier, Time::min(), Id([255; 32])); + Self::key_atc_index(event.pubkey(), b'd', identifier, Time::max(), Id([0; 32])); + let end_prefix = Self::key_atc_index( + event.pubkey(), + b'd', + identifier, + Time::min(), + Id([255; 32]), + ); let range = ( Bound::Included(&*start_prefix), Bound::Excluded(&*end_prefix), ); let loop_txn = self.env.read_txn()?; - let iter = self.atci.range(&loop_txn, &range)?; + let iter = self.atc_index.range(&loop_txn, &range)?; let mut first = true; for result in iter { // Keep the first result @@ -713,7 +740,7 @@ impl Store { Ok(()) } - fn key_ci(created_at: Time, id: Id) -> Vec { + fn key_ci_index(created_at: Time, id: Id) -> Vec { let mut key: Vec = Vec::with_capacity(std::mem::size_of::