From c86ab862ef3df873268dd3a4957889af0c1ce312 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Sat, 2 Mar 2024 13:03:02 +1300 Subject: [PATCH] Fix issues around limit in multiple-group scans --- chorus-lib/src/store/mod.rs | 237 +++++++++++++++++++++++------------- 1 file changed, 153 insertions(+), 84 deletions(-) diff --git a/chorus-lib/src/store/mod.rs b/chorus-lib/src/store/mod.rs index ef5f4ad..3773364 100644 --- a/chorus-lib/src/store/mod.rs +++ b/chorus-lib/src/store/mod.rs @@ -10,6 +10,7 @@ use heed::byteorder::BigEndian; use heed::types::{OwnedType, UnalignedSlice, Unit, U64}; use heed::{Database, Env, EnvFlags, EnvOpenOptions, RwTxn}; use speedy::{Readable, Writable}; +use std::collections::BTreeSet; use std::fs; use std::net::IpAddr; use std::ops::Bound; @@ -243,60 +244,83 @@ impl Store { where F: Fn(&Event) -> bool, { - let mut output: Vec = Vec::new(); + let txn = self.env.read_txn()?; + + // We insert into a BTreeSet to keep them time-ordered + let mut output: BTreeSet = BTreeSet::new(); if filter.num_ids() > 0 { // Fetch by id for id in filter.ids() { // Stop if limited if output.len() >= filter.limit() as usize { - return Ok(output); + break; } if let Some(event) = self.get_event_by_id(id)? { // and check each against the rest of the filter if filter.event_matches(&event)? && screen(&event) { - output.push(event); + output.insert(event); } } } } else if filter.num_authors() > 0 && filter.num_kinds() > 0 { + // We may bring since forward if we hit the limit without going back that + // far, so we use a mutable since: + let mut since = filter.since(); + for author in filter.authors() { - 'kind: for kind in filter.kinds() { - let start_prefix = Self::key_akci( - author, - kind, - filter.until(), // scan goes backwards in time - Id([0; 32]), - ); - let end_prefix = Self::key_akci( - author, - kind, - filter.since(), // scan goes backwards in time - Id([255; 32]), - ); - let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), - ); - let txn = self.env.read_txn()?; - let iter = self.akci.range(&txn, &range)?; - for result in iter { - // Stop if limited - if output.len() >= filter.limit() as usize { - return Ok(output); - } + for kind in filter.kinds() { + let iter = { + let start_prefix = Self::key_akci( + author, + kind, + filter.until(), // scan goes backwards in time + Id([0; 32]), + ); + let end_prefix = Self::key_akci(author, kind, since, Id([255; 32])); + let range = ( + Bound::Included(&*start_prefix), + Bound::Excluded(&*end_prefix), + ); + self.akci.range(&txn, &range)? + }; + + // Count how many we have found of this author-kind pair, so we + // can possibly update `since` + let mut paircount = 0; + + 'per_event: for result in iter { let (_key, offset) = result?; if let Some(event) = self.events.get_event_by_offset(offset)? { + // If we have gone beyond since, we can stop early + // (We have to check because `since` might change in this loop) + if event.created_at() < since { + continue 'per_event; + } + // check against the rest of the filter if filter.event_matches(&event)? && screen(&event) { - output.push(event); + // Accept the event + output.insert(event); + paircount += 1; + + // Stop this pair if limited + if paircount >= filter.limit() as usize { + // Since we found the limit just among this pair, + // potentially move since forward + if event.created_at() > since { + since = event.created_at(); + } + break 'per_event; + } + // If kind is replaceable (and not parameterized) // then don't take any more events for this author-kind // pair. // NOTE that this optimization is difficult to implement // for other replaceable event situations if kind.is_replaceable() { - continue 'kind; + break 'per_event; } } } @@ -304,41 +328,60 @@ impl Store { } } } else if filter.num_authors() > 0 && !filter.tags()?.is_empty() { + // We may bring since forward if we hit the limit without going back that + // far, so we use a mutable since: + let mut since = filter.since(); + for author in filter.authors() { let tags = filter.tags()?; for mut tag in tags.iter() { if let Some(tag0) = tag.next() { if let Some(tagvalue) = tag.next() { - let start_prefix = Self::key_atci( - author, - tag0[0], - tagvalue, - filter.until(), // scan goes backwards in time - Id([0; 32]), - ); - let end_prefix = Self::key_atci( - author, - tag0[0], - tagvalue, - filter.since(), // scan goes backwards in time - Id([255; 32]), - ); - let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), - ); - let txn = self.env.read_txn()?; - let iter = self.atci.range(&txn, &range)?; - for result in iter { - // Stop if limited - if output.len() >= filter.limit() as usize { - return Ok(output); - } + let iter = { + let start_prefix = Self::key_atci( + author, + tag0[0], + tagvalue, + filter.until(), // scan goes backwards in time + Id([0; 32]), + ); + let end_prefix = + Self::key_atci(author, tag0[0], tagvalue, since, Id([255; 32])); + let range = ( + Bound::Included(&*start_prefix), + Bound::Excluded(&*end_prefix), + ); + self.atci.range(&txn, &range)? + }; + + // Count how many we have found of this author-tag pair, so we + // can possibly update `since` + let mut paircount = 0; + + 'per_event: for result in iter { let (_key, offset) = result?; if let Some(event) = self.events.get_event_by_offset(offset)? { + // If we have gone beyond since, we can stop early + // (We have to check because `since` might change in this loop) + if event.created_at() < since { + continue 'per_event; + } + // check against the rest of the filter if filter.event_matches(&event)? && screen(&event) { - output.push(event); + // Accept the event + output.insert(event); + paircount += 1; + + // Stop this pair if limited + if paircount >= filter.limit() as usize { + // Since we found the limit just among this pair, + // potentially move since forward + if event.created_at() > since { + since = event.created_at(); + } + break 'per_event; + } } } } @@ -347,41 +390,61 @@ impl Store { } } } else if filter.num_kinds() > 0 && !filter.tags()?.is_empty() { + // SKIP + // We may bring since forward if we hit the limit without going back that + // far, so we use a mutable since: + let mut since = filter.since(); + for kind in filter.kinds() { let tags = filter.tags()?; for mut tag in tags.iter() { if let Some(tag0) = tag.next() { if let Some(tagvalue) = tag.next() { - let start_prefix = Self::key_ktci( - kind, - tag0[0], - tagvalue, - filter.until(), // scan goes backwards in time - Id([0; 32]), - ); - let end_prefix = Self::key_ktci( - kind, - tag0[0], - tagvalue, - filter.since(), // scan goes backwards in time - Id([255; 32]), - ); - let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), - ); - let txn = self.env.read_txn()?; - let iter = self.ktci.range(&txn, &range)?; - for result in iter { - // Stop if limited - if output.len() >= filter.limit() as usize { - return Ok(output); - } + let iter = { + let start_prefix = Self::key_ktci( + kind, + tag0[0], + tagvalue, + filter.until(), // scan goes backwards in time + Id([0; 32]), + ); + let end_prefix = + Self::key_ktci(kind, tag0[0], tagvalue, since, Id([255; 32])); + let range = ( + Bound::Included(&*start_prefix), + Bound::Excluded(&*end_prefix), + ); + self.ktci.range(&txn, &range)? + }; + + // Count how many we have found of this kind-tag pair, so we + // can possibly update `since` + let mut paircount = 0; + + 'per_event: for result in iter { let (_key, offset) = result?; if let Some(event) = self.events.get_event_by_offset(offset)? { + // If we have gone beyond since, we can stop early + // (We have to check because `since` might change in this loop) + if event.created_at() < since { + continue 'per_event; + } + // check against the rest of the filter if filter.event_matches(&event)? && screen(&event) { - output.push(event); + // Accept the event + output.insert(event); + paircount += 1; + + // Stop this pair if limited + if paircount >= filter.limit() as usize { + // Since we found the limit just among this pair, + // potentially move since forward + if event.created_at() > since { + since = event.created_at(); + } + break 'per_event; + } } } } @@ -390,6 +453,7 @@ impl Store { } } } else { + // SCRAPE: let maxtime = filter.until().0.min(Time::now().0); let allow = self.allow_scraping || filter.limit() <= 100 || (maxtime - filter.since().0) < 3600; @@ -403,7 +467,6 @@ impl Store { // This is INEFFICIENT as it scans through EVERY EVENT let mut count = 0; - let txn = self.env.read_txn()?; let iter = self.ci.iter(&txn)?; for result in iter { if count >= filter.limit() { @@ -412,14 +475,20 @@ impl Store { let (_key, offset) = result?; if let Some(event) = self.events.get_event_by_offset(offset)? { if filter.event_matches(&event)? && screen(&event) { - output.push(event); + output.insert(event); count += 1; } } } } - Ok(output) + // Convert to a Vec, reverse time order, and apply limit + Ok(output + .iter() + .rev() + .take(filter.limit() as usize) + .copied() + .collect()) } /// Delete an event by id