Merge #6: Bitcoin interface poller
dd37255d7b03051aa8924127112ff8fac6c38ff7 bitcoin: update our tip in the poller (Antoine Poinsot)
6997adc073981c3c786ab7e1244330fcc1d7afc7 daemon: bitcoin: introduce the Bitcoin poller (Antoine Poinsot)
Pull request description:
This introduces the Bitcoin poller, an event loop in a new thread that continuously poll our Bitcoin backend to update our state. This is kept minimal; only introduce the necessary components (Bitcoin interface, DB interface) and as such it only takes care of updating the best tip at the moment.
ACKs for top commit:
darosior:
self-ACK dd37255d7b03051aa8924127112ff8fac6c38ff7 -- again, it was inspired and adapted from revaultd, where a similar logic was reviewed and well tested. This PR was tested with the following one, #9.
Tree-SHA512: d630659c75f172754ecfb4df7892d55184b20c848eff4a6184fb45669d57aa87de9377740b5c1bccf7829d859aed8d040c84f3040bb95d478dc41fbd7db157bf
This commit is contained in:
commit
06b009a479
@ -1,9 +1,9 @@
|
||||
///! Implementation of the Bitcoin interface using bitcoind.
|
||||
///!
|
||||
///! We use the RPC interface and a watchonly descriptor wallet.
|
||||
use crate::config;
|
||||
use crate::{bitcoin::BlockChainTip, config};
|
||||
|
||||
use std::{fs, io, time::Duration};
|
||||
use std::{convert::TryInto, fs, io, str::FromStr, time::Duration};
|
||||
|
||||
use jsonrpc::{
|
||||
arg,
|
||||
@ -477,4 +477,61 @@ impl BitcoinD {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn block_chain_info(&self) -> Json {
|
||||
self.make_node_request("getblockchaininfo", &[])
|
||||
}
|
||||
|
||||
pub fn sync_progress(&self) -> f64 {
|
||||
// TODO: don't harass revaultd, be smarter like in revaultd.
|
||||
roundup_progress(
|
||||
self.block_chain_info()
|
||||
.get("verificationprogress")
|
||||
.and_then(Json::as_f64)
|
||||
.expect("No valid 'verificationprogress' in getblockchaininfo response?"),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn chain_tip(&self) -> BlockChainTip {
|
||||
// We use getblockchaininfo to avoid a race between getblockcount and getblockhash
|
||||
let chain_info = self.block_chain_info();
|
||||
let hash = bitcoin::BlockHash::from_str(
|
||||
chain_info
|
||||
.get("bestblockhash")
|
||||
.and_then(Json::as_str)
|
||||
.expect("No valid 'bestblockhash' in 'getblockchaininfo' response?"),
|
||||
)
|
||||
.expect("Invalid blockhash from bitcoind?");
|
||||
let height: i32 = chain_info
|
||||
.get("blocks")
|
||||
.and_then(Json::as_i64)
|
||||
.expect("No valid 'blocks' in 'getblockchaininfo' response?")
|
||||
.try_into()
|
||||
.expect("Must fit by Bitcoin consensus");
|
||||
|
||||
BlockChainTip { hash, height }
|
||||
}
|
||||
|
||||
pub fn get_block_hash(&self, height: i32) -> Option<bitcoin::BlockHash> {
|
||||
Some(
|
||||
self.make_fallible_node_request("getblockhash", ¶ms!(Json::Number(height.into()),))
|
||||
.ok()?
|
||||
.as_str()
|
||||
.and_then(|s| bitcoin::BlockHash::from_str(s).ok())
|
||||
.expect("bitcoind must send valid block hashes"),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Bitcoind uses a guess for the value of verificationprogress. It will eventually get to
|
||||
// be 1, and we want to be less conservative.
|
||||
fn roundup_progress(progress: f64) -> f64 {
|
||||
let precision = 10u64.pow(5) as f64;
|
||||
let progress_rounded = (progress * precision + 1.0) as u64;
|
||||
|
||||
if progress_rounded >= precision as u64 {
|
||||
1.0
|
||||
} else {
|
||||
(progress_rounded as f64 / precision) as f64
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,5 +2,46 @@
|
||||
///!
|
||||
///! Broadcast transactions, poll for new unspent coins, gather fee estimates.
|
||||
pub mod d;
|
||||
pub mod poller;
|
||||
|
||||
pub trait BitcoinInterface {}
|
||||
use std::sync;
|
||||
|
||||
use miniscript::bitcoin;
|
||||
|
||||
/// Information about the best block in the chain
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct BlockChainTip {
|
||||
pub hash: bitcoin::BlockHash,
|
||||
pub height: i32,
|
||||
}
|
||||
|
||||
/// Our Bitcoin backend.
|
||||
pub trait BitcoinInterface: Send {
|
||||
/// Get the progress of the block chain synchronization.
|
||||
/// Returns a percentage between 0 and 1.
|
||||
fn sync_progress(&self) -> f64;
|
||||
|
||||
/// Get the best block info.
|
||||
fn chain_tip(&self) -> BlockChainTip;
|
||||
|
||||
/// Check whether this former tip is part of the current best chain.
|
||||
fn is_in_chain(&self, tip: &BlockChainTip) -> bool;
|
||||
}
|
||||
|
||||
impl BitcoinInterface for sync::Arc<sync::RwLock<d::BitcoinD>> {
|
||||
fn sync_progress(&self) -> f64 {
|
||||
self.read().unwrap().sync_progress()
|
||||
}
|
||||
|
||||
fn chain_tip(&self) -> BlockChainTip {
|
||||
self.read().unwrap().chain_tip()
|
||||
}
|
||||
|
||||
fn is_in_chain(&self, tip: &BlockChainTip) -> bool {
|
||||
self.read()
|
||||
.unwrap()
|
||||
.get_block_hash(tip.height)
|
||||
.map(|bh| bh == tip.hash)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
81
src/bitcoin/poller/looper.rs
Normal file
81
src/bitcoin/poller/looper.rs
Normal file
@ -0,0 +1,81 @@
|
||||
use crate::{
|
||||
bitcoin::BitcoinInterface,
|
||||
database::{DatabaseConnection, DatabaseInterface},
|
||||
};
|
||||
|
||||
use std::{
|
||||
sync::{self, atomic},
|
||||
thread, time,
|
||||
};
|
||||
|
||||
fn update_tip(bit: &impl BitcoinInterface, db_conn: &mut Box<dyn DatabaseConnection>) {
|
||||
let bitcoin_tip = bit.chain_tip();
|
||||
|
||||
let current_tip = match db_conn.chain_tip() {
|
||||
Some(tip) => tip,
|
||||
None => {
|
||||
db_conn.update_tip(&bitcoin_tip);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// If the tip didn't change, there is nothing to update.
|
||||
if current_tip == bitcoin_tip {
|
||||
return;
|
||||
}
|
||||
|
||||
if bitcoin_tip.height > current_tip.height {
|
||||
// Make sure we are on the same chain.
|
||||
if bit.is_in_chain(¤t_tip) {
|
||||
// All good, we just moved forward. Record the new tip.
|
||||
db_conn.update_tip(&bitcoin_tip);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: reorg handling.
|
||||
}
|
||||
|
||||
/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the
|
||||
/// `shutdown` atomic.
|
||||
pub fn looper(
|
||||
bit: impl BitcoinInterface,
|
||||
db: impl DatabaseInterface,
|
||||
shutdown: sync::Arc<atomic::AtomicBool>,
|
||||
poll_interval: time::Duration,
|
||||
) {
|
||||
let mut last_poll = None;
|
||||
let mut synced = false;
|
||||
|
||||
while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() {
|
||||
let now = time::Instant::now();
|
||||
|
||||
if let Some(last_poll) = last_poll {
|
||||
if now.duration_since(last_poll) < poll_interval {
|
||||
thread::sleep(time::Duration::from_millis(500));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
last_poll = Some(now);
|
||||
|
||||
// Don't poll until the Bitcoin backend is fully synced.
|
||||
if !synced {
|
||||
let sync_progress = bit.sync_progress();
|
||||
log::info!(
|
||||
"Block chain synchronization progress: {:.2}%",
|
||||
sync_progress
|
||||
);
|
||||
synced = sync_progress == 1.0;
|
||||
if !synced {
|
||||
// Avoid harassing bitcoind..
|
||||
// TODO: be smarter, like in revaultd, but more generic too.
|
||||
#[cfg(not(test))]
|
||||
thread::sleep(time::Duration::from_secs(30));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let mut db_conn = db.connection();
|
||||
update_tip(&bit, &mut db_conn);
|
||||
}
|
||||
}
|
||||
38
src/bitcoin/poller/mod.rs
Normal file
38
src/bitcoin/poller/mod.rs
Normal file
@ -0,0 +1,38 @@
|
||||
mod looper;
|
||||
|
||||
use crate::{
|
||||
bitcoin::{poller::looper::looper, BitcoinInterface},
|
||||
database::DatabaseInterface,
|
||||
};
|
||||
|
||||
use std::{
|
||||
sync::{self, atomic},
|
||||
thread, time,
|
||||
};
|
||||
|
||||
/// The Bitcoin poller handler.
|
||||
pub struct Poller {
|
||||
handle: thread::JoinHandle<()>,
|
||||
shutdown: sync::Arc<atomic::AtomicBool>,
|
||||
}
|
||||
|
||||
impl Poller {
|
||||
pub fn start(
|
||||
bit: impl BitcoinInterface + 'static,
|
||||
db: impl DatabaseInterface + 'static,
|
||||
poll_interval: time::Duration,
|
||||
) -> Poller {
|
||||
let shutdown = sync::Arc::from(atomic::AtomicBool::from(false));
|
||||
let handle = thread::spawn({
|
||||
let shutdown = shutdown.clone();
|
||||
move || looper(bit, db, shutdown, poll_interval)
|
||||
});
|
||||
|
||||
Poller { shutdown, handle }
|
||||
}
|
||||
|
||||
pub fn stop(self) {
|
||||
self.shutdown.store(true, atomic::Ordering::Relaxed);
|
||||
self.handle.join().expect("The poller loop must not fail");
|
||||
}
|
||||
}
|
||||
@ -3,4 +3,42 @@
|
||||
///! Record wallet metadata, spent and unspent coins, ongoing transactions.
|
||||
pub mod sqlite;
|
||||
|
||||
pub trait DatabaseInterface {}
|
||||
use crate::{
|
||||
bitcoin::BlockChainTip,
|
||||
database::sqlite::{schema::DbTip, SqliteConn, SqliteDb},
|
||||
};
|
||||
|
||||
pub trait DatabaseInterface: Send {
|
||||
fn connection(&self) -> Box<dyn DatabaseConnection>;
|
||||
}
|
||||
|
||||
impl DatabaseInterface for SqliteDb {
|
||||
fn connection(&self) -> Box<dyn DatabaseConnection> {
|
||||
Box::new(self.connection().expect("Database must be available"))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait DatabaseConnection {
|
||||
/// Get the tip of the best chain we've seen.
|
||||
fn chain_tip(&mut self) -> Option<BlockChainTip>;
|
||||
|
||||
/// Update our best chain seen.
|
||||
fn update_tip(&mut self, tip: &BlockChainTip);
|
||||
}
|
||||
|
||||
impl DatabaseConnection for SqliteConn {
|
||||
fn chain_tip(&mut self) -> Option<BlockChainTip> {
|
||||
match self.db_tip() {
|
||||
DbTip {
|
||||
block_height: Some(height),
|
||||
block_hash: Some(hash),
|
||||
..
|
||||
} => Some(BlockChainTip { height, hash }),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn update_tip(&mut self, tip: &BlockChainTip) {
|
||||
self.update_tip(&tip)
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,11 +6,16 @@
|
||||
///!
|
||||
///! We leverage SQLite's `unlock_notify` feature to synchronize writes accross connection. More
|
||||
///! about it at https://sqlite.org/unlock_notify.html.
|
||||
mod schema;
|
||||
pub mod schema;
|
||||
mod utils;
|
||||
|
||||
use schema::{DbTip, DbWallet};
|
||||
use utils::{create_fresh_db, db_query};
|
||||
use crate::{
|
||||
bitcoin::BlockChainTip,
|
||||
database::sqlite::{
|
||||
schema::{DbTip, DbWallet},
|
||||
utils::{create_fresh_db, db_exec, db_query},
|
||||
},
|
||||
};
|
||||
|
||||
use std::{convert::TryInto, fmt, io, path};
|
||||
|
||||
@ -178,6 +183,19 @@ impl SqliteConn {
|
||||
.pop()
|
||||
.expect("There is always a row in the wallet table")
|
||||
}
|
||||
|
||||
/// Update the network tip.
|
||||
pub fn update_tip(&mut self, tip: &BlockChainTip) {
|
||||
db_exec(&mut self.conn, |db_tx| {
|
||||
db_tx
|
||||
.execute(
|
||||
"UPDATE tip SET blockheight = (?1), blockhash = (?2)",
|
||||
rusqlite::params![tip.height, tip.hash.to_vec()],
|
||||
)
|
||||
.map(|_| ())
|
||||
})
|
||||
.expect("Database must be available")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -185,6 +203,15 @@ mod tests {
|
||||
use super::*;
|
||||
use std::{env, fs, path, process, str::FromStr, thread};
|
||||
|
||||
fn dummy_options() -> FreshDbOptions {
|
||||
let desc_str = "wsh(andor(pk(03b506a1dbe57b4bf48c95e0c7d417b87dd3b4349d290d2e7e9ba72c912652d80a),older(10000),pk(0295e7f5d12a2061f1fd2286cefec592dff656a19f55f4f01305d6aa56630880ce)))";
|
||||
let main_descriptor = Descriptor::<DescriptorPublicKey>::from_str(desc_str).unwrap();
|
||||
FreshDbOptions {
|
||||
bitcoind_network: bitcoin::Network::Bitcoin,
|
||||
main_descriptor,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn db_startup_sanity_checks() {
|
||||
let tmp_dir = env::temp_dir().join(format!(
|
||||
@ -202,15 +229,10 @@ mod tests {
|
||||
.to_string()
|
||||
.contains("database file not found"));
|
||||
|
||||
let desc_str = "wsh(andor(pk(03b506a1dbe57b4bf48c95e0c7d417b87dd3b4349d290d2e7e9ba72c912652d80a),older(10000),pk(0295e7f5d12a2061f1fd2286cefec592dff656a19f55f4f01305d6aa56630880ce)))";
|
||||
let desc = Descriptor::<DescriptorPublicKey>::from_str(desc_str).unwrap();
|
||||
let options = FreshDbOptions {
|
||||
bitcoind_network: bitcoin::Network::Bitcoin,
|
||||
main_descriptor: desc.clone(),
|
||||
};
|
||||
let options = dummy_options();
|
||||
|
||||
let db = SqliteDb::new(db_path.clone(), Some(options.clone())).unwrap();
|
||||
db.sanity_check(bitcoin::Network::Testnet, &desc)
|
||||
db.sanity_check(bitcoin::Network::Testnet, &options.main_descriptor)
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Database was created for network");
|
||||
@ -226,9 +248,50 @@ mod tests {
|
||||
// TODO: version check
|
||||
|
||||
let db = SqliteDb::new(db_path.clone(), Some(options.clone())).unwrap();
|
||||
db.sanity_check(bitcoin::Network::Bitcoin, &desc).unwrap();
|
||||
db.sanity_check(bitcoin::Network::Bitcoin, &options.main_descriptor)
|
||||
.unwrap();
|
||||
let db = SqliteDb::new(db_path.clone(), None).unwrap();
|
||||
db.sanity_check(bitcoin::Network::Bitcoin, &desc).unwrap();
|
||||
db.sanity_check(bitcoin::Network::Bitcoin, &options.main_descriptor)
|
||||
.unwrap();
|
||||
|
||||
fs::remove_dir_all(&tmp_dir).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn db_tip_update() {
|
||||
let tmp_dir = env::temp_dir().join(format!(
|
||||
"minisafed-unit-tests-{}-{:?}",
|
||||
process::id(),
|
||||
thread::current().id()
|
||||
));
|
||||
fs::create_dir_all(&tmp_dir).unwrap();
|
||||
|
||||
let db_path: path::PathBuf = [tmp_dir.as_path(), path::Path::new("minisafed.sqlite3")]
|
||||
.iter()
|
||||
.collect();
|
||||
let options = dummy_options();
|
||||
let db = SqliteDb::new(db_path.clone(), Some(options.clone())).unwrap();
|
||||
|
||||
{
|
||||
let mut conn = db.connection().unwrap();
|
||||
let db_tip = conn.db_tip();
|
||||
assert!(
|
||||
db_tip.block_hash.is_none()
|
||||
&& db_tip.block_height.is_none()
|
||||
&& db_tip.network == options.bitcoind_network
|
||||
);
|
||||
let new_tip = BlockChainTip {
|
||||
height: 746756,
|
||||
hash: bitcoin::BlockHash::from_str(
|
||||
"00000000000000000006d50e4c9fd269ddf690c94f422dff85e96f1a84b3a615",
|
||||
)
|
||||
.unwrap(),
|
||||
};
|
||||
conn.update_tip(&new_tip);
|
||||
let db_tip = conn.db_tip();
|
||||
assert_eq!(db_tip.block_height.unwrap(), new_tip.height);
|
||||
assert_eq!(db_tip.block_hash.unwrap(), new_tip.hash);
|
||||
}
|
||||
|
||||
fs::remove_dir_all(&tmp_dir).unwrap();
|
||||
}
|
||||
|
||||
31
src/lib.rs
31
src/lib.rs
@ -8,12 +8,15 @@ pub mod descriptors;
|
||||
pub use miniscript;
|
||||
|
||||
use crate::{
|
||||
bitcoin::d::{BitcoinD, BitcoindError},
|
||||
bitcoin::{
|
||||
d::{BitcoinD, BitcoindError},
|
||||
poller,
|
||||
},
|
||||
config::{config_folder_path, Config},
|
||||
database::sqlite::{FreshDbOptions, SqliteDb, SqliteDbError},
|
||||
};
|
||||
|
||||
use std::{error, fmt, fs, io, path};
|
||||
use std::{error, fmt, fs, io, path, sync};
|
||||
|
||||
#[cfg(not(test))]
|
||||
use std::{panic, process};
|
||||
@ -176,7 +179,6 @@ impl DaemonHandle {
|
||||
}
|
||||
bitcoind.try_load_watchonly_wallet();
|
||||
bitcoind.sanity_check(&config.main_descriptor, config.bitcoind_config.network)?;
|
||||
bitcoind.with_retry_limit(None);
|
||||
log::info!("Connection to bitcoind established and checked.");
|
||||
|
||||
// If we are on a UNIX system and they told us to daemonize, do it now.
|
||||
@ -193,6 +195,15 @@ impl DaemonHandle {
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn the bitcoind poller with a retry limit high enough that we'd fail after that.
|
||||
let bitcoind = sync::Arc::from(sync::RwLock::from(bitcoind.with_retry_limit(None)));
|
||||
let bit_poller = poller::Poller::start(
|
||||
bitcoind.clone(),
|
||||
db,
|
||||
config.bitcoind_config.poll_interval_secs,
|
||||
);
|
||||
bit_poller.stop();
|
||||
|
||||
Ok(Self {})
|
||||
}
|
||||
|
||||
@ -343,6 +354,18 @@ mod tests {
|
||||
stream.flush().unwrap();
|
||||
}
|
||||
|
||||
// Send them a response to 'getblockchaininfo' saying we are far from being synced
|
||||
fn complete_sync_check<'a>(server: &net::TcpListener) {
|
||||
let net_resp = [
|
||||
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1}}\n".as_bytes(),
|
||||
]
|
||||
.concat();
|
||||
let (mut stream, _) = server.accept().unwrap();
|
||||
read_til_json_end(&mut stream);
|
||||
stream.write_all(&net_resp).unwrap();
|
||||
stream.flush().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn daemon_startup() {
|
||||
let tmp_dir = env::temp_dir().join(format!(
|
||||
@ -413,6 +436,7 @@ mod tests {
|
||||
complete_network_check(&server);
|
||||
complete_wallet_check(&server, &wo_path);
|
||||
complete_desc_check(&server, desc_str);
|
||||
complete_sync_check(&server);
|
||||
daemon_thread.join().unwrap();
|
||||
|
||||
// The datadir is created now, so if we restart it it won't create the wo wallet.
|
||||
@ -426,6 +450,7 @@ mod tests {
|
||||
complete_network_check(&server);
|
||||
complete_wallet_check(&server, &wo_path);
|
||||
complete_desc_check(&server, desc_str);
|
||||
complete_sync_check(&server);
|
||||
daemon_thread.join().unwrap();
|
||||
|
||||
fs::remove_dir_all(&tmp_dir).unwrap();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user