#!/usr/bin/env python3 # # Taken then adapted from: # - Initially https://github.com/achow101/psbt-simple-signer/blob/5def3622a09f5bcb76ae79707f0790d050291474/serializations.py # - Then from the October 2022 Bitcoin Core functional test for the new PSBTMap class # # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2016 The Bitcoin Core developers # Copyright (c) 2022 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Bitcoin Object Python Serializations Modified from the test/test_framework/mininode.py file from the Bitcoin repository CTransaction,CTxIn, CTxOut, etc....: data structures that should map to corresponding structures in bitcoin/primitives for transactions only ser_*, deser_*: functions that handle serialization/deserialization """ from io import BytesIO from codecs import encode import struct import binascii import hashlib import copy import base64 def sha256(s): return hashlib.new("sha256", s).digest() def ripemd160(s): return hashlib.new("ripemd160", s).digest() def hash256(s): return sha256(sha256(s)) def hash160(s): return ripemd160(sha256(s)) # Serialization/deserialization tools def ser_compact_size(l): r = b"" if l < 253: r = struct.pack("B", l) elif l < 0x10000: r = struct.pack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack("> 24) & 0xFF v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) return v def deser_vector(f, c): nit = deser_compact_size(f) r = [] for i in range(nit): t = c() t.deserialize(f) r.append(t) return r # ser_function_name: Allow for an alternate serialization function on the # entries in the vector (we use this for serializing the vector of transactions # for a witness block). def ser_vector(l, ser_function_name=None): r = ser_compact_size(len(l)) for i in l: if ser_function_name: r += getattr(i, ser_function_name)() else: r += i.serialize() return r def deser_uint256_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_uint256(f) r.append(t) return r def ser_uint256_vector(l): r = ser_compact_size(len(l)) for i in l: r += ser_uint256(i) return r def deser_string_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_string(f) r.append(t) return r def ser_string_vector(l): r = ser_compact_size(len(l)) for sv in l: r += ser_string(sv) return r def deser_int_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = struct.unpack(" 42: return (False, None, None) if script[0] != 0 and (script[0] < 81 or script[0] > 96): return (False, None, None) if script[1] + 2 == len(script): return (True, script[0] - 0x50 if script[0] else 0, script[2:]) return (False, None, None) # Objects that map to bitcoind objects, which can be serialized/deserialized MSG_WITNESS_FLAG = 1 << 30 class COutPoint(object): def __init__(self, hash=0, n=0xFFFFFFFF): self.hash = hash self.n = n def deserialize(self, f): self.hash = deser_uint256(f) self.n = struct.unpack(" 21000000 * COIN: return False return True def is_null(self): return len(self.vin) == 0 and len(self.vout) == 0 def __repr__(self): return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" % ( self.nVersion, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime, ) # global types PSBT_GLOBAL_UNSIGNED_TX = 0x00 PSBT_GLOBAL_XPUB = 0x01 PSBT_GLOBAL_TX_VERSION = 0x02 PSBT_GLOBAL_FALLBACK_LOCKTIME = 0x03 PSBT_GLOBAL_INPUT_COUNT = 0x04 PSBT_GLOBAL_OUTPUT_COUNT = 0x05 PSBT_GLOBAL_TX_MODIFIABLE = 0x06 PSBT_GLOBAL_VERSION = 0xFB PSBT_GLOBAL_PROPRIETARY = 0xFC # per-input types PSBT_IN_NON_WITNESS_UTXO = 0x00 PSBT_IN_WITNESS_UTXO = 0x01 PSBT_IN_PARTIAL_SIG = 0x02 PSBT_IN_SIGHASH_TYPE = 0x03 PSBT_IN_REDEEM_SCRIPT = 0x04 PSBT_IN_WITNESS_SCRIPT = 0x05 PSBT_IN_BIP32_DERIVATION = 0x06 PSBT_IN_FINAL_SCRIPTSIG = 0x07 PSBT_IN_FINAL_SCRIPTWITNESS = 0x08 PSBT_IN_POR_COMMITMENT = 0x09 PSBT_IN_RIPEMD160 = 0x0A PSBT_IN_SHA256 = 0x0B PSBT_IN_HASH160 = 0x0C PSBT_IN_HASH256 = 0x0D PSBT_IN_PREVIOUS_TXID = 0x0E PSBT_IN_OUTPUT_INDEX = 0x0F PSBT_IN_SEQUENCE = 0x10 PSBT_IN_REQUIRED_TIME_LOCKTIME = 0x11 PSBT_IN_REQUIRED_HEIGHT_LOCKTIME = 0x12 PSBT_IN_TAP_KEY_SIG = 0x13 PSBT_IN_TAP_SCRIPT_SIG = 0x14 PSBT_IN_TAP_LEAF_SCRIPT = 0x15 PSBT_IN_TAP_BIP32_DERIVATION = 0x16 PSBT_IN_TAP_INTERNAL_KEY = 0x17 PSBT_IN_TAP_MERKLE_ROOT = 0x18 PSBT_IN_PROPRIETARY = 0xFC # per-output types PSBT_OUT_REDEEM_SCRIPT = 0x00 PSBT_OUT_WITNESS_SCRIPT = 0x01 PSBT_OUT_BIP32_DERIVATION = 0x02 PSBT_OUT_AMOUNT = 0x03 PSBT_OUT_SCRIPT = 0x04 PSBT_OUT_TAP_INTERNAL_KEY = 0x05 PSBT_OUT_TAP_TREE = 0x06 PSBT_OUT_TAP_BIP32_DERIVATION = 0x07 PSBT_OUT_PROPRIETARY = 0xFC class PSBTMap: """Class for serializing and deserializing PSBT maps""" def __init__(self, map=None): self.map = map if map is not None else {} # NOTE: this implementation assumes that the keytype from bip174 is always 1 byte, # as it detects mappings (like bip32 derivations, partial sigs, ..) based on this. def deserialize(self, f): m = {} while True: k = deser_string(f) if len(k) == 0: break v = deser_string(f) if len(k) == 1: k = k[0] assert k not in m m[k] = v else: typ, k = k[0], k[1:] if typ not in m: m[typ] = {k: v} else: m[typ][k] = v self.map = m def serialize(self): m = b"" for key_type in sorted(self.map): psbt_val = self.map[key_type] if isinstance(key_type, int) and 0 <= key_type and key_type <= 255: key_type = bytes([key_type]) if isinstance(psbt_val, dict): for key_data, val_data in psbt_val.items(): k = key_type + key_data m += ser_compact_size(len(k)) + k m += ser_compact_size(len(val_data)) + val_data else: m += ser_compact_size(len(key_type)) + key_type m += ser_compact_size(len(psbt_val)) + psbt_val m += b"\x00" return m class PSBT: """Class for serializing and deserializing PSBTs""" def __init__(self, *, g=None, i=None, o=None): self.g = g if g is not None else PSBTMap() self.i = i if i is not None else [] self.o = o if o is not None else [] self.tx = None def deserialize(self, f): assert f.read(5) == b"psbt\xff" self.g = from_binary(PSBTMap, f) assert 0 in self.g.map self.tx = from_binary(CTransaction, self.g.map[0]) self.i = [from_binary(PSBTMap, f) for _ in self.tx.vin] self.o = [from_binary(PSBTMap, f) for _ in self.tx.vout] return self def serialize(self): assert isinstance(self.g, PSBTMap) assert isinstance(self.i, list) and all(isinstance(x, PSBTMap) for x in self.i) assert isinstance(self.o, list) and all(isinstance(x, PSBTMap) for x in self.o) assert 0 in self.g.map tx = from_binary(CTransaction, self.g.map[0]) assert len(tx.vin) == len(self.i) assert len(tx.vout) == len(self.o) psbt = [x.serialize() for x in [self.g] + self.i + self.o] return b"psbt\xff" + b"".join(psbt) def make_blank(self): """ Remove all fields except for PSBT_GLOBAL_UNSIGNED_TX """ for m in self.i + self.o: m.map.clear() self.g = PSBTMap(map={0: self.g.map[0]}) def to_base64(self): return base64.b64encode(self.serialize()).decode("utf8") @classmethod def from_base64(cls, b64psbt): return from_binary(cls, base64.b64decode(b64psbt)) # Sighash serializations def sighash_all_witness(script_code, psbt, i, acp=False): """ Compute the ALL signature hash of the {psbt} 's input {i}. :param acp: if True, use ALL | ANYONECANPAY behaviour. """ # Calculate hashPrevouts and hashSequence if not acp: prevouts_preimage = b"" sequence_preimage = b"" for inputs in psbt.tx.vin: prevouts_preimage += inputs.prevout.serialize() sequence_preimage += struct.pack("