Merge #3: Functional tests framework

7c3289fda95aa79ca62258f14b7afb28548f5541 tests: add a functional test framework, adapted from revaultd (Antoine Poinsot)
cea72d8a63a247c8bcba39ecd487731d045d75eb bitcoind: don't try to match bitcoind's error string when loading wallet (Antoine Poinsot)
096ad68c4bd1b0332835fdbacb5d540a71646217 bitcoind: correctly check for errors when creating wallet (Antoine Poinsot)
5db59b1c892b82f2778949d2a51093507165b2c0 git: add a gitignore (Antoine Poinsot)

Pull request description:

  This adds a Python functional tests framework, taken and adapted from `revaultd`. Since we don't have a JSONRPC interface yet it only contains a single trivial startup functional test... And 2 bug fixes this trivial test uncovered! :)

  CI integration is still TODO, but it's not critical at the moment and i don't have the heart of fighting with it at the moment (we require a non-standard `bitcoind`).

ACKs for top commit:
  darosior:
    ACK 7c3289fda95aa79ca62258f14b7afb28548f5541 -- mostly adapted from revaultd, and tested with the following PRs rebased on this one.

Tree-SHA512: 95bd8ecfe70b1c7bacd17d10f009ec1bcc0211cb66b9545fa25c70b02a0f12d81e7747fc94c00de62f9aa7de17e7eb95183d8a2ffa659628f40313ef6bc43a1d
This commit is contained in:
Antoine Poinsot 2022-07-27 11:33:34 +02:00
commit 55a79b7751
No known key found for this signature in database
GPG Key ID: E13FC145CD3F4304
13 changed files with 1765 additions and 20 deletions

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
tags
target/
__pycache__
config*.toml
dummy_config.toml
regtest/
venv/
pytest.log
TODO

View File

@ -334,7 +334,9 @@ impl BitcoinD {
);
if let Some(warning) = res.get("warning").map(Json::as_str).flatten() {
return Some(warning.to_string());
if !warning.is_empty() {
return Some(warning.to_string());
}
}
if res.get("name").is_none() {
return Some("Unknown error when create watchonly wallet".to_string());
@ -416,27 +418,14 @@ impl BitcoinD {
Ok(())
}
pub fn maybe_load_watchonly_wallet(&self) -> Result<(), BitcoindError> {
match self.make_fallible_node_request(
/// Try to load the watchonly wallet in bitcoind. It will continue on error (since it's
/// likely the wallet is just already loaded) and log it as info instead.
pub fn try_load_watchonly_wallet(&self) {
if let Err(e) = self.make_fallible_node_request(
"loadwallet",
&params!(Json::String(self.watchonly_wallet_path.clone()),),
) {
Err(e) => {
if e.to_string().contains("is already loaded") {
Ok(())
} else {
Err(e)
}
}
Ok(res) => {
if let Some(warning) = res.get("warning").map(Json::as_str).flatten() {
Err(BitcoindError::WalletLoading(warning.to_string()))
} else if res.get("name").is_none() {
Err(BitcoindError::WalletLoading(res.to_string()))
} else {
Ok(())
}
}
log::info!("Got error '{}' while trying to load watchonly on bitcoind. It is possibly already loaded.", e);
}
}

View File

@ -171,7 +171,7 @@ impl DaemonHandle {
bitcoind.create_watchonly_wallet(&config.main_descriptor)?;
log::info!("Created a new watchonly wallet on bitcoind.");
}
bitcoind.maybe_load_watchonly_wallet()?;
bitcoind.try_load_watchonly_wallet();
bitcoind.sanity_check(&config.main_descriptor, config.bitcoind_config.network)?;
bitcoind.with_retry_limit(None);
log::info!("Connection to bitcoind established and checked.");

53
tests/README.md Normal file
View File

@ -0,0 +1,53 @@
## Minisafed blackbox tests
Here we test `minisafed` by starting it on a regression testing Bitcoin network,
and by then talking to it as an user would, from the outside.
Python scripts are used for the automation, and specifically the [`pytest` framework](https://docs.pytest.org/en/stable/index.html).
Credits: this test framework was taken and adapted from revaultd, which was itself adapted from
[C-lightning's test framework](https://github.com/ElementsProject/lightning/tree/master/contrib/pyln-testing).
### Test dependencies
Functional tests dependencies can be installed using `pip`. Use a virtual environment.
```
# Create a new virtual environment, preferably.
python3 -m venv venv
. venv/bin/activate
# Get the deps
pip install -r tests/requirements.txt
```
Additionaly you need to have `bitcoind` installed on your computer, please
refer to [bitcoincore](https://bitcoincore.org/en/download/) for installation. You may use a
specific `bitcoind` binary by specifying the `BITCOIND_PATH` env var.
### Running the tests
From the root of the repository:
```
pytest tests/
```
### Tips and tricks
#### Logging
We use the [Live Logging](https://docs.pytest.org/en/latest/logging.html#live-logs)
functionality from pytest. It is configured in (`pyproject.toml`)[../pyproject.toml] to
output `INFO`-level to the console. If a test fails, the entire `DEBUG` log is output.
You can override the config at runtime with the `--log-cli-level` option:
```
pytest -vvv --log-cli-level=DEBUG -k test_startup
```
Note that we record all logs from daemons, and we start them with `log_level = "debug"`.
### Test lints
Just use [`black`](https://github.com/psf/black).
### More
See the environment variables in `test_framework/utils.py`.

136
tests/fixtures.py Normal file
View File

@ -0,0 +1,136 @@
from concurrent import futures
from ephemeral_port_reserve import reserve
from test_framework.bitcoind import Bitcoind
from test_framework.minisafed import Minisafed
from test_framework.utils import (
EXECUTOR_WORKERS,
)
import os
import pytest
import shutil
import tempfile
import time
# A dict in which we count how often a particular test has run so far. Used to
# give each attempt its own numbered directory, and avoid clashes.
ATTEMPTS = {}
@pytest.fixture(scope="session")
def test_base_dir():
d = os.getenv("TEST_DIR", "/tmp")
directory = tempfile.mkdtemp(prefix="minisafed-tests-", dir=d)
print("Running tests in {}".format(directory))
yield directory
content = os.listdir(directory)
if content == []:
shutil.rmtree(directory)
else:
print(f"Leaving base dir '{directory}' as it still contains {content}")
# Taken from https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
# execute all other hooks to obtain the report object
outcome = yield
rep = outcome.get_result()
# set a report attribute for each phase of a call, which can
# be "setup", "call", "teardown"
setattr(item, "rep_" + rep.when, rep)
@pytest.fixture
def directory(request, test_base_dir, test_name):
"""Return a per-test specific directory.
This makes a unique test-directory even if a test is rerun multiple times.
"""
global ATTEMPTS
# Auto set value if it isn't in the dict yet
ATTEMPTS[test_name] = ATTEMPTS.get(test_name, 0) + 1
directory = os.path.join(
test_base_dir, "{}_{}".format(test_name, ATTEMPTS[test_name])
)
if not os.path.exists(directory):
os.makedirs(directory)
yield directory
# test_base_dir is at the session scope, so we can't use request.node as mentioned in
# the doc linked in the hook above.
if request.session.testsfailed == 0:
try:
shutil.rmtree(directory)
except Exception:
files = [
os.path.join(dp, f) for dp, _, fn in os.walk(directory) for f in fn
]
print("Directory still contains files:", files)
raise
else:
print(f"Test failed, leaving directory '{directory}' intact")
@pytest.fixture
def test_name(request):
yield request.function.__name__
@pytest.fixture
def executor(test_name):
ex = futures.ThreadPoolExecutor(
max_workers=EXECUTOR_WORKERS, thread_name_prefix=test_name
)
yield ex
ex.shutdown(wait=False)
@pytest.fixture
def bitcoind(directory):
bitcoind = Bitcoind(bitcoin_dir=os.path.join(directory, "bitcoind"))
bitcoind.startup()
bitcoind.rpc.createwallet(bitcoind.rpc.wallet_name, False, False, "", False, True)
bitcoind.rpc.generatetoaddress(101, bitcoind.rpc.getnewaddress())
while bitcoind.rpc.getbalance() < 50:
time.sleep(0.01)
yield bitcoind
bitcoind.cleanup()
@pytest.fixture
def minisafed(bitcoind, directory):
datadir = os.path.join(directory, "minisafed")
os.makedirs(datadir, exist_ok=True)
bitcoind_cookie = os.path.join(bitcoind.bitcoin_dir, "regtest", ".cookie")
main_desc = "wsh(or_d(pk(02869ef67283b4bc9af9d8366efb31f718018bfd5970a69b3d16f22f51228f73dc),and_v(v:pkh(03bb4dc7ed08cc633893f457553ad941ff82195342467d350dbb63773dd17f113b),older(157680))))"
minisafed = Minisafed(
datadir,
main_desc,
bitcoind.rpcport,
bitcoind_cookie,
)
try:
minisafed.start()
yield minisafed
except Exception:
minisafed.cleanup()
raise
minisafed.cleanup()

4
tests/requirements.txt Normal file
View File

@ -0,0 +1,4 @@
pytest==6.2
pytest-xdist==1.31.0
pytest-timeout==1.3.4
ephemeral_port_reserve==1.1.1

View File

View File

@ -0,0 +1,283 @@
# This file was taken from the Bitcoin Core project in September 2021.
# Copyright (c) 2021 The Bitcoin Core developers
#
# Copyright (c) 2011 Jeff Garzik
#
# Previous copyright, from python-jsonrpc/jsonrpc/proxy.py:
#
# Copyright (c) 2007 Jan-Klaas Kollhof
#
# This file is part of jsonrpc.
#
# jsonrpc is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this software; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""HTTP proxy for opening RPC connection to bitcoind.
AuthServiceProxy has the following improvements over python-jsonrpc's
ServiceProxy class:
- HTTP connections persist for the life of the AuthServiceProxy object
(if server supports HTTP/1.1)
- sends protocol 'version', per JSON-RPC 1.1
- sends proper, incrementing 'id'
- sends Basic HTTP authentication headers
- parses all JSON numbers that look like floats as Decimal
- uses standard Python json lib
"""
import base64
import decimal
from http import HTTPStatus
import http.client
import json
import logging
import os
import socket
import time
import urllib.parse
HTTP_TIMEOUT = 300
USER_AGENT = "AuthServiceProxy/0.1"
log = logging.getLogger("BitcoinRPC")
class JSONRPCException(Exception):
def __init__(self, rpc_error, http_status=None):
try:
errmsg = "%(message)s (%(code)i)" % rpc_error
except (KeyError, TypeError):
errmsg = ""
super().__init__(errmsg)
self.error = rpc_error
self.http_status = http_status
def EncodeDecimal(o):
if isinstance(o, decimal.Decimal):
return str(o)
raise TypeError(repr(o) + " is not JSON serializable")
class AuthServiceProxy:
__id_count = 0
# ensure_ascii: escape unicode as \uXXXX, passed to json.dumps
def __init__(
self,
service_url,
service_name=None,
timeout=HTTP_TIMEOUT,
connection=None,
ensure_ascii=True,
):
self.__service_url = service_url
self._service_name = service_name
self.ensure_ascii = ensure_ascii # can be toggled on the fly by tests
self.__url = urllib.parse.urlparse(service_url)
user = (
b"" if self.__url.username is None else self.__url.username.encode("utf8")
)
passwd = (
b"" if self.__url.password is None else self.__url.password.encode("utf8")
)
authpair = user + b":" + passwd
self.__auth_header = b"Basic " + base64.b64encode(authpair)
self.timeout = timeout
self._set_conn(connection)
def __getattr__(self, name):
if name.startswith("__") and name.endswith("__"):
# Python internal stuff
raise AttributeError
if self._service_name is not None:
name = "%s.%s" % (self._service_name, name)
return AuthServiceProxy(self.__service_url, name, connection=self.__conn)
def _request(self, method, path, postdata):
"""
Do a HTTP request, with retry if we get disconnected (e.g. due to a timeout).
This is a workaround for https://bugs.python.org/issue3566 which is fixed in Python 3.5.
"""
headers = {
"Host": self.__url.hostname,
"User-Agent": USER_AGENT,
"Authorization": self.__auth_header,
"Content-type": "application/json",
}
if os.name == "nt":
# Windows somehow does not like to re-use connections
# TODO: Find out why the connection would disconnect occasionally and make it reusable on Windows
# Avoid "ConnectionAbortedError: [WinError 10053] An established connection was aborted by the software in your host machine"
self._set_conn()
try:
self.__conn.request(method, path, postdata, headers)
return self._get_response()
except (BrokenPipeError, ConnectionResetError):
# Python 3.5+ raises BrokenPipeError when the connection was reset
# ConnectionResetError happens on FreeBSD
self.__conn.close()
self.__conn.request(method, path, postdata, headers)
return self._get_response()
except OSError as e:
retry = (
"[WinError 10053] An established connection was aborted by the software in your host machine"
in str(e)
)
# Workaround for a bug on macOS. See https://bugs.python.org/issue33450
retry = retry or ("[Errno 41] Protocol wrong type for socket" in str(e))
if retry:
self.__conn.close()
self.__conn.request(method, path, postdata, headers)
return self._get_response()
else:
raise
def get_request(self, *args, **argsn):
AuthServiceProxy.__id_count += 1
# FIXME: keep this but with a lower degree of verbosity
# log.debug(
# "-{}-> {} {}".format(
# AuthServiceProxy.__id_count,
# self._service_name,
# json.dumps(
# args or argsn, default=EncodeDecimal, ensure_ascii=self.ensure_ascii
# ),
# )
# )
if args and argsn:
raise ValueError("Cannot handle both named and positional arguments")
return {
"version": "1.1",
"method": self._service_name,
"params": args or argsn,
"id": AuthServiceProxy.__id_count,
}
def __call__(self, *args, **argsn):
postdata = json.dumps(
self.get_request(*args, **argsn),
default=EncodeDecimal,
ensure_ascii=self.ensure_ascii,
)
response, status = self._request(
"POST", self.__url.path, postdata.encode("utf-8")
)
if response["error"] is not None:
raise JSONRPCException(response["error"], status)
elif "result" not in response:
raise JSONRPCException(
{"code": -343, "message": "missing JSON-RPC result"}, status
)
elif status != HTTPStatus.OK:
raise JSONRPCException(
{
"code": -342,
"message": "non-200 HTTP status code but no JSON-RPC error",
},
status,
)
else:
return response["result"]
def batch(self, rpc_call_list):
postdata = json.dumps(
list(rpc_call_list), default=EncodeDecimal, ensure_ascii=self.ensure_ascii
)
# FIXME: keep this but with a lower degree of verbosity
# log.debug("--> " + postdata)
response, status = self._request(
"POST", self.__url.path, postdata.encode("utf-8")
)
if status != HTTPStatus.OK:
raise JSONRPCException(
{
"code": -342,
"message": "non-200 HTTP status code but no JSON-RPC error",
},
status,
)
return response
def _get_response(self):
# req_start_time = time.time()
try:
http_response = self.__conn.getresponse()
except socket.timeout:
raise JSONRPCException(
{
"code": -344,
"message": "%r RPC took longer than %f seconds. Consider "
"using larger timeout for calls that take "
"longer to return." % (self._service_name, self.__conn.timeout),
}
)
if http_response is None:
raise JSONRPCException(
{"code": -342, "message": "missing HTTP response from server"}
)
content_type = http_response.getheader("Content-Type")
if content_type != "application/json":
raise JSONRPCException(
{
"code": -342,
"message": "non-JSON HTTP response with '%i %s' from server"
% (http_response.status, http_response.reason),
},
http_response.status,
)
responsedata = http_response.read().decode("utf8")
response = json.loads(responsedata, parse_float=decimal.Decimal)
# FIXME: keep this but with a lower degree of verbosity
# elapsed = time.time() - req_start_time
# if "error" in response and response["error"] is None:
# log.debug(
# "<-%s- [%.6f] %s"
# % (
# response["id"],
# elapsed,
# json.dumps(
# response["result"],
# default=EncodeDecimal,
# ensure_ascii=self.ensure_ascii,
# ),
# )
# )
# else:
# log.debug("<-- [%.6f] %s" % (elapsed, responsedata))
return response, http_response.status
def __truediv__(self, relative_uri):
return AuthServiceProxy(
"{}/{}".format(self.__service_url, relative_uri),
self._service_name,
connection=self.__conn,
)
def _set_conn(self, connection=None):
port = 80 if self.__url.port is None else self.__url.port
if connection:
self.__conn = connection
self.timeout = connection.timeout
elif self.__url.scheme == "https":
self.__conn = http.client.HTTPSConnection(
self.__url.hostname, port, timeout=self.timeout
)
else:
self.__conn = http.client.HTTPConnection(
self.__url.hostname, port, timeout=self.timeout
)

View File

@ -0,0 +1,192 @@
import logging
import os
from decimal import Decimal
from ephemeral_port_reserve import reserve
from test_framework.authproxy import AuthServiceProxy
from test_framework.utils import TailableProc, wait_for, TIMEOUT, BITCOIND_PATH, COIN
class BitcoindRpcInterface:
def __init__(self, data_dir, network, rpc_port):
self.cookie_path = os.path.join(data_dir, network, ".cookie")
self.rpc_port = rpc_port
self.wallet_name = "minisafed-tests"
def __getattr__(self, name):
assert not (name.startswith("__") and name.endswith("__")), "Python internals"
with open(self.cookie_path) as fd:
authpair = fd.read()
service_url = (
f"http://{authpair}@localhost:{self.rpc_port}/wallet/{self.wallet_name}"
)
proxy = AuthServiceProxy(service_url, name)
def f(*args):
return proxy.__call__(*args)
# Make debuggers show <function bitcoin.rpc.name> rather than <function
# bitcoin.rpc.<lambda>>
f.__name__ = name
return f
class Bitcoind(TailableProc):
def __init__(self, bitcoin_dir, rpcport=None):
TailableProc.__init__(self, bitcoin_dir, verbose=False)
if rpcport is None:
rpcport = reserve()
self.bitcoin_dir = bitcoin_dir
self.rpcport = rpcport
self.p2pport = reserve()
self.prefix = "bitcoind"
regtestdir = os.path.join(bitcoin_dir, "regtest")
if not os.path.exists(regtestdir):
os.makedirs(regtestdir)
self.cmd_line = [
BITCOIND_PATH,
"-datadir={}".format(bitcoin_dir),
"-printtoconsole",
"-server",
]
bitcoind_conf = {
"port": self.p2pport,
"rpcport": rpcport,
"debug": 1,
"fallbackfee": Decimal(1000) / COIN,
"rpcthreads": 32,
}
self.conf_file = os.path.join(bitcoin_dir, "bitcoin.conf")
with open(self.conf_file, "w") as f:
f.write("chain=regtest\n")
f.write("[regtest]\n")
for k, v in bitcoind_conf.items():
f.write(f"{k}={v}\n")
self.rpc = BitcoindRpcInterface(bitcoin_dir, "regtest", rpcport)
def start(self):
TailableProc.start(self)
self.wait_for_log("Done loading", timeout=TIMEOUT)
logging.info("Bitcoind started")
def stop(self):
self.rpc.stop()
return TailableProc.stop(self)
# wait_for_mempool can be used to wait for the mempool before generating
# blocks:
# True := wait for at least 1 transation
# int > 0 := wait for at least N transactions
# 'tx_id' := wait for one transaction id given as a string
# ['tx_id1', 'tx_id2'] := wait until all of the specified transaction IDs
def generate_block(self, numblocks=1, wait_for_mempool=0):
if wait_for_mempool:
if isinstance(wait_for_mempool, str):
wait_for_mempool = [wait_for_mempool]
if isinstance(wait_for_mempool, list):
wait_for(
lambda: all(
txid in self.rpc.getrawmempool() for txid in wait_for_mempool
)
)
else:
wait_for(lambda: len(self.rpc.getrawmempool()) >= wait_for_mempool)
old_blockcount = self.rpc.getblockcount()
addr = self.rpc.getnewaddress()
self.rpc.generatetoaddress(numblocks, addr)
wait_for(lambda: self.rpc.getblockcount() == old_blockcount + numblocks)
def get_coins(self, amount_btc):
# subsidy halving is every 150 blocks on regtest, it's a rough estimate
# to avoid looping in most cases
numblocks = amount_btc // 25 + 1
while self.rpc.getbalance() < amount_btc:
self.generate_block(numblocks)
def generate_blocks_censor(self, n, txids):
"""Generate {n} blocks ignoring {txids}"""
fee_delta = 1000000
for txid in txids:
self.rpc.prioritisetransaction(txid, None, -fee_delta)
self.generate_block(n)
for txid in txids:
self.rpc.prioritisetransaction(txid, None, fee_delta)
def generate_empty_blocks(self, n):
"""Generate {n} empty blocks"""
addr = self.rpc.getnewaddress()
for _ in range(n):
self.rpc.generateblock(addr, [])
def simple_reorg(self, height, shift=0):
"""
Reorganize chain by creating a fork at height={height} and:
- If shift >=0:
- re-mine all mempool transactions into {height} + shift
(with shift floored at 1)
- Else:
- don't re-mine the mempool transactions
Note that tx's that become invalid at {height} (because coin maturity,
locktime etc.) are removed from mempool. The length of the new chain
will be original + 1 OR original + {shift}, whichever is larger.
For example: to push tx's backward from height h1 to h2 < h1,
use {height}=h2.
Or to change the txindex of tx's at height h1:
1. A block at height h2 < h1 should contain a non-coinbase tx that can
be pulled forward to h1.
2. Set {height}=h2 and {shift}= h1-h2
"""
orig_len = self.rpc.getblockcount()
old_hash = self.rpc.getblockhash(height)
if height + shift > orig_len:
final_len = height + shift
else:
final_len = 1 + orig_len
self.rpc.invalidateblock(old_hash)
self.wait_for_log(
r"InvalidChainFound: invalid block=.* height={}".format(height)
)
memp = self.rpc.getrawmempool()
if shift < 0:
self.generate_empty_blocks(1 + final_len - height)
elif shift == 0:
self.generate_block(1 + final_len - height, memp)
else:
self.generate_empty_blocks(shift)
self.generate_block(1 + final_len - (height + shift), memp)
self.wait_for_log(r"UpdateTip: new best=.* height={}".format(final_len))
def startup(self):
try:
self.start()
except Exception:
self.stop()
raise
info = self.rpc.getnetworkinfo()
if info["version"] < 220000:
self.rpc.stop()
raise ValueError(
"bitcoind is too old. Minimum supported version is 0.22.0."
" Current is {}".format(info["version"])
)
def cleanup(self):
try:
self.stop()
except Exception:
self.proc.kill()
self.proc.wait()

View File

@ -0,0 +1,55 @@
import os
from test_framework.utils import (
TailableProc,
VERBOSE,
LOG_LEVEL,
MINISAFED_PATH,
)
class Minisafed(TailableProc):
def __init__(
self,
datadir,
main_desc,
bitcoind_rpc_port,
bitcoind_cookie_path,
):
TailableProc.__init__(self, datadir, verbose=VERBOSE)
self.prefix = os.path.split(datadir)[-1]
self.conf_file = os.path.join(datadir, "config.toml")
self.cmd_line = [MINISAFED_PATH, "--conf", f"{self.conf_file}"]
with open(self.conf_file, "w") as f:
f.write(f"data_dir = '{datadir}'\n")
f.write("daemon = false\n")
f.write(f"log_level = '{LOG_LEVEL}'\n")
f.write(f'main_descriptor = "{main_desc}"\n')
f.write("[bitcoind_config]\n")
f.write('network = "regtest"\n')
f.write(f"cookie_path = '{bitcoind_cookie_path}'\n")
f.write(f"addr = '127.0.0.1:{bitcoind_rpc_port}'\n")
f.write("poll_interval_secs = 1\n")
def start(self):
TailableProc.start(self)
self.wait_for_logs(
[
"Database initialized and checked",
"Connection to bitcoind established and checked.",
]
)
def stop(self, timeout=5):
return TailableProc.stop(self)
def cleanup(self):
try:
self.stop()
except Exception:
self.proc.kill()

View File

@ -0,0 +1,811 @@
import bip32
import logging
import os
import random
from ephemeral_port_reserve import reserve
from nacl.public import PrivateKey as Curve25519Private
from test_framework import serializations
from test_framework.bitcoind import BitcoindRpcProxy
from test_framework.coordinatord import Coordinatord
from test_framework.cosignerd import Cosignerd
from test_framework.miradord import Miradord
from test_framework.revaultd import ManagerRevaultd, StakeholderRevaultd, StkManRevaultd
from test_framework.utils import (
get_descriptors,
get_participants,
finalize_input,
wait_for,
TIMEOUT,
WT_PLUGINS_DIR,
)
class RevaultNetwork:
# FIXME: we use a single bitcoind for all the wallets because it's much
# more efficient. Eventually, we may have to test with separate ones.
def __init__(
self,
root_dir,
bitcoind,
executor,
postgres_user,
postgres_pass,
postgres_host="localhost",
):
self.root_dir = root_dir
self.bitcoind = bitcoind
self.daemons = []
self.executor = executor
self.postgres_user = postgres_user
self.postgres_pass = postgres_pass
self.postgres_host = postgres_host
self.coordinator_port = reserve()
self.stk_wallets = []
self.stkman_wallets = []
self.man_wallets = []
self.csv = None
self.emergency_address = None
self.bitcoind_proxy = None
def deploy(
self,
n_stakeholders,
n_managers,
n_stkmanagers=0,
csv=None,
managers_threshold=None,
with_cosigs=True,
with_watchtowers=True,
with_cpfp=True,
bitcoind_rpc_mocks=[],
):
"""
Deploy a revault setup with {n_stakeholders} stakeholders, {n_managers}
managers.
"""
# They didn't provide it, defaults to n_managers
# PS: No I can't just managers_threshold=n_managers in the method's signature :(
if managers_threshold == None:
managers_threshold = n_managers + n_stkmanagers
assert n_stakeholders + n_stkmanagers >= 2, "Not enough stakeholders"
assert n_managers + n_stkmanagers >= 1, "Not enough managers"
assert managers_threshold <= n_managers + n_stkmanagers, "Invalid threshold"
# Connection info to bitcoind. Change the port depending on whether we are proxying
# the daemons' requests.
bitcoind_cookie = os.path.join(self.bitcoind.bitcoin_dir, "regtest", ".cookie")
if len(bitcoind_rpc_mocks) > 0:
self.bitcoind_proxy = BitcoindRpcProxy(
self.bitcoind.rpcport, bitcoind_cookie, bitcoind_rpc_mocks
)
bitcoind_rpcport = self.bitcoind_proxy.rpcport
else:
bitcoind_rpcport = self.bitcoind.rpcport
(
stkonly_keychains,
stkonly_cosig_keychains,
manonly_keychains,
stkman_stk_keychains,
stkman_cosig_keychains,
stkman_man_keychains,
) = get_participants(n_stakeholders, n_managers, n_stkmanagers, with_cosigs)
stks_keychains = stkonly_keychains + stkman_stk_keychains
cosigs_keychains = stkonly_cosig_keychains + stkman_cosig_keychains
mans_keychains = manonly_keychains + stkman_man_keychains
if csv is None:
# Not more than 6 months
csv = random.randint(1, 26784)
self.csv = csv
man_cpfp_seeds = [os.urandom(32) for _ in range(len(manonly_keychains))]
man_cpfp_privs = [
bip32.BIP32.from_seed(seed, network="test") for seed in man_cpfp_seeds
]
stkman_cpfp_seeds = [os.urandom(32) for _ in range(len(stkman_man_keychains))]
stkman_cpfp_privs = [
bip32.BIP32.from_seed(seed, network="test") for seed in stkman_cpfp_seeds
]
cpfp_xpubs = [c.get_xpub() for c in man_cpfp_privs + stkman_cpfp_privs]
stks_xpubs = [stk.get_xpub() for stk in stks_keychains]
cosigs_keys = [cosig.get_static_key().hex() for cosig in cosigs_keychains]
mans_xpubs = [man.get_xpub() for man in mans_keychains]
(self.deposit_desc, self.unvault_desc, self.cpfp_desc) = get_descriptors(
stks_xpubs, cosigs_keys, mans_xpubs, managers_threshold, cpfp_xpubs, csv
)
# Generate a dummy 2of2 to be used as our Emergency address
desc = "wsh(multi(2,cRE7qAArQYnFQK7S1gXFTArFT4UWvh8J2v2EUajRWXbWFvRzxoeF,\
cTzcgRCmHNqUqZuZgvCPLUDXXrQSoVQpZiXQZWQzsLEytcTr6iXi))"
checksum = self.bitcoind.rpc.getdescriptorinfo(desc)["checksum"]
desc = f"{desc}#{checksum}"
self.emergency_address = self.bitcoind.rpc.deriveaddresses(desc)[0]
desc_import = self.bitcoind.rpc.importdescriptors(
[
{
"desc": desc,
"timestamp": "now",
"label": "revault-emergency",
}
]
)
if not desc_import[0]["success"]:
raise Exception(desc_import)
# FIXME: this is getting dirty.. We should re-centralize information
# about each participant in specified data structures
stkonly_cosigners_ports = []
stkman_cosigners_ports = []
# The Noise keys are interdependant, so generate everything in advance
# to avoid roundtrips
coordinator_noisepriv = os.urandom(32)
coordinator_noisepub = bytes(
Curve25519Private(coordinator_noisepriv).public_key
)
(stkonly_noiseprivs, stkonly_noisepubs) = ([], [])
(stkonly_wt_noiseprivs, stkonly_wt_noisepubs) = ([], [])
(stkonly_cosig_noiseprivs, stkonly_cosig_noisepubs) = ([], [])
for i in range(len(stkonly_keychains)):
stkonly_noiseprivs.append(os.urandom(32))
stkonly_noisepubs.append(
bytes(Curve25519Private(stkonly_noiseprivs[i]).public_key)
)
if with_cosigs:
stkonly_cosig_noiseprivs.append(os.urandom(32))
stkonly_cosig_noisepubs.append(
bytes(Curve25519Private(stkonly_cosig_noiseprivs[i]).public_key)
)
# Unused yet
stkonly_wt_noiseprivs.append(os.urandom(32))
stkonly_wt_noisepubs.append(
bytes(Curve25519Private(stkonly_wt_noiseprivs[i]).public_key)
)
(stkman_noiseprivs, stkman_noisepubs) = ([], [])
(stkman_wt_noiseprivs, stkman_wt_noisepubs) = ([], [])
(stkman_cosig_noiseprivs, stkman_cosig_noisepubs) = ([], [])
for i in range(len(stkman_stk_keychains)):
stkman_noiseprivs.append(os.urandom(32))
stkman_noisepubs.append(
bytes(Curve25519Private(stkman_noiseprivs[i]).public_key)
)
if with_cosigs:
stkman_cosig_noiseprivs.append(os.urandom(32))
stkman_cosig_noisepubs.append(
bytes(Curve25519Private(stkman_cosig_noiseprivs[i]).public_key)
)
# Unused yet
stkman_wt_noiseprivs.append(os.urandom(32))
stkman_wt_noisepubs.append(
bytes(Curve25519Private(stkman_wt_noiseprivs[i]).public_key)
)
(man_noiseprivs, man_noisepubs) = ([], [])
for i in range(len(manonly_keychains)):
man_noiseprivs.append(os.urandom(32))
man_noisepubs.append(bytes(Curve25519Private(man_noiseprivs[i]).public_key))
logging.debug(
f"Using Noise pubkeys:\n- Stakeholders: {stkonly_noisepubs + stkman_noisepubs}"
f" (of which {len(stkman_noisepubs)} are also managers)"
f"\n- Managers: {man_noisepubs}\n- Watchtowers:"
f"{stkonly_wt_noisepubs + stkman_wt_noisepubs}\n"
)
# Spin up the "Sync Server"
coord_datadir = os.path.join(self.root_dir, "coordinatord")
os.makedirs(coord_datadir, exist_ok=True)
coordinatord = Coordinatord(
coord_datadir,
coordinator_noisepriv,
man_noisepubs + stkman_noisepubs,
stkonly_noisepubs + stkman_noisepubs,
stkonly_wt_noisepubs + stkman_wt_noisepubs,
self.coordinator_port,
bitcoind_rpcport,
bitcoind_cookie,
self.postgres_user,
self.postgres_pass,
self.postgres_host,
)
coordinatord.start()
self.daemons.append(coordinatord)
cosigners_info = []
for (i, noisepub) in enumerate(stkonly_cosig_noisepubs):
stkonly_cosigners_ports.append(reserve())
cosigners_info.append(
{
"host": f"127.0.0.1:{stkonly_cosigners_ports[i]}",
"noise_key": noisepub,
}
)
for (i, noisepub) in enumerate(stkman_cosig_noisepubs):
stkman_cosigners_ports.append(reserve())
cosigners_info.append(
{
"host": f"127.0.0.1:{stkman_cosigners_ports[i]}",
"noise_key": noisepub,
}
)
# Start daemons in parallel, as it takes a few seconds for each
start_jobs = []
# By default the watchtower should not revault anything
default_wt_plugin = {
"path": os.path.join(WT_PLUGINS_DIR, "revault_nothing.py"),
"conf": {},
}
# Spin up the stakeholders wallets and their cosigning servers
for i, stk in enumerate(stkonly_keychains):
if with_watchtowers:
datadir = os.path.join(self.root_dir, f"miradord-{i}")
os.makedirs(datadir)
wt_listen_port = reserve()
miradord = Miradord(
datadir,
str(self.deposit_desc),
str(self.unvault_desc),
str(self.cpfp_desc),
self.emergency_address,
wt_listen_port,
stkonly_wt_noiseprivs[i],
stkonly_noisepubs[i].hex(),
coordinator_noisepub.hex(),
self.coordinator_port,
bitcoind_rpcport,
bitcoind_cookie,
plugins=[default_wt_plugin],
)
start_jobs.append(self.executor.submit(miradord.start))
self.daemons.append(miradord)
datadir = os.path.join(self.root_dir, f"revaultd-stk-{i}")
os.makedirs(datadir, exist_ok=True)
stk_config = {
"keychain": stk,
"watchtowers": [
{
"host": f"127.0.0.1:{wt_listen_port}",
"noise_key": stkonly_wt_noisepubs[i].hex(),
}
]
if with_watchtowers
else [],
"emergency_address": self.emergency_address,
}
revaultd = StakeholderRevaultd(
datadir,
str(self.deposit_desc),
str(self.unvault_desc),
str(self.cpfp_desc),
stkonly_noiseprivs[i],
coordinator_noisepub.hex(),
self.coordinator_port,
bitcoind_rpcport,
bitcoind_cookie,
stk_config,
wt_process=miradord if with_watchtowers else None,
)
start_jobs.append(self.executor.submit(revaultd.start))
self.stk_wallets.append(revaultd)
if with_cosigs:
datadir = os.path.join(self.root_dir, f"cosignerd-stk-{i}")
os.makedirs(datadir, exist_ok=True)
cosignerd = Cosignerd(
datadir,
stkonly_cosig_noiseprivs[i],
stkonly_cosig_keychains[i].get_bitcoin_priv(),
stkonly_cosigners_ports[i],
man_noisepubs + stkman_noisepubs,
)
start_jobs.append(self.executor.submit(cosignerd.start))
self.daemons.append(cosignerd)
# Spin up the stakeholder-managers wallets and their cosigning servers
for i, stkman in enumerate(stkman_stk_keychains):
if with_watchtowers:
datadir = os.path.join(self.root_dir, f"miradord-stkman-{i}")
os.makedirs(datadir)
wt_listen_port = reserve()
miradord = Miradord(
datadir,
str(self.deposit_desc),
str(self.unvault_desc),
str(self.cpfp_desc),
self.emergency_address,
wt_listen_port,
stkman_wt_noiseprivs[i],
stkman_noisepubs[i].hex(),
coordinator_noisepub.hex(),
self.coordinator_port,
bitcoind_rpcport,
bitcoind_cookie,
plugins=[default_wt_plugin],
)
start_jobs.append(self.executor.submit(miradord.start))
self.daemons.append(miradord)
datadir = os.path.join(self.root_dir, f"revaultd-stkman-{i}")
os.makedirs(datadir, exist_ok=True)
stk_config = {
"keychain": stkman,
"watchtowers": [
{
"host": f"127.0.0.1:{wt_listen_port}",
"noise_key": stkman_wt_noisepubs[i].hex(),
}
]
if with_watchtowers
else [],
"emergency_address": self.emergency_address,
}
man_config = {
"keychain": stkman_man_keychains[i],
"cosigners": cosigners_info,
}
revaultd = StkManRevaultd(
datadir,
str(self.deposit_desc),
str(self.unvault_desc),
str(self.cpfp_desc),
stkman_noiseprivs[i],
coordinator_noisepub.hex(),
self.coordinator_port,
bitcoind_rpcport,
bitcoind_cookie,
stk_config,
man_config,
wt_process=miradord if with_watchtowers else None,
cpfp_seed=stkman_cpfp_seeds[i] if with_cpfp else None,
)
start_jobs.append(self.executor.submit(revaultd.start))
self.stkman_wallets.append(revaultd)
if with_cosigs:
datadir = os.path.join(self.root_dir, f"cosignerd-stkman-{i}")
os.makedirs(datadir, exist_ok=True)
cosignerd = Cosignerd(
datadir,
stkman_cosig_noiseprivs[i],
stkman_cosig_keychains[i].get_bitcoin_priv(),
stkman_cosigners_ports[i],
man_noisepubs + stkman_noisepubs,
)
start_jobs.append(self.executor.submit(cosignerd.start))
self.daemons.append(cosignerd)
# Spin up the managers (only) wallets
for i, man in enumerate(manonly_keychains):
datadir = os.path.join(self.root_dir, f"revaultd-man-{i}")
os.makedirs(datadir, exist_ok=True)
man_config = {"keychain": man, "cosigners": cosigners_info}
daemon = ManagerRevaultd(
datadir,
str(self.deposit_desc),
str(self.unvault_desc),
str(self.cpfp_desc),
man_noiseprivs[i],
coordinator_noisepub.hex(),
self.coordinator_port,
bitcoind_rpcport,
bitcoind_cookie,
man_config,
cpfp_seed=man_cpfp_seeds[i] if with_cpfp else None,
)
start_jobs.append(self.executor.submit(daemon.start))
self.man_wallets.append(daemon)
for j in start_jobs:
j.result(TIMEOUT)
self.daemons += self.stk_wallets + self.stkman_wallets + self.man_wallets
def mans(self):
return self.stkman_wallets + self.man_wallets
def stks(self):
return self.stkman_wallets + self.stk_wallets
def participants(self):
return self.stkman_wallets + self.stk_wallets + self.man_wallets
def man(self, n):
"""Get the {n}th manager (including the stakeholder-managers first)"""
mans = self.stkman_wallets + self.man_wallets
return mans[n]
def stk(self, n):
"""Get the {n}th stakeholder (including the stakeholder-managers first)"""
stks = self.stkman_wallets + self.stk_wallets
return stks[n]
def signed_unvault_psbt(self, deposit, derivation_index):
"""Get the fully-signed Unvault transaction for this deposit.
This will raise if we don't have all the signatures.
"""
psbt_str = self.stks()[0].rpc.listpresignedtransactions([deposit])[
"presigned_transactions"
][0]["unvault"]
psbt = serializations.PSBT()
psbt.deserialize(psbt_str)
finalize_input(self.deposit_desc, psbt.inputs[0], derivation_index)
psbt.tx.wit.vtxinwit.append(psbt.inputs[0].final_script_witness)
return psbt.tx.serialize_with_witness().hex()
def signed_cancel_psbt(self, deposit, derivation_index):
"""Get the fully-signed Cancel transaction for this deposit.
This picks the lowest feerate version.
This will raise if we don't have all the signatures.
"""
psbt_str = self.stks()[0].rpc.listpresignedtransactions([deposit])[
"presigned_transactions"
][0]["cancel"][0]
psbt = serializations.PSBT()
psbt.deserialize(psbt_str)
finalize_input(self.unvault_desc, psbt.inputs[0], derivation_index)
psbt.tx.wit.vtxinwit.append(psbt.inputs[0].final_script_witness)
return psbt.tx.serialize_with_witness().hex()
def get_vault(self, address):
"""Get a vault entry by outpoint or by address"""
for v in self.man(0).rpc.listvaults()["vaults"]:
if v["address"] == address:
return v
def fund(self, amount=None):
"""Deposit coins into the architectures, by paying to the deposit
descriptor and getting the tx 6 blocks confirmations."""
assert (
len(self.man_wallets + self.stkman_wallets) > 0
), "You must have deploy()ed first"
man = self.man(0)
if amount is None:
amount = 49.9999
addr = man.rpc.getdepositaddress()["address"]
txid = self.bitcoind.rpc.sendtoaddress(addr, amount)
man.wait_for_log(f"Got a new unconfirmed deposit at {txid}")
self.bitcoind.generate_block(6, wait_for_mempool=txid)
man.wait_for_log(f"Vault at {txid}.* is now confirmed")
vaults = man.rpc.listvaults(["funded"])["vaults"]
for v in vaults:
if v["txid"] == txid:
for w in self.man_wallets + self.stk_wallets:
w.wait_for_deposits([f"{txid}:{v['vout']}"])
return v
raise Exception(f"Vault created by '{txid}' got in logs but not in listvaults?")
def fundmany(self, amounts=[]):
"""Deposit coins into the architectures in a single transaction"""
assert (
len(self.man_wallets + self.stkman_wallets) > 0
), "You must have deploy()ed first"
assert len(amounts) > 0, "You must provide at least an amount!"
man = self.man(0)
curr_index = 0
vaults = man.rpc.listvaults()["vaults"]
for v in vaults:
if v["derivation_index"] > curr_index:
curr_index = v["derivation_index"]
indexes = list(range(curr_index + 1, curr_index + 1 + len(amounts)))
amounts_sendmany = {}
for i, amount in enumerate(amounts):
amounts_sendmany[man.rpc.getdepositaddress(indexes[i])["address"]] = amount
txid = self.bitcoind.rpc.sendmany("", amounts_sendmany)
man.wait_for_logs(
[f"Got a new unconfirmed deposit at {txid}" for _ in range(len(amounts))],
timeout=TIMEOUT * max(1, len(amounts) / 10),
)
self.bitcoind.generate_block(6, wait_for_mempool=txid)
man.wait_for_logs(
[f"Vault at {txid}.* is now confirmed" for _ in range(len(amounts))],
timeout=TIMEOUT * max(1, len(amounts) / 10),
)
# Return the vaults we created
all_vaults = man.rpc.listvaults(["funded"])["vaults"]
created_vaults = []
for v in all_vaults:
if v["txid"] == txid:
created_vaults.append(v)
assert len(created_vaults) == len(amounts)
return created_vaults
def secure_vault(self, vault):
"""Make all stakeholders share signatures for all revocation txs"""
deposit = f"{vault['txid']}:{vault['vout']}"
for stk in self.stks():
stk.wait_for_deposits([deposit])
psbts = stk.rpc.getrevocationtxs(deposit)
cancel_psbts = [
stk.stk_keychain.sign_revocation_psbt(c, vault["derivation_index"])
for c in psbts["cancel_txs"]
]
emer_psbt = stk.stk_keychain.sign_revocation_psbt(
psbts["emergency_tx"], vault["derivation_index"]
)
unemer_psbt = stk.stk_keychain.sign_revocation_psbt(
psbts["emergency_unvault_tx"], vault["derivation_index"]
)
stk.rpc.revocationtxs(deposit, cancel_psbts, emer_psbt, unemer_psbt)
for w in self.participants():
w.wait_for_secured_vaults([deposit])
def secure_vaults(self, vaults):
"""Secure all these vaults, concurrently."""
sec_jobs = []
for v in vaults:
sec_jobs.append(self.executor.submit(self.secure_vault, v))
for j in sec_jobs:
j.result(TIMEOUT)
def activate_vault(self, vault):
"""Make all stakeholders share signatures for the unvault tx"""
deposit = f"{vault['txid']}:{vault['vout']}"
for stk in self.stks():
stk.wait_for_secured_vaults([deposit])
unvault_psbt = stk.rpc.getunvaulttx(deposit)["unvault_tx"]
unvault_psbt = stk.stk_keychain.sign_unvault_psbt(
unvault_psbt, vault["derivation_index"]
)
stk.rpc.unvaulttx(deposit, unvault_psbt)
for w in self.participants():
w.wait_for_active_vaults([deposit])
def activate_fresh_vaults(self, vaults):
"""Secure then activate all these vaults, concurrently."""
# TODO: i'm sure we don't even need to wait for all sec jobs to be complete
# before starting the activate_vault futures, given a high enough TIMEOUT.
self.secure_vaults(vaults)
act_jobs = []
for v in vaults:
act_jobs.append(self.executor.submit(self.activate_vault, v))
for j in act_jobs:
j.result(TIMEOUT)
def broadcast_unvaults(self, vaults, destinations, feerate, priority=False):
"""
Broadcast the Unvault transactions for these {vaults}, advertizing a
Spend tx spending to these {destinations} (mapping of addresses to
amounts)
"""
man = self.man(0)
deposits = []
deriv_indexes = []
for v in vaults:
deposits.append(f"{v['txid']}:{v['vout']}")
deriv_indexes.append(v["derivation_index"])
man.wait_for_active_vaults(deposits)
spend_tx = man.rpc.getspendtx(deposits, destinations, feerate)["spend_tx"][
"psbt"
]
for man in self.mans():
spend_tx = man.man_keychain.sign_spend_psbt(spend_tx, deriv_indexes)
man.rpc.updatespendtx(spend_tx)
spend_psbt = serializations.PSBT()
spend_psbt.deserialize(spend_tx)
spend_psbt.tx.calc_sha256()
man.rpc.setspendtx(spend_psbt.tx.hash, priority)
return spend_psbt
def unvault_vaults(self, vaults, destinations, feerate, priority=False):
"""
Unvault these {vaults}, advertizing a Spend tx spending to these {destinations}
(mapping of addresses to amounts)
"""
spend_psbt = self.broadcast_unvaults(vaults, destinations, feerate, priority)
deposits = [f"{v['txid']}:{v['vout']}" for v in vaults]
self.bitcoind.generate_block(1, wait_for_mempool=len(deposits))
for w in self.participants():
wait_for(
lambda: len(w.rpc.listvaults(["unvaulted"], deposits)["vaults"])
== len(deposits)
)
return spend_psbt
def spend_vaults_unconfirmed(self, vaults, destinations, feerate, priority=False):
"""
Spend these {vaults} to these {destinations} (mapping of addresses to amounts), not
confirming the Spend transaction.
Make sure to call this only with revault deployment with a low (<500) CSV, or you'll encounter
an ugly timeout from bitcoinlib.
:return: the list of spent deposits along with the Spend PSBT.
"""
assert len(vaults) > 0
man = self.man(0)
deposits = []
deriv_indexes = []
for v in vaults:
deposits.append(f"{v['txid']}:{v['vout']}")
deriv_indexes.append(v["derivation_index"])
for man in self.mans():
man.wait_for_active_vaults(deposits)
spend_tx = man.rpc.getspendtx(deposits, destinations, feerate)["spend_tx"][
"psbt"
]
for man in self.mans():
spend_tx = man.man_keychain.sign_spend_psbt(spend_tx, deriv_indexes)
man.rpc.updatespendtx(spend_tx)
spend_psbt = serializations.PSBT()
spend_psbt.deserialize(spend_tx)
spend_psbt.tx.calc_sha256()
man.rpc.setspendtx(spend_psbt.tx.hash, priority)
self.bitcoind.generate_block(1, wait_for_mempool=len(deposits))
self.bitcoind.generate_block(self.csv)
man.wait_for_log(
f"Succesfully broadcasted Spend tx '{spend_psbt.tx.hash}'",
)
for w in self.participants():
wait_for(
lambda: len(w.rpc.listvaults(["spending"], deposits)["vaults"])
== len(deposits)
)
return deposits, spend_psbt
def spend_vaults(self, vaults, destinations, feerate, priority=False):
"""
Spend these {vaults} to these {destinations} (mapping of addresses to amounts).
Make sure to call this only with revault deployment with a low (<500) CSV, or you'll encounter
an ugly timeout from bitcoinlib.
:return: the list of spent deposits along with the Spend PSBT.
"""
deposits, spend_psbt = self.spend_vaults_unconfirmed(
vaults, destinations, feerate, priority
)
self.bitcoind.generate_block(1, wait_for_mempool=[spend_psbt.tx.hash])
for w in self.participants():
wait_for(
lambda: len(w.rpc.listvaults(["spent"], deposits)["vaults"])
== len(deposits)
)
return deposits, spend_psbt.tx.hash
def _any_spend_data(self, vaults):
addr = self.bitcoind.rpc.getnewaddress()
total_spent = sum(v["amount"] for v in vaults)
feerate = 2
fees = self.compute_spendtx_fees(feerate, len(vaults), 1)
return {addr: total_spent - fees}, feerate
def unvault_vaults_anyhow(self, vaults, priority=False):
"""
Unvault these vaults with a random Spend transaction for a maximum amount and a
fixed feerate.
"""
destinations, feerate = self._any_spend_data(vaults)
return self.unvault_vaults(vaults, destinations, feerate, priority)
def broadcast_unvaults_anyhow(self, vaults, priority=False):
"""
Broadcast the Unvault transactions for these vaults with a random Spend
transaction for a maximum amount and a fixed feerate.
"""
destinations, feerate = self._any_spend_data(vaults)
return self.broadcast_unvaults(vaults, destinations, feerate, priority)
def spend_vaults_anyhow(self, vaults):
"""Spend these vaults to a random address for a maximum amount for a fixed feerate"""
destinations, feerate = self._any_spend_data(vaults)
return self.spend_vaults(vaults, destinations, feerate)
def spend_vaults_anyhow_unconfirmed(self, vaults, priority=False):
"""
Spend these vaults to a random address for a maximum amount for a fixed feerate,
not confirming the Spend transaction.
"""
destinations, feerate = self._any_spend_data(vaults)
return self.spend_vaults_unconfirmed(vaults, destinations, feerate, priority)
def compute_spendtx_fees(
self, spendtx_feerate, n_vaults_spent, n_destinations, with_change=False
):
"""Get the fees necessary to include in a Spend transaction.
This assumes the destinations to be P2WPKH
"""
n_stk = len(self.stks())
n_man = len(self.mans())
# witscript PUSH, keys , Unvault Script overhead, signatures
spend_witness_vb = (
1 + (n_man + n_stk * 2) * 34 + 15 + (n_man + n_stk) * 73
) // 4
# Overhead, P2WPKH, P2WSH, inputs, witnesses
spend_witstrip_vb = (
11
+ 31 * n_destinations
+ 43 * (1 + (1 if with_change else 0))
+ (32 + 4 + 4 + 1) * n_vaults_spent
)
spendtx_vbytes = spend_witstrip_vb + spend_witness_vb * n_vaults_spent
# witscript PUSH, keys , Deposit Script overhead, signatures
unvault_witness_vb = (1 + n_stk * (34 + 73) + 3) // 4
# Overhead, P2WSH * 2, inputs + witness
unvaulttxs_vbytes = (
11 + 43 * 2 + (32 + 4 + 4 + 1) + unvault_witness_vb
) * n_vaults_spent
return (
spendtx_vbytes * spendtx_feerate # Spend fees
+ 2 * 32 * spendtx_vbytes # Spend CPFP
+ unvaulttxs_vbytes * 24 # Unvault fees (6sat/WU feerate)
+ 30_000 * n_vaults_spent # Unvault CPFP
)
def cancel_vault(self, vault):
deposit = f"{vault['txid']}:{vault['vout']}"
for w in self.participants():
wait_for(
lambda: len(
w.rpc.listvaults(
["unvaulting", "unvaulted", "spending"], [deposit]
)["vaults"]
)
== 1
)
self.stk(0).rpc.revault(deposit)
self.bitcoind.generate_block(1, wait_for_mempool=1)
for w in self.participants():
wait_for(
lambda: len(w.rpc.listvaults(["canceled"], [deposit])["vaults"]) == 1
)
def stop_wallets(self):
jobs = [self.executor.submit(w.stop) for w in self.participants()]
for j in jobs:
j.result(TIMEOUT)
def start_wallets(self):
jobs = [self.executor.submit(w.start) for w in self.participants()]
for j in jobs:
j.result(TIMEOUT)
def cleanup(self):
for n in self.daemons:
n.cleanup()
if self.bitcoind_proxy is not None:
self.bitcoind_proxy.stop()

View File

@ -0,0 +1,208 @@
import itertools
import logging
import os
import re
import subprocess
import threading
import time
TIMEOUT = int(os.getenv("TIMEOUT", 20))
EXECUTOR_WORKERS = int(os.getenv("EXECUTOR_WORKERS", 20))
VERBOSE = os.getenv("VERBOSE", "0") == "1"
LOG_LEVEL = os.getenv("LOG_LEVEL", "debug")
assert LOG_LEVEL in ["trace", "debug", "info", "warn", "error"]
DEFAULT_MS_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "target/debug/minisafed"
)
MINISAFED_PATH = os.getenv("MINISAFED_PATH", DEFAULT_MS_PATH)
DEFAULT_BITCOIND_PATH = "bitcoind"
BITCOIND_PATH = os.getenv("BITCOIND_PATH", DEFAULT_BITCOIND_PATH)
COIN = 10 ** 8
def wait_for(success, timeout=TIMEOUT, debug_fn=None):
"""
Run success() either until it returns True, or until the timeout is reached.
debug_fn is logged at each call to success, it can be useful for debugging
when tests fail.
"""
start_time = time.time()
interval = 0.25
while not success() and time.time() < start_time + timeout:
if debug_fn is not None:
logging.info(debug_fn())
time.sleep(interval)
interval *= 2
if interval > 5:
interval = 5
if time.time() > start_time + timeout:
raise ValueError("Error waiting for {}", success)
class RpcError(ValueError):
def __init__(self, method: str, payload: dict, error: str):
super(ValueError, self).__init__(
"RPC call failed: method: {}, payload: {}, error: {}".format(
method, payload, error
)
)
self.method = method
self.payload = payload
self.error = error
class TailableProc(object):
"""A monitorable process that we can start, stop and tail.
This is the base class for the daemons. It allows us to directly
tail the processes and react to their output.
"""
def __init__(self, outputDir=None, verbose=True):
self.logs = []
self.logs_cond = threading.Condition(threading.RLock())
self.env = os.environ.copy()
self.running = False
self.proc = None
self.outputDir = outputDir
self.logsearch_start = 0
# Set by inherited classes
self.cmd_line = []
self.prefix = ""
# Should we be logging lines we read from stdout?
self.verbose = verbose
# A filter function that'll tell us whether to filter out the line (not
# pass it to the log matcher and not print it to stdout).
self.log_filter = lambda _: False
def start(self, stdin=None, stdout=None, stderr=None):
"""Start the underlying process and start monitoring it."""
logging.debug("Starting '%s'", " ".join(self.cmd_line))
self.proc = subprocess.Popen(
self.cmd_line,
stdin=stdin,
stdout=stdout if stdout else subprocess.PIPE,
stderr=stderr if stderr else subprocess.PIPE,
env=self.env,
)
self.thread = threading.Thread(target=self.tail)
self.thread.daemon = True
self.thread.start()
self.running = True
def save_log(self):
if self.outputDir:
logpath = os.path.join(self.outputDir, "log")
with open(logpath, "w") as f:
for l in self.logs:
f.write(l + "\n")
def stop(self, timeout=10):
self.save_log()
self.proc.terminate()
# Now give it some time to react to the signal
rc = self.proc.wait(timeout)
if rc is None:
self.proc.kill()
self.proc.wait()
self.thread.join()
return self.proc.returncode
def kill(self):
"""Kill process without giving it warning."""
self.proc.kill()
self.proc.wait()
self.thread.join()
def tail(self):
"""Tail the stdout of the process and remember it.
Stores the lines of output produced by the process in
self.logs and signals that a new line was read so that it can
be picked up by consumers.
"""
out = self.proc.stdout.readline
err = self.proc.stderr.readline
for line in itertools.chain(iter(out, ""), iter(err, "")):
if len(line) == 0:
break
if self.log_filter(line.decode("utf-8")):
continue
if self.verbose:
logging.debug(f"{self.prefix}: {line.decode().rstrip()}")
with self.logs_cond:
self.logs.append(str(line.rstrip()))
self.logs_cond.notifyAll()
self.running = False
self.proc.stdout.close()
self.proc.stderr.close()
def is_in_log(self, regex, start=0):
"""Look for `regex` in the logs."""
ex = re.compile(regex)
for l in self.logs[start:]:
if ex.search(l):
logging.debug("Found '%s' in logs", regex)
return l
logging.debug(f"{self.prefix} : Did not find {regex} in logs")
return None
def wait_for_logs(self, regexs, timeout=TIMEOUT):
"""Look for `regexs` in the logs.
We tail the stdout of the process and look for each regex in `regexs`,
starting from last of the previous waited-for log entries (if any). We
fail if the timeout is exceeded or if the underlying process
exits before all the `regexs` were found.
If timeout is None, no time-out is applied.
"""
logging.debug("Waiting for {} in the logs".format(regexs))
exs = [re.compile(r) for r in regexs]
start_time = time.time()
pos = self.logsearch_start
while True:
if timeout is not None and time.time() > start_time + timeout:
print("Time-out: can't find {} in logs".format(exs))
for r in exs:
if self.is_in_log(r):
print("({} was previously in logs!)".format(r))
raise TimeoutError('Unable to find "{}" in logs.'.format(exs))
with self.logs_cond:
if pos >= len(self.logs):
if not self.running:
raise ValueError("Process died while waiting for logs")
self.logs_cond.wait(1)
continue
for r in exs.copy():
self.logsearch_start = pos + 1
if r.search(self.logs[pos]):
logging.debug("Found '%s' in logs", r)
exs.remove(r)
break
if len(exs) == 0:
return self.logs[pos]
pos += 1
def wait_for_log(self, regex, timeout=TIMEOUT):
"""Look for `regex` in the logs.
Convenience wrapper for the common case of only seeking a single entry.
"""
return self.wait_for_logs([regex], timeout)

5
tests/test_misc.py Normal file
View File

@ -0,0 +1,5 @@
from fixtures import *
def test_startup(minisafed):
pass