From c9ef068fa5cb0ec39a9a88ec581434f6abb84048 Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Mon, 15 Aug 2022 14:37:16 +0200 Subject: [PATCH] poller: update our coins on each poll This refactors the entire state update on each poll, to apply database changes at the very end. --- src/bitcoin/poller/looper.rs | 148 ++++++++++++++++++++++++++++++----- src/lib.rs | 13 +++ 2 files changed, 143 insertions(+), 18 deletions(-) diff --git a/src/bitcoin/poller/looper.rs b/src/bitcoin/poller/looper.rs index 6e75d4f5..3c10f537 100644 --- a/src/bitcoin/poller/looper.rs +++ b/src/bitcoin/poller/looper.rs @@ -1,6 +1,6 @@ use crate::{ - bitcoin::BitcoinInterface, - database::{DatabaseConnection, DatabaseInterface}, + bitcoin::{BitcoinInterface, BlockChainTip, UTxO}, + database::{Coin, DatabaseConnection, DatabaseInterface}, }; use std::{ @@ -8,32 +8,143 @@ use std::{ thread, time, }; -fn update_tip(bit: &impl BitcoinInterface, db_conn: &mut Box) { +use miniscript::bitcoin::{self, util::bip32}; + +#[derive(Debug, Clone)] +struct UpdatedCoins { + pub received: Vec, + pub confirmed: Vec<(bitcoin::OutPoint, i32)>, + pub spent: Vec<(bitcoin::OutPoint, bitcoin::Txid)>, +} + +// Update the state of our coins. There may be new unspent, and existing ones may become confirmed +// or spent. +// NOTE: A coin may be updated multiple times at once. That is, a coin may be received, confirmed, +// and spent in a single poll. +fn update_coins( + bit: &impl BitcoinInterface, + db_conn: &mut Box, + previous_tip: &BlockChainTip, +) -> UpdatedCoins { + // Start by fetching newly received coins. + let curr_coins = db_conn.unspent_coins(); + let mut received = Vec::new(); + for utxo in bit.received_coins(&previous_tip) { + // FIXME: have a DB table to query those... + let derivation_index = bip32::ChildNumber::from(0); + // This works because the hash only takes the outpoint into account. + if !curr_coins.contains_key(&utxo.outpoint) { + let UTxO { + outpoint, amount, .. + } = utxo; + let coin = Coin { + outpoint, + amount, + derivation_index, + block_height: None, + spend_txid: None, + }; + received.push(coin); + } + } + + // We need to take the newly received ones into account as well, as they may have been + // confirmed within the previous tip and the current one, and we may not poll this chunk of the + // chain anymore. + let to_be_confirmed: Vec = curr_coins + .values() + .chain(received.iter()) + .filter_map(|coin| { + if coin.block_height.is_none() { + Some(coin.outpoint) + } else { + None + } + }) + .collect(); + let confirmed = bit.confirmed_coins(&to_be_confirmed); + + // We need to take the newly received ones into account as well, as they may have been + // spent within the previous tip and the current one, and we may not poll this chunk of the + // chain anymore. + let to_be_spent: Vec = curr_coins + .values() + .chain(received.iter()) + .filter_map(|coin| { + if coin.spend_txid.is_none() { + Some(coin.outpoint) + } else { + None + } + }) + .collect(); + let spent = bit.spent_coins(&to_be_spent); + + UpdatedCoins { + received, + confirmed, + spent, + } +} + +// Returns the new block chain tip, if it changed. +fn new_tip(bit: &impl BitcoinInterface, current_tip: &BlockChainTip) -> Option { let bitcoin_tip = bit.chain_tip(); - let current_tip = match db_conn.chain_tip() { - Some(tip) => tip, - None => { - db_conn.update_tip(&bitcoin_tip); - return; - } - }; - // If the tip didn't change, there is nothing to update. - if current_tip == bitcoin_tip { - return; + if current_tip == &bitcoin_tip { + return None; } if bitcoin_tip.height > current_tip.height { // Make sure we are on the same chain. if bit.is_in_chain(¤t_tip) { - // All good, we just moved forward. Record the new tip. - db_conn.update_tip(&bitcoin_tip); - return; + // All good, we just moved forward. + return Some(bitcoin_tip); } } // TODO: reorg handling. + None +} + +fn updates(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { + let mut db_conn = db.connection(); + + // Check if there was a new block before updating ourselves. + let current_tip = db_conn.chain_tip().expect("Always set at first startup"); + let new_tip = new_tip(bit, ¤t_tip); + let latest_tip = new_tip.unwrap_or(current_tip); + + // Then check the state of our coins. Do it even if the tip did not change since last poll, as + // we may have unconfirmed transactions. + let updated_coins = update_coins(bit, &mut db_conn, ¤t_tip); + + // If the tip changed while we were polling our Bitcoin interface, start over. + if bit.chain_tip() != latest_tip { + log::info!("Chain tip changed while we were updating our state. Starting over."); + return updates(bit, db); + } + + // The chain tip did not change since we started our updates. Record them and the latest tip. + // Having the tip in database means that, as far as the chain is concerned, we've got all + // updates up to this block. But not more. + db_conn.new_unspent_coins(&updated_coins.received); + db_conn.confirm_coins(&updated_coins.confirmed); + db_conn.spend_coins(&updated_coins.spent); + if let Some(tip) = new_tip { + db_conn.update_tip(&tip); + } +} + +// If the database chain tip is NULL (first startup), initialize it. +fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { + let mut db_conn = db.connection(); + + if db_conn.chain_tip().is_none() { + // TODO: be smarter. We can use the timestamp of the descriptor to get a newer block hash. + db_conn.update_tip(&bit.genesis_block()); + } } /// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the @@ -47,6 +158,8 @@ pub fn looper( let mut last_poll = None; let mut synced = false; + maybe_initialize_tip(&bit, &db); + while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { let now = time::Instant::now(); @@ -75,7 +188,6 @@ pub fn looper( } } - let mut db_conn = db.connection(); - update_tip(&bit, &mut db_conn); + updates(&bit, &db); } } diff --git a/src/lib.rs b/src/lib.rs index 80f0f71c..e1a7431f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -513,6 +513,18 @@ mod tests { stream.flush().unwrap(); } + // Send them a response to 'getblockhash' with the genesis block hash + fn complete_tip_init<'a>(server: &net::TcpListener) { + let net_resp = [ + "HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f\"}\n".as_bytes(), + ] + .concat(); + let (mut stream, _) = server.accept().unwrap(); + read_til_json_end(&mut stream); + stream.write_all(&net_resp).unwrap(); + stream.flush().unwrap(); + } + // Send them a response to 'getblockchaininfo' saying we are far from being synced fn complete_sync_check<'a>(server: &net::TcpListener) { let net_resp = [ @@ -598,6 +610,7 @@ mod tests { complete_network_check(&server); complete_wallet_check(&server, &wo_path); complete_desc_check(&server, desc_str); + complete_tip_init(&server); complete_sync_check(&server); daemon_thread.join().unwrap();