liana/tests/test_rpc.py
2025-07-22 10:38:42 +02:00

1963 lines
80 KiB
Python

import pytest
import random
import re
import time
from fixtures import *
from test_framework.serializations import (
PSBT,
PSBT_IN_PARTIAL_SIG,
PSBT_IN_NON_WITNESS_UTXO,
)
from test_framework.utils import (
wait_for,
COIN,
RpcError,
get_txid,
spend_coins,
sign_and_broadcast,
sign_and_broadcast_psbt,
USE_TAPROOT,
)
MAX_DERIV = 2**31 - 1
def test_getinfo(lianad):
res = lianad.rpc.getinfo()
assert "timestamp" in res.keys()
assert res["version"] == "12.0-dev"
assert res["network"] == "regtest"
wait_for(lambda: lianad.rpc.getinfo()["block_height"] == 101)
res = lianad.rpc.getinfo()
assert res["sync"] == 1.0
assert "main" in res["descriptors"]
assert res["rescan_progress"] is None
last_poll_timestamp = res["last_poll_timestamp"]
assert last_poll_timestamp is not None
time.sleep(lianad.poll_interval_secs + 1)
res = lianad.rpc.getinfo()
assert res["last_poll_timestamp"] > last_poll_timestamp
assert res["receive_index"] == 0
assert res["change_index"] == 0
def test_update_derivation_indexes(lianad):
info = lianad.rpc.getinfo()
assert info["receive_index"] == 0
assert info["change_index"] == 0
ret = lianad.rpc.updatederivationindexes(0, 0)
info = lianad.rpc.getinfo()
assert info["receive_index"] == 0
assert info["change_index"] == 0
assert ret["receive"] == 0
assert ret["change"] == 0
ret = lianad.rpc.updatederivationindexes(receive=3)
info = lianad.rpc.getinfo()
assert info["receive_index"] == 3
assert info["change_index"] == 0
assert ret["receive"] == 3
assert ret["change"] == 0
ret = lianad.rpc.updatederivationindexes(change=4)
info = lianad.rpc.getinfo()
assert info["receive_index"] == 3
assert info["change_index"] == 4
assert ret["receive"] == 3
assert ret["change"] == 4
ret = lianad.rpc.updatederivationindexes(receive=1, change=2)
info = lianad.rpc.getinfo()
assert info["receive_index"] == 3
assert info["change_index"] == 4
assert ret["receive"] == 3
assert ret["change"] == 4
ret = lianad.rpc.updatederivationindexes(5, 6)
info = lianad.rpc.getinfo()
assert info["receive_index"] == 5
assert info["change_index"] == 6
assert ret["receive"] == 5
assert ret["change"] == 6
ret = lianad.rpc.updatederivationindexes(0, 0)
info = lianad.rpc.getinfo()
assert info["receive_index"] == 5
assert info["change_index"] == 6
assert ret["receive"] == 5
assert ret["change"] == 6
# Will explicitly error on invalid indexes
with pytest.raises(
RpcError,
match=re.escape("Invalid params: Invalid value for 'receive' param"),
):
lianad.rpc.updatederivationindexes(-1)
with pytest.raises(
RpcError,
match=re.escape("Invalid params: Invalid value for 'change' param"),
):
lianad.rpc.updatederivationindexes(0, -1)
with pytest.raises(
RpcError,
match=re.escape("Unhardened or overflowing BIP32 derivation index."),
):
lianad.rpc.updatederivationindexes(MAX_DERIV + 1, 2)
with pytest.raises(
RpcError,
match=re.escape("Unhardened or overflowing BIP32 derivation index."),
):
lianad.rpc.updatederivationindexes(0, MAX_DERIV + 1)
with pytest.raises(
RpcError,
match=re.escape("Unhardened or overflowing BIP32 derivation index."),
):
lianad.rpc.updatederivationindexes(receive=(MAX_DERIV + 1))
with pytest.raises(
RpcError,
match=re.escape("Unhardened or overflowing BIP32 derivation index."),
):
lianad.rpc.updatederivationindexes(change=(MAX_DERIV + 1))
with pytest.raises(
RpcError,
match=re.escape("Invalid params: Missing 'receive' or 'change' parameter"),
):
lianad.rpc.updatederivationindexes()
last_derivs = lianad.rpc.updatederivationindexes(0, 0)
last_receive = last_derivs["receive"]
last_change = last_derivs["change"]
ret = lianad.rpc.updatederivationindexes(0, (MAX_DERIV - 1))
assert ret["receive"] == last_receive
assert ret["change"] == last_change + 1000
last_derivs = lianad.rpc.updatederivationindexes(0, 0)
last_receive = last_derivs["receive"]
last_change = last_derivs["change"]
ret = lianad.rpc.updatederivationindexes((MAX_DERIV - 1), 0)
assert ret["receive"] == last_receive + 1000
assert ret["change"] == last_change
def test_getaddress(lianad):
res = lianad.rpc.getnewaddress()
assert "address" in res
# The first new wallet address has index 1
assert res["derivation_index"] == 1
# We'll get a new one at every call
assert res["address"] != lianad.rpc.getnewaddress()["address"]
# new address has derivation_index higher than the previous one
assert lianad.rpc.getnewaddress()["derivation_index"] == res["derivation_index"] + 2
info = lianad.rpc.getinfo()
assert info["receive_index"] == res["derivation_index"] + 2 # 3 == 1 + 2
assert info["change_index"] == 0
def test_listaddresses(lianad):
list1 = lianad.rpc.listaddresses(2, 5)
list2 = lianad.rpc.listaddresses(start_index=2, count=5)
assert list1 == list2
assert "addresses" in list1
addr = list1["addresses"]
assert addr[0]["index"] == 2
assert addr[-1]["index"] == 6
list3 = (
lianad.rpc.listaddresses()
) # start_index = 0, receive_index = 0 (returns 1 "used" address for index 0)
_ = lianad.rpc.getnewaddress() # start_index = 0, receive_index = 1
_ = lianad.rpc.getnewaddress() # start_index = 0, receive_index = 2
# list4 returns all indexes from 0 up to last used.
# The first new address has index 1, so returned indexes are 0, 1, 2:
list4 = lianad.rpc.listaddresses()
assert len(list4["addresses"]) == len(list3["addresses"]) + 2 == 3
list5 = lianad.rpc.listaddresses(0)
assert list4 == list5
# Will explicitly error on invalid start_index.
with pytest.raises(
RpcError,
match=re.escape(
"Invalid params: Invalid value for \\'start_index\\': \"blabla\""
),
):
lianad.rpc.listaddresses("blabla", None)
# Will explicitly error on invalid count.
with pytest.raises(
RpcError,
match=re.escape("Invalid params: Invalid value for \\'count\\': \"blb\""),
):
lianad.rpc.listaddresses(0, "blb")
def test_listrevealedaddresses(lianad, bitcoind):
# Get addresses for reference:
addresses = lianad.rpc.listaddresses(0, 10)["addresses"]
# We start with index 0 already "revealed":
list_rec = lianad.rpc.listrevealedaddresses(False, False, 10)
assert list_rec["continue_from"] is None # there are no more addresses to list
assert len(list_rec["addresses"]) == 1
assert list_rec["addresses"][0]["index"] == 0
assert list_rec["addresses"][0]["address"] == addresses[0]["receive"]
assert list_rec["addresses"][0]["used_count"] == 0
assert list_rec["addresses"][0]["label"] is None
# Generate some addresses.
addr_1 = lianad.rpc.getnewaddress()["address"]
addr_2 = lianad.rpc.getnewaddress()["address"]
addr_3 = lianad.rpc.getnewaddress()["address"]
addr_4 = lianad.rpc.getnewaddress()["address"]
addr_5 = lianad.rpc.getnewaddress()["address"]
addr_6 = lianad.rpc.getnewaddress()["address"]
addr_7 = lianad.rpc.getnewaddress()["address"]
# Last revealed receive index is 7.
assert lianad.rpc.getinfo()["receive_index"] == 7
# Set some labels
lianad.rpc.updatelabels(
{addr_1: "my test label 1", addr_5: "my test label 5"},
)
# Passing None or omitting start_index parameter is the same:
assert lianad.rpc.listrevealedaddresses(
False, False, 10
) == lianad.rpc.listrevealedaddresses(False, False, 10, None)
# If we continue_from a value above our last revealed index, we'll start from the last index.
assert lianad.rpc.listrevealedaddresses(
False, False, 10
) == lianad.rpc.listrevealedaddresses(False, False, 10, 100)
# Similarly if we start from a hardened index:
assert lianad.rpc.listrevealedaddresses(
False, False, 10
) == lianad.rpc.listrevealedaddresses(False, False, 10, 4_294_967_295)
# Get 3 addresses starting at last revealed index:
list_rec = lianad.rpc.listrevealedaddresses(False, False, 3)
assert list_rec["continue_from"] == 4
assert len(list_rec["addresses"]) == 3
assert list_rec["addresses"][0]["index"] == 7
assert list_rec["addresses"][0]["address"] == addr_7
assert list_rec["addresses"][0]["used_count"] == 0
assert list_rec["addresses"][0]["label"] is None
assert list_rec["addresses"][1]["index"] == 6
assert list_rec["addresses"][1]["address"] == addr_6
assert list_rec["addresses"][1]["used_count"] == 0
assert list_rec["addresses"][1]["label"] is None
assert list_rec["addresses"][2]["index"] == 5
assert list_rec["addresses"][2]["address"] == addr_5
assert list_rec["addresses"][2]["used_count"] == 0
assert list_rec["addresses"][2]["label"] == "my test label 5"
# Get next 3 using continue_from returned above as start_index:
list_rec = lianad.rpc.listrevealedaddresses(False, False, 3, 4)
assert list_rec["continue_from"] == 1
assert len(list_rec["addresses"]) == 3
assert list_rec["addresses"][0]["index"] == 4
assert list_rec["addresses"][0]["address"] == addr_4
assert list_rec["addresses"][0]["used_count"] == 0
assert list_rec["addresses"][0]["label"] is None
assert list_rec["addresses"][1]["index"] == 3
assert list_rec["addresses"][1]["address"] == addr_3
assert list_rec["addresses"][1]["used_count"] == 0
assert list_rec["addresses"][1]["label"] is None
assert list_rec["addresses"][2]["index"] == 2
assert list_rec["addresses"][2]["address"] == addr_2
assert list_rec["addresses"][2]["used_count"] == 0
assert list_rec["addresses"][2]["label"] is None
# Get final page of results consisting of 2 addresses:
list_rec = lianad.rpc.listrevealedaddresses(False, False, 3, 1)
assert list_rec["continue_from"] is None # final page
assert len(list_rec["addresses"]) == 2 # num addresses remaining is below limit
assert list_rec["addresses"][0]["index"] == 1
assert list_rec["addresses"][0]["address"] == addr_1
assert list_rec["addresses"][0]["used_count"] == 0
assert list_rec["addresses"][0]["label"] == "my test label 1"
assert list_rec["addresses"][1]["index"] == 0
assert list_rec["addresses"][1]["address"] == addresses[0]["receive"]
assert list_rec["addresses"][1]["used_count"] == 0
assert list_rec["addresses"][1]["label"] is None
# Receive funds at a couple of addresses.
destinations = {
addr_2: 0.003,
addr_4: 0.004,
addr_7: 0.005,
}
txid = bitcoind.rpc.sendmany("", destinations)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 3)
# The addresses are shown as used.
list_rec = lianad.rpc.listrevealedaddresses(False, False, 3)
assert list_rec["continue_from"] == 4
assert len(list_rec["addresses"]) == 3
assert list_rec["addresses"][0]["index"] == 7
assert list_rec["addresses"][0]["address"] == addr_7
assert list_rec["addresses"][0]["used_count"] == 1
assert list_rec["addresses"][0]["label"] is None
assert list_rec["addresses"][1]["index"] == 6
assert list_rec["addresses"][1]["address"] == addr_6
assert list_rec["addresses"][1]["used_count"] == 0
assert list_rec["addresses"][1]["label"] is None
assert list_rec["addresses"][2]["index"] == 5
assert list_rec["addresses"][2]["address"] == addr_5
assert list_rec["addresses"][2]["used_count"] == 0
assert list_rec["addresses"][2]["label"] == "my test label 5"
list_rec = lianad.rpc.listrevealedaddresses(False, False, 3, 4)
assert list_rec["continue_from"] == 1
assert len(list_rec["addresses"]) == 3
assert list_rec["addresses"][0]["index"] == 4
assert list_rec["addresses"][0]["address"] == addr_4
assert list_rec["addresses"][0]["used_count"] == 1
assert list_rec["addresses"][0]["label"] is None
assert list_rec["addresses"][1]["index"] == 3
assert list_rec["addresses"][1]["address"] == addr_3
assert list_rec["addresses"][1]["used_count"] == 0
assert list_rec["addresses"][1]["label"] is None
assert list_rec["addresses"][2]["index"] == 2
assert list_rec["addresses"][2]["address"] == addr_2
assert list_rec["addresses"][2]["used_count"] == 1
assert list_rec["addresses"][2]["label"] is None
# We can exclude used addresses:
list_rec = lianad.rpc.listrevealedaddresses(False, True, 3)
assert list_rec["continue_from"] == 2
assert len(list_rec["addresses"]) == 3
assert list_rec["addresses"][0]["index"] == 6
assert list_rec["addresses"][0]["address"] == addr_6
assert list_rec["addresses"][0]["used_count"] == 0
assert list_rec["addresses"][0]["label"] is None
assert list_rec["addresses"][1]["index"] == 5
assert list_rec["addresses"][1]["address"] == addr_5
assert list_rec["addresses"][1]["used_count"] == 0
assert list_rec["addresses"][1]["label"] == "my test label 5"
assert list_rec["addresses"][2]["index"] == 3 # index 4 was skipped
assert list_rec["addresses"][2]["address"] == addr_3
assert list_rec["addresses"][2]["used_count"] == 0
assert list_rec["addresses"][2]["label"] is None
# We can exclude used also if we continue from the value in the response above:
list_rec = lianad.rpc.listrevealedaddresses(False, True, 3, 2)
assert list_rec["continue_from"] is None
assert len(list_rec["addresses"]) == 2
assert list_rec["addresses"][0]["index"] == 1 # index 2 was skipped
assert list_rec["addresses"][0]["address"] == addr_1
assert list_rec["addresses"][0]["used_count"] == 0
assert list_rec["addresses"][0]["label"] == "my test label 1"
assert list_rec["addresses"][1]["index"] == 0
assert list_rec["addresses"][1]["address"] == addresses[0]["receive"]
assert list_rec["addresses"][1]["used_count"] == 0
assert list_rec["addresses"][1]["label"] is None
# Receive funds at some of the same addresses again.
destinations = {
addr_2: 0.0031,
addr_4: 0.0041,
}
txid = bitcoind.rpc.sendmany("", destinations)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 5)
# One more coin to addr_2.
destinations = {
addr_2: 0.0032,
}
txid = bitcoind.rpc.sendmany("", destinations)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 6)
# The counts have updated:
list_rec = lianad.rpc.listrevealedaddresses(False, False, 3, 4)
assert list_rec["continue_from"] == 1
assert len(list_rec["addresses"]) == 3
assert list_rec["addresses"][0]["index"] == 4
assert list_rec["addresses"][0]["address"] == addr_4
assert list_rec["addresses"][0]["used_count"] == 2
assert list_rec["addresses"][0]["label"] is None
assert list_rec["addresses"][1]["index"] == 3
assert list_rec["addresses"][1]["address"] == addr_3
assert list_rec["addresses"][1]["used_count"] == 0
assert list_rec["addresses"][1]["label"] is None
assert list_rec["addresses"][2]["index"] == 2
assert list_rec["addresses"][2]["address"] == addr_2
assert list_rec["addresses"][2]["used_count"] == 3
assert list_rec["addresses"][2]["label"] is None
# If we request limit 0, we get empty list:
list_rec = lianad.rpc.listrevealedaddresses(False, False, 0)
assert list_rec["continue_from"] == 7 # same as starting index
assert len(list_rec["addresses"]) == 0
# The change index has index 0 as "revealed".
assert lianad.rpc.getinfo()["receive_index"] == 7
assert lianad.rpc.getinfo()["change_index"] == 0
# We can get the single revealed change address:
list_cha = lianad.rpc.listrevealedaddresses(True, False, 3)
assert list_cha["continue_from"] is None
assert len(list_cha["addresses"]) == 1
assert list_cha["addresses"][0]["index"] == 0
assert list_cha["addresses"][0]["address"] == addresses[0]["change"]
assert list_cha["addresses"][0]["used_count"] == 0
assert list_cha["addresses"][0]["label"] is None
def test_listcoins(lianad, bitcoind):
# Initially empty
res = lianad.rpc.listcoins()
assert "coins" in res
assert len(res["coins"]) == 0
# If we send a coin, we'll get a new entry. Note we monitor for unconfirmed
# funds as well.
addr_a = lianad.rpc.getnewaddress()
txid_a = bitcoind.rpc.sendtoaddress(addr_a["address"], 1)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
res = lianad.rpc.listcoins()["coins"]
outpoint_a = res[0]["outpoint"]
assert txid_a == outpoint_a[:64]
assert res[0]["amount"] == 1 * COIN
assert res[0]["derivation_index"] == addr_a["derivation_index"]
assert res[0]["is_change"] == False
assert res[0]["block_height"] is None
assert res[0]["spend_info"] is None
assert res[0]["is_from_self"] is False
assert len(lianad.rpc.listcoins(["confirmed", "spent", "spending"])["coins"]) == 0
assert (
lianad.rpc.listcoins()
== lianad.rpc.listcoins([], [outpoint_a])
== lianad.rpc.listcoins(["unconfirmed"])
== lianad.rpc.listcoins(["unconfirmed"], [outpoint_a])
== lianad.rpc.listcoins(["unconfirmed", "confirmed"])
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"])
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"], [outpoint_a])
)
# If the coin gets confirmed, it'll be marked as such.
bitcoind.generate_block(1, wait_for_mempool=txid_a)
block_height = bitcoind.rpc.getblockcount()
wait_for(lambda: lianad.rpc.listcoins()["coins"][0]["block_height"] == block_height)
assert (
len(lianad.rpc.listcoins())
== len(lianad.rpc.listcoins(["confirmed"])["coins"])
== 1
)
assert (
lianad.rpc.listcoins()
== lianad.rpc.listcoins([], [outpoint_a])
== lianad.rpc.listcoins(["confirmed"])
== lianad.rpc.listcoins(["confirmed"], [outpoint_a])
== lianad.rpc.listcoins(["unconfirmed", "confirmed"])
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"])
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"], [outpoint_a])
)
assert lianad.rpc.listcoins()["coins"][0]["is_from_self"] is False
# Same if the coin gets spent.
spend_tx = spend_coins(lianad, bitcoind, (res[0],))
spend_txid = get_txid(spend_tx)
wait_for(lambda: lianad.rpc.listcoins()["coins"][0]["spend_info"] is not None)
spend_info = lianad.rpc.listcoins()["coins"][0]["spend_info"]
assert spend_info["txid"] == spend_txid
assert spend_info["height"] is None
assert len(lianad.rpc.listcoins(["spent"])["coins"]) == 0
assert len(lianad.rpc.listcoins(["spending"])["coins"]) == 1
assert lianad.rpc.listcoins(["spending"])["coins"][0]["is_from_self"] is False
# And if this spending tx gets confirmed.
bitcoind.generate_block(1, wait_for_mempool=spend_txid)
curr_height = bitcoind.rpc.getblockcount()
wait_for(lambda: lianad.rpc.getinfo()["block_height"] == curr_height)
spend_info = lianad.rpc.listcoins()["coins"][0]["spend_info"]
assert spend_info["txid"] == spend_txid
assert spend_info["height"] == curr_height
assert len(lianad.rpc.listcoins(["unconfirmed", "confirmed"])["coins"]) == 0
assert (
lianad.rpc.listcoins()
== lianad.rpc.listcoins(["spent"])
== lianad.rpc.listcoins(["spent", "unconfirmed", "confirmed"])
)
assert len(lianad.rpc.listcoins()["coins"]) == 1
assert lianad.rpc.listcoins()["coins"][0]["is_from_self"] is False
# Add a second coin.
addr_b = lianad.rpc.getnewaddress()["address"]
txid_b = bitcoind.rpc.sendtoaddress(addr_b, 2)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 2)
res = lianad.rpc.listcoins(["unconfirmed"], [])["coins"]
outpoint_b = res[0]["outpoint"]
assert res[0]["is_from_self"] is False
# We have one unconfirmed coin and one spent coin.
assert (
len(lianad.rpc.listcoins()["coins"])
== len(lianad.rpc.listcoins([], [outpoint_a, outpoint_b])["coins"])
== len(lianad.rpc.listcoins(["unconfirmed", "spent"])["coins"])
== len(
lianad.rpc.listcoins(["unconfirmed", "spent"], [outpoint_a, outpoint_b])[
"coins"
]
)
== 2
)
assert (
lianad.rpc.listcoins([], [outpoint_b])
== lianad.rpc.listcoins(["unconfirmed"])
== lianad.rpc.listcoins(["unconfirmed"], [outpoint_b])
== lianad.rpc.listcoins(["unconfirmed", "confirmed"])
== lianad.rpc.listcoins(["spending", "unconfirmed", "confirmed"])
== lianad.rpc.listcoins(["spending", "unconfirmed", "confirmed"], [outpoint_b])
)
assert lianad.rpc.listcoins([], [outpoint_b])["coins"][0]["is_from_self"] is False
# Now confirm the second coin.
bitcoind.generate_block(1, wait_for_mempool=txid_b)
block_height = bitcoind.rpc.getblockcount()
wait_for(
lambda: lianad.rpc.listcoins([], [outpoint_b])["coins"][0]["block_height"]
== block_height
)
assert lianad.rpc.listcoins([], [outpoint_b])["coins"][0]["is_from_self"] is False
# We have one confirmed coin and one spent coin.
assert (
len(lianad.rpc.listcoins()["coins"])
== len(lianad.rpc.listcoins([], [outpoint_a, outpoint_b])["coins"])
== len(lianad.rpc.listcoins(["confirmed", "spent"])["coins"])
== len(
lianad.rpc.listcoins(["confirmed", "spent"], [outpoint_a, outpoint_b])[
"coins"
]
)
== 2
)
assert (
lianad.rpc.listcoins([], [outpoint_b])
== lianad.rpc.listcoins(["confirmed"])
== lianad.rpc.listcoins(["confirmed"], [outpoint_b])
== lianad.rpc.listcoins(["unconfirmed", "confirmed"])
== lianad.rpc.listcoins(["unconfirmed", "confirmed", "spending"])
== lianad.rpc.listcoins(["unconfirmed", "confirmed", "spending"], [outpoint_b])
)
# Add a third coin.
addr_c = lianad.rpc.getnewaddress()["address"]
txid_c = bitcoind.rpc.sendtoaddress(addr_c, 3)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 3)
res = lianad.rpc.listcoins(["unconfirmed"], [])["coins"]
outpoint_c = res[0]["outpoint"]
# We have three different statuses: unconfirmed, confirmed and spent.
assert (
len(lianad.rpc.listcoins()["coins"])
== len(lianad.rpc.listcoins([], [outpoint_a, outpoint_b, outpoint_c])["coins"])
== len(lianad.rpc.listcoins(["unconfirmed", "confirmed", "spent"])["coins"])
== len(
lianad.rpc.listcoins(
["unconfirmed", "confirmed", "spent"],
[outpoint_a, outpoint_b, outpoint_c],
)["coins"]
)
== 3
)
assert (
lianad.rpc.listcoins([], [outpoint_c])
== lianad.rpc.listcoins(["unconfirmed"])
== lianad.rpc.listcoins(["unconfirmed"], [outpoint_c])
== lianad.rpc.listcoins(["unconfirmed", "spending"])
== lianad.rpc.listcoins(["spending", "unconfirmed"])
== lianad.rpc.listcoins(["spending", "unconfirmed", "confirmed"], [outpoint_c])
)
# Spend third coin, even though it is still unconfirmed.
spend_tx = spend_coins(lianad, bitcoind, (res[0],))
spend_txid = get_txid(spend_tx)
wait_for(
lambda: lianad.rpc.listcoins([], [outpoint_c])["coins"][0]["spend_info"]
is not None
)
assert len(lianad.rpc.listcoins(["unconfirmed"])["coins"]) == 0
assert (
len(lianad.rpc.listcoins()["coins"])
== len(lianad.rpc.listcoins([], [outpoint_a, outpoint_b, outpoint_c])["coins"])
== len(lianad.rpc.listcoins(["confirmed", "spending", "spent"])["coins"])
== len(
lianad.rpc.listcoins(
["confirmed", "spending", "spent"], [outpoint_a, outpoint_b, outpoint_c]
)["coins"]
)
== 3
)
# The unconfirmed coin now has spending status.
assert (
lianad.rpc.listcoins([], [outpoint_c])
== lianad.rpc.listcoins(["spending"])
== lianad.rpc.listcoins(["spending"], [outpoint_c])
== lianad.rpc.listcoins(["spending", "unconfirmed"])
== lianad.rpc.listcoins(["spending", "unconfirmed"], [outpoint_c])
)
# Add a fourth coin.
addr_d = lianad.rpc.getnewaddress()["address"]
txid_d = bitcoind.rpc.sendtoaddress(addr_d, 4)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 4)
res = lianad.rpc.listcoins(["unconfirmed"], [])["coins"]
outpoint_d = res[0]["outpoint"]
# We now have all four statuses.
assert (
len(lianad.rpc.listcoins(["unconfirmed"])["coins"])
== len(lianad.rpc.listcoins(["confirmed"])["coins"])
== len(lianad.rpc.listcoins(["spending"])["coins"])
== len(lianad.rpc.listcoins(["spent"])["coins"])
== 1
)
assert (
len(lianad.rpc.listcoins()["coins"])
== len(
lianad.rpc.listcoins([], [outpoint_a, outpoint_b, outpoint_c, outpoint_d])[
"coins"
]
)
== len(
lianad.rpc.listcoins(["unconfirmed", "confirmed", "spending", "spent"])[
"coins"
]
)
== len(
lianad.rpc.listcoins(
["unconfirmed", "confirmed", "spending", "spent"],
[outpoint_a, outpoint_b, outpoint_c, outpoint_d],
)["coins"]
)
== 4
)
# We can filter for specific statuses/outpoints.
assert (
sorted(
lianad.rpc.listcoins(["spending", "spent"])["coins"],
key=lambda c: c["outpoint"],
)
== sorted(
lianad.rpc.listcoins(["spending", "spent"], [outpoint_a, outpoint_c])[
"coins"
],
key=lambda c: c["outpoint"],
)
== sorted(
lianad.rpc.listcoins(
["unconfirmed", "confirmed", "spending", "spent"],
[outpoint_a, outpoint_c],
)["coins"],
key=lambda c: c["outpoint"],
)
== sorted(
lianad.rpc.listcoins(
["spending", "spent"], [outpoint_a, outpoint_b, outpoint_c, outpoint_d]
)["coins"],
key=lambda c: c["outpoint"],
)
)
# Finally, check that we return errors for invalid parameter values.
for statuses, outpoints in [
(["fake_status"], []),
(["spent", "fake_status"], []),
(["fake_status", "fake_status_2"], []),
(["confirmed", "spending", "fake_status"], ["fake_outpoint"]),
(["fake_status"], [outpoint_a, outpoint_b]),
]:
with pytest.raises(
RpcError,
match=re.escape(
"Invalid params: Invalid value \"fake_status\" in \\'statuses\\' parameter."
),
):
lianad.rpc.listcoins(statuses, outpoints)
for statuses, outpoints in [
([], ["fake_outpoint"]),
([], [outpoint_a, "fake_outpoint", outpoint_b]),
([], [outpoint_a, "fake_outpoint", "fake_outpoint_2"]),
([], [outpoint_a, outpoint_b, "fake_outpoint"]),
]:
with pytest.raises(
RpcError,
match=re.escape(
"Invalid params: Invalid value \"fake_outpoint\" in \\'outpoints\\' parameter."
),
):
lianad.rpc.listcoins(statuses, outpoints)
def test_jsonrpc_server(lianad, bitcoind):
"""Test passing parameters as a list or a mapping."""
addr = lianad.rpc.getnewaddress()["address"]
bitcoind.rpc.sendtoaddress(addr, 1)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
outpoints = [lianad.rpc.listcoins()["coins"][0]["outpoint"]]
destinations = {
bitcoind.rpc.getnewaddress(): 20_000,
}
res = lianad.rpc.createspend(destinations, outpoints, 18)
assert "psbt" in res
res = lianad.rpc.createspend(
outpoints=outpoints, destinations=destinations, feerate=18
)
assert "psbt" in res
def test_create_spend(lianad, bitcoind):
# Receive a number of coins in different blocks on different addresses, and
# one more on the same address.
for _ in range(15):
addr = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(addr, 0.01)
bitcoind.generate_block(1, wait_for_mempool=txid)
txid = bitcoind.rpc.sendtoaddress(addr, 0.3556)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 16)
# Stop the daemon, should be a no-op
lianad.stop()
lianad.start()
# Now create a transaction spending all those coins to a few addresses
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): 200_000,
bitcoind.rpc.getnewaddress(): 400_000,
bitcoind.rpc.getnewaddress(): 1_000_000,
}
res = lianad.rpc.createspend(destinations, outpoints, 18)
assert "psbt" in res
# The transaction must contain a change output.
spend_psbt = PSBT.from_base64(res["psbt"])
assert len(spend_psbt.o) == 4
assert len(spend_psbt.tx.vout) == 4
# 15 new receive addresses have been generated (starting at index 1),
# so last used value is 15:
assert lianad.rpc.getinfo()["receive_index"] == 15
# `createspend` will use the next index for change and update the DB value accordingly:
assert lianad.rpc.getinfo()["change_index"] == 1
# The transaction must contain the spent transaction for each input for P2WSH. But not for Taproot.
# We don't make assumptions about the ordering of PSBT inputs.
if USE_TAPROOT:
assert all(
PSBT_IN_NON_WITNESS_UTXO not in psbt_in.map for psbt_in in spend_psbt.i
)
else:
assert sorted(
[psbt_in.map[PSBT_IN_NON_WITNESS_UTXO] for psbt_in in spend_psbt.i]
) == sorted(
[
bytes.fromhex(bitcoind.rpc.gettransaction(op[:64])["hex"])
for op in outpoints
]
)
# We can sign it and broadcast it.
sign_and_broadcast(lianad, bitcoind, PSBT.from_base64(res["psbt"]))
# Try creating a transaction that spends an immature coinbase deposit.
addr = lianad.rpc.getnewaddress()["address"]
bitcoind.rpc.generatetoaddress(1, addr)
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
imma_coin = next(c for c in lianad.rpc.listcoins()["coins"] if c["is_immature"])
with pytest.raises(RpcError, match=".*is from an immature coinbase transaction."):
lianad.rpc.createspend(destinations, [imma_coin["outpoint"]], 1)
def test_list_spend(lianad, bitcoind):
# Start by creating two conflicting Spend PSBTs. The first one will have a change
# output but not the second one.
addr = lianad.rpc.getnewaddress()["address"]
value_a = 0.2567
bitcoind.rpc.sendtoaddress(addr, value_a)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): int(value_a * COIN // 2),
}
res = lianad.rpc.createspend(destinations, outpoints, 6)
assert "psbt" in res
addr = lianad.rpc.getnewaddress()["address"]
value_b = 0.0987
bitcoind.rpc.sendtoaddress(addr, value_b)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 2)
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): int((value_a + value_b) * COIN - 1_000),
}
res_b = lianad.rpc.createspend(destinations, outpoints, 2)
assert "psbt" in res_b
# Store them both in DB.
time_before_update = int(time.time())
assert len(lianad.rpc.listspendtxs()["spend_txs"]) == 0
lianad.rpc.updatespend(res["psbt"])
lianad.rpc.updatespend(res_b["psbt"])
# Check 'txids' parameter
list_res = lianad.rpc.listspendtxs()["spend_txs"]
txid = PSBT.from_base64(list_res[0]["psbt"]).tx.txid().hex()
filtered_res = lianad.rpc.listspendtxs(txids=[txid])
assert filtered_res["spend_txs"][0]["psbt"] == list_res[0]["psbt"]
assert len(filtered_res) == 1
with pytest.raises(
RpcError, match="Filter list is empty, should supply None instead."
):
lianad.rpc.listspendtxs(txids=[])
with pytest.raises(RpcError, match="Invalid params: Invalid 'txids' parameter."):
lianad.rpc.listspendtxs(txids=[txid, 123])
with pytest.raises(RpcError, match="Invalid params: Invalid 'txids' parameter."):
lianad.rpc.listspendtxs(txids=[0])
with pytest.raises(RpcError, match="Invalid params: Invalid 'txids' parameter."):
lianad.rpc.listspendtxs(txids=[123])
with pytest.raises(RpcError, match="Invalid params: Invalid 'txids' parameter."):
lianad.rpc.listspendtxs(txids=["abc"])
with pytest.raises(RpcError, match="Invalid params: Invalid 'txids' parameter."):
lianad.rpc.listspendtxs(txids=["123"])
# Listing all Spend transactions will list them both. It'll tell us which one has
# change and which one doesn't.
list_res = lianad.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 2
first_psbt = next(entry for entry in list_res if entry["psbt"] == res["psbt"])
assert time_before_update <= first_psbt["updated_at"] <= int(time.time())
second_psbt = next(entry for entry in list_res if entry["psbt"] == res_b["psbt"])
assert time_before_update <= second_psbt["updated_at"] <= int(time.time())
# If we delete the first one, we'll get only the second one.
first_psbt = PSBT.from_base64(res["psbt"])
lianad.rpc.delspendtx(first_psbt.tx.txid().hex())
list_res = lianad.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 1
assert list_res[0]["psbt"] == res_b["psbt"]
# If we delete the second one, result will be empty.
second_psbt = PSBT.from_base64(res_b["psbt"])
lianad.rpc.delspendtx(second_psbt.tx.txid().hex())
list_res = lianad.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 0
def test_update_spend(lianad, bitcoind):
# Start by creating a Spend PSBT
addr = lianad.rpc.getnewaddress()["address"]
bitcoind.rpc.sendtoaddress(addr, 0.2567)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) > 0)
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): 200_000,
}
res = lianad.rpc.createspend(destinations, outpoints, 6)
assert "psbt" in res
# Now update it
assert len(lianad.rpc.listspendtxs()["spend_txs"]) == 0
lianad.rpc.updatespend(res["psbt"])
list_res = lianad.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 1
assert list_res[0]["psbt"] == res["psbt"]
# We can add a signature and update it
psbt_sig_a = PSBT.from_base64(res["psbt"])
dummy_pk_a = bytes.fromhex(
"0375e00eb72e29da82b89367947f29ef34afb75e8654f6ea368e0acdfd92976b7c"
)
dummy_sig_a = bytes.fromhex(
"304402202b925395cfeaa0171a7a92982bb4891acc4a312cbe7691d8375d36796d5b570a0220378a8ab42832848e15d1aedded5fb360fedbdd6c39226144e527f0f1e19d539801"
)
psbt_sig_a.i[0].map[PSBT_IN_PARTIAL_SIG] = {dummy_pk_a: dummy_sig_a}
psbt_sig_a_ser = psbt_sig_a.to_base64()
lianad.rpc.updatespend(psbt_sig_a_ser)
# We'll get it when querying
list_res = lianad.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 1
assert list_res[0]["psbt"] == psbt_sig_a_ser
# We can add another signature to the empty PSBT and update it again
psbt_sig_b = PSBT.from_base64(res["psbt"])
dummy_pk_b = bytes.fromhex(
"03a1b26313f430c4b15bb1fdce663207659d8cac749a0e53d70eff01874496feff"
)
dummy_sig_b = bytes.fromhex(
"3044022005aebcd649fb8965f0591710fb3704931c3e8118ee60dd44917479f63ceba6d4022018b212900e5a80e9452366894de37f0d02fb9c89f1e94f34fb6ed7fd71c15c4101"
)
psbt_sig_b.i[0].map[PSBT_IN_PARTIAL_SIG] = {dummy_pk_b: dummy_sig_b}
psbt_sig_b_ser = psbt_sig_b.to_base64()
lianad.rpc.updatespend(psbt_sig_b_ser)
# It will have merged both.
list_res = lianad.rpc.listspendtxs()["spend_txs"]
assert len(list_res) == 1
psbt_merged = PSBT.from_base64(list_res[0]["psbt"])
assert len(psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG]) == 2
assert psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG][dummy_pk_a] == dummy_sig_a
assert psbt_merged.i[0].map[PSBT_IN_PARTIAL_SIG][dummy_pk_b] == dummy_sig_b
def test_broadcast_spend(lianad, bitcoind):
# Create a new coin and a spending tx for it.
addr = lianad.rpc.getnewaddress()["address"]
bitcoind.rpc.sendtoaddress(addr, 0.2567)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) > 0)
outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
destinations = {
bitcoind.rpc.getnewaddress(): 200_000,
}
res = lianad.rpc.createspend(destinations, outpoints, 6)
psbt = PSBT.from_base64(res["psbt"])
txid = psbt.tx.txid().hex()
# We can't broadcast an unknown Spend
with pytest.raises(RpcError, match="Unknown spend transaction.*"):
lianad.rpc.broadcastspend(txid)
lianad.rpc.updatespend(res["psbt"])
# We can't broadcast an unsigned transaction
with pytest.raises(RpcError, match="Failed to finalize the spend transaction.*"):
lianad.rpc.broadcastspend(txid)
signed_psbt = lianad.signer.sign_psbt(PSBT.from_base64(res["psbt"]))
lianad.rpc.updatespend(signed_psbt.to_base64())
# Now we've signed and stored it, the daemon will take care of finalizing
# the PSBT before broadcasting the transaction.
lianad.rpc.broadcastspend(txid)
# Use a descriptor that includes hardened derivation paths so that we can check
# there is no problem regarding the use of `h` and `'`.
def test_start_rescan_does_not_error(lianad_with_deriv_paths, bitcoind):
"""Test we can successfully start a rescan."""
tip_timestamp = bitcoind.rpc.getblockheader(bitcoind.rpc.getbestblockhash())["time"]
lianad_with_deriv_paths.rpc.startrescan(tip_timestamp - 1)
def test_start_rescan(lianad, bitcoind):
"""Test we successfully retrieve all our transactions after losing state by rescanning."""
initial_timestamp = int(time.time())
first_address = lianad.rpc.getnewaddress()
second_address = lianad.rpc.getnewaddress()
# Some utility functions to DRY
list_coins = lambda: lianad.rpc.listcoins()["coins"]
unspent_coins = lambda: (
c for c in lianad.rpc.listcoins()["coins"] if c["spend_info"] is None
)
sorted_coins = lambda: sorted(list_coins(), key=lambda c: c["outpoint"])
def all_spent(coins):
unspent = set(c["outpoint"] for c in unspent_coins())
for c in coins:
if c["outpoint"] in unspent:
return False
return True
# We can rescan from one second before the tip timestamp, that's almost a no-op.
tip_timestamp = bitcoind.rpc.getblockheader(bitcoind.rpc.getbestblockhash())["time"]
lianad.rpc.startrescan(tip_timestamp - 1)
wait_for(lambda: lianad.rpc.getinfo()["rescan_progress"] is None)
# We can't rescan from an insane timestamp though.
with pytest.raises(RpcError, match="Insane timestamp.*"):
lianad.rpc.startrescan(tip_timestamp)
assert lianad.rpc.getinfo()["rescan_progress"] is None
future_timestamp = tip_timestamp + 60 * 60
with pytest.raises(RpcError, match="Insane timestamp.*"):
lianad.rpc.startrescan(future_timestamp)
assert lianad.rpc.getinfo()["rescan_progress"] is None
block_hash = bitcoind.rpc.getblockhash(0)
genesis_timestamp = bitcoind.rpc.getblock(block_hash)["time"]
prebitcoin_timestamp = genesis_timestamp - 1
with pytest.raises(RpcError, match="Insane timestamp."):
lianad.rpc.startrescan(prebitcoin_timestamp)
assert lianad.rpc.getinfo()["rescan_progress"] is None
# we can rescan from genesis block
lianad.rpc.startrescan(genesis_timestamp)
wait_for(lambda: lianad.rpc.getinfo()["rescan_progress"] is None)
# First, get some coins
for _ in range(10):
addr = lianad.rpc.getnewaddress()["address"]
amount = random.randint(1, COIN * 10) / COIN
txid = bitcoind.rpc.sendtoaddress(addr, amount)
bitcoind.generate_block(random.randint(1, 10), wait_for_mempool=txid)
wait_for(lambda: len(list_coins()) == 10)
# Then simulate some regular activity (spend and receive)
# TODO: instead of having randomness we should lay down all different cases (with or
# without change, single or multiple inputs, sending externally or to self).
for _ in range(5):
addr = lianad.rpc.getnewaddress()["address"]
amount = random.randint(1, COIN * 10) / COIN
txid = bitcoind.rpc.sendtoaddress(addr, amount)
avail = list(unspent_coins())
to_spend = random.sample(avail, random.randint(1, len(avail)))
spend_coins(lianad, bitcoind, to_spend)
bitcoind.generate_block(random.randint(1, 5), wait_for_mempool=2)
wait_for(lambda: all_spent(to_spend))
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
# Receiving addresses are derived at much higher indexes now.
assert lianad.rpc.getnewaddress() not in (first_address, second_address)
# Move time forward one day as bitcoind will rescan the last 2 hours of block upon
# importing a descriptor.
now = int(time.time())
added_time = 60 * 60 * 24
bitcoind.rpc.setmocktime(now + added_time)
bitcoind.generate_block(10)
# Now delete the wallet state. When starting up we'll re-create a fresh database
# and watchonly wallet. Those won't be aware of past coins for the configured
# descriptor.
coins_before = sorted_coins()
lianad.restart_fresh(bitcoind)
if BITCOIN_BACKEND_TYPE is BitcoinBackendType.Bitcoind:
assert len(list_coins()) == 0
# The wallet isn't aware what derivation indexes were used. Necessarily it'll start
# from 0.
if BITCOIN_BACKEND_TYPE is BitcoinBackendType.Bitcoind:
assert lianad.rpc.getnewaddress() == first_address
# Once the rescan is done, we must have detected all previous transactions.
lianad.rpc.startrescan(initial_timestamp)
rescan_progress = lianad.rpc.getinfo()["rescan_progress"]
assert rescan_progress is None or 0 <= rescan_progress <= 1
wait_for(lambda: lianad.rpc.getinfo()["rescan_progress"] is None)
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
assert coins_before == sorted_coins()
# Now that it caught up it noticed which one were used onchain, so it won't reuse
# this derivation indexes anymore.
assert lianad.rpc.getnewaddress() not in (first_address, second_address)
def test_listtransactions(lianad, bitcoind):
"""Test listing of transactions by txid and timespan"""
def wait_synced():
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
best_block = bitcoind.rpc.getbestblockhash()
initial_timestamp = bitcoind.rpc.getblockheader(best_block)["time"]
wait_synced()
# Deposit multiple coins in a single transaction
destinations = {
lianad.rpc.getnewaddress()["address"]: 0.0123456,
lianad.rpc.getnewaddress()["address"]: 0.0123457,
lianad.rpc.getnewaddress()["address"]: 0.0123458,
}
txid = bitcoind.rpc.sendmany("", destinations)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 3)
bitcoind.generate_block(1, wait_for_mempool=txid)
# Mine 12 blocks to force the blocktime to increase
bitcoind.generate_block(12)
wait_synced()
best_block = bitcoind.rpc.getbestblockhash()
second_timestamp = bitcoind.rpc.getblockheader(best_block)["time"]
assert second_timestamp > initial_timestamp
# Deposit a coin that will be unspent
addr = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(addr, 0.123456)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 4)
bitcoind.generate_block(1, wait_for_mempool=txid)
# Deposit a coin that will be spent with a change output
addr = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(addr, 0.23456)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 5)
bitcoind.generate_block(1, wait_for_mempool=txid)
outpoint = next(
c["outpoint"] for c in lianad.rpc.listcoins()["coins"] if txid in c["outpoint"]
)
destinations = {
bitcoind.rpc.getnewaddress(): 100_000,
}
res = lianad.rpc.createspend(destinations, [outpoint], 6)
psbt = PSBT.from_base64(res["psbt"])
txid = sign_and_broadcast_psbt(lianad, psbt)
bitcoind.generate_block(1, wait_for_mempool=txid)
# Mine 12 blocks to force the blocktime to increase
bitcoind.generate_block(12)
wait_synced()
best_block = bitcoind.rpc.getbestblockhash()
third_timestamp = bitcoind.rpc.getblockheader(best_block)["time"]
assert third_timestamp > second_timestamp
bitcoind.generate_block(12)
wait_synced()
# Deposit a coin that will be spent with a change output and also two new deposits
addr = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(addr, 0.3456)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 7)
bitcoind.generate_block(1, wait_for_mempool=txid)
outpoint = next(
c["outpoint"] for c in lianad.rpc.listcoins()["coins"] if txid in c["outpoint"]
)
destinations = {
bitcoind.rpc.getnewaddress(): 11_000,
addr: 12_000, # Even with address reuse! Booooh
lianad.rpc.getnewaddress()["address"]: 13_000,
}
res = lianad.rpc.createspend(destinations, [outpoint], 6)
psbt = PSBT.from_base64(res["psbt"])
txid = sign_and_broadcast_psbt(lianad, psbt)
bitcoind.generate_block(1, wait_for_mempool=txid)
# Deposit a coin that will be spending (unconfirmed spend transaction)
addr = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(addr, 0.456)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 11)
bitcoind.generate_block(1, wait_for_mempool=txid)
outpoint = next(
c["outpoint"] for c in lianad.rpc.listcoins()["coins"] if txid in c["outpoint"]
)
destinations = {
bitcoind.rpc.getnewaddress(): 11_000,
}
res = lianad.rpc.createspend(destinations, [outpoint], 6)
psbt = PSBT.from_base64(res["psbt"])
txid = sign_and_broadcast_psbt(lianad, psbt)
# At this point we have 12 spent and unspent coins, one of them is unconfirmed.
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 12)
# However some of them share the same txid! This is the case of the 3 first coins
# for instance, or the Spend transactions with multiple outputs at one of our addresses.
# In total, that's 8 transactions.
txids = set(c["outpoint"][:-2] for c in lianad.rpc.listcoins()["coins"])
assert len(txids) == 8
# We can query all of them at once using listtransactions. The result contains all
# the correct transactions as hex, with no duplicate.
all_txs = lianad.rpc.listtransactions(list(txids))["transactions"]
assert len(all_txs) == 8
for tx in all_txs:
txid = bitcoind.rpc.decoderawtransaction(tx["tx"])["txid"]
txids.remove(txid) # This will raise an error if it isn't there
# We can also query them one by one.
txids = set(c["outpoint"][:-2] for c in lianad.rpc.listcoins()["coins"])
for txid in txids:
txs = lianad.rpc.listtransactions([txid])["transactions"]
bit_txid = bitcoind.rpc.decoderawtransaction(txs[0]["tx"])["txid"]
assert bit_txid == txid
# We can query all confirmed transactions
best_block = bitcoind.rpc.getbestblockhash()
final_timestamp = bitcoind.rpc.getblockheader(best_block)["time"]
txs = lianad.rpc.listconfirmed(initial_timestamp, final_timestamp, 10)[
"transactions"
]
assert len(txs) == 7, "The last spend tx is unconfirmed"
for tx in txs:
txid = bitcoind.rpc.decoderawtransaction(tx["tx"])["txid"]
txids.remove(txid) # This will raise an error if it isn't there
# We can limit the size of the result
txs = lianad.rpc.listconfirmed(initial_timestamp, final_timestamp, 5)[
"transactions"
]
assert len(txs) == 5
# We can restrict the query to a certain time window.
# First get the txid of all the transactions that happened during this timespan.
txids = set()
for coin in lianad.rpc.listcoins()["coins"]:
if coin["block_height"] is None:
continue
block_hash = bitcoind.rpc.getblockhash(coin["block_height"])
block_time = bitcoind.rpc.getblockheader(block_hash)["time"]
spend_time = None
if coin["spend_info"] is not None and coin["spend_info"]["height"] is not None:
spend_bhash = bitcoind.rpc.getblockhash(coin["spend_info"]["height"])
spend_time = bitcoind.rpc.getblockheader(spend_bhash)["time"]
if (block_time >= second_timestamp and block_time <= third_timestamp) or (
spend_time is not None
and spend_time >= second_timestamp
and spend_time <= third_timestamp
):
txids.add(coin["outpoint"][:-2])
# It's all 7 minus the first deposit and the last confirmed spend. So that's 5 of them.
assert len(txids) == 3
# Now let's compare with what lianad is giving us.
txs = lianad.rpc.listconfirmed(second_timestamp, third_timestamp, 10)[
"transactions"
]
assert len(txs) == 3
bit_txids = set(bitcoind.rpc.decoderawtransaction(tx["tx"])["txid"] for tx in txs)
assert bit_txids == txids
def test_create_recovery(lianad, bitcoind):
"""Test the sweep of coins that are available through the timelocked path."""
# Generate blocks in order to test locktime set correctly.
bitcoind.generate_block(200)
# Start by getting a few coins
destinations = {
lianad.rpc.getnewaddress()["address"]: 0.1,
lianad.rpc.getnewaddress()["address"]: 0.2,
lianad.rpc.getnewaddress()["address"]: 0.3,
}
txid = bitcoind.rpc.sendmany("", destinations)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
first_outpoints = [c["outpoint"] for c in lianad.rpc.listcoins()["coins"]]
# There's nothing to sweep
with pytest.raises(
RpcError,
match="No coin currently spendable through this timelocked recovery path",
):
lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 2)
# Same if we specify timelock:
with pytest.raises(
RpcError,
match="No coin currently spendable through this timelocked recovery path",
):
lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 2, 10)
# And if we use empty array for outpoints:
with pytest.raises(
RpcError,
match="No coin currently spendable through this timelocked recovery path",
):
lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 2, 10, [])
# If we specify a coin, the error will be different:
with pytest.raises(
RpcError,
match=f"Coin at '{first_outpoints[0]}' is not recoverable with timelock '10'",
):
lianad.rpc.createrecovery(
bitcoind.rpc.getnewaddress(), 2, 10, [f"{first_outpoints[0]}"]
)
# Receive another coin, it will be one block after the others
txid = bitcoind.rpc.sendtoaddress(lianad.rpc.getnewaddress()["address"], 0.4)
# Make the timelock of the 3 first coins mature (we use a csv of 10 in the fixture)
bitcoind.generate_block(9, wait_for_mempool=txid)
# Now we can create a recovery tx that sweeps the first 3 coins.
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
new_outpoint = [
c["outpoint"] for c in lianad.rpc.listcoins()["coins"] if txid in c["outpoint"]
][0]
reco_address = bitcoind.rpc.getnewaddress()
res = lianad.rpc.createrecovery(reco_address, 18)
reco_psbt = PSBT.from_base64(res["psbt"])
# Do the same passing all three coins explicitly:
res_op = lianad.rpc.createrecovery(reco_address, 18, 10, first_outpoints)
reco_psbt_op = PSBT.from_base64(res_op["psbt"])
# Check locktime being set correctly.
tip_height = bitcoind.rpc.getblockcount()
assert tip_height > 100
locktime = reco_psbt.tx.nLockTime
assert tip_height - 100 <= locktime <= tip_height
assert len(reco_psbt.tx.vin) == 3, "The last coin's timelock hasn't matured yet"
assert len(reco_psbt.tx.vout) == len(reco_psbt_op.tx.vout) == 1
# The inputs are the same for both explicit and implicit outpoints:
assert sorted(i.prevout.serialize() for i in reco_psbt.tx.vin) == sorted(
i.prevout.serialize() for i in reco_psbt_op.tx.vin
)
assert reco_psbt.tx.vout[0].nValue == reco_psbt_op.tx.vout[0].nValue
assert reco_psbt.tx.vout[0].scriptPubKey == reco_psbt_op.tx.vout[0].scriptPubKey
assert int(0.5999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.6 * COIN)
# Now use only 2 of the 3 coins:
res_op_2 = lianad.rpc.createrecovery(
bitcoind.rpc.getnewaddress(), 18, 10, first_outpoints[:2]
)
reco_psbt_op_2 = PSBT.from_base64(res_op_2["psbt"])
assert len(reco_psbt_op_2.tx.vin) == 2
assert sorted(
f"{i.prevout.hash:064x}:{i.prevout.n}" for i in reco_psbt_op_2.tx.vin
) == sorted(first_outpoints[:2])
# If we try to include the newest coin, an error will be returned:
with pytest.raises(
RpcError,
match=f"Coin at '{new_outpoint}' is not recoverable with timelock '10'",
):
lianad.rpc.createrecovery(
bitcoind.rpc.getnewaddress(), 2, 10, [first_outpoints[0], new_outpoint]
)
txid = sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True)
# And by mining one more block we'll be able to sweep the last coin.
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
res = lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 1)
reco_psbt = PSBT.from_base64(res["psbt"])
assert len(reco_psbt.tx.vin) == 1
assert len(reco_psbt.tx.vout) == 1
assert int(0.39999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.4 * COIN)
sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True)
def test_labels(lianad, bitcoind):
"""Test the creation and updating of labels."""
# We can set a label for an address.
addr = lianad.rpc.getnewaddress()["address"]
lianad.rpc.updatelabels({addr: "first-addr"})
assert lianad.rpc.getlabels([addr])["labels"] == {addr: "first-addr"}
# And also update it.
lianad.rpc.updatelabels({addr: "first-addr-1"})
assert lianad.rpc.getlabels([addr])["labels"] == {addr: "first-addr-1"}
# But we can't set a label larger than 100 characters
with pytest.raises(RpcError, match=".*must be less or equal than 100 characters"):
lianad.rpc.updatelabels({addr: "".join("a" for _ in range(101))})
# We can set a label for a coin.
sec_addr = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(sec_addr, 1)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
coin = lianad.rpc.listcoins()["coins"][0]
lianad.rpc.updatelabels({coin["outpoint"]: "first-coin"})
assert lianad.rpc.getlabels([coin["outpoint"]])["labels"] == {
coin["outpoint"]: "first-coin"
}
# And also update it.
lianad.rpc.updatelabels({coin["outpoint"]: "first-coin-1"})
assert lianad.rpc.getlabels([coin["outpoint"]])["labels"] == {
coin["outpoint"]: "first-coin-1"
}
# Its address though has no label.
assert lianad.rpc.getlabels([sec_addr])["labels"] == {}
# But we can receive a coin to the address that has a label set, and query both.
sec_txid = bitcoind.rpc.sendtoaddress(addr, 1)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 2)
sec_coin = next(
c for c in lianad.rpc.listcoins()["coins"] if sec_txid in c["outpoint"]
)
lianad.rpc.updatelabels({sec_coin["outpoint"]: "sec-coin"})
res = lianad.rpc.getlabels([sec_coin["outpoint"], addr])["labels"]
assert len(res) == 2
assert res[sec_coin["outpoint"]] == "sec-coin"
assert res[addr] == "first-addr-1"
# We can also query the labels for both coins, of course.
res = lianad.rpc.getlabels([coin["outpoint"], sec_coin["outpoint"]])["labels"]
assert len(res) == 2
assert res[coin["outpoint"]] == "first-coin-1"
assert res[sec_coin["outpoint"]] == "sec-coin"
# We can set, update and query labels for deposit transactions.
lianad.rpc.updatelabels({txid: "first-deposit"})
assert lianad.rpc.getlabels([txid, sec_txid])["labels"] == {txid: "first-deposit"}
lianad.rpc.updatelabels({txid: "first-deposit-1", sec_txid: "second-deposit"})
res = lianad.rpc.getlabels([txid, sec_txid])["labels"]
assert len(res) == 2
assert res[txid] == "first-deposit-1"
assert res[sec_txid] == "second-deposit"
# We can set and update a label for a spend transaction.
spend_txid = get_txid(spend_coins(lianad, bitcoind, [coin, sec_coin]))
lianad.rpc.updatelabels({spend_txid: "spend-tx"})
assert lianad.rpc.getlabels([spend_txid])["labels"] == {spend_txid: "spend-tx"}
lianad.rpc.updatelabels({spend_txid: "spend-tx-1"})
assert lianad.rpc.getlabels([spend_txid])["labels"] == {spend_txid: "spend-tx-1"}
# We can set labels for inexistent stuff, as long as the format of the item being
# labelled is valid.
inexistent_txid = "".join("0" for _ in range(64))
inexistent_outpoint = "".join("1" for _ in range(64)) + ":42"
random_address = bitcoind.rpc.getnewaddress()
lianad.rpc.updatelabels(
{
inexistent_txid: "inex_txid",
inexistent_outpoint: "inex_outpoint",
random_address: "bitcoind-addr",
}
)
res = lianad.rpc.getlabels([inexistent_txid, inexistent_outpoint, random_address])[
"labels"
]
assert len(res) == 3
assert res[inexistent_txid] == "inex_txid"
assert res[inexistent_outpoint] == "inex_outpoint"
assert res[random_address] == "bitcoind-addr"
# We'll confirm everything, shouldn't affect any of the labels.
bitcoind.generate_block(1, wait_for_mempool=spend_txid)
wait_for(
lambda: bitcoind.rpc.getblockcount() == lianad.rpc.getinfo()["block_height"]
)
res = lianad.rpc.getlabels(
[
addr,
sec_addr, # No label for this one.
txid,
sec_txid,
coin["outpoint"],
sec_coin["outpoint"],
spend_txid,
inexistent_txid,
inexistent_outpoint,
random_address,
]
)["labels"]
assert len(res) == 9
assert res[sec_coin["outpoint"]] == "sec-coin"
assert res[addr] == "first-addr-1"
assert res[coin["outpoint"]] == "first-coin-1"
assert res[txid] == "first-deposit-1"
assert res[sec_txid] == "second-deposit"
assert res[spend_txid] == "spend-tx-1"
assert res[inexistent_txid] == "inex_txid"
assert res[inexistent_outpoint] == "inex_outpoint"
assert res[random_address] == "bitcoind-addr"
# Delete 2 of the labels set above. They shouldn't be returned anymore.
lianad.rpc.updatelabels(
{
addr: None,
sec_addr: None,
random_address: "this address is random",
}
)
res = lianad.rpc.getlabels([addr, sec_addr, random_address])["labels"]
assert len(res) == 1
assert addr not in res
assert sec_addr not in res
assert res[random_address] == "this address is random"
def test_labels_bip329(lianad, bitcoind):
# Label 5 addresses
addresses = []
for i in range(0, 5):
addr = lianad.rpc.getnewaddress()["address"]
addresses.append(addr)
lianad.rpc.updatelabels({addr: f"addr{i}"})
# Label 5 coin
txids = []
for i in range(0, 5):
addr = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(addr, 1)
txids.append(txid)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == i + 1)
coins = lianad.rpc.listcoins()["coins"]
for i in range(0, 5):
coin = coins[i]
lianad.rpc.updatelabels({coin["outpoint"]: f"coin{i}"})
# Label 5 transactions
for i, txid in enumerate(txids):
lianad.rpc.updatelabels({txid: f"tx{i}"})
# Get Bip-0329 labels
bip329_labels = lianad.rpc.getlabelsbip329(0, 100)["labels"]
assert len(bip329_labels) == 15
def label_found(name, labels):
for label in labels:
if label["label"] == name:
return True
return False
# All transactions are labelled
for i in range(0, len(txids)):
assert label_found(f"tx{i}", bip329_labels)
# All adresses are labelled
for i in range(0, len(addresses)):
assert label_found(f"addr{i}", bip329_labels)
# All coins are labelled
for i in range(0, len(coins)):
assert label_found(f"coin{i}", bip329_labels)
# There is no conflict between batches
batch1 = lianad.rpc.getlabelsbip329(0, 5)["labels"]
assert len(batch1) == 5
batch2 = lianad.rpc.getlabelsbip329(5, 5)["labels"]
assert len(batch2) == 5
batch3 = lianad.rpc.getlabelsbip329(10, 5)["labels"]
assert len(batch3) == 5
for label in batch1:
print(label)
name = label["label"]
assert not label_found(name, batch2)
assert not label_found(name, batch3)
for label in batch2:
name = label["label"]
assert not label_found(name, batch1)
assert not label_found(name, batch3)
def test_rbfpsbt_bump_fee(lianad, bitcoind):
"""Test the use of RBF to bump the fee of a transaction."""
# Generate blocks in order to test locktime set correctly.
bitcoind.generate_block(200)
# Get three coins.
destinations = {
lianad.rpc.getnewaddress()["address"]: 0.003,
lianad.rpc.getnewaddress()["address"]: 0.004,
lianad.rpc.getnewaddress()["address"]: 0.005,
}
txid = bitcoind.rpc.sendmany("", destinations)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 3)
coins = lianad.rpc.listcoins(["confirmed"])["coins"]
assert all(c["is_from_self"] is False for c in coins)
# Create a spend that will later be replaced.
first_outpoints = [c["outpoint"] for c in coins[:2]]
destinations = {
bitcoind.rpc.getnewaddress(): 650_000,
}
first_res = lianad.rpc.createspend(destinations, first_outpoints, 1)
first_psbt = PSBT.from_base64(first_res["psbt"])
# The transaction has a change output.
assert len(first_psbt.o) == len(first_psbt.tx.vout) == 2
first_txid = first_psbt.tx.txid().hex()
# We must provide a valid feerate.
for bad_feerate in [-1, "foo", 18_446_744_073_709_551_616]:
with pytest.raises(RpcError, match=f"Invalid 'feerate' parameter."):
lianad.rpc.rbfpsbt(first_txid, False, bad_feerate)
# We cannot RBF yet as first PSBT has not been saved.
with pytest.raises(RpcError, match=f"Unknown spend transaction '{first_txid}'."):
lianad.rpc.rbfpsbt(first_txid, False, 1)
# Now save the PSBT.
lianad.rpc.updatespend(first_res["psbt"])
# The RBF command succeeds even if transaction has not been signed.
lianad.rpc.rbfpsbt(first_txid, False, 2)
# The RBF command also succeeds if transaction has been signed but not broadcast.
first_psbt = lianad.signer.sign_psbt(first_psbt)
lianad.rpc.updatespend(first_psbt.to_base64())
lianad.rpc.rbfpsbt(first_txid, False, 2)
# Now broadcast the spend and wait for it to be detected.
lianad.rpc.broadcastspend(first_txid)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == first_txid
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
)
)
# The change output is from self as its parent is confirmed.
lc_res = lianad.rpc.listcoins(["unconfirmed"], [])["coins"]
assert (
len(lc_res) == 1
and first_txid in lc_res[0]["outpoint"]
and lc_res[0]["is_from_self"] is True
)
# We can now use RBF, but the feerate must be higher than that of the first transaction.
with pytest.raises(RpcError, match=f"Feerate 1 too low for minimum feerate 2."):
lianad.rpc.rbfpsbt(first_txid, False, 1)
# Using a higher feerate works.
lianad.rpc.rbfpsbt(first_txid, False, 2)
# We can still use RBF if the PSBT is no longer in the DB.
lianad.rpc.delspendtx(first_txid)
lianad.rpc.rbfpsbt(first_txid, False, 2)
# Let's use an even higher feerate.
rbf_1_res = lianad.rpc.rbfpsbt(first_txid, False, 10)
rbf_1_psbt = PSBT.from_base64(rbf_1_res["psbt"])
# Check the locktime is being set.
tip_height = bitcoind.rpc.getblockcount()
locktime = rbf_1_psbt.tx.nLockTime
assert tip_height - 100 <= locktime <= tip_height
# The inputs are the same in both (no new inputs needed in the replacement).
assert sorted(i.prevout.serialize() for i in first_psbt.tx.vin) == sorted(
i.prevout.serialize() for i in rbf_1_psbt.tx.vin
)
# Check non-change output is the same in both.
assert first_psbt.tx.vout[0].nValue == rbf_1_psbt.tx.vout[0].nValue
assert first_psbt.tx.vout[0].scriptPubKey == rbf_1_psbt.tx.vout[0].scriptPubKey
# Change address is the same but change amount will be lower in the replacement to pay higher fee.
assert first_psbt.tx.vout[1].nValue > rbf_1_psbt.tx.vout[1].nValue
assert first_psbt.tx.vout[1].scriptPubKey == rbf_1_psbt.tx.vout[1].scriptPubKey
# Broadcast the replacement and wait for it to be detected.
rbf_1_txid = sign_and_broadcast_psbt(lianad, rbf_1_psbt)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == rbf_1_txid
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
)
)
# The change output of the replacement is also from self as its parent is confirmed.
lc_res = lianad.rpc.listcoins(["unconfirmed"], [])["coins"]
assert (
len(lc_res) == 1
and rbf_1_txid in lc_res[0]["outpoint"]
and lc_res[0]["is_from_self"] is True
)
mempool_rbf_1 = bitcoind.rpc.getmempoolentry(rbf_1_txid)
# Note that in the mempool entry, "ancestor" includes rbf_1_txid itself.
rbf_1_feerate = (
mempool_rbf_1["fees"]["ancestor"] * COIN / mempool_rbf_1["ancestorsize"]
)
assert 10 <= rbf_1_feerate < 10.25
# If we try to RBF the first transaction again, it will not be possible as we
# deleted the PSBT above and the tx is no longer part of our wallet's
# spending txs (even though it's saved in the DB).
with pytest.raises(RpcError, match=f"Unknown spend transaction '{first_txid}'."):
lianad.rpc.rbfpsbt(first_txid, False, 2)
# If we resave the PSBT, then we can use RBF and it will use the first RBF's
# feerate to set the min feerate, instead of 1 sat/vb of the first transaction:
lianad.rpc.updatespend(first_psbt.to_base64())
with pytest.raises(
RpcError,
match=f"Feerate {int(rbf_1_feerate)} too low for minimum feerate {int(rbf_1_feerate) + 1}.",
):
lianad.rpc.rbfpsbt(first_txid, False, int(rbf_1_feerate))
# Using 1 more for feerate works.
feerate = int(rbf_1_feerate) + 1
lianad.rpc.rbfpsbt(first_txid, False, feerate)
# Add a new transaction spending the change from the first RBF.
desc_1_destinations = {
bitcoind.rpc.getnewaddress(): 500_000,
}
desc_1_outpoints = [f"{rbf_1_txid}:1", coins[2]["outpoint"]]
wait_for(lambda: len(lianad.rpc.listcoins([], desc_1_outpoints)["coins"]) == 2)
desc_1_res = lianad.rpc.createspend(desc_1_destinations, desc_1_outpoints, 1)
desc_1_psbt = PSBT.from_base64(desc_1_res["psbt"])
assert len(desc_1_psbt.tx.vout) == 2
desc_1_txid = sign_and_broadcast_psbt(lianad, desc_1_psbt)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == desc_1_txid
for c in lianad.rpc.listcoins([], desc_1_outpoints)["coins"]
)
)
lc_res = [c for c in lianad.rpc.listcoins(["unconfirmed"], [])["coins"]]
assert (
len(lc_res) == 1
and desc_1_txid in lc_res[0]["outpoint"]
and lc_res[0]["is_from_self"] is True
)
# Add a new transaction spending the change from the first descendant.
desc_2_destinations = {
bitcoind.rpc.getnewaddress(): 25_000,
}
desc_2_outpoints = [f"{desc_1_txid}:1"]
wait_for(lambda: len(lianad.rpc.listcoins([], desc_2_outpoints)["coins"]) == 1)
desc_2_res = lianad.rpc.createspend(desc_2_destinations, desc_2_outpoints, 1)
desc_2_psbt = PSBT.from_base64(desc_2_res["psbt"])
assert len(desc_2_psbt.tx.vout) == 2
desc_2_txid = sign_and_broadcast_psbt(lianad, desc_2_psbt)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == desc_2_txid
for c in lianad.rpc.listcoins([], desc_2_outpoints)["coins"]
)
)
lc_res = [c for c in lianad.rpc.listcoins(["unconfirmed"], [])["coins"]]
assert (
len(lc_res) == 1
and desc_2_txid in lc_res[0]["outpoint"]
and lc_res[0]["is_from_self"] is True
)
# Now replace the first RBF, which will also remove its descendants.
rbf_2_res = lianad.rpc.rbfpsbt(rbf_1_txid, False, feerate)
rbf_2_psbt = PSBT.from_base64(rbf_2_res["psbt"])
# The inputs are the same in both (no new inputs needed in the replacement).
assert sorted(i.prevout.serialize() for i in rbf_1_psbt.tx.vin) == sorted(
i.prevout.serialize() for i in rbf_2_psbt.tx.vin
)
# Check non-change output is the same in both.
assert rbf_1_psbt.tx.vout[0].nValue == rbf_2_psbt.tx.vout[0].nValue
assert rbf_1_psbt.tx.vout[0].scriptPubKey == rbf_2_psbt.tx.vout[0].scriptPubKey
# Change address is the same but change amount will be lower in the replacement to pay higher fee.
assert rbf_1_psbt.tx.vout[1].nValue > rbf_2_psbt.tx.vout[1].nValue
assert rbf_1_psbt.tx.vout[1].scriptPubKey == rbf_2_psbt.tx.vout[1].scriptPubKey
# Broadcast the replacement and wait for it to be detected.
rbf_2_txid = sign_and_broadcast_psbt(lianad, rbf_2_psbt)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == rbf_2_txid
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
)
)
lc_res = [c for c in lianad.rpc.listcoins(["unconfirmed"], [])["coins"]]
assert (
len(lc_res) == 1
and rbf_2_txid in lc_res[0]["outpoint"]
and lc_res[0]["is_from_self"] is True
)
# The unconfirmed coins used in the descendant transactions have been removed so that
# only one of the input coins remains, and its spend info has been wiped so that it is as before.
assert lianad.rpc.listcoins([], desc_1_outpoints + desc_2_outpoints)["coins"] == [
coins[2]
]
# Now confirm the replacement transaction.
bitcoind.generate_block(1, wait_for_mempool=rbf_2_txid)
wait_for(
lambda: all(
c["spend_info"]["txid"] == rbf_2_txid
and c["spend_info"]["height"] is not None
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
)
)
final_coins = lianad.rpc.listcoins()["coins"]
# We have the three original coins plus the change output from the last RBF.
assert len(final_coins) == 4
assert len(coins + lc_res) == 4
for fc in final_coins:
assert fc["outpoint"] in [c["outpoint"] for c in coins + lc_res]
# Original coins are not from self, but RBF change output is.
assert fc["is_from_self"] is (fc["outpoint"] in [c["outpoint"] for c in lc_res])
def test_rbfpsbt_insufficient_funds(lianad, bitcoind):
"""Trying to increase the fee too much returns the missing funds amount."""
# Get a coin.
deposit_txid_1 = bitcoind.rpc.sendtoaddress(
lianad.rpc.getnewaddress()["address"], 30_000 / COIN
)
bitcoind.generate_block(1, wait_for_mempool=deposit_txid_1)
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 1)
# Create a spend that we will then attempt to replace.
destinations_1 = {
bitcoind.rpc.getnewaddress(): 29_800,
}
spend_res_1 = lianad.rpc.createspend(destinations_1, [], 1)
spend_psbt_1 = PSBT.from_base64(spend_res_1["psbt"])
spend_txid_1 = sign_and_broadcast_psbt(lianad, spend_psbt_1)
# We don't have sufficient funds to bump the fee.
feerate = 3 if USE_TAPROOT else 2
assert "missing" in lianad.rpc.rbfpsbt(spend_txid_1, False, feerate)
# We can still cancel it as the coin has enough value to create a single
# output at a higher feerate.
assert "psbt" in lianad.rpc.rbfpsbt(spend_txid_1, True)
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 0)
# Get another coin.
deposit_txid_2 = bitcoind.rpc.sendtoaddress(
lianad.rpc.getnewaddress()["address"], 5_200 / COIN
)
bitcoind.generate_block(1, wait_for_mempool=deposit_txid_2)
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 1)
# Create a spend that we will then attempt to cancel.
destinations_2 = {
bitcoind.rpc.getnewaddress(): 5_000,
}
spend_res_2 = lianad.rpc.createspend(destinations_2, [], 1)
spend_psbt_2 = PSBT.from_base64(spend_res_2["psbt"])
spend_txid_2 = sign_and_broadcast_psbt(lianad, spend_psbt_2)
# We don't have enough to create a transaction with feerate 2 sat/vb.
assert "missing" in lianad.rpc.rbfpsbt(spend_txid_2, True)
def test_rbfpsbt_cancel(lianad, bitcoind):
"""Test the use of RBF to cancel a transaction."""
# Get three coins.
destinations = {
lianad.rpc.getnewaddress()["address"]: 0.003,
lianad.rpc.getnewaddress()["address"]: 0.004,
lianad.rpc.getnewaddress()["address"]: 0.005,
}
txid = bitcoind.rpc.sendmany("", destinations)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(lambda: len(lianad.rpc.listcoins(["confirmed"])["coins"]) == 3)
coins = lianad.rpc.listcoins(["confirmed"])["coins"]
# Create a spend that will later be replaced.
first_outpoints = [c["outpoint"] for c in coins[:2]]
destinations = {
bitcoind.rpc.getnewaddress(): 650_000,
}
first_res = lianad.rpc.createspend(destinations, first_outpoints, 1)
first_psbt = PSBT.from_base64(first_res["psbt"])
# The transaction has a change output.
assert len(first_psbt.o) == len(first_psbt.tx.vout) == 2
first_txid = first_psbt.tx.txid().hex()
# Broadcast the spend and wait for it to be detected.
first_txid = sign_and_broadcast_psbt(lianad, first_psbt)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == first_txid
for c in lianad.rpc.listcoins([], first_outpoints)["coins"]
)
)
# We can use RBF and let the command choose the min possible feerate (1 larger than previous).
rbf_1_res = lianad.rpc.rbfpsbt(first_txid, True)
# But we can't set the feerate explicitly.
with pytest.raises(
RpcError,
match=re.escape("A feerate must not be provided if creating a cancel."),
):
rbf_1_res = lianad.rpc.rbfpsbt(first_txid, True, 2)
rbf_1_psbt = PSBT.from_base64(rbf_1_res["psbt"])
# Replacement only has a single input.
assert len(rbf_1_psbt.i) == 1
# This input is one of the two from the previous transaction.
assert rbf_1_psbt.tx.vin[0].prevout.serialize() in [
i.prevout.serialize() for i in first_psbt.tx.vin
]
# The replacement only has a change output.
assert len(rbf_1_psbt.tx.vout) == 1
# Change address is the same but change amount will be higher in the replacement as it is the only output.
assert first_psbt.tx.vout[1].nValue < rbf_1_psbt.tx.vout[0].nValue
assert first_psbt.tx.vout[1].scriptPubKey == rbf_1_psbt.tx.vout[0].scriptPubKey
# Broadcast the replacement and wait for it to be detected.
rbf_1_txid = sign_and_broadcast_psbt(lianad, rbf_1_psbt)
# The spend info of the coin used in the replacement will be updated.
rbf_1_outpoint = (
f"{rbf_1_psbt.tx.vin[0].prevout.hash:064x}:{rbf_1_psbt.tx.vin[0].prevout.n}"
)
assert rbf_1_outpoint in first_outpoints
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == rbf_1_txid
for c in lianad.rpc.listcoins([], [rbf_1_outpoint])["coins"]
)
)
# The other coin will have its spend info removed.
wait_for(
lambda: all(
c["spend_info"] is None
for c in lianad.rpc.listcoins(
[], [op for op in first_outpoints if op != rbf_1_outpoint]
)["coins"]
)
)
# Add a new transaction spending the only output (change) from the first RBF.
desc_1_destinations = {
bitcoind.rpc.getnewaddress(): 500_000,
}
desc_1_outpoints = [f"{rbf_1_txid}:0", coins[2]["outpoint"]]
wait_for(lambda: len(lianad.rpc.listcoins([], desc_1_outpoints)["coins"]) == 2)
desc_1_res = lianad.rpc.createspend(desc_1_destinations, desc_1_outpoints, 1)
desc_1_psbt = PSBT.from_base64(desc_1_res["psbt"])
assert len(desc_1_psbt.tx.vout) == 2
desc_1_txid = sign_and_broadcast_psbt(lianad, desc_1_psbt)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == desc_1_txid
for c in lianad.rpc.listcoins([], desc_1_outpoints)["coins"]
)
)
# Add a new transaction spending the change from the first descendant.
desc_2_destinations = {
bitcoind.rpc.getnewaddress(): 25_000,
}
desc_2_outpoints = [f"{desc_1_txid}:1"]
wait_for(lambda: len(lianad.rpc.listcoins([], desc_2_outpoints)["coins"]) == 1)
desc_2_res = lianad.rpc.createspend(desc_2_destinations, desc_2_outpoints, 1)
desc_2_psbt = PSBT.from_base64(desc_2_res["psbt"])
assert len(desc_2_psbt.tx.vout) == 2
desc_2_txid = sign_and_broadcast_psbt(lianad, desc_2_psbt)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == desc_2_txid
for c in lianad.rpc.listcoins([], desc_2_outpoints)["coins"]
)
)
# Now cancel the first RBF, which will also remove its descendants.
rbf_2_res = lianad.rpc.rbfpsbt(rbf_1_txid, True)
rbf_2_psbt = PSBT.from_base64(rbf_2_res["psbt"])
# The inputs are the same in both (no new inputs needed in the replacement).
assert len(rbf_2_psbt.tx.vin) == 1
assert (
rbf_1_psbt.tx.vin[0].prevout.serialize()
== rbf_2_psbt.tx.vin[0].prevout.serialize()
)
# Only a single output (change) in the replacement.
assert len(rbf_2_psbt.tx.vout) == 1
# Change address is the same but change amount will be lower in the replacement to pay higher fee.
assert rbf_1_psbt.tx.vout[0].nValue > rbf_2_psbt.tx.vout[0].nValue
assert rbf_1_psbt.tx.vout[0].scriptPubKey == rbf_2_psbt.tx.vout[0].scriptPubKey
# Broadcast the replacement and wait for it to be detected.
rbf_2_txid = sign_and_broadcast_psbt(lianad, rbf_2_psbt)
wait_for(
lambda: all(
c["spend_info"] is not None and c["spend_info"]["txid"] == rbf_2_txid
for c in lianad.rpc.listcoins([], [rbf_1_outpoint])["coins"]
)
)
# The unconfirmed coins used in the descendant transactions have been removed so that
# only one of the input coins remains, and its spend info has been wiped so that it is as before.
assert lianad.rpc.listcoins([], desc_1_outpoints + desc_2_outpoints)["coins"] == [
coins[2]
]
# Now confirm the replacement transaction.
bitcoind.generate_block(1, wait_for_mempool=rbf_2_txid)
wait_for(
lambda: all(
c["spend_info"]["txid"] == rbf_2_txid
and c["spend_info"]["height"] is not None
for c in lianad.rpc.listcoins([], [rbf_1_outpoint])["coins"]
)
)