poller: introduce a communication channel with the poller thread

We'll need to ask the poller thread another thing besides to shut down,
so it's cleaner to start using proper messages.

The mpsc channel in the std lib was buggy for awhile but since they
merged crossbeam and are using this behind the hood now it should be
fine starting with Rust 1.67. That's (slightly) higher than our MSRV but
it's what we use for releases so that's reasonable. See
https://github.com/rust-lang/rust/issues/39364 for details.
This commit is contained in:
Antoine Poinsot 2024-03-15 11:15:53 +01:00
parent f6ce85cfd3
commit 1cf42d9aee
No known key found for this signature in database
GPG Key ID: E13FC145CD3F4304
2 changed files with 57 additions and 40 deletions

View File

@ -3,12 +3,17 @@ mod looper;
use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors}; use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors};
use std::{ use std::{
sync::{self, atomic}, sync::{self, mpsc},
thread, time, time,
}; };
use miniscript::bitcoin::secp256k1; use miniscript::bitcoin::secp256k1;
#[derive(Debug, Clone)]
pub enum PollerMessage {
Shutdown,
}
/// The Bitcoin poller handler. /// The Bitcoin poller handler.
pub struct Poller { pub struct Poller {
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>, bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
@ -50,29 +55,44 @@ impl Poller {
pub fn poll_forever( pub fn poll_forever(
&self, &self,
poll_interval: time::Duration, poll_interval: time::Duration,
shutdown: sync::Arc<atomic::AtomicBool>, receiver: mpsc::Receiver<PollerMessage>,
) { ) {
let mut last_poll = None; let mut last_poll = None;
let mut synced = false; let mut synced = false;
while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { loop {
let now = time::Instant::now(); // How long to wait before the next poll.
let time_before_poll = if let Some(last_poll) = last_poll {
if let Some(last_poll) = last_poll { let time_since_poll = time::Instant::now().duration_since(last_poll);
let time_since_poll = now.duration_since(last_poll); // Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
let poll_interval = if synced { let poll_interval = if synced {
poll_interval poll_interval
} else { } else {
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
looper::sync_poll_interval() looper::sync_poll_interval()
}; };
if time_since_poll < poll_interval { poll_interval.saturating_sub(time_since_poll)
thread::sleep(time::Duration::from_millis(500)); } else {
continue; // Don't wait before doing the first poll.
time::Duration::ZERO
};
// Wait for the duration of the interval between polls, but listen to messages in the
// meantime.
match receiver.recv_timeout(time_before_poll) {
Ok(PollerMessage::Shutdown) => {
log::info!("Bitcoin poller was told to shut down.");
return;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// It's been long enough since the last poll.
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
log::error!("Bitcoin poller communication channel got disconnected. Exiting.");
return;
} }
} }
last_poll = Some(now); last_poll = Some(time::Instant::now());
// Don't poll until the Bitcoin backend is fully synced. // Don't poll until the Bitcoin backend is fully synced.
if !synced { if !synced {

View File

@ -28,7 +28,11 @@ use crate::{
}, },
}; };
use std::{error, fmt, fs, io, path, sync, thread}; use std::{
error, fmt, fs, io, path,
sync::{self, mpsc},
thread,
};
use miniscript::bitcoin::secp256k1; use miniscript::bitcoin::secp256k1;
@ -284,12 +288,12 @@ impl DaemonControl {
/// JSONRPC server or one which exposes its API through a `DaemonControl`. /// JSONRPC server or one which exposes its API through a `DaemonControl`.
pub enum DaemonHandle { pub enum DaemonHandle {
Controller { Controller {
poller_shutdown: sync::Arc<sync::atomic::AtomicBool>, poller_sender: mpsc::SyncSender<poller::PollerMessage>,
poller_handle: thread::JoinHandle<()>, poller_handle: thread::JoinHandle<()>,
control: DaemonControl, control: DaemonControl,
}, },
Server { Server {
poller_shutdown: sync::Arc<sync::atomic::AtomicBool>, poller_sender: mpsc::SyncSender<poller::PollerMessage>,
poller_handle: thread::JoinHandle<()>, poller_handle: thread::JoinHandle<()>,
rpcserver_shutdown: sync::Arc<sync::atomic::AtomicBool>, rpcserver_shutdown: sync::Arc<sync::atomic::AtomicBool>,
rpcserver_handle: thread::JoinHandle<Result<(), io::Error>>, rpcserver_handle: thread::JoinHandle<Result<(), io::Error>>,
@ -368,15 +372,14 @@ impl DaemonHandle {
// an atomic to be able to stop it. // an atomic to be able to stop it.
let bitcoin_poller = let bitcoin_poller =
poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone()); poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone());
let poller_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); let (poller_sender, poller_receiver) = mpsc::sync_channel(0);
let poller_handle = thread::Builder::new() let poller_handle = thread::Builder::new()
.name("Bitcoin Network poller".to_string()) .name("Bitcoin Network poller".to_string())
.spawn({ .spawn({
let poll_interval = config.bitcoin_config.poll_interval_secs; let poll_interval = config.bitcoin_config.poll_interval_secs;
let shutdown = poller_shutdown.clone();
move || { move || {
log::info!("Bitcoin poller started."); log::info!("Bitcoin poller started.");
bitcoin_poller.poll_forever(poll_interval, shutdown); bitcoin_poller.poll_forever(poll_interval, poller_receiver);
log::info!("Bitcoin poller stopped."); log::info!("Bitcoin poller stopped.");
} }
}) })
@ -406,14 +409,14 @@ impl DaemonHandle {
.expect("Spawning the RPC server thread should never fail."); .expect("Spawning the RPC server thread should never fail.");
DaemonHandle::Server { DaemonHandle::Server {
poller_shutdown, poller_sender,
poller_handle, poller_handle,
rpcserver_shutdown, rpcserver_shutdown,
rpcserver_handle, rpcserver_handle,
} }
} else { } else {
DaemonHandle::Controller { DaemonHandle::Controller {
poller_shutdown, poller_sender,
poller_handle, poller_handle,
control, control,
} }
@ -454,21 +457,25 @@ impl DaemonHandle {
pub fn stop(self) -> Result<(), Box<dyn error::Error>> { pub fn stop(self) -> Result<(), Box<dyn error::Error>> {
match self { match self {
Self::Controller { Self::Controller {
poller_shutdown, poller_sender,
poller_handle, poller_handle,
.. ..
} => { } => {
poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); poller_sender
.send(poller::PollerMessage::Shutdown)
.expect("The other end should never have hung up before this.");
poller_handle.join().expect("Poller thread must not panic"); poller_handle.join().expect("Poller thread must not panic");
Ok(()) Ok(())
} }
Self::Server { Self::Server {
poller_shutdown, poller_sender,
poller_handle, poller_handle,
rpcserver_shutdown, rpcserver_shutdown,
rpcserver_handle, rpcserver_handle,
} => { } => {
poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); poller_sender
.send(poller::PollerMessage::Shutdown)
.expect("The other end should never have hung up before this.");
rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed); rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed);
rpcserver_handle rpcserver_handle
.join() .join()
@ -656,18 +663,6 @@ mod tests {
stream.flush().unwrap(); stream.flush().unwrap();
} }
// Send them a response to 'getblockchaininfo' saying we are far from being synced
fn complete_sync_check(server: &net::TcpListener) {
let net_resp = [
"HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1,\"headers\":1000,\"blocks\":100}}\n".as_bytes(),
]
.concat();
let (mut stream, _) = server.accept().unwrap();
read_til_json_end(&mut stream);
stream.write_all(&net_resp).unwrap();
stream.flush().unwrap();
}
// TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the // TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the
// bitcoind interface, and use the DummyLiana from testutils to sanity check the startup. // bitcoind interface, and use the DummyLiana from testutils to sanity check the startup.
// Note that startup as checked by this unit test is also tested in the functional test // Note that startup as checked by this unit test is also tested in the functional test
@ -744,7 +739,8 @@ mod tests {
complete_wallet_check(&server, &wo_path); complete_wallet_check(&server, &wo_path);
complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string());
complete_tip_init(&server); complete_tip_init(&server);
complete_sync_check(&server); // We don't have to complete the sync check as the poller checks whether it needs to stop
// before checking the bitcoind sync status.
t.join().unwrap(); t.join().unwrap();
// The datadir is created now, so if we restart it it won't create the wo wallet. // The datadir is created now, so if we restart it it won't create the wo wallet.
@ -761,7 +757,8 @@ mod tests {
complete_wallet_loading(&server); complete_wallet_loading(&server);
complete_wallet_check(&server, &wo_path); complete_wallet_check(&server, &wo_path);
complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string());
complete_sync_check(&server); // We don't have to complete the sync check as the poller checks whether it needs to stop
// before checking the bitcoind sync status.
t.join().unwrap(); t.join().unwrap();
fs::remove_dir_all(&tmp_dir).unwrap(); fs::remove_dir_all(&tmp_dir).unwrap();