#!/usr/bin/env python3 # Stolen from https://github.com/achow101/psbt-simple-signer/blob/5def3622a09f5bcb76ae79707f0790d050291474/serializations.py # PSBT serialization was authored by Andrew Chow (achow101) # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Bitcoin Object Python Serializations Modified from the test/test_framework/mininode.py file from the Bitcoin repository CTransaction,CTxIn, CTxOut, etc....: data structures that should map to corresponding structures in bitcoin/primitives for transactions only ser_*, deser_*: functions that handle serialization/deserialization """ from io import BytesIO, BufferedReader from codecs import encode import struct import binascii import hashlib import copy import base64 def sha256(s): return hashlib.new("sha256", s).digest() def ripemd160(s): return hashlib.new("ripemd160", s).digest() def hash256(s): return sha256(sha256(s)) def hash160(s): return ripemd160(sha256(s)) # Serialization/deserialization tools def ser_compact_size(l): r = b"" if l < 253: r = struct.pack("B", l) elif l < 0x10000: r = struct.pack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack("> 24) & 0xFF v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) return v def deser_vector(f, c): nit = deser_compact_size(f) r = [] for i in range(nit): t = c() t.deserialize(f) r.append(t) return r # ser_function_name: Allow for an alternate serialization function on the # entries in the vector (we use this for serializing the vector of transactions # for a witness block). def ser_vector(l, ser_function_name=None): r = ser_compact_size(len(l)) for i in l: if ser_function_name: r += getattr(i, ser_function_name)() else: r += i.serialize() return r def deser_uint256_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_uint256(f) r.append(t) return r def ser_uint256_vector(l): r = ser_compact_size(len(l)) for i in l: r += ser_uint256(i) return r def deser_string_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_string(f) r.append(t) return r def ser_string_vector(l): r = ser_compact_size(len(l)) for sv in l: r += ser_string(sv) return r def deser_int_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = struct.unpack(" 42: return (False, None, None) if script[0] != 0 and (script[0] < 81 or script[0] > 96): return (False, None, None) if script[1] + 2 == len(script): return (True, script[0] - 0x50 if script[0] else 0, script[2:]) return (False, None, None) # Objects that map to bitcoind objects, which can be serialized/deserialized MSG_WITNESS_FLAG = 1 << 30 class COutPoint(object): def __init__(self, hash=0, n=0xFFFFFFFF): self.hash = hash self.n = n def deserialize(self, f): self.hash = deser_uint256(f) self.n = struct.unpack(" 21000000 * COIN: return False return True def is_null(self): return len(self.vin) == 0 and len(self.vout) == 0 def __repr__(self): return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" % ( self.nVersion, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime, ) def DeserializeHDKeypath(f, key, hd_keypaths): if len(key) != 34 and len(key) != 66: raise IOError( "Size of key was not the expected size for the type partial signature pubkey" ) pubkey = key[1:] if pubkey in hd_keypaths: raise IOError( "Duplicate key, input partial signature for pubkey already provided" ) value = deser_string(f) hd_keypaths[pubkey] = struct.unpack("<" + "I" * (len(value) // 4), value) def SerializeHDKeypath(hd_keypaths, type): r = b"" for pubkey, path in hd_keypaths.items(): r += ser_string(type + pubkey) packed = struct.pack("<" + "I" * len(path), *path) r += ser_string(packed) return r class PartiallySignedInput: def __init__(self): self.non_witness_utxo = None self.witness_utxo = None self.partial_sigs = {} self.sighash = 0 self.redeem_script = b"" self.witness_script = b"" self.hd_keypaths = {} self.final_script_sig = b"" self.final_script_witness = CTxInWitness() self.unknown = {} def set_null(self): self.non_witness_utxo = None self.witness_utxo = None self.partial_sigs.clear() self.sighash = 0 self.redeem_script = b"" self.witness_script = b"" self.hd_keypaths.clear() self.final_script_sig = b"" self.final_script_witness = CTxInWitness() self.unknown.clear() def deserialize(self, f): while True: # read the key try: key = deser_string(f) except Exception: break # Check for separator if len(key) == 0: break # First byte of key is the type key_type = struct.unpack("b", bytearray([key[0]]))[0] if key_type == 0: if self.non_witness_utxo: raise IOError( "Duplicate Key, input non witness utxo already provided" ) elif len(key) != 1: raise IOError("non witness utxo key is more than one byte type") self.non_witness_utxo = CTransaction() value = BufferedReader(BytesIO(deser_string(f))) self.non_witness_utxo.deserialize(value) self.non_witness_utxo.rehash() elif key_type == 1: if self.witness_utxo: raise IOError("Duplicate Key, input witness utxo already provided") elif len(key) != 1: raise IOError("witness utxo key is more than one byte type") self.witness_utxo = CTxOut() value = BufferedReader(BytesIO(deser_string(f))) self.witness_utxo.deserialize(value) elif key_type == 2: if len(key) != 34 and len(key) != 66: raise IOError( "Size of key was not the expected size for the type partial signature pubkey" ) pubkey = key[1:] if pubkey in self.partial_sigs: raise IOError( "Duplicate key, input partial signature for pubkey already provided" ) sig = deser_string(f) self.partial_sigs[pubkey] = sig elif key_type == 3: if self.sighash > 0: raise IOError("Duplicate key, input sighash type already provided") elif len(key) != 1: raise IOError("sighash key is more than one byte type") value = deser_string(f) self.sighash = struct.unpack(" 0: r += ser_string(b"\x03") r += ser_string(struct.pack(" 1: raise IOError("Global unsigned tx key is more than one byte type") # read in value value = BufferedReader(BytesIO(deser_string(f))) self.tx.deserialize(value) # Make sure that all scriptSigs and scriptWitnesses are empty for txin in self.tx.vin: if len(txin.scriptSig) != 0 or not self.tx.wit.is_null(): raise IOError( "Unsigned tx does not have empty scriptSigs and scriptWitnesses" ) else: if key in self.unknown: raise IOError( "Duplicate key, key for unknown value already provided" ) value = deser_string(f) self.unknown[key] = value # make sure that we got an unsigned tx if self.tx.is_null(): raise IOError("No unsigned trasaction was provided") # Read input data for txin in self.tx.vin: input = PartiallySignedInput() input.deserialize(f) self.inputs.append(input) if ( input.non_witness_utxo and input.non_witness_utxo.rehash() and input.non_witness_utxo.sha256 != txin.prevout.sha256 ): raise IOError("Non-witness UTXO does not match outpoint hash") if len(self.inputs) != len(self.tx.vin): raise IOError( "Inputs provided does not match the number of inputs in transaction" ) # Read output data for txout in self.tx.vout: output = PartiallySignedOutput() output.deserialize(f) self.outputs.append(output) if len(self.outputs) != len(self.tx.vout): raise IOError( "Outputs provided does not match the number of outputs in transaction" ) if not self.is_sane(): raise IOError("PSBT is not sane") def serialize(self): r = b"" # magic bytes r += b"psbt\xff" # unsigned tx flag r += b"\x01\x00" # write serialized tx tx = self.tx.serialize_with_witness() r += ser_compact_size(len(tx)) r += tx # separator r += b"\x00" # unknowns for key, value in self.unknown: r += ser_string(key) r += ser_string(value) # inputs for input in self.inputs: r += input.serialize() # outputs for output in self.outputs: r += output.serialize() # return hex string return HexToBase64(binascii.hexlify(r)).decode() def is_sane(self): for input in self.inputs: if not input.is_sane(): return False return True # Sighash serializations def sighash_all_witness(script_code, psbt, i, acp=False): """ Compute the ALL signature hash of the {psbt} 's input {i}. :param acp: if True, use ALL | ANYONECANPAY behaviour. """ # Calculate hashPrevouts and hashSequence if not acp: prevouts_preimage = b"" sequence_preimage = b"" for inputs in psbt.tx.vin: prevouts_preimage += inputs.prevout.serialize() sequence_preimage += struct.pack("