mirror of
https://github.com/mikedilger/chorus.git
synced 2026-03-04 06:36:27 +00:00
Fix issues around limit in multiple-group scans
This commit is contained in:
parent
4d9bd321ac
commit
c86ab862ef
@ -10,6 +10,7 @@ use heed::byteorder::BigEndian;
|
||||
use heed::types::{OwnedType, UnalignedSlice, Unit, U64};
|
||||
use heed::{Database, Env, EnvFlags, EnvOpenOptions, RwTxn};
|
||||
use speedy::{Readable, Writable};
|
||||
use std::collections::BTreeSet;
|
||||
use std::fs;
|
||||
use std::net::IpAddr;
|
||||
use std::ops::Bound;
|
||||
@ -243,60 +244,83 @@ impl Store {
|
||||
where
|
||||
F: Fn(&Event) -> bool,
|
||||
{
|
||||
let mut output: Vec<Event> = Vec::new();
|
||||
let txn = self.env.read_txn()?;
|
||||
|
||||
// We insert into a BTreeSet to keep them time-ordered
|
||||
let mut output: BTreeSet<Event> = BTreeSet::new();
|
||||
|
||||
if filter.num_ids() > 0 {
|
||||
// Fetch by id
|
||||
for id in filter.ids() {
|
||||
// Stop if limited
|
||||
if output.len() >= filter.limit() as usize {
|
||||
return Ok(output);
|
||||
break;
|
||||
}
|
||||
if let Some(event) = self.get_event_by_id(id)? {
|
||||
// and check each against the rest of the filter
|
||||
if filter.event_matches(&event)? && screen(&event) {
|
||||
output.push(event);
|
||||
output.insert(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if filter.num_authors() > 0 && filter.num_kinds() > 0 {
|
||||
// We may bring since forward if we hit the limit without going back that
|
||||
// far, so we use a mutable since:
|
||||
let mut since = filter.since();
|
||||
|
||||
for author in filter.authors() {
|
||||
'kind: for kind in filter.kinds() {
|
||||
let start_prefix = Self::key_akci(
|
||||
author,
|
||||
kind,
|
||||
filter.until(), // scan goes backwards in time
|
||||
Id([0; 32]),
|
||||
);
|
||||
let end_prefix = Self::key_akci(
|
||||
author,
|
||||
kind,
|
||||
filter.since(), // scan goes backwards in time
|
||||
Id([255; 32]),
|
||||
);
|
||||
let range = (
|
||||
Bound::Included(&*start_prefix),
|
||||
Bound::Excluded(&*end_prefix),
|
||||
);
|
||||
let txn = self.env.read_txn()?;
|
||||
let iter = self.akci.range(&txn, &range)?;
|
||||
for result in iter {
|
||||
// Stop if limited
|
||||
if output.len() >= filter.limit() as usize {
|
||||
return Ok(output);
|
||||
}
|
||||
for kind in filter.kinds() {
|
||||
let iter = {
|
||||
let start_prefix = Self::key_akci(
|
||||
author,
|
||||
kind,
|
||||
filter.until(), // scan goes backwards in time
|
||||
Id([0; 32]),
|
||||
);
|
||||
let end_prefix = Self::key_akci(author, kind, since, Id([255; 32]));
|
||||
let range = (
|
||||
Bound::Included(&*start_prefix),
|
||||
Bound::Excluded(&*end_prefix),
|
||||
);
|
||||
self.akci.range(&txn, &range)?
|
||||
};
|
||||
|
||||
// Count how many we have found of this author-kind pair, so we
|
||||
// can possibly update `since`
|
||||
let mut paircount = 0;
|
||||
|
||||
'per_event: for result in iter {
|
||||
let (_key, offset) = result?;
|
||||
if let Some(event) = self.events.get_event_by_offset(offset)? {
|
||||
// If we have gone beyond since, we can stop early
|
||||
// (We have to check because `since` might change in this loop)
|
||||
if event.created_at() < since {
|
||||
continue 'per_event;
|
||||
}
|
||||
|
||||
// check against the rest of the filter
|
||||
if filter.event_matches(&event)? && screen(&event) {
|
||||
output.push(event);
|
||||
// Accept the event
|
||||
output.insert(event);
|
||||
paircount += 1;
|
||||
|
||||
// Stop this pair if limited
|
||||
if paircount >= filter.limit() as usize {
|
||||
// Since we found the limit just among this pair,
|
||||
// potentially move since forward
|
||||
if event.created_at() > since {
|
||||
since = event.created_at();
|
||||
}
|
||||
break 'per_event;
|
||||
}
|
||||
|
||||
// If kind is replaceable (and not parameterized)
|
||||
// then don't take any more events for this author-kind
|
||||
// pair.
|
||||
// NOTE that this optimization is difficult to implement
|
||||
// for other replaceable event situations
|
||||
if kind.is_replaceable() {
|
||||
continue 'kind;
|
||||
break 'per_event;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -304,41 +328,60 @@ impl Store {
|
||||
}
|
||||
}
|
||||
} else if filter.num_authors() > 0 && !filter.tags()?.is_empty() {
|
||||
// We may bring since forward if we hit the limit without going back that
|
||||
// far, so we use a mutable since:
|
||||
let mut since = filter.since();
|
||||
|
||||
for author in filter.authors() {
|
||||
let tags = filter.tags()?;
|
||||
for mut tag in tags.iter() {
|
||||
if let Some(tag0) = tag.next() {
|
||||
if let Some(tagvalue) = tag.next() {
|
||||
let start_prefix = Self::key_atci(
|
||||
author,
|
||||
tag0[0],
|
||||
tagvalue,
|
||||
filter.until(), // scan goes backwards in time
|
||||
Id([0; 32]),
|
||||
);
|
||||
let end_prefix = Self::key_atci(
|
||||
author,
|
||||
tag0[0],
|
||||
tagvalue,
|
||||
filter.since(), // scan goes backwards in time
|
||||
Id([255; 32]),
|
||||
);
|
||||
let range = (
|
||||
Bound::Included(&*start_prefix),
|
||||
Bound::Excluded(&*end_prefix),
|
||||
);
|
||||
let txn = self.env.read_txn()?;
|
||||
let iter = self.atci.range(&txn, &range)?;
|
||||
for result in iter {
|
||||
// Stop if limited
|
||||
if output.len() >= filter.limit() as usize {
|
||||
return Ok(output);
|
||||
}
|
||||
let iter = {
|
||||
let start_prefix = Self::key_atci(
|
||||
author,
|
||||
tag0[0],
|
||||
tagvalue,
|
||||
filter.until(), // scan goes backwards in time
|
||||
Id([0; 32]),
|
||||
);
|
||||
let end_prefix =
|
||||
Self::key_atci(author, tag0[0], tagvalue, since, Id([255; 32]));
|
||||
let range = (
|
||||
Bound::Included(&*start_prefix),
|
||||
Bound::Excluded(&*end_prefix),
|
||||
);
|
||||
self.atci.range(&txn, &range)?
|
||||
};
|
||||
|
||||
// Count how many we have found of this author-tag pair, so we
|
||||
// can possibly update `since`
|
||||
let mut paircount = 0;
|
||||
|
||||
'per_event: for result in iter {
|
||||
let (_key, offset) = result?;
|
||||
if let Some(event) = self.events.get_event_by_offset(offset)? {
|
||||
// If we have gone beyond since, we can stop early
|
||||
// (We have to check because `since` might change in this loop)
|
||||
if event.created_at() < since {
|
||||
continue 'per_event;
|
||||
}
|
||||
|
||||
// check against the rest of the filter
|
||||
if filter.event_matches(&event)? && screen(&event) {
|
||||
output.push(event);
|
||||
// Accept the event
|
||||
output.insert(event);
|
||||
paircount += 1;
|
||||
|
||||
// Stop this pair if limited
|
||||
if paircount >= filter.limit() as usize {
|
||||
// Since we found the limit just among this pair,
|
||||
// potentially move since forward
|
||||
if event.created_at() > since {
|
||||
since = event.created_at();
|
||||
}
|
||||
break 'per_event;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -347,41 +390,61 @@ impl Store {
|
||||
}
|
||||
}
|
||||
} else if filter.num_kinds() > 0 && !filter.tags()?.is_empty() {
|
||||
// SKIP
|
||||
// We may bring since forward if we hit the limit without going back that
|
||||
// far, so we use a mutable since:
|
||||
let mut since = filter.since();
|
||||
|
||||
for kind in filter.kinds() {
|
||||
let tags = filter.tags()?;
|
||||
for mut tag in tags.iter() {
|
||||
if let Some(tag0) = tag.next() {
|
||||
if let Some(tagvalue) = tag.next() {
|
||||
let start_prefix = Self::key_ktci(
|
||||
kind,
|
||||
tag0[0],
|
||||
tagvalue,
|
||||
filter.until(), // scan goes backwards in time
|
||||
Id([0; 32]),
|
||||
);
|
||||
let end_prefix = Self::key_ktci(
|
||||
kind,
|
||||
tag0[0],
|
||||
tagvalue,
|
||||
filter.since(), // scan goes backwards in time
|
||||
Id([255; 32]),
|
||||
);
|
||||
let range = (
|
||||
Bound::Included(&*start_prefix),
|
||||
Bound::Excluded(&*end_prefix),
|
||||
);
|
||||
let txn = self.env.read_txn()?;
|
||||
let iter = self.ktci.range(&txn, &range)?;
|
||||
for result in iter {
|
||||
// Stop if limited
|
||||
if output.len() >= filter.limit() as usize {
|
||||
return Ok(output);
|
||||
}
|
||||
let iter = {
|
||||
let start_prefix = Self::key_ktci(
|
||||
kind,
|
||||
tag0[0],
|
||||
tagvalue,
|
||||
filter.until(), // scan goes backwards in time
|
||||
Id([0; 32]),
|
||||
);
|
||||
let end_prefix =
|
||||
Self::key_ktci(kind, tag0[0], tagvalue, since, Id([255; 32]));
|
||||
let range = (
|
||||
Bound::Included(&*start_prefix),
|
||||
Bound::Excluded(&*end_prefix),
|
||||
);
|
||||
self.ktci.range(&txn, &range)?
|
||||
};
|
||||
|
||||
// Count how many we have found of this kind-tag pair, so we
|
||||
// can possibly update `since`
|
||||
let mut paircount = 0;
|
||||
|
||||
'per_event: for result in iter {
|
||||
let (_key, offset) = result?;
|
||||
if let Some(event) = self.events.get_event_by_offset(offset)? {
|
||||
// If we have gone beyond since, we can stop early
|
||||
// (We have to check because `since` might change in this loop)
|
||||
if event.created_at() < since {
|
||||
continue 'per_event;
|
||||
}
|
||||
|
||||
// check against the rest of the filter
|
||||
if filter.event_matches(&event)? && screen(&event) {
|
||||
output.push(event);
|
||||
// Accept the event
|
||||
output.insert(event);
|
||||
paircount += 1;
|
||||
|
||||
// Stop this pair if limited
|
||||
if paircount >= filter.limit() as usize {
|
||||
// Since we found the limit just among this pair,
|
||||
// potentially move since forward
|
||||
if event.created_at() > since {
|
||||
since = event.created_at();
|
||||
}
|
||||
break 'per_event;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -390,6 +453,7 @@ impl Store {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// SCRAPE:
|
||||
let maxtime = filter.until().0.min(Time::now().0);
|
||||
let allow =
|
||||
self.allow_scraping || filter.limit() <= 100 || (maxtime - filter.since().0) < 3600;
|
||||
@ -403,7 +467,6 @@ impl Store {
|
||||
|
||||
// This is INEFFICIENT as it scans through EVERY EVENT
|
||||
let mut count = 0;
|
||||
let txn = self.env.read_txn()?;
|
||||
let iter = self.ci.iter(&txn)?;
|
||||
for result in iter {
|
||||
if count >= filter.limit() {
|
||||
@ -412,14 +475,20 @@ impl Store {
|
||||
let (_key, offset) = result?;
|
||||
if let Some(event) = self.events.get_event_by_offset(offset)? {
|
||||
if filter.event_matches(&event)? && screen(&event) {
|
||||
output.push(event);
|
||||
output.insert(event);
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(output)
|
||||
// Convert to a Vec, reverse time order, and apply limit
|
||||
Ok(output
|
||||
.iter()
|
||||
.rev()
|
||||
.take(filter.limit() as usize)
|
||||
.copied()
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Delete an event by id
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user