liana/tests/test_rpc.py
Antoine Poinsot 4bd65213e2
commands: swap the outpoints and destinations arguments to createspend
The outpoints argument could eventually be made optional in order to
introduce some automated coin selection. So it makes more sense for it
to be after a required parameter, the destinations.
2022-11-17 15:15:22 +01:00

359 lines
15 KiB
Python

import os
import pytest
import random
import shutil
import time
from fixtures import *
from test_framework.serializations import PSBT, PSBT_IN_PARTIAL_SIG
from test_framework.utils import wait_for, COIN, RpcError, get_txid, spend_coins
def test_getinfo(minisafed):
res = minisafed.rpc.getinfo()
assert res["version"] == "0.1"
assert res["network"] == "regtest"
wait_for(lambda: minisafed.rpc.getinfo()["blockheight"] == 101)
res = minisafed.rpc.getinfo()
assert res["sync"] == 1.0
assert "main" in res["descriptors"]
assert res["rescan_progress"] is None
def test_getaddress(minisafed):
res = minisafed.rpc.getnewaddress()
assert "address" in res
# We'll get a new one at every call
assert res["address"] != minisafed.rpc.getnewaddress()["address"]
def test_listcoins(minisafed, bitcoind):
# Initially empty
res = minisafed.rpc.listcoins()
assert "coins" in res
assert len(res["coins"]) == 0
# If we send a coin, we'll get a new entry. Note we monitor for unconfirmed
# funds as well.
addr = minisafed.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(addr, 1)
wait_for(lambda: len(minisafed.rpc.listcoins()["coins"]) == 1)
res = minisafed.rpc.listcoins()["coins"]
assert txid == res[0]["outpoint"][:64]
assert res[0]["amount"] == 1 * COIN
assert res[0]["block_height"] is None
assert res[0]["spend_info"] is None
# If the coin gets confirmed, it'll be marked as such.
bitcoind.generate_block(1, wait_for_mempool=txid)
block_height = bitcoind.rpc.getblockcount()
wait_for(
lambda: minisafed.rpc.listcoins()["coins"][0]["block_height"] == block_height
)
# Same if the coin gets spent.
spend_tx = spend_coins(minisafed, bitcoind, (res[0],))
spend_txid = get_txid(spend_tx)
wait_for(lambda: minisafed.rpc.listcoins()["coins"][0]["spend_info"] is not None)
spend_info = minisafed.rpc.listcoins()["coins"][0]["spend_info"]
assert spend_info["txid"] == spend_txid
assert spend_info["height"] is None
# And if this spending tx gets confirmed.
bitcoind.generate_block(1, wait_for_mempool=spend_txid)
curr_height = bitcoind.rpc.getblockcount()
wait_for(lambda: minisafed.rpc.getinfo()["blockheight"] == curr_height)
spend_info = minisafed.rpc.listcoins()["coins"][0]["spend_info"]
assert spend_info["txid"] == spend_txid
assert spend_info["height"] == curr_height
def test_jsonrpc_server(minisafed, bitcoind):
"""Test passing parameters as a list or a mapping."""
addr = minisafed.rpc.getnewaddress()["address"]
bitcoind.rpc.sendtoaddress(addr, 1)
wait_for(lambda: len(minisafed.rpc.listcoins()["coins"]) == 1)
outpoints = [minisafed.rpc.listcoins()["coins"][0]["outpoint"]]
destinations = {
bitcoind.rpc.getnewaddress(): 20_000,
}
res = minisafed.rpc.createspend(destinations, outpoints, 18)
assert "psbt" in res
res = minisafed.rpc.createspend(
outpoints=outpoints, destinations=destinations, feerate=18
)
assert "psbt" in res
def test_create_spend(minisafed, bitcoind):
# Receive a number of coins in different blocks on different addresses, and
# one more on the same address.
for _ in range(15):
addr = minisafed.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(addr, 0.01)
bitcoind.generate_block(1, wait_for_mempool=txid)
txid = bitcoind.rpc.sendtoaddress(addr, 0.3556)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(lambda: len(minisafed.rpc.listcoins()["coins"]) == 16)
# Stop the daemon, should be a no-op
minisafed.stop()
minisafed.start()
# Now create a transaction spending all those coins to a few addresses
outpoints = [c["outpoint"] for c in minisafed.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): 200_000,
bitcoind.rpc.getnewaddress(): 400_000,
bitcoind.rpc.getnewaddress(): 1_000_000,
}
res = minisafed.rpc.createspend(destinations, outpoints, 18)
assert "psbt" in res
# The transaction must contain a change output.
spend_psbt = PSBT.from_base64(res["psbt"])
assert len(spend_psbt.o) == 4
assert len(spend_psbt.tx.vout) == 4
# We can sign it and broadcast it.
signed_psbt = minisafed.sign_psbt(PSBT.from_base64(res["psbt"]))
finalized_psbt = minisafed.finalize_psbt(signed_psbt)
tx = finalized_psbt.tx.serialize_with_witness().hex()
bitcoind.rpc.sendrawtransaction(tx)
def test_list_spend(minisafed, bitcoind):
# Start by creating two conflicting Spend PSBTs. The first one will have a change
# output but not the second one.
addr = minisafed.rpc.getnewaddress()["address"]
value_a = 0.2567
bitcoind.rpc.sendtoaddress(addr, value_a)
wait_for(lambda: len(minisafed.rpc.listcoins()["coins"]) == 1)
outpoints = [c["outpoint"] for c in minisafed.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): int(value_a * COIN // 2),
}
res = minisafed.rpc.createspend(destinations, outpoints, 6)
assert "psbt" in res
addr = minisafed.rpc.getnewaddress()["address"]
value_b = 0.0987
bitcoind.rpc.sendtoaddress(addr, value_b)
wait_for(lambda: len(minisafed.rpc.listcoins()["coins"]) == 2)
outpoints = [c["outpoint"] for c in minisafed.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): int((value_a + value_b) * COIN - 1_000),
}
res_b = minisafed.rpc.createspend(destinations, outpoints, 2)
assert "psbt" in res_b
# Store them both in DB.
assert len(minisafed.rpc.listspendtxs()["spend_txs"]) == 0
minisafed.rpc.updatespend(res["psbt"])
minisafed.rpc.updatespend(res_b["psbt"])
# Listing all Spend transactions will list them both. It'll tell us which one has
# change and which one doesn't.
list_res = minisafed.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 2
first_psbt = next(entry for entry in list_res if entry["psbt"] == res["psbt"])
assert first_psbt["change_index"] == 1
second_psbt = next(entry for entry in list_res if entry["psbt"] == res_b["psbt"])
assert second_psbt["change_index"] is None
# If we delete the first one, we'll get only the second one.
first_psbt = PSBT.from_base64(res["psbt"])
minisafed.rpc.delspendtx(first_psbt.tx.txid().hex())
list_res = minisafed.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 1
assert list_res[0]["psbt"] == res_b["psbt"]
# If we delete the second one, result will be empty.
second_psbt = PSBT.from_base64(res_b["psbt"])
minisafed.rpc.delspendtx(second_psbt.tx.txid().hex())
list_res = minisafed.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 0
def test_update_spend(minisafed, bitcoind):
# Start by creating a Spend PSBT
addr = minisafed.rpc.getnewaddress()["address"]
bitcoind.rpc.sendtoaddress(addr, 0.2567)
wait_for(lambda: len(minisafed.rpc.listcoins()["coins"]) > 0)
outpoints = [c["outpoint"] for c in minisafed.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): 200_000,
}
res = minisafed.rpc.createspend(destinations, outpoints, 6)
assert "psbt" in res
# Now update it
assert len(minisafed.rpc.listspendtxs()["spend_txs"]) == 0
minisafed.rpc.updatespend(res["psbt"])
list_res = minisafed.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 1
assert list_res[0]["psbt"] == res["psbt"]
# We can add a signature and update it
psbt_sig_a = PSBT.from_base64(res["psbt"])
dummy_pk_a = bytes.fromhex(
"0375e00eb72e29da82b89367947f29ef34afb75e8654f6ea368e0acdfd92976b7c"
)
dummy_sig_a = bytes.fromhex(
"304402202b925395cfeaa0171a7a92982bb4891acc4a312cbe7691d8375d36796d5b570a0220378a8ab42832848e15d1aedded5fb360fedbdd6c39226144e527f0f1e19d539801"
)
psbt_sig_a.i[0].map[PSBT_IN_PARTIAL_SIG] = {dummy_pk_a: dummy_sig_a}
psbt_sig_a_ser = psbt_sig_a.to_base64()
minisafed.rpc.updatespend(psbt_sig_a_ser)
# We'll get it when querying
list_res = minisafed.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 1
assert list_res[0]["psbt"] == psbt_sig_a_ser
# We can add another signature to the empty PSBT and update it again
psbt_sig_b = PSBT.from_base64(res["psbt"])
dummy_pk_b = bytes.fromhex(
"03a1b26313f430c4b15bb1fdce663207659d8cac749a0e53d70eff01874496feff"
)
dummy_sig_b = bytes.fromhex(
"3044022005aebcd649fb8965f0591710fb3704931c3e8118ee60dd44917479f63ceba6d4022018b212900e5a80e9452366894de37f0d02fb9c89f1e94f34fb6ed7fd71c15c4101"
)
psbt_sig_b.i[0].map[PSBT_IN_PARTIAL_SIG] = {dummy_pk_b: dummy_sig_b}
psbt_sig_b_ser = psbt_sig_b.to_base64()
minisafed.rpc.updatespend(psbt_sig_b_ser)
# It will have merged both.
list_res = minisafed.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 1
psbt_merged = PSBT.from_base64(list_res[0]["psbt"])
assert len(psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG]) == 2
assert psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG][dummy_pk_a] == dummy_sig_a
assert psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG][dummy_pk_b] == dummy_sig_b
def test_broadcast_spend(minisafed, bitcoind):
# Create a new coin and a spending tx for it.
addr = minisafed.rpc.getnewaddress()["address"]
bitcoind.rpc.sendtoaddress(addr, 0.2567)
wait_for(lambda: len(minisafed.rpc.listcoins()["coins"]) > 0)
outpoints = [c["outpoint"] for c in minisafed.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): 200_000,
}
res = minisafed.rpc.createspend(destinations, outpoints, 6)
psbt = PSBT.from_base64(res["psbt"])
txid = psbt.tx.txid().hex()
# We can't broadcast an unknown Spend
with pytest.raises(RpcError, match="Unknown spend transaction.*"):
minisafed.rpc.broadcastspend(txid)
minisafed.rpc.updatespend(res["psbt"])
# We can't broadcast an unsigned transaction
with pytest.raises(RpcError, match="Failed to finalize the spend transaction.*"):
minisafed.rpc.broadcastspend(txid)
signed_psbt = minisafed.sign_psbt(PSBT.from_base64(res["psbt"]))
minisafed.rpc.updatespend(signed_psbt.to_base64())
# Now we've signed and stored it, the daemon will take care of finalizing
# the PSBT before broadcasting the transaction.
minisafed.rpc.broadcastspend(txid)
def test_start_rescan(minisafed, bitcoind):
"""Test we successfully retrieve all our transactions after losing state by rescanning."""
initial_timestamp = int(time.time())
first_address = minisafed.rpc.getnewaddress()
second_address = minisafed.rpc.getnewaddress()
# Some utility functions to DRY
list_coins = lambda: minisafed.rpc.listcoins()["coins"]
unspent_coins = lambda: (
c for c in minisafed.rpc.listcoins()["coins"] if c["spend_info"] is None
)
sorted_coins = lambda: sorted(list_coins(), key=lambda c: c["outpoint"])
def all_spent(coins):
unspent = set(c["outpoint"] for c in unspent_coins())
for c in coins:
if c["outpoint"] in unspent:
return False
return True
# We can rescan from one second before the tip timestamp, that's almost a no-op.
tip_timestamp = bitcoind.rpc.getblockheader(bitcoind.rpc.getbestblockhash())["time"]
minisafed.rpc.startrescan(tip_timestamp - 1)
wait_for(lambda: minisafed.rpc.getinfo()["rescan_progress"] is None)
# We can't rescan from an insane timestamp though.
with pytest.raises(RpcError, match="Insane timestamp.*"):
minisafed.rpc.startrescan(tip_timestamp)
assert minisafed.rpc.getinfo()["rescan_progress"] is None
future_timestamp = tip_timestamp + 60 * 60
with pytest.raises(RpcError, match="Insane timestamp.*"):
minisafed.rpc.startrescan(future_timestamp)
assert minisafed.rpc.getinfo()["rescan_progress"] is None
prebitcoin_timestamp = 1231006505 - 1
with pytest.raises(RpcError, match="Insane timestamp."):
minisafed.rpc.startrescan(prebitcoin_timestamp)
assert minisafed.rpc.getinfo()["rescan_progress"] is None
# First, get some coins
for _ in range(10):
addr = minisafed.rpc.getnewaddress()["address"]
amount = random.randint(1, COIN * 10) / COIN
txid = bitcoind.rpc.sendtoaddress(addr, amount)
bitcoind.generate_block(random.randint(1, 10), wait_for_mempool=txid)
wait_for(lambda: len(list_coins()) == 10)
# Then simulate some regular activity (spend and receive)
# TODO: instead of having randomness we should lay down all different cases (with or
# without change, single or multiple inputs, sending externally or to self).
for _ in range(5):
addr = minisafed.rpc.getnewaddress()["address"]
amount = random.randint(1, COIN * 10) / COIN
txid = bitcoind.rpc.sendtoaddress(addr, amount)
avail = list(unspent_coins())
to_spend = random.sample(avail, random.randint(1, len(avail)))
spend_coins(minisafed, bitcoind, to_spend)
bitcoind.generate_block(random.randint(1, 5), wait_for_mempool=2)
wait_for(lambda: all_spent(to_spend))
wait_for(
lambda: minisafed.rpc.getinfo()["blockheight"] == bitcoind.rpc.getblockcount()
)
# Receiving addresses are derived at much higher indexes now.
assert minisafed.rpc.getnewaddress() not in (first_address, second_address)
# Move time forward one day as bitcoind will rescan the last 2 hours of block upon
# importing a descriptor.
now = int(time.time())
added_time = 60 * 60 * 24
bitcoind.rpc.setmocktime(now + added_time)
bitcoind.generate_block(10)
# Now delete the wallet state. When starting up we'll re-create a fresh database
# and watchonly wallet. Those won't be aware of past coins for the configured
# descriptor.
coins_before = sorted_coins()
minisafed.restart_fresh(bitcoind)
assert len(list_coins()) == 0
# The wallet isn't aware what derivation indexes were used. Necessarily it'll start
# from 0.
assert minisafed.rpc.getnewaddress() == first_address
# Once the rescan is done, we must have detected all previous transactions.
minisafed.rpc.startrescan(initial_timestamp)
rescan_progress = minisafed.rpc.getinfo()["rescan_progress"]
assert rescan_progress is None or 0 <= rescan_progress <= 1
wait_for(lambda: minisafed.rpc.getinfo()["rescan_progress"] is None)
wait_for(
lambda: minisafed.rpc.getinfo()["blockheight"] == bitcoind.rpc.getblockcount()
)
assert coins_before == sorted_coins()
# Now that it caught up it noticed which one were used onchain, so it won't reuse
# this derivation indexes anymore.
assert minisafed.rpc.getnewaddress() not in (first_address, second_address)