diff --git a/tests/test_chain.py b/tests/test_chain.py index 6c7cba1d..f1fc9cda 100644 --- a/tests/test_chain.py +++ b/tests/test_chain.py @@ -1,5 +1,6 @@ from fixtures import * -from test_framework.utils import wait_for, get_txid, spend_coins +from test_framework.utils import wait_for, get_txid, spend_coins, RpcError, COIN, sign_and_broadcast +from test_framework.serializations import PSBT def get_coin(lianad, outpoint_or_txid): @@ -303,3 +304,38 @@ def test_deposit_replacement(lianad, bitcoind): addr = lianad.rpc.getnewaddress()["address"] bitcoind.rpc.sendtoaddress(addr, 2) wait_for(lambda: len(lianad.rpc.listcoins()["coins"]) == 1) + +def test_rescan_and_recovery(lianad, bitcoind): + """Test user recovery flow""" + # Get initial_tip to use for rescan later + initial_tip = bitcoind.rpc.getblockheader(bitcoind.rpc.getbestblockhash()) + + # Start by getting a few coins + destination = lianad.rpc.getnewaddress()["address"] + txid = bitcoind.rpc.sendtoaddress(destination, 0.5) + bitcoind.generate_block(1, wait_for_mempool=txid) + wait_for( + lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount() + ) + assert len(lianad.rpc.listcoins()["coins"]) == 1 + + # Advance the blocktime by >2h in median-time past for rescan + added_time = 60 * 60 * 3 + bitcoind.rpc.setmocktime(initial_tip["time"] + added_time) + bitcoind.generate_block(12) + + # Clear lianad state + lianad.restart_fresh(bitcoind) + assert len(lianad.rpc.listcoins()["coins"]) == 0 + + # Start rescan + lianad.rpc.startrescan(initial_tip["time"]) + wait_for(lambda: lianad.rpc.getinfo()["rescan_progress"] is None) + + # Create a recovery tx that sweeps the first coin. + res = lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 2) + reco_psbt = PSBT.from_base64(res["psbt"]) + assert len(reco_psbt.tx.vin) == 1 + assert len(reco_psbt.tx.vout) == 1 + assert int(0.4999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.5 * COIN) + sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True)