diff --git a/src/bin/daemon.rs b/src/bin/daemon.rs index 51e3c134..3917dc11 100644 --- a/src/bin/daemon.rs +++ b/src/bin/daemon.rs @@ -70,13 +70,16 @@ fn main() { process::exit(1); }); - let daemon = DaemonHandle::start_default(config).unwrap_or_else(|e| { + let handle = DaemonHandle::start_default(config, true).unwrap_or_else(|e| { log::error!("Error starting Liana daemon: {}", e); process::exit(1); }); - daemon - .rpc_server() - .expect("JSONRPC server must terminate cleanly"); + while handle.is_alive() { + thread::sleep(time::Duration::from_millis(500)); + } + if let Err(e) = handle.stop() { + log::error!("Error stopping Liana daemon: {}", e); + } // We are always logging to stdout, should it be then piped to the log file (if self) or // not. So just make sure that all messages were actually written. diff --git a/src/bitcoin/poller/looper.rs b/src/bitcoin/poller/looper.rs index bdd7bb9f..93c85749 100644 --- a/src/bitcoin/poller/looper.rs +++ b/src/bitcoin/poller/looper.rs @@ -4,11 +4,7 @@ use crate::{ descriptors, }; -use std::{ - collections::HashSet, - sync::{self, atomic}, - thread, time, -}; +use std::{collections::HashSet, sync, time}; use miniscript::bitcoin::{self, secp256k1}; @@ -302,8 +298,8 @@ fn rescan_check( } } -// If the database chain tip is NULL (first startup), initialize it. -fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { +/// If the database chain tip is NULL (first startup), initialize it. +pub fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { let mut db_conn = db.connection(); if db_conn.chain_tip().is_none() { @@ -312,7 +308,7 @@ fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface } } -fn sync_poll_interval() -> time::Duration { +pub fn sync_poll_interval() -> time::Duration { // TODO: be smarter, like in revaultd, but more generic too. #[cfg(not(test))] { @@ -333,60 +329,3 @@ pub fn poll( updates(&mut db_conn, bit, descs, secp); rescan_check(&mut db_conn, bit, descs, secp); } - -/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the -/// `shutdown` atomic. -pub fn looper( - bit: sync::Arc>, - db: sync::Arc>, - shutdown: sync::Arc, - poll_interval: time::Duration, - desc: descriptors::LianaDescriptor, -) { - let mut last_poll = None; - let mut synced = false; - let descs = [ - desc.receive_descriptor().clone(), - desc.change_descriptor().clone(), - ]; - let secp = secp256k1::Secp256k1::verification_only(); - - maybe_initialize_tip(&bit, &db); - - while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { - let now = time::Instant::now(); - - if let Some(last_poll) = last_poll { - let time_since_poll = now.duration_since(last_poll); - let poll_interval = if synced { - poll_interval - } else { - // Until we are synced we poll less often to avoid harassing bitcoind and impeding - // the sync. As a function since it's mocked for the tests. - sync_poll_interval() - }; - if time_since_poll < poll_interval { - thread::sleep(time::Duration::from_millis(500)); - continue; - } - } - last_poll = Some(now); - - // Don't poll until the Bitcoin backend is fully synced. - if !synced { - let progress = bit.sync_progress(); - log::info!( - "Block chain synchronization progress: {:.2}% ({} blocks / {} headers)", - progress.rounded_up_progress() * 100.0, - progress.blocks, - progress.headers - ); - synced = progress.is_complete(); - if !synced { - continue; - } - } - - poll(&bit, &db, &secp, &descs); - } -} diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs index 7731af2e..1825e8a1 100644 --- a/src/bitcoin/poller/mod.rs +++ b/src/bitcoin/poller/mod.rs @@ -1,60 +1,95 @@ mod looper; -use crate::{ - bitcoin::{poller::looper::looper, BitcoinInterface}, - database::DatabaseInterface, - descriptors, -}; +use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors}; use std::{ sync::{self, atomic}, thread, time, }; +use miniscript::bitcoin::secp256k1; + /// The Bitcoin poller handler. pub struct Poller { - handle: thread::JoinHandle<()>, - shutdown: sync::Arc, + bit: sync::Arc>, + db: sync::Arc>, + secp: secp256k1::Secp256k1, + // The receive and change descriptors (in this order). + descs: [descriptors::SinglePathLianaDesc; 2], } impl Poller { - pub fn start( + pub fn new( bit: sync::Arc>, db: sync::Arc>, - poll_interval: time::Duration, desc: descriptors::LianaDescriptor, ) -> Poller { - let shutdown = sync::Arc::from(atomic::AtomicBool::from(false)); - let handle = thread::Builder::new() - .name("Bitcoin poller".to_string()) - .spawn({ - let shutdown = shutdown.clone(); - move || looper(bit, db, shutdown, poll_interval, desc) - }) - .expect("Must not fail"); + let secp = secp256k1::Secp256k1::verification_only(); + let descs = [ + desc.receive_descriptor().clone(), + desc.change_descriptor().clone(), + ]; - Poller { shutdown, handle } + // On first startup the tip may be NULL. Make sure it's set as the poller relies on it. + looper::maybe_initialize_tip(&bit, &db); + + Poller { + bit, + db, + secp, + descs, + } } - pub fn trigger_stop(&self) { - self.shutdown.store(true, atomic::Ordering::Relaxed); - } + /// Continuously update our state from the Bitcoin backend. + /// - `poll_interval`: how frequently to perform an update. + /// - `shutdown`: set to true to stop continuously updating and make this function return. + /// + /// Typically this would run for the whole duration of the program in a thread, and the main + /// thread would set the `shutdown` atomic to `true` when shutting down. + pub fn poll_forever( + &self, + poll_interval: time::Duration, + shutdown: sync::Arc, + ) { + let mut last_poll = None; + let mut synced = false; - pub fn stop(self) { - self.trigger_stop(); - self.handle.join().expect("The poller loop must not fail"); - } + while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { + let now = time::Instant::now(); - #[cfg(feature = "nonblocking_shutdown")] - pub fn is_stopped(&self) -> bool { - // Doc says "This might return true for a brief moment after the thread’s main function has - // returned, but before the thread itself has stopped running.". But it's not an issue for - // us, as long as the main poller function has returned we are good. - self.handle.is_finished() - } + if let Some(last_poll) = last_poll { + let time_since_poll = now.duration_since(last_poll); + let poll_interval = if synced { + poll_interval + } else { + // Until we are synced we poll less often to avoid harassing bitcoind and impeding + // the sync. As a function since it's mocked for the tests. + looper::sync_poll_interval() + }; + if time_since_poll < poll_interval { + thread::sleep(time::Duration::from_millis(500)); + continue; + } + } + last_poll = Some(now); - #[cfg(test)] - pub fn test_stop(&mut self) { - self.shutdown.store(true, atomic::Ordering::Relaxed); + // Don't poll until the Bitcoin backend is fully synced. + if !synced { + let progress = self.bit.sync_progress(); + log::info!( + "Block chain synchronization progress: {:.2}% ({} blocks / {} headers)", + progress.rounded_up_progress() * 100.0, + progress.blocks, + progress.headers + ); + synced = progress.is_complete(); + if !synced { + continue; + } + } + + looper::poll(&self.bit, &self.db, &self.secp, &self.descs); + } } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 6cb372d7..25af7040 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1251,7 +1251,7 @@ mod tests { fn getinfo() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); // We can query getinfo - ms.handle.control.get_info(); + ms.control().get_info(); ms.shutdown(); } @@ -1259,7 +1259,7 @@ mod tests { fn getnewaddress() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); // We can get an address let addr = control.get_new_address().address; assert_eq!( @@ -1281,7 +1281,7 @@ mod tests { fn listaddresses() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let list = control.list_addresses(Some(2), Some(5)).unwrap(); @@ -1412,7 +1412,7 @@ mod tests { ), ); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); // Arguments sanity checking let dummy_addr = @@ -1900,7 +1900,7 @@ mod tests { .insert(dummy_op_a.txid, (dummy_tx.clone(), None)); dummy_bitcoind.txs.insert(dummy_op_b.txid, (dummy_tx, None)); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let mut db_conn = control.db().lock().unwrap().connection(); // Add two (unconfirmed) coins in DB @@ -2046,7 +2046,7 @@ mod tests { let dummy_txid_a = dummy_psbt_a.unsigned_tx.txid(); dummy_bitcoind.txs.insert(dummy_txid_a, (dummy_tx_a, None)); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let mut db_conn = control.db().lock().unwrap().connection(); // The spend needs to be in DB before using RBF. assert_eq!( @@ -2284,7 +2284,7 @@ mod tests { let ms = DummyLiana::new(btc, db); - let control = &ms.handle.control; + let control = &ms.control(); let transactions = control.list_confirmed_transactions(0, 4, 10).transactions; assert_eq!(transactions.len(), 4); @@ -2416,7 +2416,7 @@ mod tests { let ms = DummyLiana::new(btc, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let transactions = control.list_transactions(&[tx1.txid()]).transactions; assert_eq!(transactions.len(), 1); diff --git a/src/jsonrpc/server.rs b/src/jsonrpc/server.rs index dc6f7ca1..424adc2a 100644 --- a/src/jsonrpc/server.rs +++ b/src/jsonrpc/server.rs @@ -122,11 +122,11 @@ fn connection_handler( pub fn rpcserver_loop( listener: net::UnixListener, daemon_control: DaemonControl, + shutdown: sync::Arc, ) -> Result<(), io::Error> { // Keep it simple. We don't need great performances so just treat each connection in // its thread, with a given maximum number of connections. let connections_counter = sync::Arc::from(atomic::AtomicU32::new(0)); - let shutdown = sync::Arc::from(atomic::AtomicBool::new(false)); listener.set_nonblocking(true)?; while !shutdown.load(atomic::Ordering::Relaxed) { @@ -400,7 +400,7 @@ mod tests { #[cfg(not(target_os = "macos"))] #[test] fn server_sanity_check() { - let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); + let ms = DummyLiana::new_server(DummyBitcoind::new(), DummyDatabase::new()); let socket_path: path::PathBuf = [ ms.tmp_dir.as_path(), path::Path::new("d"), @@ -410,7 +410,6 @@ mod tests { .iter() .collect(); - let t = thread::spawn(move || ms.rpc_server().unwrap()); while !socket_path.exists() { thread::sleep(time::Duration::from_millis(100)); } @@ -426,6 +425,6 @@ mod tests { &[&serde_json::to_vec(&stop_req).unwrap(), b"\n"], ); - t.join().unwrap(); + ms.shutdown(); } } diff --git a/src/lib.rs b/src/lib.rs index b608b24e..163c14be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,7 @@ use crate::{ }, }; -use std::{error, fmt, fs, io, path, sync}; +use std::{error, fmt, fs, io, path, sync, thread}; use miniscript::bitcoin::secp256k1; @@ -261,7 +261,7 @@ pub struct DaemonControl { } impl DaemonControl { - pub fn new( + pub(crate) fn new( config: Config, bitcoin: sync::Arc>, db: sync::Arc>, @@ -282,13 +282,28 @@ impl DaemonControl { } } -pub struct DaemonHandle { - pub control: DaemonControl, - bitcoin_poller: poller::Poller, +/// The handle to a Liana daemon. It might either be the handle for a daemon which exposes a +/// JSONRPC server or one which exposes its API through a `DaemonControl`. +pub enum DaemonHandle { + Controller { + poller_shutdown: sync::Arc, + poller_handle: thread::JoinHandle<()>, + control: DaemonControl, + }, + Server { + poller_shutdown: sync::Arc, + poller_handle: thread::JoinHandle<()>, + rpcserver_shutdown: sync::Arc, + rpcserver_handle: thread::JoinHandle>, + }, } impl DaemonHandle { - /// This starts the Liana daemon. Call `shutdown` to shut it down. + /// This starts the Liana daemon. A user of this interface should regularly poll the `is_alive` + /// method to check for internal errors. To shut down the daemon use the `stop` method. + /// + /// The `with_rpc_server` controls whether we should start a JSONRPC server to receive queries + /// or instead return a `DaemonControl` object for a caller to access the daemon's API. /// /// You may specify a custom Bitcoin interface through the `bitcoin` parameter. If `None`, the /// default Bitcoin interface (`bitcoind` JSONRPC) will be used. @@ -301,6 +316,7 @@ impl DaemonHandle { config: Config, bitcoin: Option, db: Option, + with_rpc_server: bool, ) -> Result { #[cfg(not(test))] setup_panic_hook(); @@ -353,82 +369,119 @@ impl DaemonHandle { } } - // Spawn the bitcoind poller with a retry limit high enough that we'd fail after that. - let bitcoin_poller = poller::Poller::start( - bit.clone(), - db.clone(), - config.bitcoin_config.poll_interval_secs, - config.main_descriptor.clone(), - ); + // Start the poller thread. Keep the thread handle to be able to check if it crashed. Store + // an atomic to be able to stop it. + let bitcoin_poller = + poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone()); + let poller_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); + let poller_handle = thread::Builder::new() + .name("Bitcoin Network poller".to_string()) + .spawn({ + let poll_interval = config.bitcoin_config.poll_interval_secs; + let shutdown = poller_shutdown.clone(); + move || { + log::info!("Bitcoin poller started."); + bitcoin_poller.poll_forever(poll_interval, shutdown); + log::info!("Bitcoin poller stopped."); + } + }) + .expect("Spawning the poller thread must never fail."); - // Finally, set up the API. + // Create the API the external world will use to talk to us, either directly through the Rust + // structure or through the JSONRPC server we may setup below. let control = DaemonControl::new(config, bit, db, secp); - Ok(Self { - control, - bitcoin_poller, + Ok(if with_rpc_server { + let rpcserver_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); + let rpcserver_handle = thread::Builder::new() + .name("Bitcoin Network poller".to_string()) + .spawn({ + let shutdown = rpcserver_shutdown.clone(); + move || { + let mut rpc_socket = data_dir; + rpc_socket.push("lianad_rpc"); + let listener = rpcserver_setup(&rpc_socket)?; + log::info!("JSONRPC server started."); + + rpcserver_loop(listener, control, shutdown)?; + log::info!("JSONRPC server stopped."); + Ok(()) + } + }) + .expect("Spawning the RPC server thread should never fail."); + + DaemonHandle::Server { + poller_shutdown, + poller_handle, + rpcserver_shutdown, + rpcserver_handle, + } + } else { + DaemonHandle::Controller { + poller_shutdown, + poller_handle, + control, + } }) } /// Start the Liana daemon with the default Bitcoin and database interfaces (`bitcoind` RPC /// and SQLite). - pub fn start_default(config: Config) -> Result { - DaemonHandle::start(config, Option::::None, Option::::None) + pub fn start_default( + config: Config, + with_rpc_server: bool, + ) -> Result { + Self::start( + config, + Option::::None, + Option::::None, + with_rpc_server, + ) } - /// Start the JSONRPC server and listen for incoming commands until we die. - /// Like DaemonHandle::shutdown(), this stops the Bitcoin poller at teardown. - #[cfg(feature = "daemon")] - pub fn rpc_server(self) -> Result<(), io::Error> { - let DaemonHandle { - control, - bitcoin_poller: poller, - } = self; - - let rpc_socket: path::PathBuf = [ - control - .config - .data_dir() - .expect("Didn't fail at startup, must not now") - .as_path(), - path::Path::new(&control.config.bitcoin_config.network.to_string()), - path::Path::new("lianad_rpc"), - ] - .iter() - .collect(); - let listener = rpcserver_setup(&rpc_socket)?; - log::info!("JSONRPC server started."); - - rpcserver_loop(listener, control)?; - log::info!("JSONRPC server stopped."); - - poller.stop(); - - Ok(()) + /// Check whether the daemon is still up and running. This needs to be regularly polled to + /// check for internal errors. If this returns `false`, collect the error using the `stop` + /// method. + pub fn is_alive(&self) -> bool { + match self { + Self::Controller { + ref poller_handle, .. + } => !poller_handle.is_finished(), + Self::Server { + ref poller_handle, + ref rpcserver_handle, + .. + } => !poller_handle.is_finished() && !rpcserver_handle.is_finished(), + } } - /// Shut down the Liana daemon. - pub fn shutdown(self) { - self.bitcoin_poller.stop(); - } - - /// Tell the daemon to shut down. This will return before the shutdown completes. The structure - /// must not be reused after triggering shutdown. - #[cfg(feature = "nonblocking_shutdown")] - pub fn trigger_shutdown(&self) { - self.bitcoin_poller.trigger_stop() - } - - /// Whether the daemon has finished shutting down. - #[cfg(feature = "nonblocking_shutdown")] - pub fn shutdown_complete(&self) -> bool { - self.bitcoin_poller.is_stopped() - } - - // We need a shutdown utility that does not move for implementing Drop for the DummyLiana - #[cfg(test)] - pub fn test_shutdown(&mut self) { - self.bitcoin_poller.test_stop(); + /// Stop the Liana daemon. This returns any error which may have occurred. + pub fn stop(self) -> Result<(), Box> { + match self { + Self::Controller { + poller_shutdown, + poller_handle, + .. + } => { + poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + poller_handle.join().expect("Poller thread must not panic"); + Ok(()) + } + Self::Server { + poller_shutdown, + poller_handle, + rpcserver_shutdown, + rpcserver_handle, + } => { + poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed); + rpcserver_handle + .join() + .expect("Poller thread must not panic")?; + poller_handle.join().expect("Poller thread must not panic"); + Ok(()) + } + } } } @@ -681,11 +734,11 @@ mod tests { }; // Start the daemon in a new thread so the current one acts as the bitcoind server. - let daemon_thread = thread::spawn({ + let t = thread::spawn({ let config = config.clone(); move || { - let handle = DaemonHandle::start_default(config).unwrap(); - handle.shutdown(); + let handle = DaemonHandle::start_default(config, false).unwrap(); + handle.stop().unwrap(); } }); complete_sanity_check(&server); @@ -697,12 +750,15 @@ mod tests { complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_tip_init(&server); complete_sync_check(&server); - daemon_thread.join().unwrap(); + t.join().unwrap(); // The datadir is created now, so if we restart it it won't create the wo wallet. - let daemon_thread = thread::spawn(move || { - let handle = DaemonHandle::start_default(config).unwrap(); - handle.shutdown(); + let t = thread::spawn({ + let config = config.clone(); + move || { + let handle = DaemonHandle::start_default(config, false).unwrap(); + handle.stop().unwrap(); + } }); complete_sanity_check(&server); complete_version_check(&server); @@ -711,7 +767,7 @@ mod tests { complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_sync_check(&server); - daemon_thread.join().unwrap(); + t.join().unwrap(); fs::remove_dir_all(&tmp_dir).unwrap(); } diff --git a/src/testutils.rs b/src/testutils.rs index 9e2adfc0..35ff27d4 100644 --- a/src/testutils.rs +++ b/src/testutils.rs @@ -2,13 +2,13 @@ use crate::{ bitcoin::{BitcoinInterface, Block, BlockChainTip, MempoolEntry, SyncProgress, UTxO}, config::{BitcoinConfig, Config}, database::{BlockInfo, Coin, CoinStatus, DatabaseConnection, DatabaseInterface, LabelItem}, - descriptors, DaemonHandle, + descriptors, DaemonControl, DaemonHandle, }; use std::convert::TryInto; use std::{ collections::{HashMap, HashSet}, - env, fs, io, path, process, + env, fs, path, process, str::FromStr, sync, thread, time, time::{SystemTime, UNIX_EPOCH}, @@ -464,9 +464,10 @@ pub fn tmp_dir() -> path::PathBuf { impl DummyLiana { /// Creates a new DummyLiana interface - pub fn new( + pub fn _new( bitcoin_interface: impl BitcoinInterface + 'static, database: impl DatabaseInterface + 'static, + rpc_server: bool, ) -> DummyLiana { let tmp_dir = tmp_dir(); fs::create_dir_all(&tmp_dir).unwrap(); @@ -497,19 +498,37 @@ impl DummyLiana { main_descriptor: desc, }; - let handle = DaemonHandle::start(config, Some(bitcoin_interface), Some(database)).unwrap(); + let handle = + DaemonHandle::start(config, Some(bitcoin_interface), Some(database), rpc_server) + .unwrap(); DummyLiana { tmp_dir, handle } } - #[cfg(feature = "daemon")] - pub fn rpc_server(self) -> Result<(), io::Error> { - self.handle.rpc_server()?; - fs::remove_dir_all(&self.tmp_dir)?; - Ok(()) + /// Creates a new DummyLiana interface + pub fn new( + bitcoin_interface: impl BitcoinInterface + 'static, + database: impl DatabaseInterface + 'static, + ) -> DummyLiana { + Self::_new(bitcoin_interface, database, false) + } + + /// Creates a new DummyLiana interface which also spins up an RPC server. + pub fn new_server( + bitcoin_interface: impl BitcoinInterface + 'static, + database: impl DatabaseInterface + 'static, + ) -> DummyLiana { + Self::_new(bitcoin_interface, database, true) + } + + pub fn control(&self) -> &DaemonControl { + match self.handle { + DaemonHandle::Controller { ref control, .. } => control, + DaemonHandle::Server { .. } => unreachable!(), + } } pub fn shutdown(self) { - self.handle.shutdown(); - fs::remove_dir_all(&self.tmp_dir).unwrap(); + self.handle.stop().unwrap(); + fs::remove_dir_all(self.tmp_dir).unwrap(); } }