lib: encapsulate the handling of both threads (poller and RPC server)
This is inspired from the work in
https://github.com/wizardsardine/liana/pull/909 (specifically
d8c59e30ed)
to externalize the management of the poller thread. However, there may
be only one poller thread. Starting more than one can lead to a crash or
potentially to data corruption. Therefore it feels safer to manage it
internally.
Instead of exposing the management of the poller to the user of the
library, we manage both threads inside the `DaemonHandle` data structure
and expose a way for a user to check for errors which may have occured
in any of the threads.
This makes it possible to:
1. Eventually propagate errors from the threads to the user of the
daemon (https://github.com/wizardsardine/liana/pull/909);
2. Communicate internally with the poller thread, for instance to
trigger a poll immediately (following commits).
This commit is contained in:
parent
fd5387f954
commit
b4fe963a5b
@ -70,13 +70,16 @@ fn main() {
|
||||
process::exit(1);
|
||||
});
|
||||
|
||||
let daemon = DaemonHandle::start_default(config).unwrap_or_else(|e| {
|
||||
let handle = DaemonHandle::start_default(config, true).unwrap_or_else(|e| {
|
||||
log::error!("Error starting Liana daemon: {}", e);
|
||||
process::exit(1);
|
||||
});
|
||||
daemon
|
||||
.rpc_server()
|
||||
.expect("JSONRPC server must terminate cleanly");
|
||||
while handle.is_alive() {
|
||||
thread::sleep(time::Duration::from_millis(500));
|
||||
}
|
||||
if let Err(e) = handle.stop() {
|
||||
log::error!("Error stopping Liana daemon: {}", e);
|
||||
}
|
||||
|
||||
// We are always logging to stdout, should it be then piped to the log file (if self) or
|
||||
// not. So just make sure that all messages were actually written.
|
||||
|
||||
@ -4,11 +4,7 @@ use crate::{
|
||||
descriptors,
|
||||
};
|
||||
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
sync::{self, atomic},
|
||||
thread, time,
|
||||
};
|
||||
use std::{collections::HashSet, sync, time};
|
||||
|
||||
use miniscript::bitcoin::{self, secp256k1};
|
||||
|
||||
@ -302,8 +298,8 @@ fn rescan_check(
|
||||
}
|
||||
}
|
||||
|
||||
// If the database chain tip is NULL (first startup), initialize it.
|
||||
fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
|
||||
/// If the database chain tip is NULL (first startup), initialize it.
|
||||
pub fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
|
||||
let mut db_conn = db.connection();
|
||||
|
||||
if db_conn.chain_tip().is_none() {
|
||||
@ -312,7 +308,7 @@ fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface
|
||||
}
|
||||
}
|
||||
|
||||
fn sync_poll_interval() -> time::Duration {
|
||||
pub fn sync_poll_interval() -> time::Duration {
|
||||
// TODO: be smarter, like in revaultd, but more generic too.
|
||||
#[cfg(not(test))]
|
||||
{
|
||||
@ -333,60 +329,3 @@ pub fn poll(
|
||||
updates(&mut db_conn, bit, descs, secp);
|
||||
rescan_check(&mut db_conn, bit, descs, secp);
|
||||
}
|
||||
|
||||
/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the
|
||||
/// `shutdown` atomic.
|
||||
pub fn looper(
|
||||
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
|
||||
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
|
||||
shutdown: sync::Arc<atomic::AtomicBool>,
|
||||
poll_interval: time::Duration,
|
||||
desc: descriptors::LianaDescriptor,
|
||||
) {
|
||||
let mut last_poll = None;
|
||||
let mut synced = false;
|
||||
let descs = [
|
||||
desc.receive_descriptor().clone(),
|
||||
desc.change_descriptor().clone(),
|
||||
];
|
||||
let secp = secp256k1::Secp256k1::verification_only();
|
||||
|
||||
maybe_initialize_tip(&bit, &db);
|
||||
|
||||
while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() {
|
||||
let now = time::Instant::now();
|
||||
|
||||
if let Some(last_poll) = last_poll {
|
||||
let time_since_poll = now.duration_since(last_poll);
|
||||
let poll_interval = if synced {
|
||||
poll_interval
|
||||
} else {
|
||||
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
|
||||
// the sync. As a function since it's mocked for the tests.
|
||||
sync_poll_interval()
|
||||
};
|
||||
if time_since_poll < poll_interval {
|
||||
thread::sleep(time::Duration::from_millis(500));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
last_poll = Some(now);
|
||||
|
||||
// Don't poll until the Bitcoin backend is fully synced.
|
||||
if !synced {
|
||||
let progress = bit.sync_progress();
|
||||
log::info!(
|
||||
"Block chain synchronization progress: {:.2}% ({} blocks / {} headers)",
|
||||
progress.rounded_up_progress() * 100.0,
|
||||
progress.blocks,
|
||||
progress.headers
|
||||
);
|
||||
synced = progress.is_complete();
|
||||
if !synced {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
poll(&bit, &db, &secp, &descs);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,60 +1,95 @@
|
||||
mod looper;
|
||||
|
||||
use crate::{
|
||||
bitcoin::{poller::looper::looper, BitcoinInterface},
|
||||
database::DatabaseInterface,
|
||||
descriptors,
|
||||
};
|
||||
use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors};
|
||||
|
||||
use std::{
|
||||
sync::{self, atomic},
|
||||
thread, time,
|
||||
};
|
||||
|
||||
use miniscript::bitcoin::secp256k1;
|
||||
|
||||
/// The Bitcoin poller handler.
|
||||
pub struct Poller {
|
||||
handle: thread::JoinHandle<()>,
|
||||
shutdown: sync::Arc<atomic::AtomicBool>,
|
||||
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
|
||||
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
|
||||
secp: secp256k1::Secp256k1<secp256k1::VerifyOnly>,
|
||||
// The receive and change descriptors (in this order).
|
||||
descs: [descriptors::SinglePathLianaDesc; 2],
|
||||
}
|
||||
|
||||
impl Poller {
|
||||
pub fn start(
|
||||
pub fn new(
|
||||
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
|
||||
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
|
||||
poll_interval: time::Duration,
|
||||
desc: descriptors::LianaDescriptor,
|
||||
) -> Poller {
|
||||
let shutdown = sync::Arc::from(atomic::AtomicBool::from(false));
|
||||
let handle = thread::Builder::new()
|
||||
.name("Bitcoin poller".to_string())
|
||||
.spawn({
|
||||
let shutdown = shutdown.clone();
|
||||
move || looper(bit, db, shutdown, poll_interval, desc)
|
||||
})
|
||||
.expect("Must not fail");
|
||||
let secp = secp256k1::Secp256k1::verification_only();
|
||||
let descs = [
|
||||
desc.receive_descriptor().clone(),
|
||||
desc.change_descriptor().clone(),
|
||||
];
|
||||
|
||||
Poller { shutdown, handle }
|
||||
// On first startup the tip may be NULL. Make sure it's set as the poller relies on it.
|
||||
looper::maybe_initialize_tip(&bit, &db);
|
||||
|
||||
Poller {
|
||||
bit,
|
||||
db,
|
||||
secp,
|
||||
descs,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trigger_stop(&self) {
|
||||
self.shutdown.store(true, atomic::Ordering::Relaxed);
|
||||
}
|
||||
/// Continuously update our state from the Bitcoin backend.
|
||||
/// - `poll_interval`: how frequently to perform an update.
|
||||
/// - `shutdown`: set to true to stop continuously updating and make this function return.
|
||||
///
|
||||
/// Typically this would run for the whole duration of the program in a thread, and the main
|
||||
/// thread would set the `shutdown` atomic to `true` when shutting down.
|
||||
pub fn poll_forever(
|
||||
&self,
|
||||
poll_interval: time::Duration,
|
||||
shutdown: sync::Arc<atomic::AtomicBool>,
|
||||
) {
|
||||
let mut last_poll = None;
|
||||
let mut synced = false;
|
||||
|
||||
pub fn stop(self) {
|
||||
self.trigger_stop();
|
||||
self.handle.join().expect("The poller loop must not fail");
|
||||
}
|
||||
while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() {
|
||||
let now = time::Instant::now();
|
||||
|
||||
#[cfg(feature = "nonblocking_shutdown")]
|
||||
pub fn is_stopped(&self) -> bool {
|
||||
// Doc says "This might return true for a brief moment after the thread’s main function has
|
||||
// returned, but before the thread itself has stopped running.". But it's not an issue for
|
||||
// us, as long as the main poller function has returned we are good.
|
||||
self.handle.is_finished()
|
||||
}
|
||||
if let Some(last_poll) = last_poll {
|
||||
let time_since_poll = now.duration_since(last_poll);
|
||||
let poll_interval = if synced {
|
||||
poll_interval
|
||||
} else {
|
||||
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
|
||||
// the sync. As a function since it's mocked for the tests.
|
||||
looper::sync_poll_interval()
|
||||
};
|
||||
if time_since_poll < poll_interval {
|
||||
thread::sleep(time::Duration::from_millis(500));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
last_poll = Some(now);
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn test_stop(&mut self) {
|
||||
self.shutdown.store(true, atomic::Ordering::Relaxed);
|
||||
// Don't poll until the Bitcoin backend is fully synced.
|
||||
if !synced {
|
||||
let progress = self.bit.sync_progress();
|
||||
log::info!(
|
||||
"Block chain synchronization progress: {:.2}% ({} blocks / {} headers)",
|
||||
progress.rounded_up_progress() * 100.0,
|
||||
progress.blocks,
|
||||
progress.headers
|
||||
);
|
||||
synced = progress.is_complete();
|
||||
if !synced {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
looper::poll(&self.bit, &self.db, &self.secp, &self.descs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1251,7 +1251,7 @@ mod tests {
|
||||
fn getinfo() {
|
||||
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
|
||||
// We can query getinfo
|
||||
ms.handle.control.get_info();
|
||||
ms.control().get_info();
|
||||
ms.shutdown();
|
||||
}
|
||||
|
||||
@ -1259,7 +1259,7 @@ mod tests {
|
||||
fn getnewaddress() {
|
||||
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
|
||||
|
||||
let control = &ms.handle.control;
|
||||
let control = &ms.control();
|
||||
// We can get an address
|
||||
let addr = control.get_new_address().address;
|
||||
assert_eq!(
|
||||
@ -1281,7 +1281,7 @@ mod tests {
|
||||
fn listaddresses() {
|
||||
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
|
||||
|
||||
let control = &ms.handle.control;
|
||||
let control = &ms.control();
|
||||
|
||||
let list = control.list_addresses(Some(2), Some(5)).unwrap();
|
||||
|
||||
@ -1412,7 +1412,7 @@ mod tests {
|
||||
),
|
||||
);
|
||||
let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new());
|
||||
let control = &ms.handle.control;
|
||||
let control = &ms.control();
|
||||
|
||||
// Arguments sanity checking
|
||||
let dummy_addr =
|
||||
@ -1900,7 +1900,7 @@ mod tests {
|
||||
.insert(dummy_op_a.txid, (dummy_tx.clone(), None));
|
||||
dummy_bitcoind.txs.insert(dummy_op_b.txid, (dummy_tx, None));
|
||||
let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new());
|
||||
let control = &ms.handle.control;
|
||||
let control = &ms.control();
|
||||
let mut db_conn = control.db().lock().unwrap().connection();
|
||||
|
||||
// Add two (unconfirmed) coins in DB
|
||||
@ -2046,7 +2046,7 @@ mod tests {
|
||||
let dummy_txid_a = dummy_psbt_a.unsigned_tx.txid();
|
||||
dummy_bitcoind.txs.insert(dummy_txid_a, (dummy_tx_a, None));
|
||||
let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new());
|
||||
let control = &ms.handle.control;
|
||||
let control = &ms.control();
|
||||
let mut db_conn = control.db().lock().unwrap().connection();
|
||||
// The spend needs to be in DB before using RBF.
|
||||
assert_eq!(
|
||||
@ -2284,7 +2284,7 @@ mod tests {
|
||||
|
||||
let ms = DummyLiana::new(btc, db);
|
||||
|
||||
let control = &ms.handle.control;
|
||||
let control = &ms.control();
|
||||
|
||||
let transactions = control.list_confirmed_transactions(0, 4, 10).transactions;
|
||||
assert_eq!(transactions.len(), 4);
|
||||
@ -2416,7 +2416,7 @@ mod tests {
|
||||
|
||||
let ms = DummyLiana::new(btc, DummyDatabase::new());
|
||||
|
||||
let control = &ms.handle.control;
|
||||
let control = &ms.control();
|
||||
|
||||
let transactions = control.list_transactions(&[tx1.txid()]).transactions;
|
||||
assert_eq!(transactions.len(), 1);
|
||||
|
||||
@ -122,11 +122,11 @@ fn connection_handler(
|
||||
pub fn rpcserver_loop(
|
||||
listener: net::UnixListener,
|
||||
daemon_control: DaemonControl,
|
||||
shutdown: sync::Arc<atomic::AtomicBool>,
|
||||
) -> Result<(), io::Error> {
|
||||
// Keep it simple. We don't need great performances so just treat each connection in
|
||||
// its thread, with a given maximum number of connections.
|
||||
let connections_counter = sync::Arc::from(atomic::AtomicU32::new(0));
|
||||
let shutdown = sync::Arc::from(atomic::AtomicBool::new(false));
|
||||
|
||||
listener.set_nonblocking(true)?;
|
||||
while !shutdown.load(atomic::Ordering::Relaxed) {
|
||||
@ -400,7 +400,7 @@ mod tests {
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
#[test]
|
||||
fn server_sanity_check() {
|
||||
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
|
||||
let ms = DummyLiana::new_server(DummyBitcoind::new(), DummyDatabase::new());
|
||||
let socket_path: path::PathBuf = [
|
||||
ms.tmp_dir.as_path(),
|
||||
path::Path::new("d"),
|
||||
@ -410,7 +410,6 @@ mod tests {
|
||||
.iter()
|
||||
.collect();
|
||||
|
||||
let t = thread::spawn(move || ms.rpc_server().unwrap());
|
||||
while !socket_path.exists() {
|
||||
thread::sleep(time::Duration::from_millis(100));
|
||||
}
|
||||
@ -426,6 +425,6 @@ mod tests {
|
||||
&[&serde_json::to_vec(&stop_req).unwrap(), b"\n"],
|
||||
);
|
||||
|
||||
t.join().unwrap();
|
||||
ms.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
212
src/lib.rs
212
src/lib.rs
@ -28,7 +28,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
use std::{error, fmt, fs, io, path, sync};
|
||||
use std::{error, fmt, fs, io, path, sync, thread};
|
||||
|
||||
use miniscript::bitcoin::secp256k1;
|
||||
|
||||
@ -261,7 +261,7 @@ pub struct DaemonControl {
|
||||
}
|
||||
|
||||
impl DaemonControl {
|
||||
pub fn new(
|
||||
pub(crate) fn new(
|
||||
config: Config,
|
||||
bitcoin: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
|
||||
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
|
||||
@ -282,13 +282,28 @@ impl DaemonControl {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DaemonHandle {
|
||||
pub control: DaemonControl,
|
||||
bitcoin_poller: poller::Poller,
|
||||
/// The handle to a Liana daemon. It might either be the handle for a daemon which exposes a
|
||||
/// JSONRPC server or one which exposes its API through a `DaemonControl`.
|
||||
pub enum DaemonHandle {
|
||||
Controller {
|
||||
poller_shutdown: sync::Arc<sync::atomic::AtomicBool>,
|
||||
poller_handle: thread::JoinHandle<()>,
|
||||
control: DaemonControl,
|
||||
},
|
||||
Server {
|
||||
poller_shutdown: sync::Arc<sync::atomic::AtomicBool>,
|
||||
poller_handle: thread::JoinHandle<()>,
|
||||
rpcserver_shutdown: sync::Arc<sync::atomic::AtomicBool>,
|
||||
rpcserver_handle: thread::JoinHandle<Result<(), io::Error>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl DaemonHandle {
|
||||
/// This starts the Liana daemon. Call `shutdown` to shut it down.
|
||||
/// This starts the Liana daemon. A user of this interface should regularly poll the `is_alive`
|
||||
/// method to check for internal errors. To shut down the daemon use the `stop` method.
|
||||
///
|
||||
/// The `with_rpc_server` controls whether we should start a JSONRPC server to receive queries
|
||||
/// or instead return a `DaemonControl` object for a caller to access the daemon's API.
|
||||
///
|
||||
/// You may specify a custom Bitcoin interface through the `bitcoin` parameter. If `None`, the
|
||||
/// default Bitcoin interface (`bitcoind` JSONRPC) will be used.
|
||||
@ -301,6 +316,7 @@ impl DaemonHandle {
|
||||
config: Config,
|
||||
bitcoin: Option<impl BitcoinInterface + 'static>,
|
||||
db: Option<impl DatabaseInterface + 'static>,
|
||||
with_rpc_server: bool,
|
||||
) -> Result<Self, StartupError> {
|
||||
#[cfg(not(test))]
|
||||
setup_panic_hook();
|
||||
@ -353,82 +369,119 @@ impl DaemonHandle {
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn the bitcoind poller with a retry limit high enough that we'd fail after that.
|
||||
let bitcoin_poller = poller::Poller::start(
|
||||
bit.clone(),
|
||||
db.clone(),
|
||||
config.bitcoin_config.poll_interval_secs,
|
||||
config.main_descriptor.clone(),
|
||||
);
|
||||
// Start the poller thread. Keep the thread handle to be able to check if it crashed. Store
|
||||
// an atomic to be able to stop it.
|
||||
let bitcoin_poller =
|
||||
poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone());
|
||||
let poller_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false));
|
||||
let poller_handle = thread::Builder::new()
|
||||
.name("Bitcoin Network poller".to_string())
|
||||
.spawn({
|
||||
let poll_interval = config.bitcoin_config.poll_interval_secs;
|
||||
let shutdown = poller_shutdown.clone();
|
||||
move || {
|
||||
log::info!("Bitcoin poller started.");
|
||||
bitcoin_poller.poll_forever(poll_interval, shutdown);
|
||||
log::info!("Bitcoin poller stopped.");
|
||||
}
|
||||
})
|
||||
.expect("Spawning the poller thread must never fail.");
|
||||
|
||||
// Finally, set up the API.
|
||||
// Create the API the external world will use to talk to us, either directly through the Rust
|
||||
// structure or through the JSONRPC server we may setup below.
|
||||
let control = DaemonControl::new(config, bit, db, secp);
|
||||
|
||||
Ok(Self {
|
||||
control,
|
||||
bitcoin_poller,
|
||||
Ok(if with_rpc_server {
|
||||
let rpcserver_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false));
|
||||
let rpcserver_handle = thread::Builder::new()
|
||||
.name("Bitcoin Network poller".to_string())
|
||||
.spawn({
|
||||
let shutdown = rpcserver_shutdown.clone();
|
||||
move || {
|
||||
let mut rpc_socket = data_dir;
|
||||
rpc_socket.push("lianad_rpc");
|
||||
let listener = rpcserver_setup(&rpc_socket)?;
|
||||
log::info!("JSONRPC server started.");
|
||||
|
||||
rpcserver_loop(listener, control, shutdown)?;
|
||||
log::info!("JSONRPC server stopped.");
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.expect("Spawning the RPC server thread should never fail.");
|
||||
|
||||
DaemonHandle::Server {
|
||||
poller_shutdown,
|
||||
poller_handle,
|
||||
rpcserver_shutdown,
|
||||
rpcserver_handle,
|
||||
}
|
||||
} else {
|
||||
DaemonHandle::Controller {
|
||||
poller_shutdown,
|
||||
poller_handle,
|
||||
control,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Start the Liana daemon with the default Bitcoin and database interfaces (`bitcoind` RPC
|
||||
/// and SQLite).
|
||||
pub fn start_default(config: Config) -> Result<DaemonHandle, StartupError> {
|
||||
DaemonHandle::start(config, Option::<BitcoinD>::None, Option::<SqliteDb>::None)
|
||||
pub fn start_default(
|
||||
config: Config,
|
||||
with_rpc_server: bool,
|
||||
) -> Result<DaemonHandle, StartupError> {
|
||||
Self::start(
|
||||
config,
|
||||
Option::<BitcoinD>::None,
|
||||
Option::<SqliteDb>::None,
|
||||
with_rpc_server,
|
||||
)
|
||||
}
|
||||
|
||||
/// Start the JSONRPC server and listen for incoming commands until we die.
|
||||
/// Like DaemonHandle::shutdown(), this stops the Bitcoin poller at teardown.
|
||||
#[cfg(feature = "daemon")]
|
||||
pub fn rpc_server(self) -> Result<(), io::Error> {
|
||||
let DaemonHandle {
|
||||
control,
|
||||
bitcoin_poller: poller,
|
||||
} = self;
|
||||
|
||||
let rpc_socket: path::PathBuf = [
|
||||
control
|
||||
.config
|
||||
.data_dir()
|
||||
.expect("Didn't fail at startup, must not now")
|
||||
.as_path(),
|
||||
path::Path::new(&control.config.bitcoin_config.network.to_string()),
|
||||
path::Path::new("lianad_rpc"),
|
||||
]
|
||||
.iter()
|
||||
.collect();
|
||||
let listener = rpcserver_setup(&rpc_socket)?;
|
||||
log::info!("JSONRPC server started.");
|
||||
|
||||
rpcserver_loop(listener, control)?;
|
||||
log::info!("JSONRPC server stopped.");
|
||||
|
||||
poller.stop();
|
||||
|
||||
Ok(())
|
||||
/// Check whether the daemon is still up and running. This needs to be regularly polled to
|
||||
/// check for internal errors. If this returns `false`, collect the error using the `stop`
|
||||
/// method.
|
||||
pub fn is_alive(&self) -> bool {
|
||||
match self {
|
||||
Self::Controller {
|
||||
ref poller_handle, ..
|
||||
} => !poller_handle.is_finished(),
|
||||
Self::Server {
|
||||
ref poller_handle,
|
||||
ref rpcserver_handle,
|
||||
..
|
||||
} => !poller_handle.is_finished() && !rpcserver_handle.is_finished(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Shut down the Liana daemon.
|
||||
pub fn shutdown(self) {
|
||||
self.bitcoin_poller.stop();
|
||||
}
|
||||
|
||||
/// Tell the daemon to shut down. This will return before the shutdown completes. The structure
|
||||
/// must not be reused after triggering shutdown.
|
||||
#[cfg(feature = "nonblocking_shutdown")]
|
||||
pub fn trigger_shutdown(&self) {
|
||||
self.bitcoin_poller.trigger_stop()
|
||||
}
|
||||
|
||||
/// Whether the daemon has finished shutting down.
|
||||
#[cfg(feature = "nonblocking_shutdown")]
|
||||
pub fn shutdown_complete(&self) -> bool {
|
||||
self.bitcoin_poller.is_stopped()
|
||||
}
|
||||
|
||||
// We need a shutdown utility that does not move for implementing Drop for the DummyLiana
|
||||
#[cfg(test)]
|
||||
pub fn test_shutdown(&mut self) {
|
||||
self.bitcoin_poller.test_stop();
|
||||
/// Stop the Liana daemon. This returns any error which may have occurred.
|
||||
pub fn stop(self) -> Result<(), Box<dyn error::Error>> {
|
||||
match self {
|
||||
Self::Controller {
|
||||
poller_shutdown,
|
||||
poller_handle,
|
||||
..
|
||||
} => {
|
||||
poller_shutdown.store(true, sync::atomic::Ordering::Relaxed);
|
||||
poller_handle.join().expect("Poller thread must not panic");
|
||||
Ok(())
|
||||
}
|
||||
Self::Server {
|
||||
poller_shutdown,
|
||||
poller_handle,
|
||||
rpcserver_shutdown,
|
||||
rpcserver_handle,
|
||||
} => {
|
||||
poller_shutdown.store(true, sync::atomic::Ordering::Relaxed);
|
||||
rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed);
|
||||
rpcserver_handle
|
||||
.join()
|
||||
.expect("Poller thread must not panic")?;
|
||||
poller_handle.join().expect("Poller thread must not panic");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -681,11 +734,11 @@ mod tests {
|
||||
};
|
||||
|
||||
// Start the daemon in a new thread so the current one acts as the bitcoind server.
|
||||
let daemon_thread = thread::spawn({
|
||||
let t = thread::spawn({
|
||||
let config = config.clone();
|
||||
move || {
|
||||
let handle = DaemonHandle::start_default(config).unwrap();
|
||||
handle.shutdown();
|
||||
let handle = DaemonHandle::start_default(config, false).unwrap();
|
||||
handle.stop().unwrap();
|
||||
}
|
||||
});
|
||||
complete_sanity_check(&server);
|
||||
@ -697,12 +750,15 @@ mod tests {
|
||||
complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string());
|
||||
complete_tip_init(&server);
|
||||
complete_sync_check(&server);
|
||||
daemon_thread.join().unwrap();
|
||||
t.join().unwrap();
|
||||
|
||||
// The datadir is created now, so if we restart it it won't create the wo wallet.
|
||||
let daemon_thread = thread::spawn(move || {
|
||||
let handle = DaemonHandle::start_default(config).unwrap();
|
||||
handle.shutdown();
|
||||
let t = thread::spawn({
|
||||
let config = config.clone();
|
||||
move || {
|
||||
let handle = DaemonHandle::start_default(config, false).unwrap();
|
||||
handle.stop().unwrap();
|
||||
}
|
||||
});
|
||||
complete_sanity_check(&server);
|
||||
complete_version_check(&server);
|
||||
@ -711,7 +767,7 @@ mod tests {
|
||||
complete_wallet_check(&server, &wo_path);
|
||||
complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string());
|
||||
complete_sync_check(&server);
|
||||
daemon_thread.join().unwrap();
|
||||
t.join().unwrap();
|
||||
|
||||
fs::remove_dir_all(&tmp_dir).unwrap();
|
||||
}
|
||||
|
||||
@ -2,13 +2,13 @@ use crate::{
|
||||
bitcoin::{BitcoinInterface, Block, BlockChainTip, MempoolEntry, SyncProgress, UTxO},
|
||||
config::{BitcoinConfig, Config},
|
||||
database::{BlockInfo, Coin, CoinStatus, DatabaseConnection, DatabaseInterface, LabelItem},
|
||||
descriptors, DaemonHandle,
|
||||
descriptors, DaemonControl, DaemonHandle,
|
||||
};
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
env, fs, io, path, process,
|
||||
env, fs, path, process,
|
||||
str::FromStr,
|
||||
sync, thread, time,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
@ -464,9 +464,10 @@ pub fn tmp_dir() -> path::PathBuf {
|
||||
|
||||
impl DummyLiana {
|
||||
/// Creates a new DummyLiana interface
|
||||
pub fn new(
|
||||
pub fn _new(
|
||||
bitcoin_interface: impl BitcoinInterface + 'static,
|
||||
database: impl DatabaseInterface + 'static,
|
||||
rpc_server: bool,
|
||||
) -> DummyLiana {
|
||||
let tmp_dir = tmp_dir();
|
||||
fs::create_dir_all(&tmp_dir).unwrap();
|
||||
@ -497,19 +498,37 @@ impl DummyLiana {
|
||||
main_descriptor: desc,
|
||||
};
|
||||
|
||||
let handle = DaemonHandle::start(config, Some(bitcoin_interface), Some(database)).unwrap();
|
||||
let handle =
|
||||
DaemonHandle::start(config, Some(bitcoin_interface), Some(database), rpc_server)
|
||||
.unwrap();
|
||||
DummyLiana { tmp_dir, handle }
|
||||
}
|
||||
|
||||
#[cfg(feature = "daemon")]
|
||||
pub fn rpc_server(self) -> Result<(), io::Error> {
|
||||
self.handle.rpc_server()?;
|
||||
fs::remove_dir_all(&self.tmp_dir)?;
|
||||
Ok(())
|
||||
/// Creates a new DummyLiana interface
|
||||
pub fn new(
|
||||
bitcoin_interface: impl BitcoinInterface + 'static,
|
||||
database: impl DatabaseInterface + 'static,
|
||||
) -> DummyLiana {
|
||||
Self::_new(bitcoin_interface, database, false)
|
||||
}
|
||||
|
||||
/// Creates a new DummyLiana interface which also spins up an RPC server.
|
||||
pub fn new_server(
|
||||
bitcoin_interface: impl BitcoinInterface + 'static,
|
||||
database: impl DatabaseInterface + 'static,
|
||||
) -> DummyLiana {
|
||||
Self::_new(bitcoin_interface, database, true)
|
||||
}
|
||||
|
||||
pub fn control(&self) -> &DaemonControl {
|
||||
match self.handle {
|
||||
DaemonHandle::Controller { ref control, .. } => control,
|
||||
DaemonHandle::Server { .. } => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shutdown(self) {
|
||||
self.handle.shutdown();
|
||||
fs::remove_dir_all(&self.tmp_dir).unwrap();
|
||||
self.handle.stop().unwrap();
|
||||
fs::remove_dir_all(self.tmp_dir).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user