From 6997adc073981c3c786ab7e1244330fcc1d7afc7 Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Tue, 26 Jul 2022 17:39:55 +0200 Subject: [PATCH 1/2] daemon: bitcoin: introduce the Bitcoin poller --- src/bitcoin/d/mod.rs | 23 ++++++++++++++++++ src/bitcoin/mod.rs | 16 ++++++++++++- src/bitcoin/poller/looper.rs | 46 ++++++++++++++++++++++++++++++++++++ src/bitcoin/poller/mod.rs | 31 ++++++++++++++++++++++++ src/lib.rs | 28 +++++++++++++++++++--- 5 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 src/bitcoin/poller/looper.rs create mode 100644 src/bitcoin/poller/mod.rs diff --git a/src/bitcoin/d/mod.rs b/src/bitcoin/d/mod.rs index a1e3d6ad..a5092bb9 100644 --- a/src/bitcoin/d/mod.rs +++ b/src/bitcoin/d/mod.rs @@ -477,4 +477,27 @@ impl BitcoinD { Ok(()) } + + pub fn sync_progress(&self) -> f64 { + // TODO: don't harass revaultd, be smarter like in revaultd. + roundup_progress( + self.make_node_request("getblockchaininfo", &[]) + .get("verificationprogress") + .and_then(Json::as_f64) + .expect("No valid 'verificationprogress' in getblockchaininfo response?"), + ) + } +} + +// Bitcoind uses a guess for the value of verificationprogress. It will eventually get to +// be 1, and we want to be less conservative. +fn roundup_progress(progress: f64) -> f64 { + let precision = 10u64.pow(5) as f64; + let progress_rounded = (progress * precision + 1.0) as u64; + + if progress_rounded * 10 >= precision as u64 { + 1.0 + } else { + (progress_rounded as f64 / precision) as f64 + } } diff --git a/src/bitcoin/mod.rs b/src/bitcoin/mod.rs index b3af0811..85b15724 100644 --- a/src/bitcoin/mod.rs +++ b/src/bitcoin/mod.rs @@ -2,5 +2,19 @@ ///! ///! Broadcast transactions, poll for new unspent coins, gather fee estimates. pub mod d; +pub mod poller; -pub trait BitcoinInterface {} +use std::sync; + +/// Our Bitcoin backend. +pub trait BitcoinInterface: Send { + /// Get the progress of the block chain synchronization. + /// Returns a percentage between 0 and 1. + fn sync_progress(&self) -> f64; +} + +impl BitcoinInterface for sync::Arc> { + fn sync_progress(&self) -> f64 { + self.read().unwrap().sync_progress() + } +} diff --git a/src/bitcoin/poller/looper.rs b/src/bitcoin/poller/looper.rs new file mode 100644 index 00000000..d0b284e7 --- /dev/null +++ b/src/bitcoin/poller/looper.rs @@ -0,0 +1,46 @@ +use crate::bitcoin::BitcoinInterface; + +use std::{ + sync::{self, atomic}, + thread, time, +}; + +/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the +/// `shutdown` atomic. +pub fn looper( + bit: impl BitcoinInterface, + shutdown: sync::Arc, + poll_interval: time::Duration, +) { + let mut last_poll = None; + let mut synced = false; + + while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { + let now = time::Instant::now(); + + if let Some(last_poll) = last_poll { + if now.duration_since(last_poll) < poll_interval { + thread::sleep(time::Duration::from_millis(500)); + continue; + } + } + last_poll = Some(now); + + // Don't poll until the Bitcoin backend is fully synced. + if !synced { + let sync_progress = bit.sync_progress(); + log::info!( + "Block chain synchronization progress: {:.2}%", + sync_progress + ); + synced = sync_progress == 1.0; + if !synced { + // Avoid harassing bitcoind.. + // TODO: be smarter, like in revaultd, but more generic too. + #[cfg(not(test))] + thread::sleep(time::Duration::from_secs(30)); + continue; + } + } + } +} diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs new file mode 100644 index 00000000..745b1757 --- /dev/null +++ b/src/bitcoin/poller/mod.rs @@ -0,0 +1,31 @@ +mod looper; + +use crate::bitcoin::{poller::looper::looper, BitcoinInterface}; + +use std::{ + sync::{self, atomic}, + thread, time, +}; + +/// The Bitcoin poller handler. +pub struct Poller { + handle: thread::JoinHandle<()>, + shutdown: sync::Arc, +} + +impl Poller { + pub fn start(bit: impl BitcoinInterface + 'static, poll_interval: time::Duration) -> Poller { + let shutdown = sync::Arc::from(atomic::AtomicBool::from(false)); + let handle = thread::spawn({ + let shutdown = shutdown.clone(); + move || looper(bit, shutdown, poll_interval) + }); + + Poller { shutdown, handle } + } + + pub fn stop(self) { + self.shutdown.store(true, atomic::Ordering::Relaxed); + self.handle.join().expect("The poller loop must not fail"); + } +} diff --git a/src/lib.rs b/src/lib.rs index 6a2fccb6..462c28d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,12 +5,15 @@ mod daemonize; mod database; use crate::{ - bitcoin::d::{BitcoinD, BitcoindError}, + bitcoin::{ + d::{BitcoinD, BitcoindError}, + poller, + }, config::{config_folder_path, Config}, database::sqlite::{FreshDbOptions, SqliteDb, SqliteDbError}, }; -use std::{error, fmt, fs, io, path}; +use std::{error, fmt, fs, io, path, sync}; #[cfg(not(test))] use std::{panic, process}; @@ -173,7 +176,6 @@ impl DaemonHandle { } bitcoind.try_load_watchonly_wallet(); bitcoind.sanity_check(&config.main_descriptor, config.bitcoind_config.network)?; - bitcoind.with_retry_limit(None); log::info!("Connection to bitcoind established and checked."); // If we are on a UNIX system and they told us to daemonize, do it now. @@ -190,6 +192,12 @@ impl DaemonHandle { } } + // Spawn the bitcoind poller with a retry limit high enough that we'd fail after that. + let bitcoind = sync::Arc::from(sync::RwLock::from(bitcoind.with_retry_limit(None))); + let bit_poller = + poller::Poller::start(bitcoind.clone(), config.bitcoind_config.poll_interval_secs); + bit_poller.stop(); + Ok(Self {}) } @@ -340,6 +348,18 @@ mod tests { stream.flush().unwrap(); } + // Send them a response to 'getblockchaininfo' saying we are far from being synced + fn complete_sync_check<'a>(server: &net::TcpListener) { + let net_resp = [ + "HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1}}\n".as_bytes(), + ] + .concat(); + let (mut stream, _) = server.accept().unwrap(); + read_til_json_end(&mut stream); + stream.write_all(&net_resp).unwrap(); + stream.flush().unwrap(); + } + #[test] fn daemon_startup() { let tmp_dir = env::temp_dir().join(format!( @@ -410,6 +430,7 @@ mod tests { complete_network_check(&server); complete_wallet_check(&server, &wo_path); complete_desc_check(&server, desc_str); + complete_sync_check(&server); daemon_thread.join().unwrap(); // The datadir is created now, so if we restart it it won't create the wo wallet. @@ -423,6 +444,7 @@ mod tests { complete_network_check(&server); complete_wallet_check(&server, &wo_path); complete_desc_check(&server, desc_str); + complete_sync_check(&server); daemon_thread.join().unwrap(); fs::remove_dir_all(&tmp_dir).unwrap(); From dd37255d7b03051aa8924127112ff8fac6c38ff7 Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Wed, 27 Jul 2022 11:27:21 +0200 Subject: [PATCH 2/2] bitcoin: update our tip in the poller This introduces the DB connection in the poller thread --- src/bitcoin/d/mod.rs | 42 +++++++++++++++-- src/bitcoin/mod.rs | 27 +++++++++++ src/bitcoin/poller/looper.rs | 37 ++++++++++++++- src/bitcoin/poller/mod.rs | 13 ++++-- src/database/mod.rs | 40 ++++++++++++++++- src/database/sqlite/mod.rs | 87 +++++++++++++++++++++++++++++++----- src/lib.rs | 7 ++- 7 files changed, 230 insertions(+), 23 deletions(-) diff --git a/src/bitcoin/d/mod.rs b/src/bitcoin/d/mod.rs index a5092bb9..a10152bc 100644 --- a/src/bitcoin/d/mod.rs +++ b/src/bitcoin/d/mod.rs @@ -1,9 +1,9 @@ ///! Implementation of the Bitcoin interface using bitcoind. ///! ///! We use the RPC interface and a watchonly descriptor wallet. -use crate::config; +use crate::{bitcoin::BlockChainTip, config}; -use std::{fs, io, time::Duration}; +use std::{convert::TryInto, fs, io, str::FromStr, time::Duration}; use jsonrpc::{ arg, @@ -478,15 +478,49 @@ impl BitcoinD { Ok(()) } + fn block_chain_info(&self) -> Json { + self.make_node_request("getblockchaininfo", &[]) + } + pub fn sync_progress(&self) -> f64 { // TODO: don't harass revaultd, be smarter like in revaultd. roundup_progress( - self.make_node_request("getblockchaininfo", &[]) + self.block_chain_info() .get("verificationprogress") .and_then(Json::as_f64) .expect("No valid 'verificationprogress' in getblockchaininfo response?"), ) } + + pub fn chain_tip(&self) -> BlockChainTip { + // We use getblockchaininfo to avoid a race between getblockcount and getblockhash + let chain_info = self.block_chain_info(); + let hash = bitcoin::BlockHash::from_str( + chain_info + .get("bestblockhash") + .and_then(Json::as_str) + .expect("No valid 'bestblockhash' in 'getblockchaininfo' response?"), + ) + .expect("Invalid blockhash from bitcoind?"); + let height: i32 = chain_info + .get("blocks") + .and_then(Json::as_i64) + .expect("No valid 'blocks' in 'getblockchaininfo' response?") + .try_into() + .expect("Must fit by Bitcoin consensus"); + + BlockChainTip { hash, height } + } + + pub fn get_block_hash(&self, height: i32) -> Option { + Some( + self.make_fallible_node_request("getblockhash", ¶ms!(Json::Number(height.into()),)) + .ok()? + .as_str() + .and_then(|s| bitcoin::BlockHash::from_str(s).ok()) + .expect("bitcoind must send valid block hashes"), + ) + } } // Bitcoind uses a guess for the value of verificationprogress. It will eventually get to @@ -495,7 +529,7 @@ fn roundup_progress(progress: f64) -> f64 { let precision = 10u64.pow(5) as f64; let progress_rounded = (progress * precision + 1.0) as u64; - if progress_rounded * 10 >= precision as u64 { + if progress_rounded >= precision as u64 { 1.0 } else { (progress_rounded as f64 / precision) as f64 diff --git a/src/bitcoin/mod.rs b/src/bitcoin/mod.rs index 85b15724..fee4f427 100644 --- a/src/bitcoin/mod.rs +++ b/src/bitcoin/mod.rs @@ -6,15 +6,42 @@ pub mod poller; use std::sync; +use miniscript::bitcoin; + +/// Information about the best block in the chain +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct BlockChainTip { + pub hash: bitcoin::BlockHash, + pub height: i32, +} + /// Our Bitcoin backend. pub trait BitcoinInterface: Send { /// Get the progress of the block chain synchronization. /// Returns a percentage between 0 and 1. fn sync_progress(&self) -> f64; + + /// Get the best block info. + fn chain_tip(&self) -> BlockChainTip; + + /// Check whether this former tip is part of the current best chain. + fn is_in_chain(&self, tip: &BlockChainTip) -> bool; } impl BitcoinInterface for sync::Arc> { fn sync_progress(&self) -> f64 { self.read().unwrap().sync_progress() } + + fn chain_tip(&self) -> BlockChainTip { + self.read().unwrap().chain_tip() + } + + fn is_in_chain(&self, tip: &BlockChainTip) -> bool { + self.read() + .unwrap() + .get_block_hash(tip.height) + .map(|bh| bh == tip.hash) + .unwrap_or(false) + } } diff --git a/src/bitcoin/poller/looper.rs b/src/bitcoin/poller/looper.rs index d0b284e7..7df006fa 100644 --- a/src/bitcoin/poller/looper.rs +++ b/src/bitcoin/poller/looper.rs @@ -1,14 +1,46 @@ -use crate::bitcoin::BitcoinInterface; +use crate::{ + bitcoin::BitcoinInterface, + database::{DatabaseConnection, DatabaseInterface}, +}; use std::{ sync::{self, atomic}, thread, time, }; +fn update_tip(bit: &impl BitcoinInterface, db_conn: &mut Box) { + let bitcoin_tip = bit.chain_tip(); + + let current_tip = match db_conn.chain_tip() { + Some(tip) => tip, + None => { + db_conn.update_tip(&bitcoin_tip); + return; + } + }; + + // If the tip didn't change, there is nothing to update. + if current_tip == bitcoin_tip { + return; + } + + if bitcoin_tip.height > current_tip.height { + // Make sure we are on the same chain. + if bit.is_in_chain(¤t_tip) { + // All good, we just moved forward. Record the new tip. + db_conn.update_tip(&bitcoin_tip); + return; + } + } + + // TODO: reorg handling. +} + /// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the /// `shutdown` atomic. pub fn looper( bit: impl BitcoinInterface, + db: impl DatabaseInterface, shutdown: sync::Arc, poll_interval: time::Duration, ) { @@ -42,5 +74,8 @@ pub fn looper( continue; } } + + let mut db_conn = db.connection(); + update_tip(&bit, &mut db_conn); } } diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs index 745b1757..fa42471e 100644 --- a/src/bitcoin/poller/mod.rs +++ b/src/bitcoin/poller/mod.rs @@ -1,6 +1,9 @@ mod looper; -use crate::bitcoin::{poller::looper::looper, BitcoinInterface}; +use crate::{ + bitcoin::{poller::looper::looper, BitcoinInterface}, + database::DatabaseInterface, +}; use std::{ sync::{self, atomic}, @@ -14,11 +17,15 @@ pub struct Poller { } impl Poller { - pub fn start(bit: impl BitcoinInterface + 'static, poll_interval: time::Duration) -> Poller { + pub fn start( + bit: impl BitcoinInterface + 'static, + db: impl DatabaseInterface + 'static, + poll_interval: time::Duration, + ) -> Poller { let shutdown = sync::Arc::from(atomic::AtomicBool::from(false)); let handle = thread::spawn({ let shutdown = shutdown.clone(); - move || looper(bit, shutdown, poll_interval) + move || looper(bit, db, shutdown, poll_interval) }); Poller { shutdown, handle } diff --git a/src/database/mod.rs b/src/database/mod.rs index bf9e9bc9..80312209 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -3,4 +3,42 @@ ///! Record wallet metadata, spent and unspent coins, ongoing transactions. pub mod sqlite; -pub trait DatabaseInterface {} +use crate::{ + bitcoin::BlockChainTip, + database::sqlite::{schema::DbTip, SqliteConn, SqliteDb}, +}; + +pub trait DatabaseInterface: Send { + fn connection(&self) -> Box; +} + +impl DatabaseInterface for SqliteDb { + fn connection(&self) -> Box { + Box::new(self.connection().expect("Database must be available")) + } +} + +pub trait DatabaseConnection { + /// Get the tip of the best chain we've seen. + fn chain_tip(&mut self) -> Option; + + /// Update our best chain seen. + fn update_tip(&mut self, tip: &BlockChainTip); +} + +impl DatabaseConnection for SqliteConn { + fn chain_tip(&mut self) -> Option { + match self.db_tip() { + DbTip { + block_height: Some(height), + block_hash: Some(hash), + .. + } => Some(BlockChainTip { height, hash }), + _ => None, + } + } + + fn update_tip(&mut self, tip: &BlockChainTip) { + self.update_tip(&tip) + } +} diff --git a/src/database/sqlite/mod.rs b/src/database/sqlite/mod.rs index 87ceeb5c..c3bad332 100644 --- a/src/database/sqlite/mod.rs +++ b/src/database/sqlite/mod.rs @@ -6,11 +6,16 @@ ///! ///! We leverage SQLite's `unlock_notify` feature to synchronize writes accross connection. More ///! about it at https://sqlite.org/unlock_notify.html. -mod schema; +pub mod schema; mod utils; -use schema::{DbTip, DbWallet}; -use utils::{create_fresh_db, db_query}; +use crate::{ + bitcoin::BlockChainTip, + database::sqlite::{ + schema::{DbTip, DbWallet}, + utils::{create_fresh_db, db_exec, db_query}, + }, +}; use std::{convert::TryInto, fmt, io, path}; @@ -178,6 +183,19 @@ impl SqliteConn { .pop() .expect("There is always a row in the wallet table") } + + /// Update the network tip. + pub fn update_tip(&mut self, tip: &BlockChainTip) { + db_exec(&mut self.conn, |db_tx| { + db_tx + .execute( + "UPDATE tip SET blockheight = (?1), blockhash = (?2)", + rusqlite::params![tip.height, tip.hash.to_vec()], + ) + .map(|_| ()) + }) + .expect("Database must be available") + } } #[cfg(test)] @@ -185,6 +203,15 @@ mod tests { use super::*; use std::{env, fs, path, process, str::FromStr, thread}; + fn dummy_options() -> FreshDbOptions { + let desc_str = "wsh(andor(pk(03b506a1dbe57b4bf48c95e0c7d417b87dd3b4349d290d2e7e9ba72c912652d80a),older(10000),pk(0295e7f5d12a2061f1fd2286cefec592dff656a19f55f4f01305d6aa56630880ce)))"; + let main_descriptor = Descriptor::::from_str(desc_str).unwrap(); + FreshDbOptions { + bitcoind_network: bitcoin::Network::Bitcoin, + main_descriptor, + } + } + #[test] fn db_startup_sanity_checks() { let tmp_dir = env::temp_dir().join(format!( @@ -202,15 +229,10 @@ mod tests { .to_string() .contains("database file not found")); - let desc_str = "wsh(andor(pk(03b506a1dbe57b4bf48c95e0c7d417b87dd3b4349d290d2e7e9ba72c912652d80a),older(10000),pk(0295e7f5d12a2061f1fd2286cefec592dff656a19f55f4f01305d6aa56630880ce)))"; - let desc = Descriptor::::from_str(desc_str).unwrap(); - let options = FreshDbOptions { - bitcoind_network: bitcoin::Network::Bitcoin, - main_descriptor: desc.clone(), - }; + let options = dummy_options(); let db = SqliteDb::new(db_path.clone(), Some(options.clone())).unwrap(); - db.sanity_check(bitcoin::Network::Testnet, &desc) + db.sanity_check(bitcoin::Network::Testnet, &options.main_descriptor) .unwrap_err() .to_string() .contains("Database was created for network"); @@ -226,9 +248,50 @@ mod tests { // TODO: version check let db = SqliteDb::new(db_path.clone(), Some(options.clone())).unwrap(); - db.sanity_check(bitcoin::Network::Bitcoin, &desc).unwrap(); + db.sanity_check(bitcoin::Network::Bitcoin, &options.main_descriptor) + .unwrap(); let db = SqliteDb::new(db_path.clone(), None).unwrap(); - db.sanity_check(bitcoin::Network::Bitcoin, &desc).unwrap(); + db.sanity_check(bitcoin::Network::Bitcoin, &options.main_descriptor) + .unwrap(); + + fs::remove_dir_all(&tmp_dir).unwrap(); + } + + #[test] + fn db_tip_update() { + let tmp_dir = env::temp_dir().join(format!( + "minisafed-unit-tests-{}-{:?}", + process::id(), + thread::current().id() + )); + fs::create_dir_all(&tmp_dir).unwrap(); + + let db_path: path::PathBuf = [tmp_dir.as_path(), path::Path::new("minisafed.sqlite3")] + .iter() + .collect(); + let options = dummy_options(); + let db = SqliteDb::new(db_path.clone(), Some(options.clone())).unwrap(); + + { + let mut conn = db.connection().unwrap(); + let db_tip = conn.db_tip(); + assert!( + db_tip.block_hash.is_none() + && db_tip.block_height.is_none() + && db_tip.network == options.bitcoind_network + ); + let new_tip = BlockChainTip { + height: 746756, + hash: bitcoin::BlockHash::from_str( + "00000000000000000006d50e4c9fd269ddf690c94f422dff85e96f1a84b3a615", + ) + .unwrap(), + }; + conn.update_tip(&new_tip); + let db_tip = conn.db_tip(); + assert_eq!(db_tip.block_height.unwrap(), new_tip.height); + assert_eq!(db_tip.block_hash.unwrap(), new_tip.hash); + } fs::remove_dir_all(&tmp_dir).unwrap(); } diff --git a/src/lib.rs b/src/lib.rs index 462c28d7..7b5ec5d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -194,8 +194,11 @@ impl DaemonHandle { // Spawn the bitcoind poller with a retry limit high enough that we'd fail after that. let bitcoind = sync::Arc::from(sync::RwLock::from(bitcoind.with_retry_limit(None))); - let bit_poller = - poller::Poller::start(bitcoind.clone(), config.bitcoind_config.poll_interval_secs); + let bit_poller = poller::Poller::start( + bitcoind.clone(), + db, + config.bitcoind_config.poll_interval_secs, + ); bit_poller.stop(); Ok(Self {})