store: Rename indexes; Improve scraping (use since and until on ci_index)

This commit is contained in:
Mike Dilger 2024-03-05 08:13:57 +13:00
parent fda607a6a6
commit 3ee285b7b7
2 changed files with 111 additions and 75 deletions

View File

@ -47,13 +47,16 @@ impl Store {
fn migrate_to_1(&self, txn: &mut RwTxn<'_>) -> Result<(), Error> {
// Build ci database
let loop_txn = self.env.read_txn()?;
let iter = self.ids.iter(&loop_txn)?;
let iter = self.i_index.iter(&loop_txn)?;
for result in iter {
let (_key, offset) = result?;
if let Some(event) = self.events.get_event_by_offset(offset)? {
// Index in ci
self.ci
.put(txn, &Self::key_ci(event.created_at(), event.id()), &offset)?;
self.ci_index.put(
txn,
&Self::key_ci_index(event.created_at(), event.id()),
&offset,
)?;
}
}

View File

@ -20,11 +20,11 @@ pub struct Store {
general: Database<UnalignedSlice<u8>, UnalignedSlice<u8>>,
events: EventStore,
env: Env,
ids: Database<UnalignedSlice<u8>, OwnedType<usize>>,
ci: Database<UnalignedSlice<u8>, OwnedType<usize>>,
akci: Database<UnalignedSlice<u8>, OwnedType<usize>>,
atci: Database<UnalignedSlice<u8>, OwnedType<usize>>,
ktci: Database<UnalignedSlice<u8>, OwnedType<usize>>,
i_index: Database<UnalignedSlice<u8>, OwnedType<usize>>,
ci_index: Database<UnalignedSlice<u8>, OwnedType<usize>>,
akc_index: Database<UnalignedSlice<u8>, OwnedType<usize>>,
atc_index: Database<UnalignedSlice<u8>, OwnedType<usize>>,
kt_index: Database<UnalignedSlice<u8>, OwnedType<usize>>,
deleted_offsets: Database<U64<BigEndian>, Unit>,
deleted_events: Database<UnalignedSlice<u8>, Unit>,
ip_data: Database<UnalignedSlice<u8>, UnalignedSlice<u8>>,
@ -58,27 +58,27 @@ impl Store {
.database_options()
.types::<UnalignedSlice<u8>, UnalignedSlice<u8>>()
.create(&mut txn)?;
let ids = env
let i_index = env
.database_options()
.types::<UnalignedSlice<u8>, OwnedType<usize>>()
.name("ids")
.create(&mut txn)?;
let ci = env
let ci_index = env
.database_options()
.types::<UnalignedSlice<u8>, OwnedType<usize>>()
.name("ci")
.create(&mut txn)?;
let akci = env
let akc_index = env
.database_options()
.types::<UnalignedSlice<u8>, OwnedType<usize>>()
.name("akci")
.create(&mut txn)?;
let atci = env
let atc_index = env
.database_options()
.types::<UnalignedSlice<u8>, OwnedType<usize>>()
.name("atci")
.create(&mut txn)?;
let ktci = env
let kt_index = env
.database_options()
.types::<UnalignedSlice<u8>, OwnedType<usize>>()
.name("ktci")
@ -99,7 +99,7 @@ impl Store {
.name("ip_data")
.create(&mut txn)?;
if let Ok(count) = ids.len(&txn) {
if let Ok(count) = i_index.len(&txn) {
log::info!("{count} events in storage");
}
if let Ok(count) = deleted_offsets.len(&txn) {
@ -118,11 +118,11 @@ impl Store {
general,
events,
env,
ids,
ci,
akci,
atci,
ktci,
i_index,
ci_index,
akc_index,
atc_index,
kt_index,
deleted_offsets,
deleted_events,
ip_data,
@ -158,7 +158,7 @@ impl Store {
let offset;
// Only if it doesn't already exist
if self.ids.get(&txn, event.id().0.as_slice())?.is_none() {
if self.i_index.get(&txn, event.id().0.as_slice())?.is_none() {
// Reject event if it was deleted
{
let deleted_key = Self::key_deleted_events(event.id(), event.pubkey());
@ -232,7 +232,7 @@ impl Store {
/// Get an event by Id
pub fn get_event_by_id(&self, id: Id) -> Result<Option<Event>, Error> {
let txn = self.env.read_txn()?;
if let Some(offset) = self.ids.get(&txn, id.0.as_slice())? {
if let Some(offset) = self.i_index.get(&txn, id.0.as_slice())? {
self.events.get_event_by_offset(offset)
} else {
Ok(None)
@ -271,18 +271,18 @@ impl Store {
for author in filter.authors() {
for kind in filter.kinds() {
let iter = {
let start_prefix = Self::key_akci(
let start_prefix = Self::key_akc_index(
author,
kind,
filter.until(), // scan goes backwards in time
Id([0; 32]),
);
let end_prefix = Self::key_akci(author, kind, since, Id([255; 32]));
let end_prefix = Self::key_akc_index(author, kind, since, Id([255; 32]));
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
);
self.akci.range(&txn, &range)?
self.akc_index.range(&txn, &range)?
};
// Count how many we have found of this author-kind pair, so we
@ -338,20 +338,25 @@ impl Store {
if let Some(tag0) = tag.next() {
if let Some(tagvalue) = tag.next() {
let iter = {
let start_prefix = Self::key_atci(
let start_prefix = Self::key_atc_index(
author,
tag0[0],
tagvalue,
filter.until(), // scan goes backwards in time
Id([0; 32]),
);
let end_prefix =
Self::key_atci(author, tag0[0], tagvalue, since, Id([255; 32]));
let end_prefix = Self::key_atc_index(
author,
tag0[0],
tagvalue,
since,
Id([255; 32]),
);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
);
self.atci.range(&txn, &range)?
self.atc_index.range(&txn, &range)?
};
// Count how many we have found of this author-tag pair, so we
@ -401,20 +406,25 @@ impl Store {
if let Some(tag0) = tag.next() {
if let Some(tagvalue) = tag.next() {
let iter = {
let start_prefix = Self::key_ktci(
let start_prefix = Self::key_kt_index(
kind,
tag0[0],
tagvalue,
filter.until(), // scan goes backwards in time
Id([0; 32]),
);
let end_prefix =
Self::key_ktci(kind, tag0[0], tagvalue, since, Id([255; 32]));
let end_prefix = Self::key_kt_index(
kind,
tag0[0],
tagvalue,
since,
Id([255; 32]),
);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
);
self.ktci.range(&txn, &range)?
self.kt_index.range(&txn, &range)?
};
// Count how many we have found of this kind-tag pair, so we
@ -461,22 +471,31 @@ impl Store {
return Err(ChorusError::Scraper.into());
}
// FIXME: we can use akci to scan author + time
// we can use ktci to scan kind + time
// FIXME: we can use akc_index to scan author + time
// we can use kt_index to scan kind + time
// (but we have no index yet to scan tag + time)
// This is INEFFICIENT as it scans through EVERY EVENT
let mut count = 0;
let iter = self.ci.iter(&txn)?;
// This is INEFFICIENT as it scans through many events
let start_prefix = Self::key_ci_index(
filter.until(), // scan goes backwards
Id([0; 32]),
);
let end_prefix = Self::key_ci_index(filter.since(), Id([255; 32]));
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
);
let iter = self.ci_index.range(&txn, &range)?;
for result in iter {
if count >= filter.limit() {
if output.len() >= filter.limit() as usize {
break;
}
let (_key, offset) = result?;
if let Some(event) = self.events.get_event_by_offset(offset)? {
if filter.event_matches(&event)? && screen(&event) {
output.insert(event);
count += 1;
}
}
}
@ -484,20 +503,20 @@ impl Store {
// Convert to a Vec, reverse time order, and apply limit
Ok(output
.iter()
.rev()
.take(filter.limit() as usize)
.copied()
.collect())
.iter()
.rev()
.take(filter.limit() as usize)
.copied()
.collect())
}
/// Delete an event by id
fn delete(&self, txn: &mut RwTxn<'_>, id: Id) -> Result<(), Error> {
if let Some(offset) = self.ids.get(txn, id.0.as_slice())? {
if let Some(offset) = self.i_index.get(txn, id.0.as_slice())? {
self.set_offset_as_deleted(txn, offset)?;
// Also remove from the id index
self.ids.delete(txn, id.0.as_slice())?;
self.i_index.delete(txn, id.0.as_slice())?;
}
Ok(())
}
@ -505,16 +524,19 @@ impl Store {
// Index the event
fn index(&self, txn: &mut RwTxn<'_>, event: &Event, offset: usize) -> Result<(), Error> {
// Index by id
self.ids.put(txn, event.id().0.as_slice(), &offset)?;
self.i_index.put(txn, event.id().0.as_slice(), &offset)?;
// Index by created_at and id
self.ci
.put(txn, &Self::key_ci(event.created_at(), event.id()), &offset)?;
self.ci_index.put(
txn,
&Self::key_ci_index(event.created_at(), event.id()),
&offset,
)?;
// Index by author and kind (with created_at and id)
self.akci.put(
self.akc_index.put(
txn,
&Self::key_akci(event.pubkey(), event.kind(), event.created_at(), event.id()),
&Self::key_akc_index(event.pubkey(), event.kind(), event.created_at(), event.id()),
&offset,
)?;
@ -524,9 +546,9 @@ impl Store {
if tagname.len() == 1 {
if let Some(tagvalue) = tsi.next() {
// Index by author and tag (with created_at and id)
self.atci.put(
self.atc_index.put(
txn,
&Self::key_atci(
&Self::key_atc_index(
event.pubkey(),
tagname[0],
tagvalue,
@ -537,9 +559,9 @@ impl Store {
)?;
// Index by kind and tag (with created_at and id)
self.ktci.put(
self.kt_index.put(
txn,
&Self::key_ktci(
&Self::key_kt_index(
event.kind(),
tagname[0],
tagvalue,
@ -564,9 +586,9 @@ impl Store {
if tagname.len() == 1 {
if let Some(tagvalue) = tsi.next() {
// Index by author and tag (with created_at and id)
self.atci.delete(
self.atc_index.delete(
txn,
&Self::key_atci(
&Self::key_atc_index(
event.pubkey(),
tagname[0],
tagvalue,
@ -576,9 +598,9 @@ impl Store {
)?;
// Index by kind and tag (with created_at and id)
self.ktci.delete(
self.kt_index.delete(
txn,
&Self::key_ktci(
&Self::key_kt_index(
event.kind(),
tagname[0],
tagvalue,
@ -591,17 +613,17 @@ impl Store {
}
}
self.ci
.delete(txn, &Self::key_ci(event.created_at(), event.id()))?;
self.ci_index
.delete(txn, &Self::key_ci_index(event.created_at(), event.id()))?;
self.akci.delete(
self.akc_index.delete(
txn,
&Self::key_akci(event.pubkey(), event.kind(), event.created_at(), event.id()),
&Self::key_akc_index(event.pubkey(), event.kind(), event.created_at(), event.id()),
)?;
// We leave it in the id map. If someone wants to load the replaced event by id
// they can still do it.
// self.ids.delete(&mut txn, event.id().0.as_slice())?;
// self.i_index.delete(&mut txn, event.id().0.as_slice())?;
Ok(())
}
@ -635,20 +657,20 @@ impl Store {
// this deletes all the events in that group except the most recent one.
fn delete_replaced(&self, txn: &mut RwTxn<'_>, event: &Event) -> Result<(), Error> {
if event.kind().is_replaceable() {
let start_prefix = Self::key_akci(
let start_prefix = Self::key_akc_index(
event.pubkey(),
event.kind(),
Time::max(), // database is ordered in reverse time
Id([0; 32]),
);
let end_prefix =
Self::key_akci(event.pubkey(), event.kind(), Time::min(), Id([255; 32]));
Self::key_akc_index(event.pubkey(), event.kind(), Time::min(), Id([255; 32]));
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
);
let loop_txn = self.env.read_txn()?;
let iter = self.akci.range(&loop_txn, &range)?;
let iter = self.akc_index.range(&loop_txn, &range)?;
let mut first = true;
for result in iter {
// Keep the first result
@ -666,15 +688,20 @@ impl Store {
let tags = event.tags()?;
if let Some(identifier) = tags.get_value(b"d") {
let start_prefix =
Self::key_atci(event.pubkey(), b'd', identifier, Time::max(), Id([0; 32]));
let end_prefix =
Self::key_atci(event.pubkey(), b'd', identifier, Time::min(), Id([255; 32]));
Self::key_atc_index(event.pubkey(), b'd', identifier, Time::max(), Id([0; 32]));
let end_prefix = Self::key_atc_index(
event.pubkey(),
b'd',
identifier,
Time::min(),
Id([255; 32]),
);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
);
let loop_txn = self.env.read_txn()?;
let iter = self.atci.range(&loop_txn, &range)?;
let iter = self.atc_index.range(&loop_txn, &range)?;
let mut first = true;
for result in iter {
// Keep the first result
@ -713,7 +740,7 @@ impl Store {
Ok(())
}
fn key_ci(created_at: Time, id: Id) -> Vec<u8> {
fn key_ci_index(created_at: Time, id: Id) -> Vec<u8> {
let mut key: Vec<u8> =
Vec::with_capacity(std::mem::size_of::<Time>() + std::mem::size_of::<Id>());
key.extend((u64::MAX - created_at.0).to_be_bytes().as_slice());
@ -723,7 +750,7 @@ impl Store {
// For looking up event by Author and Kind
// author(32) + kind(2) + reversecreatedat(8) + id(32)
fn key_akci(author: Pubkey, kind: Kind, created_at: Time, id: Id) -> Vec<u8> {
fn key_akc_index(author: Pubkey, kind: Kind, created_at: Time, id: Id) -> Vec<u8> {
let mut key: Vec<u8> = Vec::with_capacity(
std::mem::size_of::<Pubkey>()
+ std::mem::size_of::<Kind>()
@ -739,7 +766,13 @@ impl Store {
// For looking up event by Author and Tag
// author(32) + tagletter(1) + fixlentag(182) + reversecreatedat(8) + id(32)
fn key_atci(author: Pubkey, letter: u8, tag_value: &[u8], created_at: Time, id: Id) -> Vec<u8> {
fn key_atc_index(
author: Pubkey,
letter: u8,
tag_value: &[u8],
created_at: Time,
id: Id,
) -> Vec<u8> {
const PADLEN: usize = 182;
let mut key: Vec<u8> = Vec::with_capacity(
std::mem::size_of::<Pubkey>()
@ -762,7 +795,7 @@ impl Store {
// For looking up event by Kind and Tag
// kind(2) + tagletter(1) + fixlentag(182) + reversecreatedat(8) + id(32)
fn key_ktci(kind: Kind, letter: u8, tag_value: &[u8], created_at: Time, id: Id) -> Vec<u8> {
fn key_kt_index(kind: Kind, letter: u8, tag_value: &[u8], created_at: Time, id: Id) -> Vec<u8> {
const PADLEN: usize = 182;
let mut key: Vec<u8> = Vec::with_capacity(
std::mem::size_of::<Kind>()