Merge #1010: Update our state upon broadcasting a transaction

58c71c794a050a2df460aec241c90efe89cf9ae0 lib: gate the RPC server availability on the 'daemon' feature (Antoine Poinsot)
b7fde6a9e433afed1f23cabbab59093d953b2d54 commands: update our state immediately after broadcasting a tx (Antoine Poinsot)
1cf42d9aeec34b184d93484ea25d9a843a45bc3f poller: introduce a communication channel with the poller thread (Antoine Poinsot)
f6ce85cfd32f669f6eb061ed8b367f4182512b95 lib: remove the panic hook. (Antoine Poinsot)
b4fe963a5b2817ef07f3d96ddcc3c71f9a6606d3 lib: encapsulate the handling of both threads (poller and RPC server) (Antoine Poinsot)
fd5387f954948303cf8403925062f577ae7bc49e poller: use the same database connection across one update round (Antoine Poinsot)
ea6923e2c075416882304c263b29be99ac24f824 poller: make the updating process into its own function. (Antoine Poinsot)

Pull request description:

  Fixes https://github.com/wizardsardine/liana/issues/887.

  This takes a couple commits from #909 but takes the approach from there in another direction: we don't externalize the poller, since only a single instance must be ran. Instead we properly keep track of the (up to) two threads we manage in the `DaemonHandle` and provide a way for a user of the library to check for errors in any of the threads.

  This approach allows us to 1) communicate with the poller thread from inside the Liana library/daemon (here we leverage this to tell it to poll) 2) eventually (#909) expose all internal errors from the library to the user instead of panic'ing internally.

  See the commit messages for details.

ACKs for top commit:
  darosior:
    ACK 58c71c794a050a2df460aec241c90efe89cf9ae0 -- did another pass and Edouard tested this in the GUI.

Tree-SHA512: 0ab436b2a187f9d124ed8861a47f03bb1e9252cdc4f3b5c4308db07be738c78b2ea3f07dc0a9586e3d5bd34f071a1e2a2569cad30676c9cc004e39260ebb94ca
This commit is contained in:
Antoine Poinsot 2024-03-22 11:49:12 +01:00
commit 2aa8874456
No known key found for this signature in database
GPG Key ID: E13FC145CD3F4304
7 changed files with 357 additions and 235 deletions

View File

@ -70,13 +70,16 @@ fn main() {
process::exit(1); process::exit(1);
}); });
let daemon = DaemonHandle::start_default(config).unwrap_or_else(|e| { let handle = DaemonHandle::start_default(config, true).unwrap_or_else(|e| {
log::error!("Error starting Liana daemon: {}", e); log::error!("Error starting Liana daemon: {}", e);
process::exit(1); process::exit(1);
}); });
daemon while handle.is_alive() {
.rpc_server() thread::sleep(time::Duration::from_millis(500));
.expect("JSONRPC server must terminate cleanly"); }
if let Err(e) = handle.stop() {
log::error!("Error stopping Liana daemon: {}", e);
}
// We are always logging to stdout, should it be then piped to the log file (if self) or // We are always logging to stdout, should it be then piped to the log file (if self) or
// not. So just make sure that all messages were actually written. // not. So just make sure that all messages were actually written.

View File

@ -4,11 +4,7 @@ use crate::{
descriptors, descriptors,
}; };
use std::{ use std::{collections::HashSet, sync, time};
collections::HashSet,
sync::{self, atomic},
thread, time,
};
use miniscript::bitcoin::{self, secp256k1}; use miniscript::bitcoin::{self, secp256k1};
@ -208,13 +204,11 @@ fn new_tip(bit: &impl BitcoinInterface, current_tip: &BlockChainTip) -> TipUpdat
} }
fn updates( fn updates(
db_conn: &mut Box<dyn DatabaseConnection>,
bit: &impl BitcoinInterface, bit: &impl BitcoinInterface,
db: &impl DatabaseInterface,
descs: &[descriptors::SinglePathLianaDesc], descs: &[descriptors::SinglePathLianaDesc],
secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>, secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
) { ) {
let mut db_conn = db.connection();
// Check if there was a new block before updating ourselves. // Check if there was a new block before updating ourselves.
let current_tip = db_conn.chain_tip().expect("Always set at first startup"); let current_tip = db_conn.chain_tip().expect("Always set at first startup");
let latest_tip = match new_tip(bit, &current_tip) { let latest_tip = match new_tip(bit, &current_tip) {
@ -225,18 +219,18 @@ fn updates(
// between our former chain and the new one, then restart fresh. // between our former chain and the new one, then restart fresh.
db_conn.rollback_tip(&new_tip); db_conn.rollback_tip(&new_tip);
log::info!("Tip was rolled back to '{}'.", new_tip); log::info!("Tip was rolled back to '{}'.", new_tip);
return updates(bit, db, descs, secp); return updates(db_conn, bit, descs, secp);
} }
}; };
// Then check the state of our coins. Do it even if the tip did not change since last poll, as // Then check the state of our coins. Do it even if the tip did not change since last poll, as
// we may have unconfirmed transactions. // we may have unconfirmed transactions.
let updated_coins = update_coins(bit, &mut db_conn, &current_tip, descs, secp); let updated_coins = update_coins(bit, db_conn, &current_tip, descs, secp);
// If the tip changed while we were polling our Bitcoin interface, start over. // If the tip changed while we were polling our Bitcoin interface, start over.
if bit.chain_tip() != latest_tip { if bit.chain_tip() != latest_tip {
log::info!("Chain tip changed while we were updating our state. Starting over."); log::info!("Chain tip changed while we were updating our state. Starting over.");
return updates(bit, db, descs, secp); return updates(db_conn, bit, descs, secp);
} }
// The chain tip did not change since we started our updates. Record them and the latest tip. // The chain tip did not change since we started our updates. Record them and the latest tip.
@ -258,13 +252,12 @@ fn updates(
// Check if there is any rescan of the backend ongoing or one that just finished. // Check if there is any rescan of the backend ongoing or one that just finished.
fn rescan_check( fn rescan_check(
db_conn: &mut Box<dyn DatabaseConnection>,
bit: &impl BitcoinInterface, bit: &impl BitcoinInterface,
db: &impl DatabaseInterface,
descs: &[descriptors::SinglePathLianaDesc], descs: &[descriptors::SinglePathLianaDesc],
secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>, secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
) { ) {
log::debug!("Checking the state of an ongoing rescan if there is any"); log::debug!("Checking the state of an ongoing rescan if there is any");
let mut db_conn = db.connection();
// Check if there is an ongoing rescan. If there isn't and we previously asked for a rescan of // Check if there is an ongoing rescan. If there isn't and we previously asked for a rescan of
// the backend, we treat it as completed. // the backend, we treat it as completed.
@ -299,14 +292,14 @@ fn rescan_check(
"Rolling back our internal tip to '{}' to update our internal state with past transactions.", "Rolling back our internal tip to '{}' to update our internal state with past transactions.",
rescan_tip rescan_tip
); );
updates(bit, db, descs, secp) updates(db_conn, bit, descs, secp)
} else { } else {
log::debug!("No ongoing rescan."); log::debug!("No ongoing rescan.");
} }
} }
// If the database chain tip is NULL (first startup), initialize it. /// If the database chain tip is NULL (first startup), initialize it.
fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { pub fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
let mut db_conn = db.connection(); let mut db_conn = db.connection();
if db_conn.chain_tip().is_none() { if db_conn.chain_tip().is_none() {
@ -315,7 +308,7 @@ fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface
} }
} }
fn sync_poll_interval() -> time::Duration { pub fn sync_poll_interval() -> time::Duration {
// TODO: be smarter, like in revaultd, but more generic too. // TODO: be smarter, like in revaultd, but more generic too.
#[cfg(not(test))] #[cfg(not(test))]
{ {
@ -325,60 +318,14 @@ fn sync_poll_interval() -> time::Duration {
time::Duration::from_secs(0) time::Duration::from_secs(0)
} }
/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the /// Update our state from the Bitcoin backend.
/// `shutdown` atomic. pub fn poll(
pub fn looper( bit: &sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>, db: &sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>, secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
shutdown: sync::Arc<atomic::AtomicBool>, descs: &[descriptors::SinglePathLianaDesc],
poll_interval: time::Duration,
desc: descriptors::LianaDescriptor,
) { ) {
let mut last_poll = None; let mut db_conn = db.connection();
let mut synced = false; updates(&mut db_conn, bit, descs, secp);
let descs = [ rescan_check(&mut db_conn, bit, descs, secp);
desc.receive_descriptor().clone(),
desc.change_descriptor().clone(),
];
let secp = secp256k1::Secp256k1::verification_only();
maybe_initialize_tip(&bit, &db);
while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() {
let now = time::Instant::now();
if let Some(last_poll) = last_poll {
let time_since_poll = now.duration_since(last_poll);
let poll_interval = if synced {
poll_interval
} else {
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
sync_poll_interval()
};
if time_since_poll < poll_interval {
thread::sleep(time::Duration::from_millis(500));
continue;
}
}
last_poll = Some(now);
// Don't poll until the Bitcoin backend is fully synced.
if !synced {
let progress = bit.sync_progress();
log::info!(
"Block chain synchronization progress: {:.2}% ({} blocks / {} headers)",
progress.rounded_up_progress() * 100.0,
progress.blocks,
progress.headers
);
synced = progress.is_complete();
if !synced {
continue;
}
}
updates(&bit, &db, &descs, &secp);
rescan_check(&bit, &db, &descs, &secp);
}
} }

View File

@ -1,60 +1,128 @@
mod looper; mod looper;
use crate::{ use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors};
bitcoin::{poller::looper::looper, BitcoinInterface},
database::DatabaseInterface,
descriptors,
};
use std::{ use std::{
sync::{self, atomic}, sync::{self, mpsc},
thread, time, time,
}; };
use miniscript::bitcoin::secp256k1;
#[derive(Debug, Clone)]
pub enum PollerMessage {
Shutdown,
/// Ask the Bitcoin poller to poll immediately, get notified through the passed channel once
/// it's done.
PollNow(mpsc::SyncSender<()>),
}
/// The Bitcoin poller handler. /// The Bitcoin poller handler.
pub struct Poller { pub struct Poller {
handle: thread::JoinHandle<()>, bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
shutdown: sync::Arc<atomic::AtomicBool>, db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
secp: secp256k1::Secp256k1<secp256k1::VerifyOnly>,
// The receive and change descriptors (in this order).
descs: [descriptors::SinglePathLianaDesc; 2],
} }
impl Poller { impl Poller {
pub fn start( pub fn new(
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>, bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>, db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
poll_interval: time::Duration,
desc: descriptors::LianaDescriptor, desc: descriptors::LianaDescriptor,
) -> Poller { ) -> Poller {
let shutdown = sync::Arc::from(atomic::AtomicBool::from(false)); let secp = secp256k1::Secp256k1::verification_only();
let handle = thread::Builder::new() let descs = [
.name("Bitcoin poller".to_string()) desc.receive_descriptor().clone(),
.spawn({ desc.change_descriptor().clone(),
let shutdown = shutdown.clone(); ];
move || looper(bit, db, shutdown, poll_interval, desc)
})
.expect("Must not fail");
Poller { shutdown, handle } // On first startup the tip may be NULL. Make sure it's set as the poller relies on it.
looper::maybe_initialize_tip(&bit, &db);
Poller {
bit,
db,
secp,
descs,
}
} }
pub fn trigger_stop(&self) { /// Continuously update our state from the Bitcoin backend.
self.shutdown.store(true, atomic::Ordering::Relaxed); /// - `poll_interval`: how frequently to perform an update.
} /// - `shutdown`: set to true to stop continuously updating and make this function return.
///
/// Typically this would run for the whole duration of the program in a thread, and the main
/// thread would set the `shutdown` atomic to `true` when shutting down.
pub fn poll_forever(
&self,
poll_interval: time::Duration,
receiver: mpsc::Receiver<PollerMessage>,
) {
let mut last_poll = None;
let mut synced = false;
pub fn stop(self) { loop {
self.trigger_stop(); // How long to wait before the next poll.
self.handle.join().expect("The poller loop must not fail"); let time_before_poll = if let Some(last_poll) = last_poll {
} let time_since_poll = time::Instant::now().duration_since(last_poll);
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
let poll_interval = if synced {
poll_interval
} else {
looper::sync_poll_interval()
};
poll_interval.saturating_sub(time_since_poll)
} else {
// Don't wait before doing the first poll.
time::Duration::ZERO
};
#[cfg(feature = "nonblocking_shutdown")] // Wait for the duration of the interval between polls, but listen to messages in the
pub fn is_stopped(&self) -> bool { // meantime.
// Doc says "This might return true for a brief moment after the threads main function has match receiver.recv_timeout(time_before_poll) {
// returned, but before the thread itself has stopped running.". But it's not an issue for Ok(PollerMessage::Shutdown) => {
// us, as long as the main poller function has returned we are good. log::info!("Bitcoin poller was told to shut down.");
self.handle.is_finished() return;
} }
Ok(PollerMessage::PollNow(sender)) => {
// We've been asked to poll, don't wait any further and signal completion to
// the caller.
last_poll = Some(time::Instant::now());
looper::poll(&self.bit, &self.db, &self.secp, &self.descs);
if let Err(e) = sender.send(()) {
log::error!("Error sending immediate poll completion signal: {}.", e);
}
continue;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// It's been long enough since the last poll.
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
log::error!("Bitcoin poller communication channel got disconnected. Exiting.");
return;
}
}
last_poll = Some(time::Instant::now());
#[cfg(test)] // Don't poll until the Bitcoin backend is fully synced.
pub fn test_stop(&mut self) { if !synced {
self.shutdown.store(true, atomic::Ordering::Relaxed); let progress = self.bit.sync_progress();
log::info!(
"Block chain synchronization progress: {:.2}% ({} blocks / {} headers)",
progress.rounded_up_progress() * 100.0,
progress.blocks,
progress.headers
);
synced = progress.is_complete();
if !synced {
continue;
}
}
looper::poll(&self.bit, &self.db, &self.secp, &self.descs);
}
} }
} }

View File

@ -8,6 +8,7 @@ use crate::{
bitcoin::BitcoinInterface, bitcoin::BitcoinInterface,
database::{Coin, DatabaseConnection, DatabaseInterface}, database::{Coin, DatabaseConnection, DatabaseInterface},
descriptors, descriptors,
poller::PollerMessage,
spend::{ spend::{
create_spend, AddrInfo, AncestorInfo, CandidateCoin, CreateSpendRes, SpendCreationError, create_spend, AddrInfo, AncestorInfo, CandidateCoin, CreateSpendRes, SpendCreationError,
SpendOutputAddress, SpendTxFees, TxGetter, SpendOutputAddress, SpendTxFees, TxGetter,
@ -24,7 +25,8 @@ use utils::{
use std::{ use std::{
collections::{hash_map, HashMap, HashSet}, collections::{hash_map, HashMap, HashSet},
fmt, sync, fmt,
sync::{self, mpsc},
}; };
use miniscript::{ use miniscript::{
@ -688,7 +690,18 @@ impl DaemonControl {
let final_tx = spend_psbt.extract_tx_unchecked_fee_rate(); let final_tx = spend_psbt.extract_tx_unchecked_fee_rate();
self.bitcoin self.bitcoin
.broadcast_tx(&final_tx) .broadcast_tx(&final_tx)
.map_err(CommandError::TxBroadcast) .map_err(CommandError::TxBroadcast)?;
// Finally, update our state with the changes from this transaction.
let (tx, rx) = mpsc::sync_channel(0);
if let Err(e) = self.poller_sender.send(PollerMessage::PollNow(tx)) {
log::error!("Error requesting update from poller: {}", e);
}
if let Err(e) = rx.recv() {
log::error!("Error receiving completion signal from poller: {}", e);
}
Ok(())
} }
/// Create PSBT to replace the given transaction using RBF. /// Create PSBT to replace the given transaction using RBF.
@ -1251,7 +1264,7 @@ mod tests {
fn getinfo() { fn getinfo() {
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
// We can query getinfo // We can query getinfo
ms.handle.control.get_info(); ms.control().get_info();
ms.shutdown(); ms.shutdown();
} }
@ -1259,7 +1272,7 @@ mod tests {
fn getnewaddress() { fn getnewaddress() {
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
let control = &ms.handle.control; let control = &ms.control();
// We can get an address // We can get an address
let addr = control.get_new_address().address; let addr = control.get_new_address().address;
assert_eq!( assert_eq!(
@ -1281,7 +1294,7 @@ mod tests {
fn listaddresses() { fn listaddresses() {
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
let control = &ms.handle.control; let control = &ms.control();
let list = control.list_addresses(Some(2), Some(5)).unwrap(); let list = control.list_addresses(Some(2), Some(5)).unwrap();
@ -1412,7 +1425,7 @@ mod tests {
), ),
); );
let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new());
let control = &ms.handle.control; let control = &ms.control();
// Arguments sanity checking // Arguments sanity checking
let dummy_addr = let dummy_addr =
@ -1900,7 +1913,7 @@ mod tests {
.insert(dummy_op_a.txid, (dummy_tx.clone(), None)); .insert(dummy_op_a.txid, (dummy_tx.clone(), None));
dummy_bitcoind.txs.insert(dummy_op_b.txid, (dummy_tx, None)); dummy_bitcoind.txs.insert(dummy_op_b.txid, (dummy_tx, None));
let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new());
let control = &ms.handle.control; let control = &ms.control();
let mut db_conn = control.db().lock().unwrap().connection(); let mut db_conn = control.db().lock().unwrap().connection();
// Add two (unconfirmed) coins in DB // Add two (unconfirmed) coins in DB
@ -2046,7 +2059,7 @@ mod tests {
let dummy_txid_a = dummy_psbt_a.unsigned_tx.txid(); let dummy_txid_a = dummy_psbt_a.unsigned_tx.txid();
dummy_bitcoind.txs.insert(dummy_txid_a, (dummy_tx_a, None)); dummy_bitcoind.txs.insert(dummy_txid_a, (dummy_tx_a, None));
let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new());
let control = &ms.handle.control; let control = &ms.control();
let mut db_conn = control.db().lock().unwrap().connection(); let mut db_conn = control.db().lock().unwrap().connection();
// The spend needs to be in DB before using RBF. // The spend needs to be in DB before using RBF.
assert_eq!( assert_eq!(
@ -2284,7 +2297,7 @@ mod tests {
let ms = DummyLiana::new(btc, db); let ms = DummyLiana::new(btc, db);
let control = &ms.handle.control; let control = &ms.control();
let transactions = control.list_confirmed_transactions(0, 4, 10).transactions; let transactions = control.list_confirmed_transactions(0, 4, 10).transactions;
assert_eq!(transactions.len(), 4); assert_eq!(transactions.len(), 4);
@ -2416,7 +2429,7 @@ mod tests {
let ms = DummyLiana::new(btc, DummyDatabase::new()); let ms = DummyLiana::new(btc, DummyDatabase::new());
let control = &ms.handle.control; let control = &ms.control();
let transactions = control.list_transactions(&[tx1.txid()]).transactions; let transactions = control.list_transactions(&[tx1.txid()]).transactions;
assert_eq!(transactions.len(), 1); assert_eq!(transactions.len(), 1);

View File

@ -122,11 +122,11 @@ fn connection_handler(
pub fn rpcserver_loop( pub fn rpcserver_loop(
listener: net::UnixListener, listener: net::UnixListener,
daemon_control: DaemonControl, daemon_control: DaemonControl,
shutdown: sync::Arc<atomic::AtomicBool>,
) -> Result<(), io::Error> { ) -> Result<(), io::Error> {
// Keep it simple. We don't need great performances so just treat each connection in // Keep it simple. We don't need great performances so just treat each connection in
// its thread, with a given maximum number of connections. // its thread, with a given maximum number of connections.
let connections_counter = sync::Arc::from(atomic::AtomicU32::new(0)); let connections_counter = sync::Arc::from(atomic::AtomicU32::new(0));
let shutdown = sync::Arc::from(atomic::AtomicBool::new(false));
listener.set_nonblocking(true)?; listener.set_nonblocking(true)?;
while !shutdown.load(atomic::Ordering::Relaxed) { while !shutdown.load(atomic::Ordering::Relaxed) {
@ -400,7 +400,7 @@ mod tests {
#[cfg(not(target_os = "macos"))] #[cfg(not(target_os = "macos"))]
#[test] #[test]
fn server_sanity_check() { fn server_sanity_check() {
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); let ms = DummyLiana::new_server(DummyBitcoind::new(), DummyDatabase::new());
let socket_path: path::PathBuf = [ let socket_path: path::PathBuf = [
ms.tmp_dir.as_path(), ms.tmp_dir.as_path(),
path::Path::new("d"), path::Path::new("d"),
@ -410,7 +410,6 @@ mod tests {
.iter() .iter()
.collect(); .collect();
let t = thread::spawn(move || ms.rpc_server().unwrap());
while !socket_path.exists() { while !socket_path.exists() {
thread::sleep(time::Duration::from_millis(100)); thread::sleep(time::Duration::from_millis(100));
} }
@ -426,6 +425,6 @@ mod tests {
&[&serde_json::to_vec(&stop_req).unwrap(), b"\n"], &[&serde_json::to_vec(&stop_req).unwrap(), b"\n"],
); );
t.join().unwrap(); ms.shutdown();
} }
} }

View File

@ -28,12 +28,16 @@ use crate::{
}, },
}; };
use std::{error, fmt, fs, io, path, sync}; use std::{
error, fmt, fs, io, path,
sync::{self, mpsc},
thread,
};
use miniscript::bitcoin::secp256k1; use miniscript::bitcoin::secp256k1;
#[cfg(not(test))] #[cfg(not(test))]
use std::{panic, process}; use std::panic;
// A panic in any thread should stop the main thread, and print the panic. // A panic in any thread should stop the main thread, and print the panic.
#[cfg(not(test))] #[cfg(not(test))]
fn setup_panic_hook() { fn setup_panic_hook() {
@ -60,8 +64,6 @@ fn setup_panic_hook() {
info, info,
bt bt
); );
process::exit(1);
})); }));
} }
@ -255,21 +257,24 @@ fn setup_bitcoind(
pub struct DaemonControl { pub struct DaemonControl {
config: Config, config: Config,
bitcoin: sync::Arc<sync::Mutex<dyn BitcoinInterface>>, bitcoin: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
poller_sender: mpsc::SyncSender<poller::PollerMessage>,
// FIXME: Should we require Sync on DatabaseInterface rather than using a Mutex? // FIXME: Should we require Sync on DatabaseInterface rather than using a Mutex?
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>, db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
secp: secp256k1::Secp256k1<secp256k1::VerifyOnly>, secp: secp256k1::Secp256k1<secp256k1::VerifyOnly>,
} }
impl DaemonControl { impl DaemonControl {
pub fn new( pub(crate) fn new(
config: Config, config: Config,
bitcoin: sync::Arc<sync::Mutex<dyn BitcoinInterface>>, bitcoin: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
poller_sender: mpsc::SyncSender<poller::PollerMessage>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>, db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
secp: secp256k1::Secp256k1<secp256k1::VerifyOnly>, secp: secp256k1::Secp256k1<secp256k1::VerifyOnly>,
) -> DaemonControl { ) -> DaemonControl {
DaemonControl { DaemonControl {
config, config,
bitcoin, bitcoin,
poller_sender,
db, db,
secp, secp,
} }
@ -282,25 +287,39 @@ impl DaemonControl {
} }
} }
pub struct DaemonHandle { /// The handle to a Liana daemon. It might either be the handle for a daemon which exposes a
pub control: DaemonControl, /// JSONRPC server or one which exposes its API through a `DaemonControl`.
bitcoin_poller: poller::Poller, pub enum DaemonHandle {
Controller {
poller_sender: mpsc::SyncSender<poller::PollerMessage>,
poller_handle: thread::JoinHandle<()>,
control: DaemonControl,
},
#[cfg(feature = "daemon")]
Server {
poller_sender: mpsc::SyncSender<poller::PollerMessage>,
poller_handle: thread::JoinHandle<()>,
rpcserver_shutdown: sync::Arc<sync::atomic::AtomicBool>,
rpcserver_handle: thread::JoinHandle<Result<(), io::Error>>,
},
} }
impl DaemonHandle { impl DaemonHandle {
/// This starts the Liana daemon. Call `shutdown` to shut it down. /// This starts the Liana daemon. A user of this interface should regularly poll the `is_alive`
/// method to check for internal errors. To shut down the daemon use the `stop` method.
///
/// The `with_rpc_server` controls whether we should start a JSONRPC server to receive queries
/// or instead return a `DaemonControl` object for a caller to access the daemon's API.
/// ///
/// You may specify a custom Bitcoin interface through the `bitcoin` parameter. If `None`, the /// You may specify a custom Bitcoin interface through the `bitcoin` parameter. If `None`, the
/// default Bitcoin interface (`bitcoind` JSONRPC) will be used. /// default Bitcoin interface (`bitcoind` JSONRPC) will be used.
/// You may specify a custom Database interface through the `db` parameter. If `None`, the /// You may specify a custom Database interface through the `db` parameter. If `None`, the
/// default Database interface (SQLite) will be used. /// default Database interface (SQLite) will be used.
///
/// **Note**: we internally use threads, and set a panic hook. A downstream application must
/// not overwrite this panic hook.
pub fn start( pub fn start(
config: Config, config: Config,
bitcoin: Option<impl BitcoinInterface + 'static>, bitcoin: Option<impl BitcoinInterface + 'static>,
db: Option<impl DatabaseInterface + 'static>, db: Option<impl DatabaseInterface + 'static>,
#[cfg(feature = "daemon")] with_rpc_server: bool,
) -> Result<Self, StartupError> { ) -> Result<Self, StartupError> {
#[cfg(not(test))] #[cfg(not(test))]
setup_panic_hook(); setup_panic_hook();
@ -353,82 +372,126 @@ impl DaemonHandle {
} }
} }
// Spawn the bitcoind poller with a retry limit high enough that we'd fail after that. // Start the poller thread. Keep the thread handle to be able to check if it crashed. Store
let bitcoin_poller = poller::Poller::start( // an atomic to be able to stop it.
bit.clone(), let bitcoin_poller =
db.clone(), poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone());
config.bitcoin_config.poll_interval_secs, let (poller_sender, poller_receiver) = mpsc::sync_channel(0);
config.main_descriptor.clone(), let poller_handle = thread::Builder::new()
); .name("Bitcoin Network poller".to_string())
.spawn({
let poll_interval = config.bitcoin_config.poll_interval_secs;
move || {
log::info!("Bitcoin poller started.");
bitcoin_poller.poll_forever(poll_interval, poller_receiver);
log::info!("Bitcoin poller stopped.");
}
})
.expect("Spawning the poller thread must never fail.");
// Finally, set up the API. // Create the API the external world will use to talk to us, either directly through the Rust
let control = DaemonControl::new(config, bit, db, secp); // structure or through the JSONRPC server we may setup below.
let control = DaemonControl::new(config, bit, poller_sender.clone(), db, secp);
Ok(Self { #[cfg(feature = "daemon")]
if with_rpc_server {
let rpcserver_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false));
let rpcserver_handle = thread::Builder::new()
.name("Bitcoin Network poller".to_string())
.spawn({
let shutdown = rpcserver_shutdown.clone();
move || {
let mut rpc_socket = data_dir;
rpc_socket.push("lianad_rpc");
let listener = rpcserver_setup(&rpc_socket)?;
log::info!("JSONRPC server started.");
rpcserver_loop(listener, control, shutdown)?;
log::info!("JSONRPC server stopped.");
Ok(())
}
})
.expect("Spawning the RPC server thread should never fail.");
return Ok(DaemonHandle::Server {
poller_sender,
poller_handle,
rpcserver_shutdown,
rpcserver_handle,
});
}
Ok(DaemonHandle::Controller {
poller_sender,
poller_handle,
control, control,
bitcoin_poller,
}) })
} }
/// Start the Liana daemon with the default Bitcoin and database interfaces (`bitcoind` RPC /// Start the Liana daemon with the default Bitcoin and database interfaces (`bitcoind` RPC
/// and SQLite). /// and SQLite).
pub fn start_default(config: Config) -> Result<DaemonHandle, StartupError> { pub fn start_default(
DaemonHandle::start(config, Option::<BitcoinD>::None, Option::<SqliteDb>::None) config: Config,
#[cfg(feature = "daemon")] with_rpc_server: bool,
) -> Result<DaemonHandle, StartupError> {
Self::start(
config,
Option::<BitcoinD>::None,
Option::<SqliteDb>::None,
#[cfg(feature = "daemon")]
with_rpc_server,
)
} }
/// Start the JSONRPC server and listen for incoming commands until we die. /// Check whether the daemon is still up and running. This needs to be regularly polled to
/// Like DaemonHandle::shutdown(), this stops the Bitcoin poller at teardown. /// check for internal errors. If this returns `false`, collect the error using the `stop`
#[cfg(feature = "daemon")] /// method.
pub fn rpc_server(self) -> Result<(), io::Error> { pub fn is_alive(&self) -> bool {
let DaemonHandle { match self {
control, Self::Controller {
bitcoin_poller: poller, ref poller_handle, ..
} = self; } => !poller_handle.is_finished(),
#[cfg(feature = "daemon")]
let rpc_socket: path::PathBuf = [ Self::Server {
control ref poller_handle,
.config ref rpcserver_handle,
.data_dir() ..
.expect("Didn't fail at startup, must not now") } => !poller_handle.is_finished() && !rpcserver_handle.is_finished(),
.as_path(), }
path::Path::new(&control.config.bitcoin_config.network.to_string()),
path::Path::new("lianad_rpc"),
]
.iter()
.collect();
let listener = rpcserver_setup(&rpc_socket)?;
log::info!("JSONRPC server started.");
rpcserver_loop(listener, control)?;
log::info!("JSONRPC server stopped.");
poller.stop();
Ok(())
} }
/// Shut down the Liana daemon. /// Stop the Liana daemon. This returns any error which may have occurred.
pub fn shutdown(self) { pub fn stop(self) -> Result<(), Box<dyn error::Error>> {
self.bitcoin_poller.stop(); match self {
} Self::Controller {
poller_sender,
/// Tell the daemon to shut down. This will return before the shutdown completes. The structure poller_handle,
/// must not be reused after triggering shutdown. ..
#[cfg(feature = "nonblocking_shutdown")] } => {
pub fn trigger_shutdown(&self) { poller_sender
self.bitcoin_poller.trigger_stop() .send(poller::PollerMessage::Shutdown)
} .expect("The other end should never have hung up before this.");
poller_handle.join().expect("Poller thread must not panic");
/// Whether the daemon has finished shutting down. Ok(())
#[cfg(feature = "nonblocking_shutdown")] }
pub fn shutdown_complete(&self) -> bool { #[cfg(feature = "daemon")]
self.bitcoin_poller.is_stopped() Self::Server {
} poller_sender,
poller_handle,
// We need a shutdown utility that does not move for implementing Drop for the DummyLiana rpcserver_shutdown,
#[cfg(test)] rpcserver_handle,
pub fn test_shutdown(&mut self) { } => {
self.bitcoin_poller.test_stop(); poller_sender
.send(poller::PollerMessage::Shutdown)
.expect("The other end should never have hung up before this.");
rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed);
rpcserver_handle
.join()
.expect("Poller thread must not panic")?;
poller_handle.join().expect("Poller thread must not panic");
Ok(())
}
}
} }
} }
@ -608,18 +671,6 @@ mod tests {
stream.flush().unwrap(); stream.flush().unwrap();
} }
// Send them a response to 'getblockchaininfo' saying we are far from being synced
fn complete_sync_check(server: &net::TcpListener) {
let net_resp = [
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1,\"headers\":1000,\"blocks\":100}}\n".as_bytes(),
]
.concat();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(&net_resp).unwrap();
stream.flush().unwrap();
}
// TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the // TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the
// bitcoind interface, and use the DummyLiana from testutils to sanity check the startup. // bitcoind interface, and use the DummyLiana from testutils to sanity check the startup.
// Note that startup as checked by this unit test is also tested in the functional test // Note that startup as checked by this unit test is also tested in the functional test
@ -681,11 +732,16 @@ mod tests {
}; };
// Start the daemon in a new thread so the current one acts as the bitcoind server. // Start the daemon in a new thread so the current one acts as the bitcoind server.
let daemon_thread = thread::spawn({ let t = thread::spawn({
let config = config.clone(); let config = config.clone();
move || { move || {
let handle = DaemonHandle::start_default(config).unwrap(); let handle = DaemonHandle::start_default(
handle.shutdown(); config,
#[cfg(feature = "daemon")]
false,
)
.unwrap();
handle.stop().unwrap();
} }
}); });
complete_sanity_check(&server); complete_sanity_check(&server);
@ -696,13 +752,22 @@ mod tests {
complete_wallet_check(&server, &wo_path); complete_wallet_check(&server, &wo_path);
complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string());
complete_tip_init(&server); complete_tip_init(&server);
complete_sync_check(&server); // We don't have to complete the sync check as the poller checks whether it needs to stop
daemon_thread.join().unwrap(); // before checking the bitcoind sync status.
t.join().unwrap();
// The datadir is created now, so if we restart it it won't create the wo wallet. // The datadir is created now, so if we restart it it won't create the wo wallet.
let daemon_thread = thread::spawn(move || { let t = thread::spawn({
let handle = DaemonHandle::start_default(config).unwrap(); let config = config.clone();
handle.shutdown(); move || {
let handle = DaemonHandle::start_default(
config,
#[cfg(feature = "daemon")]
false,
)
.unwrap();
handle.stop().unwrap();
}
}); });
complete_sanity_check(&server); complete_sanity_check(&server);
complete_version_check(&server); complete_version_check(&server);
@ -710,8 +775,9 @@ mod tests {
complete_wallet_loading(&server); complete_wallet_loading(&server);
complete_wallet_check(&server, &wo_path); complete_wallet_check(&server, &wo_path);
complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string());
complete_sync_check(&server); // We don't have to complete the sync check as the poller checks whether it needs to stop
daemon_thread.join().unwrap(); // before checking the bitcoind sync status.
t.join().unwrap();
fs::remove_dir_all(&tmp_dir).unwrap(); fs::remove_dir_all(&tmp_dir).unwrap();
} }

View File

@ -2,13 +2,13 @@ use crate::{
bitcoin::{BitcoinInterface, Block, BlockChainTip, MempoolEntry, SyncProgress, UTxO}, bitcoin::{BitcoinInterface, Block, BlockChainTip, MempoolEntry, SyncProgress, UTxO},
config::{BitcoinConfig, Config}, config::{BitcoinConfig, Config},
database::{BlockInfo, Coin, CoinStatus, DatabaseConnection, DatabaseInterface, LabelItem}, database::{BlockInfo, Coin, CoinStatus, DatabaseConnection, DatabaseInterface, LabelItem},
descriptors, DaemonHandle, descriptors, DaemonControl, DaemonHandle,
}; };
use std::convert::TryInto; use std::convert::TryInto;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
env, fs, io, path, process, env, fs, path, process,
str::FromStr, str::FromStr,
sync, thread, time, sync, thread, time,
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
@ -464,9 +464,10 @@ pub fn tmp_dir() -> path::PathBuf {
impl DummyLiana { impl DummyLiana {
/// Creates a new DummyLiana interface /// Creates a new DummyLiana interface
pub fn new( pub fn _new(
bitcoin_interface: impl BitcoinInterface + 'static, bitcoin_interface: impl BitcoinInterface + 'static,
database: impl DatabaseInterface + 'static, database: impl DatabaseInterface + 'static,
rpc_server: bool,
) -> DummyLiana { ) -> DummyLiana {
let tmp_dir = tmp_dir(); let tmp_dir = tmp_dir();
fs::create_dir_all(&tmp_dir).unwrap(); fs::create_dir_all(&tmp_dir).unwrap();
@ -497,19 +498,44 @@ impl DummyLiana {
main_descriptor: desc, main_descriptor: desc,
}; };
let handle = DaemonHandle::start(config, Some(bitcoin_interface), Some(database)).unwrap(); let handle = DaemonHandle::start(
config,
Some(bitcoin_interface),
Some(database),
#[cfg(feature = "daemon")]
rpc_server,
)
.unwrap();
DummyLiana { tmp_dir, handle } DummyLiana { tmp_dir, handle }
} }
/// Creates a new DummyLiana interface
pub fn new(
bitcoin_interface: impl BitcoinInterface + 'static,
database: impl DatabaseInterface + 'static,
) -> DummyLiana {
Self::_new(bitcoin_interface, database, false)
}
/// Creates a new DummyLiana interface which also spins up an RPC server.
#[cfg(feature = "daemon")] #[cfg(feature = "daemon")]
pub fn rpc_server(self) -> Result<(), io::Error> { pub fn new_server(
self.handle.rpc_server()?; bitcoin_interface: impl BitcoinInterface + 'static,
fs::remove_dir_all(&self.tmp_dir)?; database: impl DatabaseInterface + 'static,
Ok(()) ) -> DummyLiana {
Self::_new(bitcoin_interface, database, true)
}
pub fn control(&self) -> &DaemonControl {
match self.handle {
DaemonHandle::Controller { ref control, .. } => control,
#[cfg(feature = "daemon")]
DaemonHandle::Server { .. } => unreachable!(),
}
} }
pub fn shutdown(self) { pub fn shutdown(self) {
self.handle.shutdown(); self.handle.stop().unwrap();
fs::remove_dir_all(&self.tmp_dir).unwrap(); fs::remove_dir_all(self.tmp_dir).unwrap();
} }
} }