From 46a94d6c8eb144484a4b0df2ddbd48b32d4d096f Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Fri, 9 Dec 2022 12:17:38 +0100 Subject: [PATCH] qa: test recovery 'sweep' transaction creation --- tests/test_rpc.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/tests/test_rpc.py b/tests/test_rpc.py index fb16defe..add3f1cd 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -130,7 +130,7 @@ def test_create_spend(lianad, bitcoind): ) # We can sign it and broadcast it. - sign_and_broadcast(PSBT.from_base64(res["psbt"])) + sign_and_broadcast(lianad, bitcoind, PSBT.from_base64(res["psbt"])) def test_list_spend(lianad, bitcoind): @@ -537,3 +537,51 @@ def test_listtransactions(lianad, bitcoind): assert len(txs) == 3 bit_txids = set(bitcoind.rpc.decoderawtransaction(tx["tx"])["txid"] for tx in txs) assert bit_txids == txids + + +def test_create_recovery(lianad, bitcoind): + """Test the sweep of coins that are available through the timelocked path.""" + # Start by getting a few coins + destinations = { + lianad.rpc.getnewaddress()["address"]: 0.1, + lianad.rpc.getnewaddress()["address"]: 0.2, + lianad.rpc.getnewaddress()["address"]: 0.3, + } + txid = bitcoind.rpc.sendmany("", destinations) + bitcoind.generate_block(1, wait_for_mempool=txid) + wait_for( + lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount() + ) + + # There's nothing to sweep + with pytest.raises( + RpcError, + match="No coin currently available through the timelocked recovery path", + ): + lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 2) + + # Receive another coin, it will be one block after the others + txid = bitcoind.rpc.sendtoaddress(lianad.rpc.getnewaddress()["address"], 0.4) + + # Make the timelock of the 3 first coins mature (we use a csv of 10 in the fixture) + bitcoind.generate_block(9, wait_for_mempool=txid) + + # Now we can create a recovery tx that sweeps the first 3 coins. + res = lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 18) + reco_psbt = PSBT.from_base64(res["psbt"]) + assert len(reco_psbt.tx.vin) == 3, "The last coin's timelock hasn't matured yet" + assert len(reco_psbt.tx.vout) == 1 + assert int(0.5999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.6 * COIN) + txid = sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True) + + # And by mining one more block we'll be able to sweep the last coin. + bitcoind.generate_block(1, wait_for_mempool=txid) + wait_for( + lambda: lianad.rpc.getinfo()["block_height"] == bitcoind.rpc.getblockcount() + ) + res = lianad.rpc.createrecovery(bitcoind.rpc.getnewaddress(), 1) + reco_psbt = PSBT.from_base64(res["psbt"]) + assert len(reco_psbt.tx.vin) == 1 + assert len(reco_psbt.tx.vout) == 1 + assert int(0.39999 * COIN) < int(reco_psbt.tx.vout[0].nValue) < int(0.4 * COIN) + sign_and_broadcast(lianad, bitcoind, reco_psbt, recovery=True)