jsonrpc: add a 'stop' command

This also implements a simple JSONRPC server sanity check

And introduces a new tmp_dir() test utility, that i didn't have the courage to split from this commit
This commit is contained in:
Antoine Poinsot 2022-08-10 18:30:41 +02:00
parent d03c469967
commit 726209cc0a
No known key found for this signature in database
GPG Key ID: E13FC145CD3F4304
5 changed files with 98 additions and 34 deletions

View File

@ -219,7 +219,8 @@ impl SqliteConn {
#[cfg(test)]
mod tests {
use super::*;
use std::{env, fs, path, process, str::FromStr, thread};
use crate::testutils::*;
use std::{fs, path, str::FromStr};
fn dummy_options() -> FreshDbOptions {
let desc_str = "wsh(andor(pk(03b506a1dbe57b4bf48c95e0c7d417b87dd3b4349d290d2e7e9ba72c912652d80a),older(10000),pk(0295e7f5d12a2061f1fd2286cefec592dff656a19f55f4f01305d6aa56630880ce)))";
@ -232,11 +233,7 @@ mod tests {
#[test]
fn db_startup_sanity_checks() {
let tmp_dir = env::temp_dir().join(format!(
"minisafed-unit-tests-{}-{:?}",
process::id(),
thread::current().id()
));
let tmp_dir = tmp_dir();
fs::create_dir_all(&tmp_dir).unwrap();
let db_path: path::PathBuf = [tmp_dir.as_path(), path::Path::new("minisafed.sqlite3")]
@ -277,11 +274,7 @@ mod tests {
#[test]
fn db_tip_update() {
let tmp_dir = env::temp_dir().join(format!(
"minisafed-unit-tests-{}-{:?}",
process::id(),
thread::current().id()
));
let tmp_dir = tmp_dir();
fs::create_dir_all(&tmp_dir).unwrap();
let db_path: path::PathBuf = [tmp_dir.as_path(), path::Path::new("minisafed.sqlite3")]

View File

@ -8,6 +8,7 @@ pub fn handle_request(control: &DaemonControl, req: Request) -> Result<Response,
let result = match req.method.as_str() {
"getinfo" => serde_json::json!(&control.get_info()),
"getnewaddress" => serde_json::json!(&control.get_new_address()),
"stop" => serde_json::json!({}),
_ => {
return Err(Error::method_not_found());
}

View File

@ -83,12 +83,13 @@ fn read_command(
fn connection_handler(
control: sync::Arc<sync::Mutex<DaemonControl>>,
mut stream: net::UnixStream,
shutdown: sync::Arc<atomic::AtomicBool>,
) -> Result<(), io::Error> {
let mut buf = vec![0; 2048];
let mut end = 0;
let mut cursor = 0;
loop {
while !shutdown.load(atomic::Ordering::Relaxed) {
let req = match read_command(&mut stream, &mut buf, &mut end, &mut cursor)? {
Some(req) => req,
None => {
@ -97,8 +98,10 @@ fn connection_handler(
}
};
// TODO: respond in case of invalid JSON or invalid JSONRPC request.
let req_id = req.id.clone();
if &req.method == "stop" {
shutdown.store(true, atomic::Ordering::Relaxed);
}
log::trace!("JSONRPC request: {:?}", serde_json::to_string(&req));
let response = api::handle_request(&control.lock().unwrap(), req)
@ -109,6 +112,8 @@ fn connection_handler(
return Ok(());
}
}
Ok(())
}
// FIXME: have a decent way to share the DaemonControl between connections. Maybe make it Clone?
@ -120,12 +125,14 @@ pub fn rpcserver_loop(
// Keep it simple. We don't need great performances so just treat each connection in
// its thread, with a given maximum number of connections.
let connections_counter = sync::Arc::from(atomic::AtomicU32::new(0));
let shutdown = sync::Arc::from(atomic::AtomicBool::new(false));
loop {
listener.set_nonblocking(true)?;
while !shutdown.load(atomic::Ordering::Relaxed) {
let (connection, _) = match listener.accept() {
Ok(c) => c,
Err(e) => {
log::error!("Accepting new connection: '{}'", e);
Err(_) => {
thread::sleep(time::Duration::from_millis(100));
continue;
}
};
@ -142,9 +149,10 @@ pub fn rpcserver_loop(
.spawn({
let control = daemon_control.clone();
let counter = connections_counter.clone();
let shutdown = shutdown.clone();
move || {
if let Err(e) = connection_handler(control, connection) {
if let Err(e) = connection_handler(control, connection, shutdown) {
log::error!("Error while handling connection {}: '{}'", handler_id, e);
} else {
log::trace!("Connection {} terminated without error.", handler_id);
@ -153,6 +161,8 @@ pub fn rpcserver_loop(
}
})?;
}
Ok(())
}
// Tries to bind to the socket, if we are told it's already in use try to connect
@ -195,7 +205,10 @@ pub fn rpcserver_setup(socket_path: &path::Path) -> Result<net::UnixListener, io
#[cfg(test)]
mod tests {
use super::*;
use crate::jsonrpc::{Params, ReqId};
use crate::{
jsonrpc::{Params, ReqId},
testutils::*,
};
use std::{env, fs, io::Write, process};
@ -377,4 +390,37 @@ mod tests {
fs::remove_file(&socket_path).unwrap();
}
// TODO: debug on MacOS
#[cfg(not(target_os = "macos"))]
#[test]
fn server_sanity_check() {
let ms = DummyMinisafe::new();
let socket_path: path::PathBuf = [
ms.tmp_dir.as_path(),
path::Path::new("datadir"),
path::Path::new("bitcoin"),
path::Path::new("minisafed_rpc"),
]
.iter()
.collect();
let t = thread::spawn(move || ms.rpc_server().unwrap());
while !socket_path.exists() {
thread::sleep(time::Duration::from_millis(100));
}
let stop_req = Request {
jsonrpc: "2.0".to_string(),
method: "stop".to_string(),
params: None,
id: ReqId::Num(0),
};
write_messages(
&socket_path,
&[&serde_json::to_vec(&stop_req).unwrap(), b"\n"],
);
t.join().unwrap();
}
}

View File

@ -372,13 +372,16 @@ impl DaemonHandle {
#[cfg(all(test, unix))]
mod tests {
use super::*;
use crate::config::{BitcoinConfig, BitcoindConfig};
use crate::{
config::{BitcoinConfig, BitcoindConfig},
testutils::*,
};
use miniscript::{bitcoin, Descriptor, DescriptorPublicKey};
use std::{
env, fs,
fs,
io::{BufRead, BufReader, Write},
net, path, process,
net, path,
str::FromStr,
thread, time,
};
@ -529,11 +532,7 @@ mod tests {
// framework.
#[test]
fn daemon_startup() {
let tmp_dir = env::temp_dir().join(format!(
"minisafed-unit-tests-{}-{:?}",
process::id(),
thread::current().id()
));
let tmp_dir = tmp_dir();
fs::create_dir_all(&tmp_dir).unwrap();
let data_dir: path::PathBuf = [tmp_dir.as_path(), path::Path::new("datadir")]
.iter()

View File

@ -2,10 +2,10 @@ use crate::{
bitcoin::{BitcoinInterface, BlockChainTip},
config::{BitcoinConfig, Config},
database::{DatabaseConnection, DatabaseInterface},
DaemonControl, DaemonHandle,
DaemonHandle,
};
use std::{env, fs, path, process, str::FromStr, sync, thread, time};
use std::{env, fs, io, path, process, str::FromStr, sync, thread, time};
use miniscript::{
bitcoin::{self, util::bip32},
@ -77,17 +77,35 @@ impl DatabaseConnection for DummyDbConn {
}
pub struct DummyMinisafe {
tmp_dir: path::PathBuf,
pub tmp_dir: path::PathBuf,
pub handle: DaemonHandle,
}
static mut COUNTER: sync::atomic::AtomicUsize = sync::atomic::AtomicUsize::new(0);
fn uid() -> usize {
unsafe {
let uid = COUNTER.load(sync::atomic::Ordering::Relaxed);
COUNTER.fetch_add(1, sync::atomic::Ordering::Relaxed);
uid
}
}
pub fn tmp_dir() -> path::PathBuf {
env::temp_dir().join(format!(
"minisafed-unit-tests-{}-{:?}-{}-{}",
process::id(),
thread::current().id(),
time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap()
.subsec_nanos(),
uid(),
))
}
impl DummyMinisafe {
pub fn new() -> DummyMinisafe {
let tmp_dir = env::temp_dir().join(format!(
"minisafed-unit-tests-{}-{:?}",
process::id(),
thread::current().id()
));
let tmp_dir = tmp_dir();
fs::create_dir_all(&tmp_dir).unwrap();
let data_dir: path::PathBuf = [tmp_dir.as_path(), path::Path::new("datadir")]
.iter()
@ -117,6 +135,13 @@ impl DummyMinisafe {
DummyMinisafe { tmp_dir, handle }
}
#[cfg(feature = "jsonrpc_server")]
pub fn rpc_server(self) -> Result<(), io::Error> {
self.handle.rpc_server()?;
fs::remove_dir_all(&self.tmp_dir)?;
Ok(())
}
pub fn shutdown(self) {
self.handle.shutdown();
fs::remove_dir_all(&self.tmp_dir).unwrap();