The GUI uses the InsufficientFunds error to get the missing amount when the user is creating a new spend. It is not straightforward to extract this information in a general way from the RPC error. Instead, this missing amount will be included in the command response. These changes are based on suggestions from darosior and edouardparis.
1346 lines
55 KiB
Python
1346 lines
55 KiB
Python
import pytest
|
|
import random
|
|
import re
|
|
import time
|
|
|
|
from fixtures import *
|
|
from test_framework.serializations import (
|
|
PSBT,
|
|
PSBT_IN_PARTIAL_SIG,
|
|
PSBT_IN_NON_WITNESS_UTXO,
|
|
)
|
|
from test_framework.utils import (
|
|
wait_for,
|
|
COIN,
|
|
RpcError,
|
|
get_txid,
|
|
spend_coins,
|
|
sign_and_broadcast,
|
|
sign_and_broadcast_psbt,
|
|
)
|
|
|
|
|
|
def test_getinfo(lianad):
|
|
res = lianad.rpc.getinfo()
|
|
assert res["version"] == "4.0.0-dev"
|
|
assert res["network"] == "regtest"
|
|
wait_for(lambda: lianad.rpc.getinfo()["block_height"] == 101)
|
|
res = lianad.rpc.getinfo()
|
|
assert res["sync"] == 1.0
|
|
assert "main" in res["descriptors"]
|
|
assert res["rescan_progress"] is None
|
|
|
|
|
|
def test_getaddress(lianad):
|
|
res = lianad.rpc.getnewaddress()
|
|
assert "address" in res
|
|
# We'll get a new one at every call
|
|
assert res["address"] != lianad.rpc.getnewaddress()["address"]
|
|
# new address has derivation_index higher than the previous one
|
|
assert lianad.rpc.getnewaddress()["derivation_index"] == res["derivation_index"] + 2
|
|
|
|
|
|
def test_listaddresses(lianad):
|
|
list = lianad.rpc.listaddresses(2, 5)
|
|
list2 = lianad.rpc.listaddresses(start_index=2, count=5)
|
|
assert list == list2
|
|
assert "addresses" in list
|
|
addr = list["addresses"]
|
|
assert addr[0]["index"] == 2
|
|
assert addr[-1]["index"] == 6
|
|
|
|
list3 = lianad.rpc.listaddresses() # start_index = 0, receive_index = 0
|
|
_ = lianad.rpc.getnewaddress() # start_index = 0, receive_index = 1
|
|
_ = lianad.rpc.getnewaddress() # start_index = 0, receive_index = 2
|
|
list4 = lianad.rpc.listaddresses()
|
|
assert len(list4["addresses"]) == len(list3["addresses"]) + 2 == 2
|
|
list5 = lianad.rpc.listaddresses(0)
|
|
assert list4 == list5
|
|
|
|
# Will explicitly error on invalid start_index.
|
|
with pytest.raises(
|
|
RpcError,
|
|
match=re.escape(
|
|
"Invalid params: Invalid value for \\'start_index\\': \"blabla\""
|
|
),
|
|
):
|
|
lianad.rpc.listaddresses("blabla", None)
|
|
|
|
# Will explicitly error on invalid count.
|
|
with pytest.raises(
|
|
RpcError,
|
|
match=re.escape("Invalid params: Invalid value for \\'count\\': \"blb\""),
|
|
):
|
|
lianad.rpc.listaddresses(0, "blb")
|
|
|
|
|
|
def test_listcoins(lianad, bitcoind):
|
|
# Initially empty
|
|
res = lianad.rpc.listcoins()
|
|
assert "coins" in res
|
|
assert len(res["coins"]) == 0
|
|
|
|
# If we send a coin, we'll get a new entry. Note we monitor for unconfirmed
|
|
# funds as well.
|
|
addr_a = lianad.rpc.getnewaddress()
|
|
txid_a = bitcoind.rpc.sendtoaddress(addr_a["address"], 1)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
|
|
res = lianad.rpc.listcoins()["coins"]
|
|
outpoint_a = res[0]["outpoint"]
|
|
assert txid_a == outpoint_a[:64]
|
|
assert res[0]["amount"] == 1 * COIN
|
|
assert res[0]["derivation_index"] == addr_a["derivation_index"]
|
|
assert res[0]["is_change"] == False
|
|
assert res[0]["block_height"] is None
|
|
assert res[0]["spend_info"] is None
|
|
|
|
assert len(lianad.rpc.listcoins(["confirmed", "spent", "spending"])["coins"]) == 0
|
|
assert (
|
|
lianad.rpc.listcoins()
|
|
== lianad.rpc.listcoins([], [outpoint_a])
|
|
== lianad.rpc.listcoins(["unconfirmed"])
|
|
== lianad.rpc.listcoins(["unconfirmed"], [outpoint_a])
|
|
== lianad.rpc.listcoins(["unconfirmed", "confirmed"])
|
|
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"])
|
|
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"], [outpoint_a])
|
|
)
|
|
# If the coin gets confirmed, it'll be marked as such.
|
|
bitcoind.generate_block(1, wait_for_mempool=txid_a)
|
|
block_height = bitcoind.rpc.getblockcount()
|
|
wait_for(lambda: lianad.rpc.listcoins()["coins"][0]["block_height"] == block_height)
|
|
|
|
assert (
|
|
len(lianad.rpc.listcoins())
|
|
== len(lianad.rpc.listcoins(["confirmed"])["coins"])
|
|
== 1
|
|
)
|
|
assert (
|
|
lianad.rpc.listcoins()
|
|
== lianad.rpc.listcoins([], [outpoint_a])
|
|
== lianad.rpc.listcoins(["confirmed"])
|
|
== lianad.rpc.listcoins(["confirmed"], [outpoint_a])
|
|
== lianad.rpc.listcoins(["unconfirmed", "confirmed"])
|
|
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"])
|
|
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"], [outpoint_a])
|
|
)
|
|
|
|
# Same if the coin gets spent.
|
|
spend_tx = spend_coins(lianad, bitcoind, (res[0],))
|
|
spend_txid = get_txid(spend_tx)
|
|
wait_for(lambda: lianad.rpc.listcoins()["coins"][0]["spend_info"] is not None)
|
|
spend_info = lianad.rpc.listcoins()["coins"][0]["spend_info"]
|
|
assert spend_info["txid"] == spend_txid
|
|
assert spend_info["height"] is None
|
|
assert len(lianad.rpc.listcoins(["spent"])["coins"]) == 0
|
|
assert len(lianad.rpc.listcoins(["spending"])["coins"]) == 1
|
|
|
|
# And if this spending tx gets confirmed.
|
|
bitcoind.generate_block(1, wait_for_mempool=spend_txid)
|
|
curr_height = bitcoind.rpc.getblockcount()
|
|
wait_for(lambda: lianad.rpc.getinfo()["block_height"] == curr_height)
|
|
spend_info = lianad.rpc.listcoins()["coins"][0]["spend_info"]
|
|
assert spend_info["txid"] == spend_txid
|
|
assert spend_info["height"] == curr_height
|
|
assert len(lianad.rpc.listcoins(["unconfirmed", "confirmed"])["coins"]) == 0
|
|
assert (
|
|
lianad.rpc.listcoins()
|
|
== lianad.rpc.listcoins(["spent"])
|
|
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"])
|
|
)
|
|
|
|
# Add a second coin.
|
|
addr_b = lianad.rpc.getnewaddress()["address"]
|
|
txid_b = bitcoind.rpc.sendtoaddress(addr_b, 2)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 2)
|
|
res = lianad.rpc.listcoins(["unconfirmed"], [])["coins"]
|
|
outpoint_b = res[0]["outpoint"]
|
|
|
|
# We have one unconfirmed coin and one spent coin.
|
|
assert (
|
|
len(lianad.rpc.listcoins()["coins"])
|
|
== len(lianad.rpc.listcoins([], [outpoint_a, outpoint_b])["coins"])
|
|
== len(lianad.rpc.listcoins(["unconfirmed", "spent"])["coins"])
|
|
== len(
|
|
lianad.rpc.listcoins(["unconfirmed", "spent"], [outpoint_a, outpoint_b])[
|
|
"coins"
|
|
]
|
|
)
|
|
== 2
|
|
)
|
|
assert (
|
|
lianad.rpc.listcoins([], [outpoint_b])
|
|
== lianad.rpc.listcoins(["unconfirmed"])
|
|
== lianad.rpc.listcoins(["unconfirmed"], [outpoint_b])
|
|
== lianad.rpc.listcoins(["unconfirmed", "confirmed"])
|
|
== lianad.rpc.listcoins(["spending", "unconfirmed", "confirmed"])
|
|
== lianad.rpc.listcoins(["spending", "unconfirmed", "confirmed"], [outpoint_b])
|
|
)
|
|
|
|
# Now confirm the second coin.
|
|
bitcoind.generate_block(1, wait_for_mempool=txid_b)
|
|
block_height = bitcoind.rpc.getblockcount()
|
|
wait_for(
|
|
lambda: lianad.rpc.listcoins([], [outpoint_b])["coins"][0]["block_height"]
|
|
== block_height
|
|
)
|
|
|
|
# We have one confirmed coin and one spent coin.
|
|
assert (
|
|
len(lianad.rpc.listcoins()["coins"])
|
|
== len(lianad.rpc.listcoins([], [outpoint_a, outpoint_b])["coins"])
|
|
== len(lianad.rpc.listcoins(["confirmed", "spent"])["coins"])
|
|
== len(
|
|
lianad.rpc.listcoins(["confirmed", "spent"], [outpoint_a, outpoint_b])[
|
|
"coins"
|
|
]
|
|
)
|
|
== 2
|
|
)
|
|
assert (
|
|
lianad.rpc.listcoins([], [outpoint_b])
|
|
== lianad.rpc.listcoins(["confirmed"])
|
|
== lianad.rpc.listcoins(["confirmed"], [outpoint_b])
|
|
== lianad.rpc.listcoins(["unconfirmed", "confirmed"])
|
|
== lianad.rpc.listcoins(["unconfirmed", "confirmed", "spending"])
|
|
== lianad.rpc.listcoins(["unconfirmed", "confirmed", "spending"], [outpoint_b])
|
|
)
|
|
|
|
# Add a third coin.
|
|
addr_c = lianad.rpc.getnewaddress()["address"]
|
|
txid_c = bitcoind.rpc.sendtoaddress(addr_c, 3)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 3)
|
|
res = lianad.rpc.listcoins(["unconfirmed"], [])["coins"]
|
|
outpoint_c = res[0]["outpoint"]
|
|
|
|
# We have three different statuses: unconfirmed, confirmed and spent.
|
|
assert (
|
|
len(lianad.rpc.listcoins()["coins"])
|
|
== len(lianad.rpc.listcoins([], [outpoint_a, outpoint_b, outpoint_c])["coins"])
|
|
== len(lianad.rpc.listcoins(["unconfirmed", "confirmed", "spent"])["coins"])
|
|
== len(
|
|
lianad.rpc.listcoins(
|
|
["unconfirmed", "confirmed", "spent"],
|
|
[outpoint_a, outpoint_b, outpoint_c],
|
|
)["coins"]
|
|
)
|
|
== 3
|
|
)
|
|
assert (
|
|
lianad.rpc.listcoins([], [outpoint_c])
|
|
== lianad.rpc.listcoins(["unconfirmed"])
|
|
== lianad.rpc.listcoins(["unconfirmed"], [outpoint_c])
|
|
== lianad.rpc.listcoins(["unconfirmed", "spending"])
|
|
== lianad.rpc.listcoins(["spending", "unconfirmed"])
|
|
== lianad.rpc.listcoins(["spending", "unconfirmed", "confirmed"], [outpoint_c])
|
|
)
|
|
|
|
# Spend third coin, even though it is still unconfirmed.
|
|
spend_tx = spend_coins(lianad, bitcoind, (res[0],))
|
|
spend_txid = get_txid(spend_tx)
|
|
wait_for(
|
|
lambda: lianad.rpc.listcoins([], [outpoint_c])["coins"][0]["spend_info"]
|
|
is not None
|
|
)
|
|
|
|
assert len(lianad.rpc.listcoins(["unconfirmed"])["coins"]) == 0
|
|
assert (
|
|
len(lianad.rpc.listcoins()["coins"])
|
|
== len(lianad.rpc.listcoins([], [outpoint_a, outpoint_b, outpoint_c])["coins"])
|
|
== len(lianad.rpc.listcoins(["confirmed", "spending", "spent"])["coins"])
|
|
== len(
|
|
lianad.rpc.listcoins(
|
|
["confirmed", "spending", "spent"], [outpoint_a, outpoint_b, outpoint_c]
|
|
)["coins"]
|
|
)
|
|
== 3
|
|
)
|
|
# The unconfirmed coin now has spending status.
|
|
assert (
|
|
lianad.rpc.listcoins([], [outpoint_c])
|
|
== lianad.rpc.listcoins(["spending"])
|
|
== lianad.rpc.listcoins(["spending"], [outpoint_c])
|
|
== lianad.rpc.listcoins(["spending", "unconfirmed"])
|
|
== lianad.rpc.listcoins(["spending", "unconfirmed"], [outpoint_c])
|
|
)
|
|
|
|
# Add a fourth coin.
|
|
addr_d = lianad.rpc.getnewaddress()["address"]
|
|
txid_d = bitcoind.rpc.sendtoaddress(addr_d, 4)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 4)
|
|
res = lianad.rpc.listcoins(["unconfirmed"], [])["coins"]
|
|
outpoint_d = res[0]["outpoint"]
|
|
|
|
# We now have all four statuses.
|
|
assert (
|
|
len(lianad.rpc.listcoins(["unconfirmed"])["coins"])
|
|
== len(lianad.rpc.listcoins(["confirmed"])["coins"])
|
|
== len(lianad.rpc.listcoins(["spending"])["coins"])
|
|
== len(lianad.rpc.listcoins(["spent"])["coins"])
|
|
== 1
|
|
)
|
|
assert (
|
|
len(lianad.rpc.listcoins()["coins"])
|
|
== len(
|
|
lianad.rpc.listcoins([], [outpoint_a, outpoint_b, outpoint_c, outpoint_d])[
|
|
"coins"
|
|
]
|
|
)
|
|
== len(
|
|
lianad.rpc.listcoins(["unconfirmed", "confirmed", "spending", "spent"])[
|
|
"coins"
|
|
]
|
|
)
|
|
== len(
|
|
lianad.rpc.listcoins(
|
|
["unconfirmed", "confirmed", "spending", "spent"],
|
|
[outpoint_a, outpoint_b, outpoint_c, outpoint_d],
|
|
)["coins"]
|
|
)
|
|
== 4
|
|
)
|
|
|
|
# We can filter for specific statuses/outpoints.
|
|
assert (
|
|
sorted(
|
|
lianad.rpc.listcoins(["spending", "spent"])["coins"],
|
|
key=lambda c: c["outpoint"],
|
|
)
|
|
== sorted(
|
|
lianad.rpc.listcoins(["spending", "spent"], [outpoint_a, outpoint_c])[
|
|
"coins"
|
|
],
|
|
key=lambda c: c["outpoint"],
|
|
)
|
|
== sorted(
|
|
lianad.rpc.listcoins(
|
|
["unconfirmed", "confirmed", "spending", "spent"],
|
|
[outpoint_a, outpoint_c],
|
|
)["coins"],
|
|
key=lambda c: c["outpoint"],
|
|
)
|
|
== sorted(
|
|
lianad.rpc.listcoins(
|
|
["spending", "spent"], [outpoint_a, outpoint_b, outpoint_c, outpoint_d]
|
|
)["coins"],
|
|
key=lambda c: c["outpoint"],
|
|
)
|
|
)
|
|
|
|
# Finally, check that we return errors for invalid parameter values.
|
|
for statuses, outpoints in [
|
|
(["fake_status"], []),
|
|
(["spent", "fake_status"], []),
|
|
(["fake_status", "fake_status_2"], []),
|
|
(["confirmed", "spending", "fake_status"], ["fake_outpoint"]),
|
|
(["fake_status"], [outpoint_a, outpoint_b]),
|
|
]:
|
|
with pytest.raises(
|
|
RpcError,
|
|
match=re.escape(
|
|
"Invalid params: Invalid value \"fake_status\" in \\'statuses\\' parameter."
|
|
),
|
|
):
|
|
lianad.rpc.listcoins(statuses, outpoints)
|
|
|
|
for statuses, outpoints in [
|
|
([], ["fake_outpoint"]),
|
|
([], [outpoint_a, "fake_outpoint", outpoint_b]),
|
|
([], [outpoint_a, "fake_outpoint", "fake_outpoint_2"]),
|
|
([], [outpoint_a, outpoint_b, "fake_outpoint"]),
|
|
]:
|
|
with pytest.raises(
|
|
RpcError,
|
|
match=re.escape(
|
|
"Invalid params: Invalid value \"fake_outpoint\" in \\'outpoints\\' parameter."
|
|
),
|
|
):
|
|
lianad.rpc.listcoins(statuses, outpoints)
|
|
|
|
|
|
def test_jsonrpc_server(lianad, bitcoind):
|
|
"""Test passing parameters as a list or a mapping."""
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
bitcoind.rpc.sendtoaddress(addr, 1)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
|
|
outpoints = [lianad.rpc.listcoins()["coins"][0]["outpoint"]]
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 20_000,
|
|
}
|
|
res = lianad.rpc.createspend(destinations, outpoints, 18)
|
|
assert "psbt" in res
|
|
res = lianad.rpc.createspend(
|
|
outpoints=outpoints, destinations=destinations, feerate=18
|
|
)
|
|
assert "psbt" in res
|
|
|
|
|
|
def test_create_spend(lianad, bitcoind):
|
|
# Receive a number of coins in different blocks on different addresses, and
|
|
# one more on the same address.
|
|
for _ in range(15):
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
txid = bitcoind.rpc.sendtoaddress(addr, 0.01)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
txid = bitcoind.rpc.sendtoaddress(addr, 0.3556)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 16)
|
|
|
|
# Stop the daemon, should be a no-op
|
|
lianad.stop()
|
|
lianad.start()
|
|
|
|
# Now create a transaction spending all those coins to a few addresses
|
|
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 200_000,
|
|
bitcoind.rpc.getnewaddress(): 400_000,
|
|
bitcoind.rpc.getnewaddress(): 1_000_000,
|
|
}
|
|
res = lianad.rpc.createspend(destinations, outpoints, 18)
|
|
assert "psbt" in res
|
|
|
|
# The transaction must contain a change output.
|
|
spend_psbt = PSBT.from_base64(res["psbt"])
|
|
assert len(spend_psbt.o) == 4
|
|
assert len(spend_psbt.tx.vout) == 4
|
|
|
|
# The transaction must contain the spent transaction for each input.
|
|
# We don't make assumptions about the ordering of PSBT inputs.
|
|
assert sorted(
|
|
[psbt_in.map[PSBT_IN_NON_WITNESS_UTXO] for psbt_in in spend_psbt.i]
|
|
) == sorted(
|
|
[bytes.fromhex(bitcoind.rpc.gettransaction(op[:64])["hex"]) for op in outpoints]
|
|
)
|
|
|
|
# We can sign it and broadcast it.
|
|
sign_and_broadcast(lianad, bitcoind, PSBT.from_base64(res["psbt"]))
|
|
|
|
# Try creating a transaction that spends an immature coinbase deposit.
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
bitcoind.rpc.generatetoaddress(1, addr)
|
|
wait_for(
|
|
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
|
|
)
|
|
imma_coin = next(c for c in lianad.rpc.listcoins()["coins"] if c["is_immature"])
|
|
with pytest.raises(RpcError, match=".*is from an immature coinbase transaction."):
|
|
lianad.rpc.createspend(destinations, [imma_coin["outpoint"]], 1)
|
|
|
|
|
|
def test_list_spend(lianad, bitcoind):
|
|
# Start by creating two conflicting Spend PSBTs. The first one will have a change
|
|
# output but not the second one.
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
value_a = 0.2567
|
|
bitcoind.rpc.sendtoaddress(addr, value_a)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
|
|
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): int(value_a * COIN // 2),
|
|
}
|
|
res = lianad.rpc.createspend(destinations, outpoints, 6)
|
|
assert "psbt" in res
|
|
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
value_b = 0.0987
|
|
bitcoind.rpc.sendtoaddress(addr, value_b)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 2)
|
|
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): int((value_a + value_b) * COIN - 1_000),
|
|
}
|
|
res_b = lianad.rpc.createspend(destinations, outpoints, 2)
|
|
assert "psbt" in res_b
|
|
|
|
# Store them both in DB.
|
|
time_before_update = int(time.time())
|
|
assert len(lianad.rpc.listspendtxs()["spend_txs"]) == 0
|
|
lianad.rpc.updatespend(res["psbt"])
|
|
lianad.rpc.updatespend(res_b["psbt"])
|
|
|
|
# Listing all Spend transactions will list them both. It'll tell us which one has
|
|
# change and which one doesn't.
|
|
list_res = lianad.rpc.listspendtxs()["spend_txs"]
|
|
assert len(list_res) == 2
|
|
first_psbt = next(entry for entry in list_res if entry["psbt"] == res["psbt"])
|
|
assert time_before_update <= first_psbt["updated_at"] <= int(time.time())
|
|
second_psbt = next(entry for entry in list_res if entry["psbt"] == res_b["psbt"])
|
|
assert time_before_update <= second_psbt["updated_at"] <= int(time.time())
|
|
|
|
# If we delete the first one, we'll get only the second one.
|
|
first_psbt = PSBT.from_base64(res["psbt"])
|
|
lianad.rpc.delspendtx(first_psbt.tx.txid().hex())
|
|
list_res = lianad.rpc.listspendtxs()["spend_txs"]
|
|
assert len(list_res) == 1
|
|
assert list_res[0]["psbt"] == res_b["psbt"]
|
|
|
|
# If we delete the second one, result will be empty.
|
|
second_psbt = PSBT.from_base64(res_b["psbt"])
|
|
lianad.rpc.delspendtx(second_psbt.tx.txid().hex())
|
|
list_res = lianad.rpc.listspendtxs()["spend_txs"]
|
|
assert len(list_res) == 0
|
|
|
|
|
|
def test_update_spend(lianad, bitcoind):
|
|
# Start by creating a Spend PSBT
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
bitcoind.rpc.sendtoaddress(addr, 0.2567)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) > 0)
|
|
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 200_000,
|
|
}
|
|
res = lianad.rpc.createspend(destinations, outpoints, 6)
|
|
assert "psbt" in res
|
|
|
|
# Now update it
|
|
assert len(lianad.rpc.listspendtxs()["spend_txs"]) == 0
|
|
lianad.rpc.updatespend(res["psbt"])
|
|
list_res = lianad.rpc.listspendtxs()["spend_txs"]
|
|
assert len(list_res) == 1
|
|
assert list_res[0]["psbt"] == res["psbt"]
|
|
|
|
# We can add a signature and update it
|
|
psbt_sig_a = PSBT.from_base64(res["psbt"])
|
|
dummy_pk_a = bytes.fromhex(
|
|
"0375e00eb72e29da82b89367947f29ef34afb75e8654f6ea368e0acdfd92976b7c"
|
|
)
|
|
dummy_sig_a = bytes.fromhex(
|
|
"304402202b925395cfeaa0171a7a92982bb4891acc4a312cbe7691d8375d36796d5b570a0220378a8ab42832848e15d1aedded5fb360fedbdd6c39226144e527f0f1e19d539801"
|
|
)
|
|
psbt_sig_a.i[0].map[PSBT_IN_PARTIAL_SIG] = {dummy_pk_a: dummy_sig_a}
|
|
psbt_sig_a_ser = psbt_sig_a.to_base64()
|
|
lianad.rpc.updatespend(psbt_sig_a_ser)
|
|
|
|
# We'll get it when querying
|
|
list_res = lianad.rpc.listspendtxs()["spend_txs"]
|
|
assert len(list_res) == 1
|
|
assert list_res[0]["psbt"] == psbt_sig_a_ser
|
|
|
|
# We can add another signature to the empty PSBT and update it again
|
|
psbt_sig_b = PSBT.from_base64(res["psbt"])
|
|
dummy_pk_b = bytes.fromhex(
|
|
"03a1b26313f430c4b15bb1fdce663207659d8cac749a0e53d70eff01874496feff"
|
|
)
|
|
dummy_sig_b = bytes.fromhex(
|
|
"3044022005aebcd649fb8965f0591710fb3704931c3e8118ee60dd44917479f63ceba6d4022018b212900e5a80e9452366894de37f0d02fb9c89f1e94f34fb6ed7fd71c15c4101"
|
|
)
|
|
psbt_sig_b.i[0].map[PSBT_IN_PARTIAL_SIG] = {dummy_pk_b: dummy_sig_b}
|
|
psbt_sig_b_ser = psbt_sig_b.to_base64()
|
|
lianad.rpc.updatespend(psbt_sig_b_ser)
|
|
|
|
# It will have merged both.
|
|
list_res = lianad.rpc.listspendtxs()["spend_txs"]
|
|
assert len(list_res) == 1
|
|
psbt_merged = PSBT.from_base64(list_res[0]["psbt"])
|
|
assert len(psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG]) == 2
|
|
assert psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG][dummy_pk_a] == dummy_sig_a
|
|
assert psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG][dummy_pk_b] == dummy_sig_b
|
|
|
|
|
|
def test_broadcast_spend(lianad, bitcoind):
|
|
# Create a new coin and a spending tx for it.
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
bitcoind.rpc.sendtoaddress(addr, 0.2567)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) > 0)
|
|
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 200_000,
|
|
}
|
|
res = lianad.rpc.createspend(destinations, outpoints, 6)
|
|
psbt = PSBT.from_base64(res["psbt"])
|
|
txid = psbt.tx.txid().hex()
|
|
|
|
# We can't broadcast an unknown Spend
|
|
with pytest.raises(RpcError, match="Unknown spend transaction.*"):
|
|
lianad.rpc.broadcastspend(txid)
|
|
lianad.rpc.updatespend(res["psbt"])
|
|
|
|
# We can't broadcast an unsigned transaction
|
|
with pytest.raises(RpcError, match="Failed to finalize the spend transaction.*"):
|
|
lianad.rpc.broadcastspend(txid)
|
|
signed_psbt = lianad.signer.sign_psbt(PSBT.from_base64(res["psbt"]))
|
|
lianad.rpc.updatespend(signed_psbt.to_base64())
|
|
|
|
# Now we've signed and stored it, the daemon will take care of finalizing
|
|
# the PSBT before broadcasting the transaction.
|
|
lianad.rpc.broadcastspend(txid)
|
|
|
|
|
|
def test_start_rescan(lianad, bitcoind):
|
|
"""Test we successfully retrieve all our transactions after losing state by rescanning."""
|
|
initial_timestamp = int(time.time())
|
|
first_address = lianad.rpc.getnewaddress()
|
|
second_address = lianad.rpc.getnewaddress()
|
|
|
|
# Some utility functions to DRY
|
|
list_coins = lambda: lianad.rpc.listcoins()["coins"]
|
|
unspent_coins = lambda: (
|
|
c for c in lianad.rpc.listcoins()["coins"] if c["spend_info"] is None
|
|
)
|
|
sorted_coins = lambda: sorted(list_coins(), key=lambda c: c["outpoint"])
|
|
|
|
def all_spent(coins):
|
|
unspent = set(c["outpoint"] for c in unspent_coins())
|
|
for c in coins:
|
|
if c["outpoint"] in unspent:
|
|
return False
|
|
return True
|
|
|
|
# We can rescan from one second before the tip timestamp, that's almost a no-op.
|
|
tip_timestamp = bitcoind.rpc.getblockheader(bitcoind.rpc.getbestblockhash())["time"]
|
|
lianad.rpc.startrescan(tip_timestamp - 1)
|
|
wait_for(lambda: lianad.rpc.getinfo()["rescan_progress"] is None)
|
|
# We can't rescan from an insane timestamp though.
|
|
with pytest.raises(RpcError, match="Insane timestamp.*"):
|
|
lianad.rpc.startrescan(tip_timestamp)
|
|
assert lianad.rpc.getinfo()["rescan_progress"] is None
|
|
future_timestamp = tip_timestamp + 60 * 60
|
|
with pytest.raises(RpcError, match="Insane timestamp.*"):
|
|
lianad.rpc.startrescan(future_timestamp)
|
|
assert lianad.rpc.getinfo()["rescan_progress"] is None
|
|
prebitcoin_timestamp = 1231006505 - 1
|
|
with pytest.raises(RpcError, match="Insane timestamp."):
|
|
lianad.rpc.startrescan(prebitcoin_timestamp)
|
|
assert lianad.rpc.getinfo()["rescan_progress"] is None
|
|
|
|
# First, get some coins
|
|
for _ in range(10):
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
amount = random.randint(1, COIN * 10) / COIN
|
|
txid = bitcoind.rpc.sendtoaddress(addr, amount)
|
|
bitcoind.generate_block(random.randint(1, 10), wait_for_mempool=txid)
|
|
wait_for(lambda: len(list_coins()) == 10)
|
|
|
|
# Then simulate some regular activity (spend and receive)
|
|
# TODO: instead of having randomness we should lay down all different cases (with or
|
|
# without change, single or multiple inputs, sending externally or to self).
|
|
for _ in range(5):
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
amount = random.randint(1, COIN * 10) / COIN
|
|
txid = bitcoind.rpc.sendtoaddress(addr, amount)
|
|
avail = list(unspent_coins())
|
|
to_spend = random.sample(avail, random.randint(1, len(avail)))
|
|
spend_coins(lianad, bitcoind, to_spend)
|
|
bitcoind.generate_block(random.randint(1, 5), wait_for_mempool=2)
|
|
wait_for(lambda: all_spent(to_spend))
|
|
wait_for(
|
|
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
|
|
)
|
|
|
|
# Receiving addresses are derived at much higher indexes now.
|
|
assert lianad.rpc.getnewaddress() not in (first_address, second_address)
|
|
|
|
# Move time forward one day as bitcoind will rescan the last 2 hours of block upon
|
|
# importing a descriptor.
|
|
now = int(time.time())
|
|
added_time = 60 * 60 * 24
|
|
bitcoind.rpc.setmocktime(now + added_time)
|
|
bitcoind.generate_block(10)
|
|
|
|
# Now delete the wallet state. When starting up we'll re-create a fresh database
|
|
# and watchonly wallet. Those won't be aware of past coins for the configured
|
|
# descriptor.
|
|
coins_before = sorted_coins()
|
|
lianad.restart_fresh(bitcoind)
|
|
assert len(list_coins()) == 0
|
|
|
|
# The wallet isn't aware what derivation indexes were used. Necessarily it'll start
|
|
# from 0.
|
|
assert lianad.rpc.getnewaddress() == first_address
|
|
|
|
# Once the rescan is done, we must have detected all previous transactions.
|
|
lianad.rpc.startrescan(initial_timestamp)
|
|
rescan_progress = lianad.rpc.getinfo()["rescan_progress"]
|
|
assert rescan_progress is None or 0 <= rescan_progress <= 1
|
|
wait_for(lambda: lianad.rpc.getinfo()["rescan_progress"] is None)
|
|
wait_for(
|
|
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
|
|
)
|
|
assert coins_before == sorted_coins()
|
|
|
|
# Now that it caught up it noticed which one were used onchain, so it won't reuse
|
|
# this derivation indexes anymore.
|
|
assert lianad.rpc.getnewaddress() not in (first_address, second_address)
|
|
|
|
|
|
def test_listtransactions(lianad, bitcoind):
|
|
"""Test listing of transactions by txid and timespan"""
|
|
|
|
def wait_synced():
|
|
wait_for(
|
|
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
|
|
)
|
|
|
|
best_block = bitcoind.rpc.getbestblockhash()
|
|
initial_timestamp = bitcoind.rpc.getblockheader(best_block)["time"]
|
|
wait_synced()
|
|
|
|
# Deposit multiple coins in a single transaction
|
|
destinations = {
|
|
lianad.rpc.getnewaddress()["address"]: 0.0123456,
|
|
lianad.rpc.getnewaddress()["address"]: 0.0123457,
|
|
lianad.rpc.getnewaddress()["address"]: 0.0123458,
|
|
}
|
|
txid = bitcoind.rpc.sendmany("", destinations)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 3)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
|
|
# Mine 12 blocks to force the blocktime to increase
|
|
bitcoind.generate_block(12)
|
|
wait_synced()
|
|
best_block = bitcoind.rpc.getbestblockhash()
|
|
second_timestamp = bitcoind.rpc.getblockheader(best_block)["time"]
|
|
assert second_timestamp > initial_timestamp
|
|
|
|
# Deposit a coin that will be unspent
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
txid = bitcoind.rpc.sendtoaddress(addr, 0.123456)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 4)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
|
|
# Deposit a coin that will be spent with a change output
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
txid = bitcoind.rpc.sendtoaddress(addr, 0.23456)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 5)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
outpoint = next(
|
|
c["outpoint"] for c in lianad.rpc.listcoins()["coins"] if txid in c["outpoint"]
|
|
)
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 100_000,
|
|
}
|
|
res = lianad.rpc.createspend(destinations, [outpoint], 6)
|
|
psbt = PSBT.from_base64(res["psbt"])
|
|
txid = sign_and_broadcast_psbt(lianad, psbt)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
|
|
# Mine 12 blocks to force the blocktime to increase
|
|
bitcoind.generate_block(12)
|
|
wait_synced()
|
|
best_block = bitcoind.rpc.getbestblockhash()
|
|
third_timestamp = bitcoind.rpc.getblockheader(best_block)["time"]
|
|
assert third_timestamp > second_timestamp
|
|
bitcoind.generate_block(12)
|
|
wait_synced()
|
|
|
|
# Deposit a coin that will be spent with a change output and also two new deposits
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
txid = bitcoind.rpc.sendtoaddress(addr, 0.3456)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 7)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
outpoint = next(
|
|
c["outpoint"] for c in lianad.rpc.listcoins()["coins"] if txid in c["outpoint"]
|
|
)
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 11_000,
|
|
addr: 12_000, # Even with address reuse! Booooh
|
|
lianad.rpc.getnewaddress()["address"]: 13_000,
|
|
}
|
|
res = lianad.rpc.createspend(destinations, [outpoint], 6)
|
|
psbt = PSBT.from_base64(res["psbt"])
|
|
txid = sign_and_broadcast_psbt(lianad, psbt)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
|
|
# Deposit a coin that will be spending (unconfirmed spend transaction)
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
txid = bitcoind.rpc.sendtoaddress(addr, 0.456)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 11)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
outpoint = next(
|
|
c["outpoint"] for c in lianad.rpc.listcoins()["coins"] if txid in c["outpoint"]
|
|
)
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 11_000,
|
|
}
|
|
res = lianad.rpc.createspend(destinations, [outpoint], 6)
|
|
psbt = PSBT.from_base64(res["psbt"])
|
|
txid = sign_and_broadcast_psbt(lianad, psbt)
|
|
|
|
# At this point we have 12 spent and unspent coins, one of them is unconfirmed.
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 12)
|
|
|
|
# However some of them share the same txid! This is the case of the 3 first coins
|
|
# for instance, or the Spend transactions with multiple outputs at one of our addresses.
|
|
# In total, that's 8 transactions.
|
|
txids = set(c["outpoint"][:-2] for c in lianad.rpc.listcoins()["coins"])
|
|
assert len(txids) == 8
|
|
|
|
# We can query all of them at once using listtransactions. The result contains all
|
|
# the correct transactions as hex, with no duplicate.
|
|
all_txs = lianad.rpc.listtransactions(list(txids))["transactions"]
|
|
assert len(all_txs) == 8
|
|
for tx in all_txs:
|
|
txid = bitcoind.rpc.decoderawtransaction(tx["tx"])["txid"]
|
|
txids.remove(txid) # This will raise an error if it isn't there
|
|
|
|
# We can also query them one by one.
|
|
txids = set(c["outpoint"][:-2] for c in lianad.rpc.listcoins()["coins"])
|
|
for txid in txids:
|
|
txs = lianad.rpc.listtransactions([txid])["transactions"]
|
|
bit_txid = bitcoind.rpc.decoderawtransaction(txs[0]["tx"])["txid"]
|
|
assert bit_txid == txid
|
|
|
|
# We can query all confirmed transactions
|
|
best_block = bitcoind.rpc.getbestblockhash()
|
|
final_timestamp = bitcoind.rpc.getblockheader(best_block)["time"]
|
|
txs = lianad.rpc.listconfirmed(initial_timestamp, final_timestamp, 10)[
|
|
"transactions"
|
|
]
|
|
assert len(txs) == 7, "The last spend tx is unconfirmed"
|
|
for tx in txs:
|
|
txid = bitcoind.rpc.decoderawtransaction(tx["tx"])["txid"]
|
|
txids.remove(txid) # This will raise an error if it isn't there
|
|
|
|
# We can limit the size of the result
|
|
txs = lianad.rpc.listconfirmed(initial_timestamp, final_timestamp, 5)[
|
|
"transactions"
|
|
]
|
|
assert len(txs) == 5
|
|
|
|
# We can restrict the query to a certain time window.
|
|
# First get the txid of all the transactions that happened during this timespan.
|
|
txids = set()
|
|
for coin in lianad.rpc.listcoins()["coins"]:
|
|
if coin["block_height"] is None:
|
|
continue
|
|
block_hash = bitcoind.rpc.getblockhash(coin["block_height"])
|
|
block_time = bitcoind.rpc.getblockheader(block_hash)["time"]
|
|
spend_time = None
|
|
if coin["spend_info"] is not None and coin["spend_info"]["height"] is not None:
|
|
spend_bhash = bitcoind.rpc.getblockhash(coin["spend_info"]["height"])
|
|
spend_time = bitcoind.rpc.getblockheader(spend_bhash)["time"]
|
|
if (block_time >= second_timestamp and block_time <= third_timestamp) or (
|
|
spend_time is not None
|
|
and spend_time >= second_timestamp
|
|
and spend_time <= third_timestamp
|
|
):
|
|
txids.add(coin["outpoint"][:-2])
|
|
# It's all 7 minus the first deposit and the last confirmed spend. So that's 5 of them.
|
|
assert len(txids) == 3
|
|
# Now let's compare with what lianad is giving us.
|
|
txs = lianad.rpc.listconfirmed(second_timestamp, third_timestamp, 10)[
|
|
"transactions"
|
|
]
|
|
assert len(txs) == 3
|
|
bit_txids = set(bitcoind.rpc.decoderawtransaction(tx["tx"])["txid"] for tx in txs)
|
|
assert bit_txids == txids
|
|
|
|
|
|
def test_create_recovery(lianad, bitcoind):
|
|
"""Test the sweep of coins that are available through the timelocked path."""
|
|
# Start by getting a few coins
|
|
destinations = {
|
|
lianad.rpc.getnewaddress()["address"]: 0.1,
|
|
lianad.rpc.getnewaddress()["address"]: 0.2,
|
|
lianad.rpc.getnewaddress()["address"]: 0.3,
|
|
}
|
|
txid = bitcoind.rpc.sendmany("", destinations)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
wait_for(
|
|
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
|
|
)
|
|
|
|
# There's nothing to sweep
|
|
with pytest.raises(
|
|
RpcError,
|
|
match="No coin currently spendable through this timelocked recovery path",
|
|
):
|
|
lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 2)
|
|
|
|
# Receive another coin, it will be one block after the others
|
|
txid = bitcoind.rpc.sendtoaddress(lianad.rpc.getnewaddress()["address"], 0.4)
|
|
|
|
# Make the timelock of the 3 first coins mature (we use a csv of 10 in the fixture)
|
|
bitcoind.generate_block(9, wait_for_mempool=txid)
|
|
|
|
# Now we can create a recovery tx that sweeps the first 3 coins.
|
|
res = lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 18)
|
|
reco_psbt = PSBT.from_base64(res["psbt"])
|
|
assert len(reco_psbt.tx.vin) == 3, "The last coin's timelock hasn't matured yet"
|
|
assert len(reco_psbt.tx.vout) == 1
|
|
assert int(0.5999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.6 * COIN)
|
|
txid = sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True)
|
|
|
|
# And by mining one more block we'll be able to sweep the last coin.
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
wait_for(
|
|
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
|
|
)
|
|
res = lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 1)
|
|
reco_psbt = PSBT.from_base64(res["psbt"])
|
|
assert len(reco_psbt.tx.vin) == 1
|
|
assert len(reco_psbt.tx.vout) == 1
|
|
assert int(0.39999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.4 * COIN)
|
|
sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True)
|
|
|
|
|
|
def test_labels(lianad, bitcoind):
|
|
"""Test the creation and updating of labels."""
|
|
# We can set a label for an address.
|
|
addr = lianad.rpc.getnewaddress()["address"]
|
|
lianad.rpc.updatelabels({addr: "first-addr"})
|
|
assert lianad.rpc.getlabels([addr])["labels"] == {addr: "first-addr"}
|
|
# And also update it.
|
|
lianad.rpc.updatelabels({addr: "first-addr-1"})
|
|
assert lianad.rpc.getlabels([addr])["labels"] == {addr: "first-addr-1"}
|
|
# But we can't set a label larger than 100 characters
|
|
with pytest.raises(RpcError, match=".*must be less or equal than 100 characters"):
|
|
lianad.rpc.updatelabels({addr: "".join("a" for _ in range(101))})
|
|
|
|
# We can set a label for a coin.
|
|
sec_addr = lianad.rpc.getnewaddress()["address"]
|
|
txid = bitcoind.rpc.sendtoaddress(sec_addr, 1)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
|
|
coin = lianad.rpc.listcoins()["coins"][0]
|
|
lianad.rpc.updatelabels({coin["outpoint"]: "first-coin"})
|
|
assert lianad.rpc.getlabels([coin["outpoint"]])["labels"] == {
|
|
coin["outpoint"]: "first-coin"
|
|
}
|
|
# And also update it.
|
|
lianad.rpc.updatelabels({coin["outpoint"]: "first-coin-1"})
|
|
assert lianad.rpc.getlabels([coin["outpoint"]])["labels"] == {
|
|
coin["outpoint"]: "first-coin-1"
|
|
}
|
|
# Its address though has no label.
|
|
assert lianad.rpc.getlabels([sec_addr])["labels"] == {}
|
|
# But we can receive a coin to the address that has a label set, and query both.
|
|
sec_txid = bitcoind.rpc.sendtoaddress(addr, 1)
|
|
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 2)
|
|
sec_coin = next(
|
|
c for c in lianad.rpc.listcoins()["coins"] if sec_txid in c["outpoint"]
|
|
)
|
|
lianad.rpc.updatelabels({sec_coin["outpoint"]: "sec-coin"})
|
|
res = lianad.rpc.getlabels([sec_coin["outpoint"], addr])["labels"]
|
|
assert len(res) == 2
|
|
assert res[sec_coin["outpoint"]] == "sec-coin"
|
|
assert res[addr] == "first-addr-1"
|
|
# We can also query the labels for both coins, of course.
|
|
res = lianad.rpc.getlabels([coin["outpoint"], sec_coin["outpoint"]])["labels"]
|
|
assert len(res) == 2
|
|
assert res[coin["outpoint"]] == "first-coin-1"
|
|
assert res[sec_coin["outpoint"]] == "sec-coin"
|
|
|
|
# We can set, update and query labels for deposit transactions.
|
|
lianad.rpc.updatelabels({txid: "first-deposit"})
|
|
assert lianad.rpc.getlabels([txid, sec_txid])["labels"] == {txid: "first-deposit"}
|
|
lianad.rpc.updatelabels({txid: "first-deposit-1", sec_txid: "second-deposit"})
|
|
res = lianad.rpc.getlabels([txid, sec_txid])["labels"]
|
|
assert len(res) == 2
|
|
assert res[txid] == "first-deposit-1"
|
|
assert res[sec_txid] == "second-deposit"
|
|
|
|
# We can set and update a label for a spend transaction.
|
|
spend_txid = get_txid(spend_coins(lianad, bitcoind, [coin, sec_coin]))
|
|
lianad.rpc.updatelabels({spend_txid: "spend-tx"})
|
|
assert lianad.rpc.getlabels([spend_txid])["labels"] == {spend_txid: "spend-tx"}
|
|
lianad.rpc.updatelabels({spend_txid: "spend-tx-1"})
|
|
assert lianad.rpc.getlabels([spend_txid])["labels"] == {spend_txid: "spend-tx-1"}
|
|
|
|
# We can set labels for inexistent stuff, as long as the format of the item being
|
|
# labelled is valid.
|
|
inexistent_txid = "".join("0" for _ in range(64))
|
|
inexistent_outpoint = "".join("1" for _ in range(64)) + ":42"
|
|
random_address = bitcoind.rpc.getnewaddress()
|
|
lianad.rpc.updatelabels(
|
|
{
|
|
inexistent_txid: "inex_txid",
|
|
inexistent_outpoint: "inex_outpoint",
|
|
random_address: "bitcoind-addr",
|
|
}
|
|
)
|
|
res = lianad.rpc.getlabels([inexistent_txid, inexistent_outpoint, random_address])[
|
|
"labels"
|
|
]
|
|
assert len(res) == 3
|
|
assert res[inexistent_txid] == "inex_txid"
|
|
assert res[inexistent_outpoint] == "inex_outpoint"
|
|
assert res[random_address] == "bitcoind-addr"
|
|
|
|
# We'll confirm everything, shouldn't affect any of the labels.
|
|
bitcoind.generate_block(1, wait_for_mempool=spend_txid)
|
|
wait_for(
|
|
lambda: bitcoind.rpc.getblockcount() == lianad.rpc.getinfo()["block_height"]
|
|
)
|
|
res = lianad.rpc.getlabels(
|
|
[
|
|
addr,
|
|
sec_addr, # No label for this one.
|
|
txid,
|
|
sec_txid,
|
|
coin["outpoint"],
|
|
sec_coin["outpoint"],
|
|
spend_txid,
|
|
inexistent_txid,
|
|
inexistent_outpoint,
|
|
random_address,
|
|
]
|
|
)["labels"]
|
|
assert len(res) == 9
|
|
assert res[sec_coin["outpoint"]] == "sec-coin"
|
|
assert res[addr] == "first-addr-1"
|
|
assert res[coin["outpoint"]] == "first-coin-1"
|
|
assert res[txid] == "first-deposit-1"
|
|
assert res[sec_txid] == "second-deposit"
|
|
assert res[spend_txid] == "spend-tx-1"
|
|
assert res[inexistent_txid] == "inex_txid"
|
|
assert res[inexistent_outpoint] == "inex_outpoint"
|
|
assert res[random_address] == "bitcoind-addr"
|
|
|
|
# Delete 2 of the labels set above. They shouldn't be returned anymore.
|
|
lianad.rpc.updatelabels(
|
|
{
|
|
addr: None,
|
|
sec_addr: None,
|
|
random_address: "this address is random",
|
|
}
|
|
)
|
|
res = lianad.rpc.getlabels([addr, sec_addr, random_address])["labels"]
|
|
assert len(res) == 1
|
|
assert addr not in res
|
|
assert sec_addr not in res
|
|
assert res[random_address] == "this address is random"
|
|
|
|
|
|
def test_rbfpsbt_bump_fee(lianad, bitcoind):
|
|
"""Test the use of RBF to bump the fee of a transaction."""
|
|
|
|
# Get three coins.
|
|
destinations = {
|
|
lianad.rpc.getnewaddress()["address"]: 0.003,
|
|
lianad.rpc.getnewaddress()["address"]: 0.004,
|
|
lianad.rpc.getnewaddress()["address"]: 0.005,
|
|
}
|
|
txid = bitcoind.rpc.sendmany("", destinations)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 3)
|
|
coins = lianad.rpc.listcoins(["confirmed"])["coins"]
|
|
|
|
# Create a spend that will later be replaced.
|
|
first_outpoints = [c["outpoint"] for c in coins[:2]]
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 650_000,
|
|
}
|
|
first_res = lianad.rpc.createspend(destinations, first_outpoints, 1)
|
|
first_psbt = PSBT.from_base64(first_res["psbt"])
|
|
# The transaction has a change output.
|
|
assert len(first_psbt.o) == len(first_psbt.tx.vout) == 2
|
|
first_txid = first_psbt.tx.txid().hex()
|
|
# We must provide a valid feerate.
|
|
for bad_feerate in [-1, "foo", 18_446_744_073_709_551_616]:
|
|
with pytest.raises(RpcError, match=f"Invalid 'feerate' parameter."):
|
|
lianad.rpc.rbfpsbt(first_txid, False, bad_feerate)
|
|
# We cannot RBF yet as first PSBT has not been saved.
|
|
with pytest.raises(RpcError, match=f"Unknown spend transaction '{first_txid}'."):
|
|
lianad.rpc.rbfpsbt(first_txid, False, 1)
|
|
# Now save the PSBT.
|
|
lianad.rpc.updatespend(first_res["psbt"])
|
|
# The RBF command succeeds even if transaction has not been signed.
|
|
lianad.rpc.rbfpsbt(first_txid, False, 2)
|
|
# The RBF command also succeeds if transaction has been signed but not broadcast.
|
|
first_psbt = lianad.signer.sign_psbt(first_psbt)
|
|
lianad.rpc.updatespend(first_psbt.to_base64())
|
|
lianad.rpc.rbfpsbt(first_txid, False, 2)
|
|
# Now broadcast the spend and wait for it to be detected.
|
|
lianad.rpc.broadcastspend(first_txid)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == first_txid
|
|
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
|
|
)
|
|
)
|
|
# We can now use RBF, but the feerate must be higher than that of the first transaction.
|
|
with pytest.raises(RpcError, match=f"Feerate too low: 1."):
|
|
lianad.rpc.rbfpsbt(first_txid, False, 1)
|
|
# Using a higher feerate works.
|
|
lianad.rpc.rbfpsbt(first_txid, False, 2)
|
|
# Let's use an even higher feerate.
|
|
rbf_1_res = lianad.rpc.rbfpsbt(first_txid, False, 10)
|
|
rbf_1_psbt = PSBT.from_base64(rbf_1_res["psbt"])
|
|
# The inputs are the same in both (no new inputs needed in the replacement).
|
|
assert sorted(
|
|
psbt_in.map[PSBT_IN_NON_WITNESS_UTXO] for psbt_in in first_psbt.i
|
|
) == sorted(psbt_in.map[PSBT_IN_NON_WITNESS_UTXO] for psbt_in in rbf_1_psbt.i)
|
|
# Check non-change output is the same in both.
|
|
assert first_psbt.tx.vout[0].nValue == rbf_1_psbt.tx.vout[0].nValue
|
|
assert first_psbt.tx.vout[0].scriptPubKey == rbf_1_psbt.tx.vout[0].scriptPubKey
|
|
# Change address is the same but change amount will be lower in the replacement to pay higher fee.
|
|
assert first_psbt.tx.vout[1].nValue > rbf_1_psbt.tx.vout[1].nValue
|
|
assert first_psbt.tx.vout[1].scriptPubKey == rbf_1_psbt.tx.vout[1].scriptPubKey
|
|
# Broadcast the replacement and wait for it to be detected.
|
|
rbf_1_txid = sign_and_broadcast_psbt(lianad, rbf_1_psbt)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == rbf_1_txid
|
|
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
|
|
)
|
|
)
|
|
# If we try to RBF the first transaction again, it will use the first RBF's
|
|
# feerate of 10 sat/vb to set the min feerate, instead of 1 sat/vb of first
|
|
# transaction:
|
|
with pytest.raises(RpcError, match=f"Feerate too low: 10."):
|
|
lianad.rpc.rbfpsbt(first_txid, False, 10)
|
|
# Using 11 for feerate works.
|
|
lianad.rpc.rbfpsbt(first_txid, False, 11)
|
|
# Add a new transaction spending the change from the first RBF.
|
|
desc_1_destinations = {
|
|
bitcoind.rpc.getnewaddress(): 500_000,
|
|
}
|
|
desc_1_outpoints = [f"{rbf_1_txid}:1", coins[2]["outpoint"]]
|
|
wait_for(lambda: len(lianad.rpc.listcoins([], desc_1_outpoints)["coins"]) == 2)
|
|
desc_1_res = lianad.rpc.createspend(desc_1_destinations, desc_1_outpoints, 1)
|
|
desc_1_psbt = PSBT.from_base64(desc_1_res["psbt"])
|
|
assert len(desc_1_psbt.tx.vout) == 2
|
|
desc_1_txid = sign_and_broadcast_psbt(lianad, desc_1_psbt)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == desc_1_txid
|
|
for c in lianad.rpc.listcoins([], desc_1_outpoints)["coins"]
|
|
)
|
|
)
|
|
# Add a new transaction spending the change from the first descendant.
|
|
desc_2_destinations = {
|
|
bitcoind.rpc.getnewaddress(): 25_000,
|
|
}
|
|
desc_2_outpoints = [f"{desc_1_txid}:1"]
|
|
wait_for(lambda: len(lianad.rpc.listcoins([], desc_2_outpoints)["coins"]) == 1)
|
|
desc_2_res = lianad.rpc.createspend(desc_2_destinations, desc_2_outpoints, 1)
|
|
desc_2_psbt = PSBT.from_base64(desc_2_res["psbt"])
|
|
assert len(desc_2_psbt.tx.vout) == 2
|
|
desc_2_txid = sign_and_broadcast_psbt(lianad, desc_2_psbt)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == desc_2_txid
|
|
for c in lianad.rpc.listcoins([], desc_2_outpoints)["coins"]
|
|
)
|
|
)
|
|
# Now replace the first RBF, which will also remove its descendants.
|
|
rbf_2_res = lianad.rpc.rbfpsbt(rbf_1_txid, False, 11)
|
|
rbf_2_psbt = PSBT.from_base64(rbf_2_res["psbt"])
|
|
# The inputs are the same in both (no new inputs needed in the replacement).
|
|
assert sorted(
|
|
psbt_in.map[PSBT_IN_NON_WITNESS_UTXO] for psbt_in in rbf_1_psbt.i
|
|
) == sorted(psbt_in.map[PSBT_IN_NON_WITNESS_UTXO] for psbt_in in rbf_2_psbt.i)
|
|
# Check non-change output is the same in both.
|
|
assert rbf_1_psbt.tx.vout[0].nValue == rbf_2_psbt.tx.vout[0].nValue
|
|
assert rbf_1_psbt.tx.vout[0].scriptPubKey == rbf_2_psbt.tx.vout[0].scriptPubKey
|
|
# Change address is the same but change amount will be lower in the replacement to pay higher fee.
|
|
assert rbf_1_psbt.tx.vout[1].nValue > rbf_2_psbt.tx.vout[1].nValue
|
|
assert rbf_1_psbt.tx.vout[1].scriptPubKey == rbf_2_psbt.tx.vout[1].scriptPubKey
|
|
|
|
# Broadcast the replacement and wait for it to be detected.
|
|
rbf_2_txid = sign_and_broadcast_psbt(lianad, rbf_2_psbt)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == rbf_2_txid
|
|
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
|
|
)
|
|
)
|
|
# The unconfirmed coins used in the descendant transactions have been removed so that
|
|
# only one of the input coins remains, and its spend info has been wiped so that it is as before.
|
|
assert lianad.rpc.listcoins([], desc_1_outpoints + desc_2_outpoints)["coins"] == [
|
|
coins[2]
|
|
]
|
|
# Now confirm the replacement transaction.
|
|
bitcoind.generate_block(1, wait_for_mempool=rbf_2_txid)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"]["txid"] == rbf_2_txid
|
|
and c["spend_info"]["height"] is not None
|
|
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
|
|
)
|
|
)
|
|
|
|
|
|
def test_rbfpsbt_insufficient_funds(lianad, bitcoind):
|
|
"""Trying to increase the fee too much returns the missing funds amount."""
|
|
# Get a coin.
|
|
deposit_txid_1 = bitcoind.rpc.sendtoaddress(
|
|
lianad.rpc.getnewaddress()["address"], 30_000 / COIN
|
|
)
|
|
bitcoind.generate_block(1, wait_for_mempool=deposit_txid_1)
|
|
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 1)
|
|
|
|
# Create a spend that we will then attempt to replace.
|
|
destinations_1 = {
|
|
bitcoind.rpc.getnewaddress(): 29_800,
|
|
}
|
|
spend_res_1 = lianad.rpc.createspend(destinations_1, [], 1)
|
|
spend_psbt_1 = PSBT.from_base64(spend_res_1["psbt"])
|
|
spend_txid_1 = sign_and_broadcast_psbt(lianad, spend_psbt_1)
|
|
|
|
# We don't have sufficient funds to bump the fee.
|
|
assert "missing" in lianad.rpc.rbfpsbt(spend_txid_1, False, 2)
|
|
# We can still cancel it as the coin has enough value to create a single
|
|
# output at a higher feerate.
|
|
assert "psbt" in lianad.rpc.rbfpsbt(spend_txid_1, True)
|
|
|
|
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 0)
|
|
# Get another coin.
|
|
deposit_txid_2 = bitcoind.rpc.sendtoaddress(
|
|
lianad.rpc.getnewaddress()["address"], 5_200 / COIN
|
|
)
|
|
bitcoind.generate_block(1, wait_for_mempool=deposit_txid_2)
|
|
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 1)
|
|
|
|
# Create a spend that we will then attempt to cancel.
|
|
destinations_2 = {
|
|
bitcoind.rpc.getnewaddress(): 5_000,
|
|
}
|
|
spend_res_2 = lianad.rpc.createspend(destinations_2, [], 1)
|
|
spend_psbt_2 = PSBT.from_base64(spend_res_2["psbt"])
|
|
spend_txid_2 = sign_and_broadcast_psbt(lianad, spend_psbt_2)
|
|
|
|
# We don't have enough to create a transaction with feerate 2 sat/vb.
|
|
assert "missing" in lianad.rpc.rbfpsbt(spend_txid_2, True)
|
|
|
|
|
|
def test_rbfpsbt_cancel(lianad, bitcoind):
|
|
"""Test the use of RBF to cancel a transaction."""
|
|
|
|
# Get three coins.
|
|
destinations = {
|
|
lianad.rpc.getnewaddress()["address"]: 0.003,
|
|
lianad.rpc.getnewaddress()["address"]: 0.004,
|
|
lianad.rpc.getnewaddress()["address"]: 0.005,
|
|
}
|
|
txid = bitcoind.rpc.sendmany("", destinations)
|
|
bitcoind.generate_block(1, wait_for_mempool=txid)
|
|
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 3)
|
|
coins = lianad.rpc.listcoins(["confirmed"])["coins"]
|
|
|
|
# Create a spend that will later be replaced.
|
|
first_outpoints = [c["outpoint"] for c in coins[:2]]
|
|
destinations = {
|
|
bitcoind.rpc.getnewaddress(): 650_000,
|
|
}
|
|
first_res = lianad.rpc.createspend(destinations, first_outpoints, 1)
|
|
first_psbt = PSBT.from_base64(first_res["psbt"])
|
|
# The transaction has a change output.
|
|
assert len(first_psbt.o) == len(first_psbt.tx.vout) == 2
|
|
first_txid = first_psbt.tx.txid().hex()
|
|
# Broadcast the spend and wait for it to be detected.
|
|
first_txid = sign_and_broadcast_psbt(lianad, first_psbt)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == first_txid
|
|
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
|
|
)
|
|
)
|
|
# We can use RBF and let the command choose the min possible feerate (1 larger than previous).
|
|
rbf_1_res = lianad.rpc.rbfpsbt(first_txid, True)
|
|
# But we can't set the feerate explicitly.
|
|
with pytest.raises(
|
|
RpcError,
|
|
match=re.escape(
|
|
"A feerate must not be provided if creating a cancel."
|
|
),
|
|
):
|
|
rbf_1_res = lianad.rpc.rbfpsbt(first_txid, True, 2)
|
|
rbf_1_psbt = PSBT.from_base64(rbf_1_res["psbt"])
|
|
# Replacement only has a single input.
|
|
assert len(rbf_1_psbt.i) == 1
|
|
# This input is one of the two from the previous transaction.
|
|
assert rbf_1_psbt.i[0].map[PSBT_IN_NON_WITNESS_UTXO] in [
|
|
psbt_in.map[PSBT_IN_NON_WITNESS_UTXO] for psbt_in in rbf_1_psbt.i
|
|
]
|
|
# The replacement only has a change output.
|
|
assert len(rbf_1_psbt.tx.vout) == 1
|
|
# Change address is the same but change amount will be higher in the replacement as it is the only output.
|
|
assert first_psbt.tx.vout[1].nValue < rbf_1_psbt.tx.vout[0].nValue
|
|
assert first_psbt.tx.vout[1].scriptPubKey == rbf_1_psbt.tx.vout[0].scriptPubKey
|
|
# Broadcast the replacement and wait for it to be detected.
|
|
rbf_1_txid = sign_and_broadcast_psbt(lianad, rbf_1_psbt)
|
|
# The spend info of the coin used in the replacement will be updated.
|
|
|
|
rbf_1_outpoint = (
|
|
f"{rbf_1_psbt.tx.vin[0].prevout.hash:064x}:{rbf_1_psbt.tx.vin[0].prevout.n}"
|
|
)
|
|
assert rbf_1_outpoint in first_outpoints
|
|
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == rbf_1_txid
|
|
for c in lianad.rpc.listcoins([], [rbf_1_outpoint])["coins"]
|
|
)
|
|
)
|
|
# The other coin will have its spend info removed.
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is None
|
|
for c in lianad.rpc.listcoins(
|
|
[], [op for op in first_outpoints if op != rbf_1_outpoint]
|
|
)["coins"]
|
|
)
|
|
)
|
|
# Add a new transaction spending the only output (change) from the first RBF.
|
|
desc_1_destinations = {
|
|
bitcoind.rpc.getnewaddress(): 500_000,
|
|
}
|
|
desc_1_outpoints = [f"{rbf_1_txid}:0", coins[2]["outpoint"]]
|
|
wait_for(lambda: len(lianad.rpc.listcoins([], desc_1_outpoints)["coins"]) == 2)
|
|
desc_1_res = lianad.rpc.createspend(desc_1_destinations, desc_1_outpoints, 1)
|
|
desc_1_psbt = PSBT.from_base64(desc_1_res["psbt"])
|
|
assert len(desc_1_psbt.tx.vout) == 2
|
|
desc_1_txid = sign_and_broadcast_psbt(lianad, desc_1_psbt)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == desc_1_txid
|
|
for c in lianad.rpc.listcoins([], desc_1_outpoints)["coins"]
|
|
)
|
|
)
|
|
# Add a new transaction spending the change from the first descendant.
|
|
desc_2_destinations = {
|
|
bitcoind.rpc.getnewaddress(): 25_000,
|
|
}
|
|
desc_2_outpoints = [f"{desc_1_txid}:1"]
|
|
wait_for(lambda: len(lianad.rpc.listcoins([], desc_2_outpoints)["coins"]) == 1)
|
|
desc_2_res = lianad.rpc.createspend(desc_2_destinations, desc_2_outpoints, 1)
|
|
desc_2_psbt = PSBT.from_base64(desc_2_res["psbt"])
|
|
assert len(desc_2_psbt.tx.vout) == 2
|
|
desc_2_txid = sign_and_broadcast_psbt(lianad, desc_2_psbt)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == desc_2_txid
|
|
for c in lianad.rpc.listcoins([], desc_2_outpoints)["coins"]
|
|
)
|
|
)
|
|
# Now cancel the first RBF, which will also remove its descendants.
|
|
rbf_2_res = lianad.rpc.rbfpsbt(rbf_1_txid, True)
|
|
rbf_2_psbt = PSBT.from_base64(rbf_2_res["psbt"])
|
|
#
|
|
assert len(rbf_2_psbt.i) == 1
|
|
assert (
|
|
rbf_1_psbt.i[0].map[PSBT_IN_NON_WITNESS_UTXO]
|
|
== rbf_2_psbt.i[0].map[PSBT_IN_NON_WITNESS_UTXO]
|
|
)
|
|
# The inputs are the same in both (no new inputs needed in the replacement).
|
|
|
|
# Only a single output (change) in the replacement.
|
|
assert len(rbf_2_psbt.tx.vout) == 1
|
|
# Change address is the same but change amount will be lower in the replacement to pay higher fee.
|
|
assert rbf_1_psbt.tx.vout[0].nValue > rbf_2_psbt.tx.vout[0].nValue
|
|
assert rbf_1_psbt.tx.vout[0].scriptPubKey == rbf_2_psbt.tx.vout[0].scriptPubKey
|
|
|
|
# Broadcast the replacement and wait for it to be detected.
|
|
rbf_2_txid = sign_and_broadcast_psbt(lianad, rbf_2_psbt)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"] is not None and c["spend_info"]["txid"] == rbf_2_txid
|
|
for c in lianad.rpc.listcoins([], [rbf_1_outpoint])["coins"]
|
|
)
|
|
)
|
|
# The unconfirmed coins used in the descendant transactions have been removed so that
|
|
# only one of the input coins remains, and its spend info has been wiped so that it is as before.
|
|
assert lianad.rpc.listcoins([], desc_1_outpoints + desc_2_outpoints)["coins"] == [
|
|
coins[2]
|
|
]
|
|
# Now confirm the replacement transaction.
|
|
bitcoind.generate_block(1, wait_for_mempool=rbf_2_txid)
|
|
wait_for(
|
|
lambda: all(
|
|
c["spend_info"]["txid"] == rbf_2_txid
|
|
and c["spend_info"]["height"] is not None
|
|
for c in lianad.rpc.listcoins([], [rbf_1_outpoint])["coins"]
|
|
)
|
|
)
|