poller: update our coins on each poll

This refactors the entire state update on each poll, to apply database
changes at the very end.
This commit is contained in:
Antoine Poinsot 2022-08-15 14:37:16 +02:00
parent c6a25adfcd
commit c9ef068fa5
No known key found for this signature in database
GPG Key ID: E13FC145CD3F4304
2 changed files with 143 additions and 18 deletions

View File

@ -1,6 +1,6 @@
use crate::{
bitcoin::BitcoinInterface,
database::{DatabaseConnection, DatabaseInterface},
bitcoin::{BitcoinInterface, BlockChainTip, UTxO},
database::{Coin, DatabaseConnection, DatabaseInterface},
};
use std::{
@ -8,32 +8,143 @@ use std::{
thread, time,
};
fn update_tip(bit: &impl BitcoinInterface, db_conn: &mut Box<dyn DatabaseConnection>) {
use miniscript::bitcoin::{self, util::bip32};
#[derive(Debug, Clone)]
struct UpdatedCoins {
pub received: Vec<Coin>,
pub confirmed: Vec<(bitcoin::OutPoint, i32)>,
pub spent: Vec<(bitcoin::OutPoint, bitcoin::Txid)>,
}
// Update the state of our coins. There may be new unspent, and existing ones may become confirmed
// or spent.
// NOTE: A coin may be updated multiple times at once. That is, a coin may be received, confirmed,
// and spent in a single poll.
fn update_coins(
bit: &impl BitcoinInterface,
db_conn: &mut Box<dyn DatabaseConnection>,
previous_tip: &BlockChainTip,
) -> UpdatedCoins {
// Start by fetching newly received coins.
let curr_coins = db_conn.unspent_coins();
let mut received = Vec::new();
for utxo in bit.received_coins(&previous_tip) {
// FIXME: have a DB table to query those...
let derivation_index = bip32::ChildNumber::from(0);
// This works because the hash only takes the outpoint into account.
if !curr_coins.contains_key(&utxo.outpoint) {
let UTxO {
outpoint, amount, ..
} = utxo;
let coin = Coin {
outpoint,
amount,
derivation_index,
block_height: None,
spend_txid: None,
};
received.push(coin);
}
}
// We need to take the newly received ones into account as well, as they may have been
// confirmed within the previous tip and the current one, and we may not poll this chunk of the
// chain anymore.
let to_be_confirmed: Vec<bitcoin::OutPoint> = curr_coins
.values()
.chain(received.iter())
.filter_map(|coin| {
if coin.block_height.is_none() {
Some(coin.outpoint)
} else {
None
}
})
.collect();
let confirmed = bit.confirmed_coins(&to_be_confirmed);
// We need to take the newly received ones into account as well, as they may have been
// spent within the previous tip and the current one, and we may not poll this chunk of the
// chain anymore.
let to_be_spent: Vec<bitcoin::OutPoint> = curr_coins
.values()
.chain(received.iter())
.filter_map(|coin| {
if coin.spend_txid.is_none() {
Some(coin.outpoint)
} else {
None
}
})
.collect();
let spent = bit.spent_coins(&to_be_spent);
UpdatedCoins {
received,
confirmed,
spent,
}
}
// Returns the new block chain tip, if it changed.
fn new_tip(bit: &impl BitcoinInterface, current_tip: &BlockChainTip) -> Option<BlockChainTip> {
let bitcoin_tip = bit.chain_tip();
let current_tip = match db_conn.chain_tip() {
Some(tip) => tip,
None => {
db_conn.update_tip(&bitcoin_tip);
return;
}
};
// If the tip didn't change, there is nothing to update.
if current_tip == bitcoin_tip {
return;
if current_tip == &bitcoin_tip {
return None;
}
if bitcoin_tip.height > current_tip.height {
// Make sure we are on the same chain.
if bit.is_in_chain(&current_tip) {
// All good, we just moved forward. Record the new tip.
db_conn.update_tip(&bitcoin_tip);
return;
// All good, we just moved forward.
return Some(bitcoin_tip);
}
}
// TODO: reorg handling.
None
}
fn updates(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
let mut db_conn = db.connection();
// Check if there was a new block before updating ourselves.
let current_tip = db_conn.chain_tip().expect("Always set at first startup");
let new_tip = new_tip(bit, &current_tip);
let latest_tip = new_tip.unwrap_or(current_tip);
// Then check the state of our coins. Do it even if the tip did not change since last poll, as
// we may have unconfirmed transactions.
let updated_coins = update_coins(bit, &mut db_conn, &current_tip);
// If the tip changed while we were polling our Bitcoin interface, start over.
if bit.chain_tip() != latest_tip {
log::info!("Chain tip changed while we were updating our state. Starting over.");
return updates(bit, db);
}
// The chain tip did not change since we started our updates. Record them and the latest tip.
// Having the tip in database means that, as far as the chain is concerned, we've got all
// updates up to this block. But not more.
db_conn.new_unspent_coins(&updated_coins.received);
db_conn.confirm_coins(&updated_coins.confirmed);
db_conn.spend_coins(&updated_coins.spent);
if let Some(tip) = new_tip {
db_conn.update_tip(&tip);
}
}
// If the database chain tip is NULL (first startup), initialize it.
fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
let mut db_conn = db.connection();
if db_conn.chain_tip().is_none() {
// TODO: be smarter. We can use the timestamp of the descriptor to get a newer block hash.
db_conn.update_tip(&bit.genesis_block());
}
}
/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the
@ -47,6 +158,8 @@ pub fn looper(
let mut last_poll = None;
let mut synced = false;
maybe_initialize_tip(&bit, &db);
while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() {
let now = time::Instant::now();
@ -75,7 +188,6 @@ pub fn looper(
}
}
let mut db_conn = db.connection();
update_tip(&bit, &mut db_conn);
updates(&bit, &db);
}
}

View File

@ -513,6 +513,18 @@ mod tests {
stream.flush().unwrap();
}
// Send them a response to 'getblockhash' with the genesis block hash
fn complete_tip_init<'a>(server: &net::TcpListener) {
let net_resp = [
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f\"}\n".as_bytes(),
]
.concat();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(&net_resp).unwrap();
stream.flush().unwrap();
}
// Send them a response to 'getblockchaininfo' saying we are far from being synced
fn complete_sync_check<'a>(server: &net::TcpListener) {
let net_resp = [
@ -598,6 +610,7 @@ mod tests {
complete_network_check(&server);
complete_wallet_check(&server, &wo_path);
complete_desc_check(&server, desc_str);
complete_tip_init(&server);
complete_sync_check(&server);
daemon_thread.join().unwrap();