Merge #538: Add test for rescan and recovery

0c4c347011e1c558ede08e6ca929e9c658fa6947 Add test for rescan and recovery This test simulates a typical recovery flow for a user (Novo)

Pull request description:

  Resolves #202
  This PR adds a new test that simulates the typical flow for a user trying to recover their coins

ACKs for top commit:
  darosior:
    utACK 0c4c347011e1c558ede08e6ca929e9c658fa6947

Tree-SHA512: e15add35ea53e475cdf57d79030905a8ae9c48562e3b73bc936c64b9a6c0b78ae0606f181957dbff9a2d2a02e947f8bdffced25689ef8e02fbabe7ebb753fc1b
This commit is contained in:
Antoine Poinsot 2023-06-06 11:28:22 +02:00
commit 4f8fbc60a9
No known key found for this signature in database
GPG Key ID: E13FC145CD3F4304

View File

@ -1,5 +1,6 @@
from fixtures import *
from test_framework.utils import wait_for, get_txid, spend_coins
from test_framework.utils import wait_for, get_txid, spend_coins, RpcError, COIN, sign_and_broadcast
from test_framework.serializations import PSBT
def get_coin(lianad, outpoint_or_txid):
@ -303,3 +304,38 @@ def test_deposit_replacement(lianad, bitcoind):
addr = lianad.rpc.getnewaddress()["address"]
bitcoind.rpc.sendtoaddress(addr, 2)
wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1)
def test_rescan_and_recovery(lianad, bitcoind):
"""Test user recovery flow"""
# Get initial_tip to use for rescan later
initial_tip = bitcoind.rpc.getblockheader(bitcoind.rpc.getbestblockhash())
# Start by getting a few coins
destination = lianad.rpc.getnewaddress()["address"]
txid = bitcoind.rpc.sendtoaddress(destination, 0.5)
bitcoind.generate_block(1, wait_for_mempool=txid)
wait_for(
lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount()
)
assert len(lianad.rpc.listcoins()["coins"]) == 1
# Advance the blocktime by >2h in median-time past for rescan
added_time = 60 * 60 * 3
bitcoind.rpc.setmocktime(initial_tip["time"] + added_time)
bitcoind.generate_block(12)
# Clear lianad state
lianad.restart_fresh(bitcoind)
assert len(lianad.rpc.listcoins()["coins"]) == 0
# Start rescan
lianad.rpc.startrescan(initial_tip["time"])
wait_for(lambda: lianad.rpc.getinfo()["rescan_progress"] is None)
# Create a recovery tx that sweeps the first coin.
res = lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 2)
reco_psbt = PSBT.from_base64(res["psbt"])
assert len(reco_psbt.tx.vin) == 1
assert len(reco_psbt.tx.vout) == 1
assert int(0.4999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.5 * COIN)
sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True)