mirror of
https://github.com/bitcoin/bips.git
synced 2026-04-20 16:28:39 +00:00
BIP-375: Add bitcoin test framework as dependency - deps/bitcoin_test
This commit is contained in:
449
bip-0375/deps/bitcoin_test/messages.py
Normal file
449
bip-0375/deps/bitcoin_test/messages.py
Normal file
@@ -0,0 +1,449 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# Copyright (c) 2010 ArtForz -- public domain half-a-node
|
||||||
|
# Copyright (c) 2012 Jeff Garzik
|
||||||
|
# Copyright (c) 2010-present The Bitcoin Core developers
|
||||||
|
# Distributed under the MIT software license, see the accompanying
|
||||||
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
"""Bitcoin test framework primitive and message structures
|
||||||
|
|
||||||
|
CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....:
|
||||||
|
data structures that should map to corresponding structures in
|
||||||
|
bitcoin/primitives
|
||||||
|
|
||||||
|
msg_block, msg_tx, msg_headers, etc.:
|
||||||
|
data structures that represent network messages
|
||||||
|
|
||||||
|
ser_*, deser_*: functions that handle serialization/deserialization.
|
||||||
|
|
||||||
|
Classes use __slots__ to ensure extraneous attributes aren't accidentally added
|
||||||
|
by tests, compromising their intended effect.
|
||||||
|
"""
|
||||||
|
|
||||||
|
########################################################################
|
||||||
|
# Adapted from Bitcoin Core test framework messages.py
|
||||||
|
# for BIP-375 PSBT validation tests.
|
||||||
|
########################################################################
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import hashlib
|
||||||
|
import math
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
COIN = 100000000 # 1 btc in satoshis
|
||||||
|
WITNESS_SCALE_FACTOR = 4
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Serialization utilities
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def hash160(s: bytes) -> bytes:
|
||||||
|
return hashlib.new("ripemd160", sha256(s)).digest()
|
||||||
|
|
||||||
|
|
||||||
|
def sha256(s: bytes) -> bytes:
|
||||||
|
return hashlib.sha256(s).digest()
|
||||||
|
|
||||||
|
|
||||||
|
def hash256(s: bytes) -> bytes:
|
||||||
|
return sha256(sha256(s))
|
||||||
|
|
||||||
|
|
||||||
|
def ser_compact_size(l):
|
||||||
|
r = b""
|
||||||
|
if l < 253:
|
||||||
|
r = l.to_bytes(1, "little")
|
||||||
|
elif l < 0x10000:
|
||||||
|
r = (253).to_bytes(1, "little") + l.to_bytes(2, "little")
|
||||||
|
elif l < 0x100000000:
|
||||||
|
r = (254).to_bytes(1, "little") + l.to_bytes(4, "little")
|
||||||
|
else:
|
||||||
|
r = (255).to_bytes(1, "little") + l.to_bytes(8, "little")
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def deser_compact_size(f):
|
||||||
|
nit = int.from_bytes(f.read(1), "little")
|
||||||
|
if nit == 253:
|
||||||
|
nit = int.from_bytes(f.read(2), "little")
|
||||||
|
elif nit == 254:
|
||||||
|
nit = int.from_bytes(f.read(4), "little")
|
||||||
|
elif nit == 255:
|
||||||
|
nit = int.from_bytes(f.read(8), "little")
|
||||||
|
return nit
|
||||||
|
|
||||||
|
|
||||||
|
def ser_varint(l):
|
||||||
|
r = b""
|
||||||
|
while True:
|
||||||
|
r = bytes([(l & 0x7f) | (0x80 if len(r) > 0 else 0x00)]) + r
|
||||||
|
if l <= 0x7f:
|
||||||
|
return r
|
||||||
|
l = (l >> 7) - 1
|
||||||
|
|
||||||
|
|
||||||
|
def deser_varint(f):
|
||||||
|
n = 0
|
||||||
|
while True:
|
||||||
|
dat = f.read(1)[0]
|
||||||
|
n = (n << 7) | (dat & 0x7f)
|
||||||
|
if (dat & 0x80) > 0:
|
||||||
|
n += 1
|
||||||
|
else:
|
||||||
|
return n
|
||||||
|
|
||||||
|
|
||||||
|
def deser_string(f):
|
||||||
|
nit = deser_compact_size(f)
|
||||||
|
return f.read(nit)
|
||||||
|
|
||||||
|
|
||||||
|
def ser_string(s):
|
||||||
|
return ser_compact_size(len(s)) + s
|
||||||
|
|
||||||
|
|
||||||
|
def deser_uint256(f):
|
||||||
|
return int.from_bytes(f.read(32), 'little')
|
||||||
|
|
||||||
|
|
||||||
|
def ser_uint256(u):
|
||||||
|
return u.to_bytes(32, 'little')
|
||||||
|
|
||||||
|
|
||||||
|
def uint256_from_str(s):
|
||||||
|
return int.from_bytes(s[:32], 'little')
|
||||||
|
|
||||||
|
|
||||||
|
def uint256_from_compact(c):
|
||||||
|
nbytes = (c >> 24) & 0xFF
|
||||||
|
v = (c & 0xFFFFFF) << (8 * (nbytes - 3))
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
|
# deser_function_name: Allow for an alternate deserialization function on the
|
||||||
|
# entries in the vector.
|
||||||
|
def deser_vector(f, c, deser_function_name=None):
|
||||||
|
nit = deser_compact_size(f)
|
||||||
|
r = []
|
||||||
|
for _ in range(nit):
|
||||||
|
t = c()
|
||||||
|
if deser_function_name:
|
||||||
|
getattr(t, deser_function_name)(f)
|
||||||
|
else:
|
||||||
|
t.deserialize(f)
|
||||||
|
r.append(t)
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
# ser_function_name: Allow for an alternate serialization function on the
|
||||||
|
# entries in the vector (we use this for serializing the vector of transactions
|
||||||
|
# for a witness block).
|
||||||
|
def ser_vector(l, ser_function_name=None):
|
||||||
|
r = ser_compact_size(len(l))
|
||||||
|
for i in l:
|
||||||
|
if ser_function_name:
|
||||||
|
r += getattr(i, ser_function_name)()
|
||||||
|
else:
|
||||||
|
r += i.serialize()
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def deser_uint256_vector(f):
|
||||||
|
nit = deser_compact_size(f)
|
||||||
|
r = []
|
||||||
|
for _ in range(nit):
|
||||||
|
t = deser_uint256(f)
|
||||||
|
r.append(t)
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def ser_uint256_vector(l):
|
||||||
|
r = ser_compact_size(len(l))
|
||||||
|
for i in l:
|
||||||
|
r += ser_uint256(i)
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def deser_string_vector(f):
|
||||||
|
nit = deser_compact_size(f)
|
||||||
|
r = []
|
||||||
|
for _ in range(nit):
|
||||||
|
t = deser_string(f)
|
||||||
|
r.append(t)
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def ser_string_vector(l):
|
||||||
|
r = ser_compact_size(len(l))
|
||||||
|
for sv in l:
|
||||||
|
r += ser_string(sv)
|
||||||
|
return r
|
||||||
|
|
||||||
|
# like from_hex, but without the hex part
|
||||||
|
def from_binary(cls, stream):
|
||||||
|
"""deserialize a binary stream (or bytes object) into an object"""
|
||||||
|
# handle bytes object by turning it into a stream
|
||||||
|
was_bytes = isinstance(stream, bytes)
|
||||||
|
if was_bytes:
|
||||||
|
stream = BytesIO(stream)
|
||||||
|
obj = cls()
|
||||||
|
obj.deserialize(stream)
|
||||||
|
if was_bytes:
|
||||||
|
assert len(stream.read()) == 0
|
||||||
|
return obj
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Transaction data structures
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
class COutPoint:
|
||||||
|
__slots__ = ("hash", "n")
|
||||||
|
|
||||||
|
def __init__(self, hash=0, n=0):
|
||||||
|
self.hash = hash
|
||||||
|
self.n = n
|
||||||
|
|
||||||
|
def deserialize(self, f):
|
||||||
|
self.hash = deser_uint256(f)
|
||||||
|
self.n = int.from_bytes(f.read(4), "little")
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
r = b""
|
||||||
|
r += ser_uint256(self.hash)
|
||||||
|
r += self.n.to_bytes(4, "little")
|
||||||
|
return r
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "COutPoint(hash=%064x n=%i)" % (self.hash, self.n)
|
||||||
|
|
||||||
|
class CTxIn:
|
||||||
|
__slots__ = ("nSequence", "prevout", "scriptSig")
|
||||||
|
|
||||||
|
def __init__(self, outpoint=None, scriptSig=b"", nSequence=0):
|
||||||
|
if outpoint is None:
|
||||||
|
self.prevout = COutPoint()
|
||||||
|
else:
|
||||||
|
self.prevout = outpoint
|
||||||
|
self.scriptSig = scriptSig
|
||||||
|
self.nSequence = nSequence
|
||||||
|
|
||||||
|
def deserialize(self, f):
|
||||||
|
self.prevout = COutPoint()
|
||||||
|
self.prevout.deserialize(f)
|
||||||
|
self.scriptSig = deser_string(f)
|
||||||
|
self.nSequence = int.from_bytes(f.read(4), "little")
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
r = b""
|
||||||
|
r += self.prevout.serialize()
|
||||||
|
r += ser_string(self.scriptSig)
|
||||||
|
r += self.nSequence.to_bytes(4, "little")
|
||||||
|
return r
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "CTxIn(prevout=%s scriptSig=%s nSequence=%i)" \
|
||||||
|
% (repr(self.prevout), self.scriptSig.hex(),
|
||||||
|
self.nSequence)
|
||||||
|
|
||||||
|
|
||||||
|
class CTxOut:
|
||||||
|
__slots__ = ("nValue", "scriptPubKey")
|
||||||
|
|
||||||
|
def __init__(self, nValue=0, scriptPubKey=b""):
|
||||||
|
self.nValue = nValue
|
||||||
|
self.scriptPubKey = scriptPubKey
|
||||||
|
|
||||||
|
def deserialize(self, f):
|
||||||
|
self.nValue = int.from_bytes(f.read(8), "little", signed=True)
|
||||||
|
self.scriptPubKey = deser_string(f)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
r = b""
|
||||||
|
r += self.nValue.to_bytes(8, "little", signed=True)
|
||||||
|
r += ser_string(self.scriptPubKey)
|
||||||
|
return r
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "CTxOut(nValue=%i.%08i scriptPubKey=%s)" \
|
||||||
|
% (self.nValue // COIN, self.nValue % COIN,
|
||||||
|
self.scriptPubKey.hex())
|
||||||
|
|
||||||
|
|
||||||
|
class CScriptWitness:
|
||||||
|
__slots__ = ("stack",)
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# stack is a vector of strings
|
||||||
|
self.stack = []
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "CScriptWitness(%s)" % \
|
||||||
|
(",".join([x.hex() for x in self.stack]))
|
||||||
|
|
||||||
|
def is_null(self):
|
||||||
|
if self.stack:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class CTxInWitness:
|
||||||
|
__slots__ = ("scriptWitness",)
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.scriptWitness = CScriptWitness()
|
||||||
|
|
||||||
|
def deserialize(self, f):
|
||||||
|
self.scriptWitness.stack = deser_string_vector(f)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
return ser_string_vector(self.scriptWitness.stack)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return repr(self.scriptWitness)
|
||||||
|
|
||||||
|
def is_null(self):
|
||||||
|
return self.scriptWitness.is_null()
|
||||||
|
|
||||||
|
|
||||||
|
class CTxWitness:
|
||||||
|
__slots__ = ("vtxinwit",)
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.vtxinwit = []
|
||||||
|
|
||||||
|
def deserialize(self, f):
|
||||||
|
for i in range(len(self.vtxinwit)):
|
||||||
|
self.vtxinwit[i].deserialize(f)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
r = b""
|
||||||
|
# This is different than the usual vector serialization --
|
||||||
|
# we omit the length of the vector, which is required to be
|
||||||
|
# the same length as the transaction's vin vector.
|
||||||
|
for x in self.vtxinwit:
|
||||||
|
r += x.serialize()
|
||||||
|
return r
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "CTxWitness(%s)" % \
|
||||||
|
(';'.join([repr(x) for x in self.vtxinwit]))
|
||||||
|
|
||||||
|
def is_null(self):
|
||||||
|
for x in self.vtxinwit:
|
||||||
|
if not x.is_null():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class CTransaction:
|
||||||
|
__slots__ = ("nLockTime", "version", "vin", "vout", "wit")
|
||||||
|
|
||||||
|
def __init__(self, tx=None):
|
||||||
|
if tx is None:
|
||||||
|
self.version = 2
|
||||||
|
self.vin = []
|
||||||
|
self.vout = []
|
||||||
|
self.wit = CTxWitness()
|
||||||
|
self.nLockTime = 0
|
||||||
|
else:
|
||||||
|
self.version = tx.version
|
||||||
|
self.vin = copy.deepcopy(tx.vin)
|
||||||
|
self.vout = copy.deepcopy(tx.vout)
|
||||||
|
self.nLockTime = tx.nLockTime
|
||||||
|
self.wit = copy.deepcopy(tx.wit)
|
||||||
|
|
||||||
|
def deserialize(self, f):
|
||||||
|
self.version = int.from_bytes(f.read(4), "little")
|
||||||
|
self.vin = deser_vector(f, CTxIn)
|
||||||
|
flags = 0
|
||||||
|
if len(self.vin) == 0:
|
||||||
|
flags = int.from_bytes(f.read(1), "little")
|
||||||
|
# Not sure why flags can't be zero, but this
|
||||||
|
# matches the implementation in bitcoind
|
||||||
|
if (flags != 0):
|
||||||
|
self.vin = deser_vector(f, CTxIn)
|
||||||
|
self.vout = deser_vector(f, CTxOut)
|
||||||
|
else:
|
||||||
|
self.vout = deser_vector(f, CTxOut)
|
||||||
|
if flags != 0:
|
||||||
|
self.wit.vtxinwit = [CTxInWitness() for _ in range(len(self.vin))]
|
||||||
|
self.wit.deserialize(f)
|
||||||
|
else:
|
||||||
|
self.wit = CTxWitness()
|
||||||
|
self.nLockTime = int.from_bytes(f.read(4), "little")
|
||||||
|
|
||||||
|
def serialize_without_witness(self):
|
||||||
|
r = b""
|
||||||
|
r += self.version.to_bytes(4, "little")
|
||||||
|
r += ser_vector(self.vin)
|
||||||
|
r += ser_vector(self.vout)
|
||||||
|
r += self.nLockTime.to_bytes(4, "little")
|
||||||
|
return r
|
||||||
|
|
||||||
|
# Only serialize with witness when explicitly called for
|
||||||
|
def serialize_with_witness(self):
|
||||||
|
flags = 0
|
||||||
|
if not self.wit.is_null():
|
||||||
|
flags |= 1
|
||||||
|
r = b""
|
||||||
|
r += self.version.to_bytes(4, "little")
|
||||||
|
if flags:
|
||||||
|
dummy = []
|
||||||
|
r += ser_vector(dummy)
|
||||||
|
r += flags.to_bytes(1, "little")
|
||||||
|
r += ser_vector(self.vin)
|
||||||
|
r += ser_vector(self.vout)
|
||||||
|
if flags & 1:
|
||||||
|
if (len(self.wit.vtxinwit) != len(self.vin)):
|
||||||
|
# vtxinwit must have the same length as vin
|
||||||
|
self.wit.vtxinwit = self.wit.vtxinwit[:len(self.vin)]
|
||||||
|
for _ in range(len(self.wit.vtxinwit), len(self.vin)):
|
||||||
|
self.wit.vtxinwit.append(CTxInWitness())
|
||||||
|
r += self.wit.serialize()
|
||||||
|
r += self.nLockTime.to_bytes(4, "little")
|
||||||
|
return r
|
||||||
|
|
||||||
|
# Regular serialization is with witness -- must explicitly
|
||||||
|
# call serialize_without_witness to exclude witness data.
|
||||||
|
def serialize(self):
|
||||||
|
return self.serialize_with_witness()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def wtxid_hex(self):
|
||||||
|
"""Return wtxid (transaction hash with witness) as hex string."""
|
||||||
|
return hash256(self.serialize())[::-1].hex()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def wtxid_int(self):
|
||||||
|
"""Return wtxid (transaction hash with witness) as integer."""
|
||||||
|
return uint256_from_str(hash256(self.serialize_with_witness()))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def txid_hex(self):
|
||||||
|
"""Return txid (transaction hash without witness) as hex string."""
|
||||||
|
return hash256(self.serialize_without_witness())[::-1].hex()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def txid_int(self):
|
||||||
|
"""Return txid (transaction hash without witness) as integer."""
|
||||||
|
return uint256_from_str(hash256(self.serialize_without_witness()))
|
||||||
|
|
||||||
|
def is_valid(self):
|
||||||
|
for tout in self.vout:
|
||||||
|
if tout.nValue < 0 or tout.nValue > 21000000 * COIN:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Calculate the transaction weight using witness and non-witness
|
||||||
|
# serialization size (does NOT use sigops).
|
||||||
|
def get_weight(self):
|
||||||
|
with_witness_size = len(self.serialize_with_witness())
|
||||||
|
without_witness_size = len(self.serialize_without_witness())
|
||||||
|
return (WITNESS_SCALE_FACTOR - 1) * without_witness_size + with_witness_size
|
||||||
|
|
||||||
|
def get_vsize(self):
|
||||||
|
return math.ceil(self.get_weight() / WITNESS_SCALE_FACTOR)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "CTransaction(version=%i vin=%s vout=%s wit=%s nLockTime=%i)" \
|
||||||
|
% (self.version, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime)
|
||||||
197
bip-0375/deps/bitcoin_test/psbt.py
Normal file
197
bip-0375/deps/bitcoin_test/psbt.py
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# Copyright (c) 2022-present The Bitcoin Core developers
|
||||||
|
# Distributed under the MIT software license, see the accompanying
|
||||||
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
|
||||||
|
########################################################################
|
||||||
|
# Adapted from Bitcoin Core test framework psbt.py
|
||||||
|
# for BIP-375 PSBT validation tests.
|
||||||
|
########################################################################
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import struct
|
||||||
|
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
from .messages import (
|
||||||
|
CTransaction,
|
||||||
|
deser_string,
|
||||||
|
deser_compact_size,
|
||||||
|
from_binary,
|
||||||
|
ser_compact_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# global types
|
||||||
|
PSBT_GLOBAL_UNSIGNED_TX = 0x00
|
||||||
|
PSBT_GLOBAL_XPUB = 0x01
|
||||||
|
PSBT_GLOBAL_TX_VERSION = 0x02
|
||||||
|
PSBT_GLOBAL_FALLBACK_LOCKTIME = 0x03
|
||||||
|
PSBT_GLOBAL_INPUT_COUNT = 0x04
|
||||||
|
PSBT_GLOBAL_OUTPUT_COUNT = 0x05
|
||||||
|
PSBT_GLOBAL_TX_MODIFIABLE = 0x06
|
||||||
|
PSBT_GLOBAL_VERSION = 0xfb
|
||||||
|
PSBT_GLOBAL_PROPRIETARY = 0xfc
|
||||||
|
|
||||||
|
# per-input types
|
||||||
|
PSBT_IN_NON_WITNESS_UTXO = 0x00
|
||||||
|
PSBT_IN_WITNESS_UTXO = 0x01
|
||||||
|
PSBT_IN_PARTIAL_SIG = 0x02
|
||||||
|
PSBT_IN_SIGHASH_TYPE = 0x03
|
||||||
|
PSBT_IN_REDEEM_SCRIPT = 0x04
|
||||||
|
PSBT_IN_WITNESS_SCRIPT = 0x05
|
||||||
|
PSBT_IN_BIP32_DERIVATION = 0x06
|
||||||
|
PSBT_IN_FINAL_SCRIPTSIG = 0x07
|
||||||
|
PSBT_IN_FINAL_SCRIPTWITNESS = 0x08
|
||||||
|
PSBT_IN_POR_COMMITMENT = 0x09
|
||||||
|
PSBT_IN_RIPEMD160 = 0x0a
|
||||||
|
PSBT_IN_SHA256 = 0x0b
|
||||||
|
PSBT_IN_HASH160 = 0x0c
|
||||||
|
PSBT_IN_HASH256 = 0x0d
|
||||||
|
PSBT_IN_PREVIOUS_TXID = 0x0e
|
||||||
|
PSBT_IN_OUTPUT_INDEX = 0x0f
|
||||||
|
PSBT_IN_SEQUENCE = 0x10
|
||||||
|
PSBT_IN_REQUIRED_TIME_LOCKTIME = 0x11
|
||||||
|
PSBT_IN_REQUIRED_HEIGHT_LOCKTIME = 0x12
|
||||||
|
PSBT_IN_TAP_KEY_SIG = 0x13
|
||||||
|
PSBT_IN_TAP_SCRIPT_SIG = 0x14
|
||||||
|
PSBT_IN_TAP_LEAF_SCRIPT = 0x15
|
||||||
|
PSBT_IN_TAP_BIP32_DERIVATION = 0x16
|
||||||
|
PSBT_IN_TAP_INTERNAL_KEY = 0x17
|
||||||
|
PSBT_IN_TAP_MERKLE_ROOT = 0x18
|
||||||
|
PSBT_IN_MUSIG2_PARTICIPANT_PUBKEYS = 0x1a
|
||||||
|
PSBT_IN_MUSIG2_PUB_NONCE = 0x1b
|
||||||
|
PSBT_IN_MUSIG2_PARTIAL_SIG = 0x1c
|
||||||
|
PSBT_IN_PROPRIETARY = 0xfc
|
||||||
|
|
||||||
|
# per-output types
|
||||||
|
PSBT_OUT_REDEEM_SCRIPT = 0x00
|
||||||
|
PSBT_OUT_WITNESS_SCRIPT = 0x01
|
||||||
|
PSBT_OUT_BIP32_DERIVATION = 0x02
|
||||||
|
PSBT_OUT_AMOUNT = 0x03
|
||||||
|
PSBT_OUT_SCRIPT = 0x04
|
||||||
|
PSBT_OUT_TAP_INTERNAL_KEY = 0x05
|
||||||
|
PSBT_OUT_TAP_TREE = 0x06
|
||||||
|
PSBT_OUT_TAP_BIP32_DERIVATION = 0x07
|
||||||
|
PSBT_OUT_MUSIG2_PARTICIPANT_PUBKEYS = 0x08
|
||||||
|
PSBT_OUT_PROPRIETARY = 0xfc
|
||||||
|
|
||||||
|
|
||||||
|
class PSBTMap:
|
||||||
|
"""Class for serializing and deserializing PSBT maps"""
|
||||||
|
|
||||||
|
def __init__(self, map=None):
|
||||||
|
self.map = map if map is not None else {}
|
||||||
|
|
||||||
|
def deserialize(self, f):
|
||||||
|
m = {}
|
||||||
|
while True:
|
||||||
|
k = deser_string(f)
|
||||||
|
if len(k) == 0:
|
||||||
|
break
|
||||||
|
v = deser_string(f)
|
||||||
|
if len(k) == 1:
|
||||||
|
k = k[0]
|
||||||
|
assert k not in m
|
||||||
|
m[k] = v
|
||||||
|
self.map = m
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
m = b""
|
||||||
|
for k,v in self.map.items():
|
||||||
|
if isinstance(k, int) and 0 <= k and k <= 255:
|
||||||
|
k = bytes([k])
|
||||||
|
if isinstance(v, list):
|
||||||
|
assert all(type(elem) is bytes for elem in v)
|
||||||
|
v = b"".join(v) # simply concatenate the byte-strings w/o size prefixes
|
||||||
|
m += ser_compact_size(len(k)) + k
|
||||||
|
m += ser_compact_size(len(v)) + v
|
||||||
|
m += b"\x00"
|
||||||
|
return m
|
||||||
|
|
||||||
|
class PSBT:
|
||||||
|
"""Class for serializing and deserializing PSBTs"""
|
||||||
|
|
||||||
|
def __init__(self, *, g=None, i=None, o=None):
|
||||||
|
self.g = g if g is not None else PSBTMap()
|
||||||
|
self.i = i if i is not None else []
|
||||||
|
self.o = o if o is not None else []
|
||||||
|
self.in_count = len(i) if i is not None else None
|
||||||
|
self.out_count = len(o) if o is not None else None
|
||||||
|
self.version = None
|
||||||
|
|
||||||
|
def deserialize(self, f):
|
||||||
|
assert f.read(5) == b"psbt\xff"
|
||||||
|
self.g = from_binary(PSBTMap, f)
|
||||||
|
|
||||||
|
self.version = 0
|
||||||
|
if PSBT_GLOBAL_VERSION in self.g.map:
|
||||||
|
assert PSBT_GLOBAL_INPUT_COUNT in self.g.map
|
||||||
|
assert PSBT_GLOBAL_OUTPUT_COUNT in self.g.map
|
||||||
|
self.version = struct.unpack("<I", self.g.map[PSBT_GLOBAL_VERSION])[0]
|
||||||
|
assert self.version in [0, 2]
|
||||||
|
if self.version == 2:
|
||||||
|
self.in_count = deser_compact_size(BytesIO(self.g.map[PSBT_GLOBAL_INPUT_COUNT]))
|
||||||
|
self.out_count = deser_compact_size(BytesIO(self.g.map[PSBT_GLOBAL_OUTPUT_COUNT]))
|
||||||
|
else:
|
||||||
|
assert PSBT_GLOBAL_UNSIGNED_TX in self.g.map
|
||||||
|
tx = from_binary(CTransaction, self.g.map[PSBT_GLOBAL_UNSIGNED_TX])
|
||||||
|
self.in_count = len(tx.vin)
|
||||||
|
self.out_count = len(tx.vout)
|
||||||
|
|
||||||
|
self.i = [from_binary(PSBTMap, f) for _ in range(self.in_count)]
|
||||||
|
self.o = [from_binary(PSBTMap, f) for _ in range(self.out_count)]
|
||||||
|
return self
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
assert isinstance(self.g, PSBTMap)
|
||||||
|
assert isinstance(self.i, list) and all(isinstance(x, PSBTMap) for x in self.i)
|
||||||
|
assert isinstance(self.o, list) and all(isinstance(x, PSBTMap) for x in self.o)
|
||||||
|
if self.version is not None and self.version == 2:
|
||||||
|
self.g.map[PSBT_GLOBAL_INPUT_COUNT] = ser_compact_size(len(self.i))
|
||||||
|
self.g.map[PSBT_GLOBAL_OUTPUT_COUNT] = ser_compact_size(len(self.o))
|
||||||
|
|
||||||
|
psbt = [x.serialize() for x in [self.g] + self.i + self.o]
|
||||||
|
return b"psbt\xff" + b"".join(psbt)
|
||||||
|
|
||||||
|
def make_blank(self):
|
||||||
|
"""
|
||||||
|
Remove all fields except for required fields depending on version
|
||||||
|
"""
|
||||||
|
if self.version == 0:
|
||||||
|
for m in self.i + self.o:
|
||||||
|
m.map.clear()
|
||||||
|
|
||||||
|
self.g = PSBTMap(map={PSBT_GLOBAL_UNSIGNED_TX: self.g.map[PSBT_GLOBAL_UNSIGNED_TX]})
|
||||||
|
elif self.version == 2:
|
||||||
|
self.g = PSBTMap(map={
|
||||||
|
PSBT_GLOBAL_TX_VERSION: self.g.map[PSBT_GLOBAL_TX_VERSION],
|
||||||
|
PSBT_GLOBAL_INPUT_COUNT: self.g.map[PSBT_GLOBAL_INPUT_COUNT],
|
||||||
|
PSBT_GLOBAL_OUTPUT_COUNT: self.g.map[PSBT_GLOBAL_OUTPUT_COUNT],
|
||||||
|
PSBT_GLOBAL_VERSION: self.g.map[PSBT_GLOBAL_VERSION],
|
||||||
|
})
|
||||||
|
|
||||||
|
new_i = []
|
||||||
|
for m in self.i:
|
||||||
|
new_i.append(PSBTMap(map={
|
||||||
|
PSBT_IN_PREVIOUS_TXID: m.map[PSBT_IN_PREVIOUS_TXID],
|
||||||
|
PSBT_IN_OUTPUT_INDEX: m.map[PSBT_IN_OUTPUT_INDEX],
|
||||||
|
}))
|
||||||
|
self.i = new_i
|
||||||
|
|
||||||
|
new_o = []
|
||||||
|
for m in self.o:
|
||||||
|
new_o.append(PSBTMap(map={
|
||||||
|
PSBT_OUT_SCRIPT: m.map[PSBT_OUT_SCRIPT],
|
||||||
|
PSBT_OUT_AMOUNT: m.map[PSBT_OUT_AMOUNT],
|
||||||
|
}))
|
||||||
|
self.o = new_o
|
||||||
|
else:
|
||||||
|
assert False
|
||||||
|
|
||||||
|
def to_base64(self):
|
||||||
|
return base64.b64encode(self.serialize()).decode("utf8")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_base64(cls, b64psbt):
|
||||||
|
return from_binary(cls, base64.b64decode(b64psbt))
|
||||||
Reference in New Issue
Block a user