Add test for rescan and recovery

This test simulates a typical recovery flow for a user
This commit is contained in:
Novo 2023-05-23 17:27:36 +01:00
parent 54ac3c7d49
commit 0c4c347011

View File

@ -1,5 +1,6 @@
from fixtures import *
from test_framework.utils import wait_for, get_txid, spend_coins
from test_framework.utils import wait_for, get_txid, spend_coins, RpcError, COIN, sign_and_broadcast
from test_framework.serializations import PSBT
def get_coin(lianad, outpoint_or_txid):
@ -303,3 +304,38 @@ def test_deposit_replacement(lianad, bitcoind):
addr = lianad.rpc.getnewaddress()["address"]
bitcoind.rpc.sendtoaddress(addr, 2)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
def test_rescan_and_recovery(lianad, bitcoind):
"""Test user recovery flow"""
# Get initial_tip to use for rescan later
initial_tip = bitcoind.rpc.getblockheader(bitcoind.rpc.getbestblockhash())
# Start by getting a few coins
destination = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(destination, 0.5)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
assert len(lianad.rpc.listcoins()["coins"]) == 1
# Advance the blocktime by >2h in median-time past for rescan
added_time = 60 * 60 * 3
bitcoind.rpc.setmocktime(initial_tip["time"] + added_time)
bitcoind.generate_block(12)
# Clear lianad state
lianad.restart_fresh(bitcoind)
assert len(lianad.rpc.listcoins()["coins"]) == 0
# Start rescan
lianad.rpc.startrescan(initial_tip["time"])
wait_for(lambda: lianad.rpc.getinfo()["rescan_progress"] is None)
# Create a recovery tx that sweeps the first coin.
res = lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 2)
reco_psbt = PSBT.from_base64(res["psbt"])
assert len(reco_psbt.tx.vin) == 1
assert len(reco_psbt.tx.vout) == 1
assert int(0.4999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.5 * COIN)
sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True)